maikel
Contributor II

Hello @Ashwin_DSA 

thank you very much for this! Sorry for the delayed response but I was on a vacation for quite long time. Auto loader seems to be good direction I believe. Btw. Is there a way to run the job as soon as file is uploaded? I assume what you have in mind is to have file arrival trigger on ingestion job and inside this job do:

bronze_df = (
    spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")              # csv, parquet, json, avro, etc.
        .option("cloudFiles.schemaLocation", schema_location)
        .option("cloudFiles.inferColumnTypes", "true")
        .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
        .option("cloudFiles.includeExistingFiles", "true")
        .load(source_volume_path)
        .withColumn("ingest_ts", current_timestamp())
        .withColumn("source_file", input_file_name())
)

# Write stream to bronze table with Trigger.AvailableNow
query = (
    bronze_df.writeStream
        .format("delta")
        .option("checkpointLocation", checkpoint_path)
        .option("mergeSchema", "true")
        .outputMode("append")
        .trigger(availableNow=True)                       # process all new files, then stop
        .toTable(target_table)
)

 What if we would like to have job which is run immediately after file is uploaded (without 60 sec wait)? I assume that only one approach is to have this job running constantly and in python code use .trigger(processingTime="30 seconds") to process changes every 30 sec, correct?