seefoods
Valued Contributor

Hello guys,
this is my source code

 

def batch_writer(self, batch_df: DataFrame, batch_id: int):

   app_id: str = self.spark.sparkContext.applicationId

   writer = batch_df.write.format("delta")

   table_name : str = "test"

   if self.spark.catalog.tableExists(table_name):
       if self.write_mode.value.lower() == "append":
           writer = writer.mode("append").option("txnVersion", batch_id).option("txnAppId", app_id)

       elif self.write_mode.value.lower() == "overwrite":
           writer = writer.mode("overwrite").option("txnVersion", batch_id).option("txnAppId", app_id)
   else:
           writer = writer.mode("overwrite").option("txnVersion", batch_id).option("txnAppId", app_id)

   if self.partition_columns:
       writer = writer.partitionBy(*self.partition_columns)
   writer.saveAsTable(f"test")

 

 


 

def _write_streaming_to_delta(self, df: DataFrame, spark: SparkSession = None, *args, **kwargs):
   stream_writer = (df.writeStream.foreachBatch(self.batch_writer))
   if self.write_mode.value.lower() == "append":
       # Création de la configuration de base du stream
       (stream_writer.outputMode("append")
        .option("checkpointLocation", self.checkpoint_location)
        .option("mergeSchema", "true").trigger(once=True))
   elif self.write_mode.value.lower() == "complete":
       # Création de la configuration de base du stream
       (stream_writer.outputMode("complete")
        .option("checkpointLocation", self.checkpoint_location)
        .option("mergeSchema", "true")
        .trigger(once=True))
   elif self.write_mode.value.lower() == "update":
       (stream_writer.outputMode("update")
        .option("checkpointLocation", self.checkpoint_location)
        .option("mergeSchema", "true")
        .trigger(once=True))

   # Lancement du stream et capture de la référence
   query = stream_writer.start()
   query.awaitTermination()