def _write_streaming_to_delta(self, df: DataFrame, spark: SparkSession = None, *args, **kwargs):
stream_writer = (df.writeStream.foreachBatch(self.batch_writer))
if self.write_mode.value.lower() == "append":
# Crรฉation de la configuration de base du stream
(stream_writer.outputMode("append")
.option("checkpointLocation", self.checkpoint_location)
.option("mergeSchema", "true").trigger(once=True))
elif self.write_mode.value.lower() == "complete":
# Crรฉation de la configuration de base du stream
(stream_writer.outputMode("complete")
.option("checkpointLocation", self.checkpoint_location)
.option("mergeSchema", "true")
.trigger(once=True))
elif self.write_mode.value.lower() == "update":
(stream_writer.outputMode("update")
.option("checkpointLocation", self.checkpoint_location)
.option("mergeSchema", "true")
.trigger(once=True))
# Lancement du stream et capture de la rรฉfรฉrence
query = stream_writer.start()
query.awaitTermination()