cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Incremental Loads from a Catalog/DLT

lprevost
New Contributor III

The databricks guide outlines several use cases for loading data via delta live tables:

https://docs.databricks.com/en/delta-live-tables/load.html 

This includes autoloader from cloudfiles, kafka messages, small static datasets, etc.

But, one use case it doesn't cover is incremental loading from a hive table catalog where those partitions are being incrementally added by another process..  This table points to partitioned raw csv files which are very large (Terabytes) of incrementally growing data.   The partitions are added as the source data are added.

I'd like to create a DLT pipeline that handles this similar to how it would via autoloader using triggers available now and maxbytes parameters to ingest it a batch at a time.  so, instead of checkpointing filenames which is how autoloader would handle this, I would prefer to checkpoint partitions and batch load the new partition after initially loading them all.   

I've considered just trying:

spark.readstream.table("my_big_catalog_table").options("maxbytes" : "20g").trigger("availablenow") to use pseudocode.  But, I'm unsure how the checkpointing would work.   

I realize that I could use autoloader to read the source csv files but my problem in this case is the source files are in a deeply nested directory structure and the logic to read them via globbing patterns is complex and has already been solved with another process that updates the catalog table partitions and source directory information.

5 REPLIES 5

raphaelblg
Contributor III
Contributor III

Hello @lprevost ,

By default, Autoloader will discover and read files according to their path lexical order. It doesn't matter how nested your folder structure is. 

if you're interested in loading all .csv files then a common cloudFiles (Autoloader) read or readStream operation should suffice in your case. 

Best regards,

Raphael Balogo
Sr. Technical Solutions Engineer
Databricks

lprevost
New Contributor III

Thank you for your reply.  I've got some other reasons I don't want to do this with Autoloader as some upstream processes are already appending the catalog table with partitions.    

Is there a way to do an incremental load from a catalog table with the partitions being the thing that is checkpointed?   

I'm also having some problems with Autoloader as follows:

- I have a large number of gzip'd csv files.  Most are manageable in size but a few are very large and spark gets hung on a few at the end of a group of tasks presumably because of skew towards the larger files.   This takes an hour on some files with only one task running and all other nodes idle.    I'm not sure that doing this without autoloader via catalog would solve this but I'm scratching my head as to how to solve.  

I've read this is becasue gzip is unsplittable so it ties up one node until it has been completely read. the autoscaler has dropped out nodes that were almost finished reading so they start over.   

I have about 650 files ranging in size from 30MB to 1.2GB.  

lprevost
New Contributor III

A bit more on this.  Looking a the time when my job really was dragging, I see a high percentage of CPU time going to "iowait" which I assume has to do with S3 and disk latency.  See the yellow areas below in the CPU utilization graph.  I am reading from a common s3 place into my own region and instance.  So, am taking steps to isolate myself from AWS public.  So, could be file size, could be AWS latency issues:

lprevost_0-1719449854194.png

 

Hi @lprevost

I don't think there's a way to "checkpoint partitions" as you said. 

For the gzip files, probably your executor is running out of memory during the decompression process. One of the few solutions that doesn't require changing your source files would be to increase the executors memory.

To enable Gzip parallel processing, this lib might be of your interest although I don't think it could address any memory issues based on the way the library works: https://github.com/nielsbasjes/splittablegzip

Best regards,

Raphael Balogo
Sr. Technical Solutions Engineer
Databricks

lprevost
New Contributor III

Thank you very much. I had read about that clever splittable codec elsewhere.   I assume that can be 'installed' using a spark conf setting in the compute?   --packages or some other way?

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!