this is my script I enable this options when i read files on Volumes before write on delta table
(reader_stream.option("cloudFiles.format", self.file_format)
.option("cloudFiles.schemaLocation", self.schema_location)
.option("cloudFiles.useNotifications", True)
.option("cloudFiles.validateOptions", True)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.option("cloudFiles.maxFilesPerTrigger", 1000))
f self.autoloader_config.use_autoloader:
logger_file_ingestion.info("debut d'ecriture en mode streaming")
if self.write_mode.value.lower() == "append":
logger_file_ingestion.info("ecriture en mode %s", self.write_mode.value)
# Crรฉation de la configuration de base du stream
stream_writer = (df.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", self.checkpoint_location)
.option("mergeSchema", "true")
.trigger(availableNow=True))
# Ajout des partitions si nรฉcessaire
if (self.source_name.lower() == "name") and (self.file_format.lower() == "parquet"๐
stream_writer = stream_writer.partitionBy("year", "day", "month")
elif (self.source_name.lower() == "test") and (self.file_format.lower() == "parquet"๐
stream_writer = stream_writer.partitionBy("day", "month", "year")
# Lancement du stream et capture de la rรฉfรฉrence
stream_writer.toTable("bronze")