szymon_dybczak
Esteemed Contributor III

Hi @seefoods ,

I'm assuming that you're currently testing your code, so hardcoding table_name was done purposefully.

I guess you have a bug in your code.  By default saveAsTable will throw exception if the data already exists. This can be changed using different mode (append, overwrite etc.). So that tells me that something is wrong with the way you're setting below values:

szymon_dybczak_0-1758625922424.png


In my opinion you're setting outputMode on stream_writer object in incorrect way. OutputMode returns new DataStreamWriter object, but you forgot to assign this new DataStreamWriter to your stream_writer variable.
So, basically the above series of if statements don't have any effect on the output mode and hence default mode is used (which is causing an exception):

Try to do it in following way:

 

if self.write_mode.value.lower() == "append":
    stream_writer = (
        stream_writer.outputMode("append")
          .option("checkpointLocation", self.checkpoint_location)
          .option("mergeSchema", "true")
          .trigger(once=True)
    )
elif self.write_mode.value.lower() == "complete":
    stream_writer = (
        stream_writer.outputMode("complete")
          .option("checkpointLocation", self.checkpoint_location)
          .option("mergeSchema", "true")
          .trigger(once=True)
    )
elif self.write_mode.value.lower() == "update":
    stream_writer = (
        stream_writer.outputMode("update")
          .option("checkpointLocation", self.checkpoint_location)
          .option("mergeSchema", "true")
          .trigger(once=True)
    )

query = stream_writer.start()
query.awaitTermination()



View solution in original post