autoloader running task batch
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-02-2025 02:51 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-03-2025 05:56 AM
Hi @seefoods ,
We usually use Trigger.AvailableNow when files arrive in batches rather than continuously. If your script keeps running even after processing, it could be that the job is still checking for any remaining files.
If it seems to be hanging for too long, try adding .awaitTermination()
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-03-2025 06:08 AM
when i use .awaitTermination() it loop so long
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-03-2025 06:12 AM
this is my script I enable this options when i read files on Volumes before write on delta table
(reader_stream.option("cloudFiles.format", self.file_format)
.option("cloudFiles.schemaLocation", self.schema_location)
.option("cloudFiles.useNotifications", True)
.option("cloudFiles.validateOptions", True)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.option("cloudFiles.maxFilesPerTrigger", 1000))
f self.autoloader_config.use_autoloader:
logger_file_ingestion.info("debut d'ecriture en mode streaming")
if self.write_mode.value.lower() == "append":
logger_file_ingestion.info("ecriture en mode %s", self.write_mode.value)
# Création de la configuration de base du stream
stream_writer = (df.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", self.checkpoint_location)
.option("mergeSchema", "true")
.trigger(availableNow=True))
# Ajout des partitions si nécessaire
if (self.source_name.lower() == "name") and (self.file_format.lower() == "parquet"😞
stream_writer = stream_writer.partitionBy("year", "day", "month")
elif (self.source_name.lower() == "test") and (self.file_format.lower() == "parquet"😞
stream_writer = stream_writer.partitionBy("day", "month", "year")
# Lancement du stream et capture de la référence
stream_writer.toTable("bronze")