06-26-2024 07:18 AM
The databricks guide outlines several use cases for loading data via delta live tables:
https://docs.databricks.com/en/delta-live-tables/load.html
This includes autoloader from cloudfiles, kafka messages, small static datasets, etc.
But, one use case it doesn't cover is incremental loading from a hive table catalog where those partitions are being incrementally added by another process.. This table points to partitioned raw csv files which are very large (Terabytes) of incrementally growing data. The partitions are added as the source data are added.
I'd like to create a DLT pipeline that handles this similar to how it would via autoloader using triggers available now and maxbytes parameters to ingest it a batch at a time. so, instead of checkpointing filenames which is how autoloader would handle this, I would prefer to checkpoint partitions and batch load the new partition after initially loading them all.
I've considered just trying:
spark.readstream.table("my_big_catalog_table").options("maxbytes" : "20g").trigger("availablenow") to use pseudocode. But, I'm unsure how the checkpointing would work.
I realize that I could use autoloader to read the source csv files but my problem in this case is the source files are in a deeply nested directory structure and the logic to read them via globbing patterns is complex and has already been solved with another process that updates the catalog table partitions and source directory information.
06-27-2024 07:13 AM - edited 06-27-2024 07:16 AM
Hi @lprevost,
I don't think there's a way to "checkpoint partitions" as you said.
For the gzip files, probably your executor is running out of memory during the decompression process. One of the few solutions that doesn't require changing your source files would be to increase the executors memory.
To enable Gzip parallel processing, this lib might be of your interest although I don't think it could address any memory issues based on the way the library works: https://github.com/nielsbasjes/splittablegzip
07-01-2024 11:09 AM
Hello @lprevost , I hope this message finds you well.
In order to install this library, you can follow any of the steps below:
- First step - search for libraries and versions:
- Second step - quick install if you already know the "Coordinates":
Done.
Best regards,
Lucas Rocha
06-26-2024 12:51 PM
Hello @lprevost ,
By default, Autoloader will discover and read files according to their path lexical order. It doesn't matter how nested your folder structure is.
if you're interested in loading all .csv files then a common cloudFiles (Autoloader) read or readStream operation should suffice in your case.
06-26-2024 05:01 PM
Thank you for your reply. I've got some other reasons I don't want to do this with Autoloader as some upstream processes are already appending the catalog table with partitions.
Is there a way to do an incremental load from a catalog table with the partitions being the thing that is checkpointed?
I'm also having some problems with Autoloader as follows:
- I have a large number of gzip'd csv files. Most are manageable in size but a few are very large and spark gets hung on a few at the end of a group of tasks presumably because of skew towards the larger files. This takes an hour on some files with only one task running and all other nodes idle. I'm not sure that doing this without autoloader via catalog would solve this but I'm scratching my head as to how to solve.
I've read this is becasue gzip is unsplittable so it ties up one node until it has been completely read. the autoscaler has dropped out nodes that were almost finished reading so they start over.
I have about 650 files ranging in size from 30MB to 1.2GB.
06-26-2024 05:58 PM
A bit more on this. Looking a the time when my job really was dragging, I see a high percentage of CPU time going to "iowait" which I assume has to do with S3 and disk latency. See the yellow areas below in the CPU utilization graph. I am reading from a common s3 place into my own region and instance. So, am taking steps to isolate myself from AWS public. So, could be file size, could be AWS latency issues:
06-27-2024 07:13 AM - edited 06-27-2024 07:16 AM
Hi @lprevost,
I don't think there's a way to "checkpoint partitions" as you said.
For the gzip files, probably your executor is running out of memory during the decompression process. One of the few solutions that doesn't require changing your source files would be to increase the executors memory.
To enable Gzip parallel processing, this lib might be of your interest although I don't think it could address any memory issues based on the way the library works: https://github.com/nielsbasjes/splittablegzip
06-28-2024 08:30 AM
Thank you very much. I had read about that clever splittable codec elsewhere. I assume that can be 'installed' using a spark conf setting in the compute? --packages or some other way?
07-01-2024 10:22 AM
Hello @lprevost, yes this library is a .jar file. You can install it as a library in your cluster.
07-01-2024 11:09 AM
Hello @lprevost , I hope this message finds you well.
In order to install this library, you can follow any of the steps below:
- First step - search for libraries and versions:
- Second step - quick install if you already know the "Coordinates":
Done.
Best regards,
Lucas Rocha
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group