- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-04-2024 12:28 PM
Hey guys,
I've been looking for some docs on how autoloader manages the source outage, I am currently running the following code:
dfBronze = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema(json_schema_bronze)
.load("myS3Source")\
.withColumn("file_path", col("_metadata.file_path")) \
.withColumn("ingestion_time", current_timestamp())\
.writeStream \
.format("delta") \
.option("checkpointLocation", checkpoint_dir_path_bronze) \
.outputMode("append") \
.trigger(availableNow=True) \ #i want to change this to .trigger(processingTime="1 second")
.start(bronze_table)
)
My question would be if i run this code will it attach to the cluster and permanently wait for file arrivals? even if the source streaming has an outage?:
Does the last screenshot mean that i will not run again unless i trigger it?
If I stop/detach the autoloader once it is run again will it sync all the files that arrived during the "autoloader outage".
I know last question is technically answered, but just want to make sure im understanding correctly.
thanks for the help
- Labels:
-
Delta Lake
-
Spark
-
Workflows
Accepted Solutions
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-05-2024 11:22 AM
Hi @sakuraDev ,
1. Using the availableNow trigger to process all available data immediately and then stop the query. As you noticed your data was processed once and now you need to trigger the process once again to process new files.
2. Changing the trigger to .trigger(processingTime="1 second") means that the streaming query will attempt to process any new files every second. If there are no new files due to a source outage, the query will not terminate; it will continue to check for new files at the specified interval.
Important consideration
The cluster would be running continuously. This means much bigger costs compared to running the process with availableNow trigger.
3. Don't worry about the outage. The whole idea of autoloader is the checkpointing mechanism. If you stop or detach the Auto Loader job and then restart it, the job will resume processing from where it left off.. The checkpointLocation option you've specified allows Auto Loader to keep track of which files have been processed. When the job is restarted, it will process any new files that arrived during the outage, ensuring no data is missed.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-05-2024 11:22 AM
Hi @sakuraDev ,
1. Using the availableNow trigger to process all available data immediately and then stop the query. As you noticed your data was processed once and now you need to trigger the process once again to process new files.
2. Changing the trigger to .trigger(processingTime="1 second") means that the streaming query will attempt to process any new files every second. If there are no new files due to a source outage, the query will not terminate; it will continue to check for new files at the specified interval.
Important consideration
The cluster would be running continuously. This means much bigger costs compared to running the process with availableNow trigger.
3. Don't worry about the outage. The whole idea of autoloader is the checkpointing mechanism. If you stop or detach the Auto Loader job and then restart it, the job will resume processing from where it left off.. The checkpointLocation option you've specified allows Auto Loader to keep track of which files have been processed. When the job is restarted, it will process any new files that arrived during the outage, ensuring no data is missed.

