cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Pipeline workflow dude

apiury
New Contributor III

Hi! I have a problem. I'm using an autoloader to ingest data from raw to a Delta Lake, but when my pipeline starts, I want to apply the pipeline only to the new data. The autoloader ingests data into the Delta Lake, but now, how can I distinguish the new data from the old?

1 ACCEPTED SOLUTION

Accepted Solutions

etsyal1e2r3
Honored Contributor

Ive had issues trying to ingest with Autoloader as a single batch process into a dataframe. Its mainly for writing directly to a table or for streaming. Ive concluded the best way is to autoload into bronze then do a spark.read into a dataframe to transform and then write/upsert to tables with spark.sql

View solution in original post

9 REPLIES 9

etsyal1e2r3
Honored Contributor

You can add a column and give it a value of the days date that you ran for the newly added data with the selectExpr() function in autoloader. Itd look something like this...

From pyspark.sql.functions import current_timestamp
 
spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .load("<source-data-with-nested-json>") \
  .selectExpr(
    "*",
    "current_timestamp() as `Date_Pulled`",
  )

apiury
New Contributor III

but why add days day column? I thought that autoloader keep the track of new files. My problem is, how can i process only new files. My data is binary and i have to apply a transformation but i dont want to apply it all data.

etsyal1e2r3
Honored Contributor

Autoloader keeps track of files yeah so that it only reads them once to prevent duplicates. If you do a count before and after autoloader each time youll see that it only adds new data. Now do you have a @timestamp column? Im not sure what your logic looks like in the pipeline but if you have a timestamp or date_pulled column you can filter the pipeline query to grab the data thar doesnt exist yet in the next table in the pipeline by checking it for the last timestamp/date_pilled data. But if you just grab all the data into a dataframe you can just do an upsert to the new table which will update existing records (if you want) and insert new ones. I can only speculate what your logic looks like though without more info 🙂

apiury
New Contributor III

Checking the data that doesn't exit yet in the next table and apply it transformation not is the same that use autoloader after the first ingest again? For example, i have binary data (pcap file format), in bronze layer. I want to transform the pcap into csv format and ingest in silver layer, but I dont want proccess the whole data each time, so only new files arrive.

etsyal1e2r3
Honored Contributor

Yeah well you have to do an upsert with a generated checksum with all the data or only grab data after a certain datetime.

apiury
New Contributor III

Okey. Then, after ingest data with autoloader, check the new data by for example, a date pulled column. My last dude is, why use autoloader only for ingestion (in this case, bronze layer, and not use for ingest bronze data into silver too?)

etsyal1e2r3
Honored Contributor

Ive had issues trying to ingest with Autoloader as a single batch process into a dataframe. Its mainly for writing directly to a table or for streaming. Ive concluded the best way is to autoload into bronze then do a spark.read into a dataframe to transform and then write/upsert to tables with spark.sql

etsyal1e2r3
Honored Contributor

So in your case you would pull data in as pcap then pull from that table to write to csv... not sure how well pcap to a table work bc ive never looked. But as long as you can write data to a table you can save as csv or just export the data as csv depending on your requirements.

Anonymous
Not applicable

Hi @Alejandro Piury Pinzón​ 

We haven't heard from you since the last response from @Tyler Retzlaff​ ​, and I was checking back to see if her suggestions helped you.

Or else, If you have any solution, please share it with the community, as it can be helpful to others. 

Also, Please don't forget to click on the "Select As Best" button whenever the information provided helps resolve your question.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.