cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

append using foreach batch autoloader

seefoods
Valued Contributor

Hello Guys, 

when i append i have this error someone knows how to fix it? 

raise converted from None pyspark.errors.exceptions.captured.AnalysisException: [TABLE_OR_VIEW_ALREADY_EXISTS] Cannot create table or view `s_test` because it already exists. Choose a different name, drop the existing object, add the IF NOT EXISTS clause to tolerate pre-existing objects, add the OR REPLACE clause to replace the existing materialized view, or add the OR REFRESH clause to refresh the existing streaming table. SQLSTATE: 42P07 SQLSTATE: 39000 SQLSTATE: XXKST



Cordially, 






2 ACCEPTED SOLUTIONS

Accepted Solutions

szymon_dybczak
Esteemed Contributor III

Hi @seefoods ,

I'm assuming that you're currently testing your code, so hardcoding table_name was done purposefully.

I guess you have a bug in your code.  By default saveAsTable will throw exception if the data already exists. This can be changed using different mode (append, overwrite etc.). So that tells me that something is wrong with the way you're setting below values:

szymon_dybczak_0-1758625922424.png


In my opinion you're setting outputMode on stream_writer object in incorrect way. OutputMode returns new DataStreamWriter object, but you forgot to assign this new DataStreamWriter to your stream_writer variable.
So, basically the above series of if statements don't have any effect on the output mode and hence default mode is used (which is causing an exception):

Try to do it in following way:

 

if self.write_mode.value.lower() == "append":
    stream_writer = (
        stream_writer.outputMode("append")
          .option("checkpointLocation", self.checkpoint_location)
          .option("mergeSchema", "true")
          .trigger(once=True)
    )
elif self.write_mode.value.lower() == "complete":
    stream_writer = (
        stream_writer.outputMode("complete")
          .option("checkpointLocation", self.checkpoint_location)
          .option("mergeSchema", "true")
          .trigger(once=True)
    )
elif self.write_mode.value.lower() == "update":
    stream_writer = (
        stream_writer.outputMode("update")
          .option("checkpointLocation", self.checkpoint_location)
          .option("mergeSchema", "true")
          .trigger(once=True)
    )

query = stream_writer.start()
query.awaitTermination()



View solution in original post

szymon_dybczak
Esteemed Contributor III

Conceptually they're the same. So they will load all available data. But the implementation differs.
In the case of trigger.Once, Spark Structured Streaming will try to load all available data in a single micro-batch. As you can imagine, if there’s a very large amount of data, this can cause serious issues. That’s why this option is deprecated.
trigger.AvailableNow will also load all available data, but by using a series of micro-batches. In the end, the effect will be the same, but with AvailableNow you won’t risk crashing the cluster when trying to load a massive amount of data 🙂

View solution in original post

10 REPLIES 10

ManojkMohan
Valued Contributor III

Root Cause:
The table or view s_test already exists in the catalog.

The code tries a CREATE TABLE without IF NOT EXISTS or without dropping the existing table first.

The underlying Spark or SQL engine enforces uniqueness of table names and raises an error if the same name is reused improperly.

Usually if there is a Repeated CREATE TABLE commands in code or pipeline without handling existence.   Solution: 

Approach

DescriptionCode Example
Use IF NOT EXISTSAvoid error by conditionally creating table only if it does not exist.CREATE TABLE IF NOT EXISTS s_test (...)
Add OR REPLACE for ViewsFor materialized views, replace existing with new definition.CREATE OR REPLACE VIEW s_test AS ...
Drop Table Before CreateExplicitly drop existing table before creation, ensures clean slate.DROP TABLE IF EXISTS s_test; CREATE TABLE s_test (...)
Use Spark Write ModesUse Spark DataFrameWriter mode("append") or mode("overwrite") depending on use case.df.write.mode("append").saveAsTable("s_test")
Use OR REFRESH for StreamingIf streaming table, use OR REFRESH clause if supported to refresh streaming query.CREATE STREAMING TABLE OR REFRESH s_test (...)

ManojkMohan
Valued Contributor III

Solution:

can you try below


Change logic in creation mode:

Only use .mode("append") or .mode("overwrite") when the table exists.

Use .mode("ignore") or .mode("errorIfExists") (default) appropriately.

Avoid .mode("overwrite"  in streaming foreachBatch to prevent dropping the table schema and causing errors.

Create the table explicitly before streaming starts:

Manually create the Delta table once outside the streaming foreachBatch logic.

Inside foreachBatch, only append data or perform merges on the existing table.

Check and clean up checkpoint location if needed:

Sometimes checkpoints get corrupted, causing streaming retries to attempt table creation again.

seefoods
Valued Contributor

Hello guys,
this is my source code

 

def batch_writer(self, batch_df: DataFrame, batch_id: int):

   app_id: str = self.spark.sparkContext.applicationId

   writer = batch_df.write.format("delta")

   table_name : str = "test"

   if self.spark.catalog.tableExists(table_name):
       if self.write_mode.value.lower() == "append":
           writer = writer.mode("append").option("txnVersion", batch_id).option("txnAppId", app_id)

       elif self.write_mode.value.lower() == "overwrite":
           writer = writer.mode("overwrite").option("txnVersion", batch_id).option("txnAppId", app_id)
   else:
           writer = writer.mode("overwrite").option("txnVersion", batch_id).option("txnAppId", app_id)

   if self.partition_columns:
       writer = writer.partitionBy(*self.partition_columns)
   writer.saveAsTable(f"test")

 

 


 

def _write_streaming_to_delta(self, df: DataFrame, spark: SparkSession = None, *args, **kwargs):
   stream_writer = (df.writeStream.foreachBatch(self.batch_writer))
   if self.write_mode.value.lower() == "append":
       # Création de la configuration de base du stream
       (stream_writer.outputMode("append")
        .option("checkpointLocation", self.checkpoint_location)
        .option("mergeSchema", "true").trigger(once=True))
   elif self.write_mode.value.lower() == "complete":
       # Création de la configuration de base du stream
       (stream_writer.outputMode("complete")
        .option("checkpointLocation", self.checkpoint_location)
        .option("mergeSchema", "true")
        .trigger(once=True))
   elif self.write_mode.value.lower() == "update":
       (stream_writer.outputMode("update")
        .option("checkpointLocation", self.checkpoint_location)
        .option("mergeSchema", "true")
        .trigger(once=True))

   # Lancement du stream et capture de la référence
   query = stream_writer.start()
   query.awaitTermination()
                 
 





szymon_dybczak
Esteemed Contributor III

Hi @seefoods ,

I'm assuming that you're currently testing your code, so hardcoding table_name was done purposefully.

I guess you have a bug in your code.  By default saveAsTable will throw exception if the data already exists. This can be changed using different mode (append, overwrite etc.). So that tells me that something is wrong with the way you're setting below values:

szymon_dybczak_0-1758625922424.png


In my opinion you're setting outputMode on stream_writer object in incorrect way. OutputMode returns new DataStreamWriter object, but you forgot to assign this new DataStreamWriter to your stream_writer variable.
So, basically the above series of if statements don't have any effect on the output mode and hence default mode is used (which is causing an exception):

Try to do it in following way:

 

if self.write_mode.value.lower() == "append":
    stream_writer = (
        stream_writer.outputMode("append")
          .option("checkpointLocation", self.checkpoint_location)
          .option("mergeSchema", "true")
          .trigger(once=True)
    )
elif self.write_mode.value.lower() == "complete":
    stream_writer = (
        stream_writer.outputMode("complete")
          .option("checkpointLocation", self.checkpoint_location)
          .option("mergeSchema", "true")
          .trigger(once=True)
    )
elif self.write_mode.value.lower() == "update":
    stream_writer = (
        stream_writer.outputMode("update")
          .option("checkpointLocation", self.checkpoint_location)
          .option("mergeSchema", "true")
          .trigger(once=True)
    )

query = stream_writer.start()
query.awaitTermination()



BS_THE_ANALYST
Esteemed Contributor

Hi @seefoods, you've got a line in your code:

table_name : str = "test"

Surely hard coding that in is going to cause some issues 😀.

All the best,

BS

Hi @BS_THE_ANALYST its just an example 😁 for sure !!

I've been caught out more times than I'd like to admit with the hardcoded tests causing issues 🤣

Glad your issue got resolved! Best of luck with the project. Would love to hear more about it once you've finished!

All the best,
BS

seefoods
Valued Contributor

Hello @szymon_dybczak , 

Thanks. I have some few questions about trigger options: what's the difference between trigger Once and trigger AvailableNow() ? 

szymon_dybczak
Esteemed Contributor III

Conceptually they're the same. So they will load all available data. But the implementation differs.
In the case of trigger.Once, Spark Structured Streaming will try to load all available data in a single micro-batch. As you can imagine, if there’s a very large amount of data, this can cause serious issues. That’s why this option is deprecated.
trigger.AvailableNow will also load all available data, but by using a series of micro-batches. In the end, the effect will be the same, but with AvailableNow you won’t risk crashing the cluster when trying to load a massive amount of data 🙂

seefoods
Valued Contributor

Thanks a lot @szymon_dybczak 😊

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now