Monday
Hello Guys,
when i append i have this error someone knows how to fix it?
raise converted from None pyspark.errors.exceptions.captured.AnalysisException: [TABLE_OR_VIEW_ALREADY_EXISTS] Cannot create table or view `s_test` because it already exists. Choose a different name, drop the existing object, add the IF NOT EXISTS clause to tolerate pre-existing objects, add the OR REPLACE clause to replace the existing materialized view, or add the OR REFRESH clause to refresh the existing streaming table. SQLSTATE: 42P07 SQLSTATE: 39000 SQLSTATE: XXKST
Cordially,
yesterday - last edited yesterday
Hi @seefoods ,
I'm assuming that you're currently testing your code, so hardcoding table_name was done purposefully.
I guess you have a bug in your code. By default saveAsTable will throw exception if the data already exists. This can be changed using different mode (append, overwrite etc.). So that tells me that something is wrong with the way you're setting below values:
In my opinion you're setting outputMode on stream_writer object in incorrect way. OutputMode returns new DataStreamWriter object, but you forgot to assign this new DataStreamWriter to your stream_writer variable.
So, basically the above series of if statements don't have any effect on the output mode and hence default mode is used (which is causing an exception):
Try to do it in following way:
if self.write_mode.value.lower() == "append":
stream_writer = (
stream_writer.outputMode("append")
.option("checkpointLocation", self.checkpoint_location)
.option("mergeSchema", "true")
.trigger(once=True)
)
elif self.write_mode.value.lower() == "complete":
stream_writer = (
stream_writer.outputMode("complete")
.option("checkpointLocation", self.checkpoint_location)
.option("mergeSchema", "true")
.trigger(once=True)
)
elif self.write_mode.value.lower() == "update":
stream_writer = (
stream_writer.outputMode("update")
.option("checkpointLocation", self.checkpoint_location)
.option("mergeSchema", "true")
.trigger(once=True)
)
query = stream_writer.start()
query.awaitTermination()
yesterday
Conceptually they're the same. So they will load all available data. But the implementation differs.
In the case of trigger.Once, Spark Structured Streaming will try to load all available data in a single micro-batch. As you can imagine, if there’s a very large amount of data, this can cause serious issues. That’s why this option is deprecated.
trigger.AvailableNow will also load all available data, but by using a series of micro-batches. In the end, the effect will be the same, but with AvailableNow you won’t risk crashing the cluster when trying to load a massive amount of data 🙂
Monday
Root Cause:
The table or view s_test already exists in the catalog.
The code tries a CREATE TABLE without IF NOT EXISTS or without dropping the existing table first.
The underlying Spark or SQL engine enforces uniqueness of table names and raises an error if the same name is reused improperly.
Usually if there is a Repeated CREATE TABLE commands in code or pipeline without handling existence. Solution:
Approach | Description | Code Example |
Use IF NOT EXISTS | Avoid error by conditionally creating table only if it does not exist. | CREATE TABLE IF NOT EXISTS s_test (...) |
Add OR REPLACE for Views | For materialized views, replace existing with new definition. | CREATE OR REPLACE VIEW s_test AS ... |
Drop Table Before Create | Explicitly drop existing table before creation, ensures clean slate. | DROP TABLE IF EXISTS s_test; CREATE TABLE s_test (...) |
Use Spark Write Modes | Use Spark DataFrameWriter mode("append") or mode("overwrite") depending on use case. | df.write.mode("append").saveAsTable("s_test") |
Use OR REFRESH for Streaming | If streaming table, use OR REFRESH clause if supported to refresh streaming query. | CREATE STREAMING TABLE OR REFRESH s_test (...) |
Monday
Solution:
can you try below
Change logic in creation mode:
Only use .mode("append") or .mode("overwrite") when the table exists.
Use .mode("ignore") or .mode("errorIfExists") (default) appropriately.
Avoid .mode("overwrite" in streaming foreachBatch to prevent dropping the table schema and causing errors.
Create the table explicitly before streaming starts:
Manually create the Delta table once outside the streaming foreachBatch logic.
Inside foreachBatch, only append data or perform merges on the existing table.
Check and clean up checkpoint location if needed:
Sometimes checkpoints get corrupted, causing streaming retries to attempt table creation again.
Monday
Hello guys,
this is my source code
yesterday - last edited yesterday
Hi @seefoods ,
I'm assuming that you're currently testing your code, so hardcoding table_name was done purposefully.
I guess you have a bug in your code. By default saveAsTable will throw exception if the data already exists. This can be changed using different mode (append, overwrite etc.). So that tells me that something is wrong with the way you're setting below values:
In my opinion you're setting outputMode on stream_writer object in incorrect way. OutputMode returns new DataStreamWriter object, but you forgot to assign this new DataStreamWriter to your stream_writer variable.
So, basically the above series of if statements don't have any effect on the output mode and hence default mode is used (which is causing an exception):
Try to do it in following way:
if self.write_mode.value.lower() == "append":
stream_writer = (
stream_writer.outputMode("append")
.option("checkpointLocation", self.checkpoint_location)
.option("mergeSchema", "true")
.trigger(once=True)
)
elif self.write_mode.value.lower() == "complete":
stream_writer = (
stream_writer.outputMode("complete")
.option("checkpointLocation", self.checkpoint_location)
.option("mergeSchema", "true")
.trigger(once=True)
)
elif self.write_mode.value.lower() == "update":
stream_writer = (
stream_writer.outputMode("update")
.option("checkpointLocation", self.checkpoint_location)
.option("mergeSchema", "true")
.trigger(once=True)
)
query = stream_writer.start()
query.awaitTermination()
Monday
Hi @seefoods, you've got a line in your code:
table_name : str = "test"
Surely hard coding that in is going to cause some issues 😀.
All the best,
BS
yesterday
Hi @BS_THE_ANALYST its just an example 😁 for sure !!
yesterday
I've been caught out more times than I'd like to admit with the hardcoded tests causing issues 🤣.
Glad your issue got resolved! Best of luck with the project. Would love to hear more about it once you've finished!
All the best,
BS
yesterday
Hello @szymon_dybczak ,
Thanks. I have some few questions about trigger options: what's the difference between trigger Once and trigger AvailableNow() ?
yesterday
Conceptually they're the same. So they will load all available data. But the implementation differs.
In the case of trigger.Once, Spark Structured Streaming will try to load all available data in a single micro-batch. As you can imagine, if there’s a very large amount of data, this can cause serious issues. That’s why this option is deprecated.
trigger.AvailableNow will also load all available data, but by using a series of micro-batches. In the end, the effect will be the same, but with AvailableNow you won’t risk crashing the cluster when trying to load a massive amount of data 🙂
yesterday
Thanks a lot @szymon_dybczak 😊
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now