โ08-07-2024 06:39 AM
I'm experimenting with several approaches to implement an incremental autoloader query either in DLT or in a pipeline job. The complexities:
- Moving approximately 30B records from a nasty set of nested folders on S3 in several thousand csv files. The table structures are very simple however (two tables of 2 and 3 columns). This csv "swamp" grows incrementally very slowly -- with a new subfolder each month. these nested folders have many types of files in them including my files of interest, and others that I don't want in my data lake.
- Through experimentation, I have found the magic combination of glob paths in the load statement and the pathGlob option to target the files I want.
crawl_path = BASE + "*/*/edges/
(return (spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.maxBytesPerTrigger", "50g")
.option("pathGlobfilter", "*.gz")
.schema(schemas[tblname])
.option("sep", "\t")
.load(BASE + "*/*/edges/)
)
My questions revolve around how to take this bite in chunks but I am struggling with how due to limits of the readstream statement (won't take a file list like others like AWS glue do - or will it? See below). The underlying files range in size from about 300Mb to over 3GB which are unsplittable .gz files, this makes for very lumpy and skewed job processing.
Here is what I'm considering based on considerable experimentation:
Would appreciate any voices of experience and wisdom on this one. I have burned up a lot of time pondering this one. As my Dad used to say, "Experience is the best teacher of them all but fools will learn no other way!"
โ08-07-2024 06:50 AM
This seems important for item 4 option: change source path for autoloader
โ08-07-2024 08:50 AM
Potential option for #4 "multiple streams" ?? if this works, could be a game changer for me.
reader_args = {'cloudFiles': {'format': 'csv',
'maxBytesPerTrigger': '50g',
'source': [{'path': '/[mypath]/[subpath1]/*/edges/',
'globPattern': '*.gz',
'recursive': True},
{'path': '/[mypath]/[subpath2]/*/edges/',
'globPattern': '*.gz',
'recursive': True}]}}
(spark.readStream
.options(reader_args)
.schema([myschema])
.option("sep", "\t")
.load()
)
โ08-08-2024 10:12 AM
@Kaniz_Fatma - any ideas here? Specifically, is there a way to pass multiple folders to a load statement similar to the thread posted above in item 4 of my original question
Multiple streams -- As I'm researching this question, I found the solution https://community.databricks.com/t5/data-engineering/configure-multiple-source-paths-for-auto-loader... that seems to indicate I can give autoloader a list of sources. I assume this json is just input options to the readstream statement?Would appreciate any voices of experience and wisdom on this one.
seems to indicate?
a month ago
Status update:
have been unsuccessful at getting anything to work on approach 3 and 4. Did not try 2. On 3, I donโt understand why that wonโt work. But when I changed sub folders, autoloader would not incrementally load them.
i have been successful on approach 1 using a slightly narrower glob pattern so as to avoid the really ugly large unsplittable files.
im still wondering if 3 or 4 are possible?
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group