cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

autoloader running task batch

seefoods
Contributor II

Hello Guys, 
I run task in batch mode with autoloader i enable option trigger (available now true). So, when my script finish, the continue running. Someone know who's happen? 

Cordially ;

3 REPLIES 3

SP_6721
Contributor III

Hi @seefoods ,

We usually use Trigger.AvailableNow when files arrive in batches rather than continuously. If your script keeps running even after processing, it could be that the job is still checking for any remaining files.

If it seems to be hanging for too long, try adding .awaitTermination()

seefoods
Contributor II

when i use  .awaitTermination() it loop so long 

seefoods
Contributor II

this is my script I enable this options when i read files on Volumes before write on delta table

(reader_stream.option("cloudFiles.format", self.file_format)
.option("cloudFiles.schemaLocation", self.schema_location)
.option("cloudFiles.useNotifications", True)
.option("cloudFiles.validateOptions", True)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.option("cloudFiles.maxFilesPerTrigger", 1000))





f self.autoloader_config.use_autoloader:
logger_file_ingestion.info("debut d'ecriture en mode streaming")

if self.write_mode.value.lower() == "append":
logger_file_ingestion.info("ecriture en mode %s", self.write_mode.value)

# Crรฉation de la configuration de base du stream
stream_writer = (df.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", self.checkpoint_location)
.option("mergeSchema", "true")
.trigger(availableNow=True))

# Ajout des partitions si nรฉcessaire
if (self.source_name.lower() == "name") and (self.file_format.lower() == "parquet"๐Ÿ˜ž
stream_writer = stream_writer.partitionBy("year", "day", "month")
elif (self.source_name.lower() == "test") and (self.file_format.lower() == "parquet"๐Ÿ˜ž
stream_writer = stream_writer.partitionBy("day", "month", "year")

# Lancement du stream et capture de la rรฉfรฉrence
stream_writer.toTable("bronze")