cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

append using foreach batch autoloader

seefoods
Contributor III

Hello Guys, 

when i append i have this error someone knows how to fix it? 

raise converted from None pyspark.errors.exceptions.captured.AnalysisException: [TABLE_OR_VIEW_ALREADY_EXISTS] Cannot create table or view `s_test` because it already exists. Choose a different name, drop the existing object, add the IF NOT EXISTS clause to tolerate pre-existing objects, add the OR REPLACE clause to replace the existing materialized view, or add the OR REFRESH clause to refresh the existing streaming table. SQLSTATE: 42P07 SQLSTATE: 39000 SQLSTATE: XXKST



Cordially, 






4 REPLIES 4

ManojkMohan
Valued Contributor III

Root Cause:
The table or view s_test already exists in the catalog.

The code tries a CREATE TABLE without IF NOT EXISTS or without dropping the existing table first.

The underlying Spark or SQL engine enforces uniqueness of table names and raises an error if the same name is reused improperly.

Usually if there is a Repeated CREATE TABLE commands in code or pipeline without handling existence.   Solution: 

Approach

DescriptionCode Example
Use IF NOT EXISTSAvoid error by conditionally creating table only if it does not exist.CREATE TABLE IF NOT EXISTS s_test (...)
Add OR REPLACE for ViewsFor materialized views, replace existing with new definition.CREATE OR REPLACE VIEW s_test AS ...
Drop Table Before CreateExplicitly drop existing table before creation, ensures clean slate.DROP TABLE IF EXISTS s_test; CREATE TABLE s_test (...)
Use Spark Write ModesUse Spark DataFrameWriter mode("append") or mode("overwrite") depending on use case.df.write.mode("append").saveAsTable("s_test")
Use OR REFRESH for StreamingIf streaming table, use OR REFRESH clause if supported to refresh streaming query.CREATE STREAMING TABLE OR REFRESH s_test (...)

ManojkMohan
Valued Contributor III

Solution:

can you try below


Change logic in creation mode:

Only use .mode("append") or .mode("overwrite") when the table exists.

Use .mode("ignore") or .mode("errorIfExists") (default) appropriately.

Avoid .mode("overwrite"  in streaming foreachBatch to prevent dropping the table schema and causing errors.

Create the table explicitly before streaming starts:

Manually create the Delta table once outside the streaming foreachBatch logic.

Inside foreachBatch, only append data or perform merges on the existing table.

Check and clean up checkpoint location if needed:

Sometimes checkpoints get corrupted, causing streaming retries to attempt table creation again.

seefoods
Contributor III

Hello guys,
this is my source code

 

def batch_writer(self, batch_df: DataFrame, batch_id: int):

   app_id: str = self.spark.sparkContext.applicationId

   writer = batch_df.write.format("delta")

   table_name : str = "test"

   if self.spark.catalog.tableExists(table_name):
       if self.write_mode.value.lower() == "append":
           writer = writer.mode("append").option("txnVersion", batch_id).option("txnAppId", app_id)

       elif self.write_mode.value.lower() == "overwrite":
           writer = writer.mode("overwrite").option("txnVersion", batch_id).option("txnAppId", app_id)
   else:
           writer = writer.mode("overwrite").option("txnVersion", batch_id).option("txnAppId", app_id)

   if self.partition_columns:
       writer = writer.partitionBy(*self.partition_columns)
   writer.saveAsTable(f"test")

 

 


 

def _write_streaming_to_delta(self, df: DataFrame, spark: SparkSession = None, *args, **kwargs):
   stream_writer = (df.writeStream.foreachBatch(self.batch_writer))
   if self.write_mode.value.lower() == "append":
       # Crรฉation de la configuration de base du stream
       (stream_writer.outputMode("append")
        .option("checkpointLocation", self.checkpoint_location)
        .option("mergeSchema", "true").trigger(once=True))
   elif self.write_mode.value.lower() == "complete":
       # Crรฉation de la configuration de base du stream
       (stream_writer.outputMode("complete")
        .option("checkpointLocation", self.checkpoint_location)
        .option("mergeSchema", "true")
        .trigger(once=True))
   elif self.write_mode.value.lower() == "update":
       (stream_writer.outputMode("update")
        .option("checkpointLocation", self.checkpoint_location)
        .option("mergeSchema", "true")
        .trigger(once=True))

   # Lancement du stream et capture de la rรฉfรฉrence
   query = stream_writer.start()
   query.awaitTermination()
                 
 





BS_THE_ANALYST
Esteemed Contributor

Hi @seefoods, you've got a line in your code:

table_name : str = "test"

Surely hard coding that in is going to cause some issues ๐Ÿ˜€.

All the best,

BS

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local communityโ€”sign up today to get started!

Sign Up Now