- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
3 weeks ago
Hello Community!
I am writing to you with my idea about data ingestion job which we have to implement in our project.
The data which we have are in CSV file format and depending on the case it differs a little bit. Before uploading we pivoting csv files to have unified schema. Currently we use github actions to copy the data to volume and when all files are copied we start ingestion job. The same can be done via manual upload, a running the job manually.
Ingestion job is responsible for validation, data transformation (let's say normalization) and data merge into final table.
We would like to automate our pipeline as much as we can. What we think of first is to use auto job run as soon as new files are added to the volume. However is there a possiblity to know what files have been uploaded? As far as I browsed through the documentation it seems, it is not? So I guess we have to create something like audit table to verify what files have been already uploaded, correct?
If you have any suggestions how to approach this data ingestion in general, I would really be thankful for that!
Thank you very much!
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
3 weeks ago
Hi @maikel,
You don't have to build a custom solution for this. Databricks now has native components that align very well with what you want.
If you want the job to start as soon as new files land in a volume, the recommended approach is to use file-arrival triggers on a Unity Catalog volume or external location, and have that trigger start your ingestion job or Lakehouse pipeline. You point the trigger at something like /Volumes/<catalog>/<schema>/<volume>/incoming/, and Databricks will poll for new files (roughly once a minute) and fire the job when it sees new arrivals, without needing GitHub Actions to orchestrate that part anymore. See the docs for file-arrival triggers and using volumes for ingestion.
For “how do I know which files have been uploaded/processed?”, the key is to lean on Auto Loader rather than rolling your own state tracking. When you read from the volume with spark.readStream.format("cloudFiles") (for example, with cloudFiles.format = "csv"), Auto Loader persists file metadata in its checkpoint and uses that to guarantee that each file is processed exactly once and that the stream can resume safely after failures.... You don’t need a separate audit table just to avoid reprocessing the same file. See What is Auto Loader? and the “How does Auto Loader track ingestion progress?” section there.
If you want human-readable observability ("which files, when, and in which batch?"), then yes, it’s common to add an ingestion log table on top... either by querying Auto Loader’s cloud_files_state metadata (which stores per-file state including commit_time) or by logging the path column from your stream in a small foreachBatch into a Delta table. That gives you a clean audit trail without owning the low-level dedup logic yourself. The heavy lifting still comes from Auto Loader’s internal state. The relevant options and the cloud_files_state TVF are documented under Auto Loader options.
A robust pattern for your scenario is... land CSVs (from GitHub Actions or manual upload) into a Unity Catalog volume, trigger a job on file arrival, use Auto Loader from that volume into a bronze table, then do your validation/normalisation and any pivoting into silver, and finally MERGE into the final table. This keeps uploads simple, makes ingestion incremental and mostly self-driven, and still lets you add an explicit audit table if you want extra transparency for which files were processed when.
By the way, are you exporting data to CSV from an upstream system and then uploading it to the volume for any specific reason (governance, network, tooling, etc.)? If you have direct access to the source system, you might also look at pulling data straight into Databricks using Lakeflow Connect instead of going via CSV. Lakeflow Connect provides managed connectors for common SaaS apps and databases, with incremental ingestion into streaming tables, which can remove a lot of custom file-handling logic. See What is Lakeflow Connect? and Managed connectors in Lakeflow Connect. if you are interested to learn more.
If this answer resolves your question, could you mark it as “Accept as Solution”? That helps other users quickly find the correct fix.
Ashwin | Delivery Solution Architect @ Databricks
Helping you build and scale the Data Intelligence Platform.
***Opinions are my own***
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
9 hours ago - last edited 8 hours ago
Hello @Ashwin_DSA
thank you very much for this! Sorry for the delayed response but I was on a vacation for quite long time. Auto loader seems to be good direction I believe. Btw. Is there a way to run the job as soon as file is uploaded? I assume what you have in mind is to have file arrival trigger on ingestion job and inside this job do:
bronze_df = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json") # csv, parquet, json, avro, etc.
.option("cloudFiles.schemaLocation", schema_location)
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.option("cloudFiles.includeExistingFiles", "true")
.load(source_volume_path)
.withColumn("ingest_ts", current_timestamp())
.withColumn("source_file", input_file_name())
)
# Write stream to bronze table with Trigger.AvailableNow
query = (
bronze_df.writeStream
.format("delta")
.option("checkpointLocation", checkpoint_path)
.option("mergeSchema", "true")
.outputMode("append")
.trigger(availableNow=True) # process all new files, then stop
.toTable(target_table)
)What if we would like to have job which is run immediately after file is uploaded (without 60 sec wait)? I assume that only one approach is to have this job running constantly and in python code use .trigger(processingTime="30 seconds") to process changes every 30 sec, correct?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
7 hours ago
Hi @maikel,
Not exactly. If you're using a Databricks file arrival trigger, it doesn't fire instantly when a file is uploaded. It makes a best-effort check roughly every minute, so it's better to think of it as near-real-time rather than immediate execution. In that setup, the usual pattern is to let the file arrival trigger start the job, and then use Auto Loader inside the job with trigger(availableNow=True) so it processes everything that has arrived since the last run and then exits cleanly.
If you need lower latency than that, then yes, you're generally moving away from a file-arrival-triggered batch pattern and into a long-running streaming workload. That said, I wouldn't position trigger(processingTime="30 seconds") as the only option, or even the default recommendation. Databricks recommends file arrival triggers for event-driven pipelines, and if you do use time-based streaming triggers, the guidance is to start at around 1 minute or higher. For very latency-sensitive use cases, Databricks also suggests considering the classic file notification mode, since managed file events add an extra caching hop that can increase latency slightly.
Hope this helps.
If this answer resolves your question, could you mark it as “Accept as Solution”? That helps other users quickly find the correct fix.
Ashwin | Delivery Solution Architect @ Databricks
Helping you build and scale the Data Intelligence Platform.
***Opinions are my own***
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
6 hours ago
Yeah, understood. Thank you very much once again!