- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-23-2025 04:16 AM - edited 09-23-2025 04:24 AM
Hi @seefoods ,
I'm assuming that you're currently testing your code, so hardcoding table_name was done purposefully.
I guess you have a bug in your code. By default saveAsTable will throw exception if the data already exists. This can be changed using different mode (append, overwrite etc.). So that tells me that something is wrong with the way you're setting below values:
In my opinion you're setting outputMode on stream_writer object in incorrect way. OutputMode returns new DataStreamWriter object, but you forgot to assign this new DataStreamWriter to your stream_writer variable.
So, basically the above series of if statements don't have any effect on the output mode and hence default mode is used (which is causing an exception):
Try to do it in following way:
if self.write_mode.value.lower() == "append":
stream_writer = (
stream_writer.outputMode("append")
.option("checkpointLocation", self.checkpoint_location)
.option("mergeSchema", "true")
.trigger(once=True)
)
elif self.write_mode.value.lower() == "complete":
stream_writer = (
stream_writer.outputMode("complete")
.option("checkpointLocation", self.checkpoint_location)
.option("mergeSchema", "true")
.trigger(once=True)
)
elif self.write_mode.value.lower() == "update":
stream_writer = (
stream_writer.outputMode("update")
.option("checkpointLocation", self.checkpoint_location)
.option("mergeSchema", "true")
.trigger(once=True)
)
query = stream_writer.start()
query.awaitTermination()