- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
6 hours ago - last edited 5 hours ago
Hello @Ashwin_DSA
thank you very much for this! Sorry for the delayed response but I was on a vacation for quite long time. Auto loader seems to be good direction I believe. Btw. Is there a way to run the job as soon as file is uploaded? I assume what you have in mind is to have file arrival trigger on ingestion job and inside this job do:
bronze_df = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json") # csv, parquet, json, avro, etc.
.option("cloudFiles.schemaLocation", schema_location)
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.option("cloudFiles.includeExistingFiles", "true")
.load(source_volume_path)
.withColumn("ingest_ts", current_timestamp())
.withColumn("source_file", input_file_name())
)
# Write stream to bronze table with Trigger.AvailableNow
query = (
bronze_df.writeStream
.format("delta")
.option("checkpointLocation", checkpoint_path)
.option("mergeSchema", "true")
.outputMode("append")
.trigger(availableNow=True) # process all new files, then stop
.toTable(target_table)
)What if we would like to have job which is run immediately after file is uploaded (without 60 sec wait)? I assume that only one approach is to have this job running constantly and in python code use .trigger(processingTime="30 seconds") to process changes every 30 sec, correct?