cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Large/complex Incremental Autoloader Job -- Seeking Experience on approach

lprevost
Contributor

I'm experimenting with several approaches to implement an incremental autoloader query either in DLT or in a pipeline job.   The complexities:

- Moving approximately 30B records from a nasty set of nested folders on S3 in several thousand csv files.  The table structures are very simple however (two tables of 2 and 3 columns).  This csv "swamp" grows incrementally very slowly -- with a new subfolder each month.  these nested folders have many types of files in them including my files of interest, and others that I don't want in my data lake.

- Through experimentation, I have found the magic combination of glob paths in the load statement and the pathGlob option to target the files I want.  

 

 

 

 

crawl_path =  BASE + "*/*/edges/
(return (spark.readStream.format("cloudFiles") 
     .option("cloudFiles.format", "csv") 
     .option("cloudFiles.maxBytesPerTrigger", "50g") 
     .option("pathGlobfilter", "*.gz") 
     .schema(schemas[tblname]) 
     .option("sep", "\t")
     .load(BASE + "*/*/edges/)
    )

 

 

 

 

My questions revolve around how to take this bite in chunks but I am struggling with how due to limits of the readstream statement (won't take a file list like others like AWS glue do - or will it?   See below).   The underlying files range in size from about 300Mb to over 3GB which are unsplittable .gz files, this makes for very lumpy and skewed job processing.   

Here is what I'm considering based on considerable experimentation:

  1. Big Bite: Just let it go and crawl all and take the resulting cost hit on what is probably a 24 hour run.
  2. Symlinks -- I wonder if Autoloader will crawl and checkpoint a directory full of symlinks?   That way, I could add symlinks to a folder incrementally and let autoloader process them.
  3. Incremental load path -- I've found through experimentation that I cannot filter the above statement only on a subpath (readstream ...... .where("filter statement") as autoloader's checkpoint places all files in the previous read checkpoint and does not read them on a subsequent read even though they were not all copied.   Unless I'm missing something, I've not found much documentation on how autoloader does checkpointing nor is there much control over it.     So, I've considered incrementally adding to the readstream load statement with incremental paths (eg. readstream ...... load(crawl_path + "/filterfolder/").  I could pass "filterfolder" as a job parameter and run multiple jobs each adding to the checkpoint unitl all are complete.
  4. Multiple streams -- As I'm researching this question, I found the solution https://community.databricks.com/t5/data-engineering/configure-multiple-source-paths-for-auto-loader... that seems to indicate I can give autoloader a list of sources.   I assume this json is just input options to the readstream statement?   

Would appreciate any voices of experience and wisdom on this one.  I have burned up a lot of time pondering this one.   As my Dad used to say, "Experience is the best teacher of them all but fools will learn no other way!"

4 REPLIES 4

lprevost
Contributor

This seems important for item 4 option:   change source path for autoloader 

Potential option for #4 "multiple streams" ??  if this works, could be a game changer for me.

reader_args = {'cloudFiles': {'format': 'csv',
   'maxBytesPerTrigger': '50g',
   'source': [{'path': '/[mypath]/[subpath1]/*/edges/',
     'globPattern': '*.gz',
     'recursive': True},
    {'path': '/[mypath]/[subpath2]/*/edges/',
     'globPattern': '*.gz',
     'recursive': True}]}}

   
(spark.readStream
     .options(reader_args)
     .schema([myschema]) 
     .option("sep", "\t") 
     .load()
    )

lprevost
Contributor

@Kaniz_Fatma - any ideas here? Specifically, is there a way to pass multiple folders to a load statement similar to the thread posted above in item 4 of my original question


Multiple streams -- As I'm researching this question, I found the solution https://community.databricks.com/t5/data-engineering/configure-multiple-source-paths-for-auto-loader... that seems to indicate I can give autoloader a list of sources.   I assume this json is just input options to the readstream statement?   

Would appreciate any voices of experience and wisdom on this one. 



seems to indicate?

Status update:

have been unsuccessful at getting anything to work on approach 3 and 4.  Did not try 2.  On 3, I donโ€™t understand why that wonโ€™t work.  But when I changed sub folders, autoloader would not incrementally load them.  

i have been successful on approach 1 using a slightly narrower glob pattern so as to avoid the really ugly large unsplittable files.  

im still wondering if 3 or 4 are possible?

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group