06-09-2022 06:49 AM
Hi there,
I was wondering if I could get your advise.
We would like to create a bronze delta table using GZ JSON data stored in S3 but each time we attempt to read and write it our clusters CPU spikes to 100%. We are not doing any transformations but simply reading from S3, creating a column with as_of_date and writing to S3 in Delta.
Currently it takes over 1 hour to read and write 20GB of GZ JSON from S3 using a server with 122GB of memory and 16 cores which is not efficient.
When we do ETL the data it also writes in tiny files 6.7MB-10MB.
Things we've tried:
source_path = ('s3://test-data/source/2022/06/03/*'.format(year, month, day))
test_data = spark.read.json(source_path).withColumn("as_of_date", col('metadata.timestamp').cast('date'))
test_data.createOrReplaceTempView('test_data')
test_data.write.format('delta') \
.partitionBy('as_of_date') \
.option("mergeSchema", "true") \
.option("path",'s3://test-data-bronze/').mode("append") \
.saveAsTable('test_data.bronze')
06-15-2022 05:47 AM
Hi Kaniz,
Thanks for the note and thank you everyone for the suggestions and help. @Joseph Kambourakis I aded your suggestion to our load but I did not see any change in how our data loads or the time it takes to load data.
I've done some additional research and one option for those that are having GZ problems could be to read the data as a text file which makes it read very quickly and then use spark to infer the schema.
However, in the end we decided to use Autoloader to load our data in...but we're still waiting for help on how to backfill a years worth of data incremental (day-by-day).
Let me know if you have any suggestions and I added more contex to my post above to User16460565755155528764 (Databricks)
Thanks,
K
06-09-2022 10:17 AM
"server with 122GB of memory and 16 cores"
Is this a single node cluster?
06-09-2022 11:04 AM
Yes in this example it was a single node cluster but we also tried to scale this to 5- 122GB servers and the CPU problem carried over to each server and the job did not speed up at all.
We tried to also mix up compute instances for divers and delta instances for the workers.
06-09-2022 03:02 PM
Try using Autoloader and enabling the auto-optimize for the table property.
First try to use autoloader within Delta Live Tables to manage your ETL pipeline for you.
Otherwise, you can write it out in a notebook.
Below is a way to get your very small GZ JSON files streamed efficiently into Databricks from your S3 bucket & then written into a compact form so the rest of your pipeline should show performance improvement. I suggest you run this on more than one node. Larger, fewer machines is best than many small ones - see "Complex batch ETL" in this Best Practices for Cluster Configuration documentation.
schema_location = <file_path_defined>
upload_path = <file_path_defined>
checkpoint_path = <file_path_defined>
write_path = <file_path_defined>
# Set up the stream to begin reading incoming files from the
bronze_df = spark.readStream.format('cloudFiles') \
.option('cloudFiles.schemaLocation', schema_location) \
.option("cloudFiles.maxFilesPerTrigger", <set to the number of cores in your cluster>) \
.option('cloudFiles.format', 'json') \
.load(upload_path)
# Start the stream.
bronze_df.writeStream \
.format("delta")
.option('checkpointLocation', checkpoint_path) \
.table("user_data_bronze_not_compact")
If there are still performance issues, try MaxBytesPerTrigger instead of MaxFilesPerTrigger; may help since you are using GZ JSON and not straight JSON.
Now make sure that all NEW delta tables will be compact automatically.
%sql
set spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite = true;
set spark.databricks.delta.properties.defaults.autoOptimize.autoCompact = true;
Now make the compact table:
checkpoint_path2 = <file_path>
bronze_df.writeStream \
.format("delta")
.option('checkpointLocation', checkpoint_path2) \
.table("user_data_bronze_compact")
From here you can read the compact delta table and you should see a better performance.
06-10-2022 09:04 AM
Hi there,
Thank you for the advise!
I set up the Autoloader script this morning in a notebook and it appears to transfer over files fairly quickly. I added .option("cloudFiles.inferColumnTypes", "true") in-order to detect the schema.
Questions:
Thank you for the help!
Kash
06-10-2022 01:26 PM
06-10-2022 01:27 PM
Also, turn off auto-compact if latency is an issue
06-14-2022 10:57 AM
Hi there,
I was able to set up a notebook but I am having difficulty getting it to backfill incrementally day by day.
I would like to process all of our 2022 data that is stored in a year/month/day format incrementally to reduce the load. I have a notebook setup that will iterate between a start_date_param and end_date_param so it run one job for each day to backfill.
When I use a specific upload path in AutoLoader for example
upload_path = ('s3://test-data/calendar/2022/01/01/*'.format(year, month, day)) I get this error.
java.lang.IllegalStateException: Found mismatched event: key calendar/2022/01/02/00-11-6a088a39-4180-4efe-852d-11d09e6c2eb8.json.gz doesn't have the prefix: calendar/2022/01/01/
When I do not specify the year/month/day Autoloader tries to load the entire directory for 2022 rather than doing it incrementally. I see in the SparkUI it's trying to load 49K files.
How do we set it up so it loads data for the first day..writes it and partitions it by day then goes onto the next day?
I saw that you mentitoed that we should not partiton by year/month/day as that slows down the read but then our S3 directory will have tons of small files.
Lastly, how do we set it up to partition by 1GB rather than optimize and write it in 10MB chunks which is what it's doing now with Auto-optmize and auto-compact?
I've also set .option('cloudFiles.backfillInterval', '1 day') \
and also tried .option('cloudFiles.backfillInterval', 1) \
Any thoughts?
Thank you again for your help!
Avkash
06-15-2022 11:44 AM
I would suggest to leave the autoloader to figure out the best way to backfill all your files instead of trying to do the increment yourself by your own schema to backfill. upload_path = ('s3://test-data/calendar/2022/01/01/*'.format(year, month, day)) <--the problem I see here is that you have no place you are putting the year, month, or day. Maybe you mean this?
upload_path = ('s3://test-data/calendar/{}/{}/{}/*'.format(year, month, day))
I mentioned that you should not repartition because with the code I specified before, you will have a compact version of a delta table that is made up of very large files. So the fact you have many small files after doing the auto-optimize step, auto-loader stream read & last checkpoint I mentioned is unusual. Did you do a
DESCRIBE TABLE EXTEND user_data_bronze_compact
and get the location? Did you then in the next cell do an
%fs
ls file_path_of_delta_tabel
to see the size of the files? What are the sizes you are seeing?
06-15-2022 11:58 AM
Hi there,
Thank you for getting back to me.
You are correct that typically we leave the upload path like this upload_path = ('s3://test-data/calendar/{}/{}/{}/*'.format(year, month, day)) and each day we fill in the date programmatically in order to minimize the data read from S3.
It appears that Autoloader has a better way of doing this and for the backfill we did leave the path empty so Autoloader could identify all buckets but my concern was that Autoloader was trying to load 2TB in memory rather than read and write incrementally.
My question is if we leave the path /* so Autoloader can determine the best way to read/write then why does it try to load all of the data at once?
With regard to repartition. Each folder is 20GB compressed in size and when Autoloader reads the data and writes it back out it only writes it in 10MB chunks. We have autocompact and auto optimize turned on and it still does this.
%sql
set spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite = true;
set spark.databricks.delta.properties.defaults.autoOptimize.autoCompact = true;
Could the problem be that when I'm doing the optimized write I am partioning by as_of_date which is making autoloader read all the data in and then write?
here is the current code we have in place
upload_path = ('s3://test-data/calendar/*/*/*').format(year,month,day)
write_path = 's3://test-data-bronze/not_optimized/'
schema_location = 's3://test-autoloader/not_optimized/'
checkpoint_path = 's3://test-autoloader/not_optimized/'
# Set up the stream to begin reading incoming files from the
bronze_df = spark.readStream.format('cloudFiles') \
.option('cloudFiles.schemaLocation', schema_location) \
.option("cloudFiles.maxFilesPerTrigger", 16) \
.option("cloudFiles.inferColumnTypes", "true") \
.option('cloudFiles.format', 'json') \
.option('cloudFiles.backfillInterval', 1) \
.load(upload_path)
# Start the stream.
bronze_df.writeStream.foreachBatch(bronze_df) \
checkpoint_path2 = 's3://test-autoloader/optimized/'
bronze_df.withColumn("as_of_date",col('metadata.timestamp').cast('date')) \
.writeStream \
.format('delta') \
.trigger(once=True) \
.partitionBy('as_of_date') \
.option('checkpointLocation', checkpoint_path2) \
.option("path",'s3://test-data-bronze/optimized/') \
.table('test-data.calendar_bronze')
Would it be possible to get on a 10 min call where I can show you the issue via a screen share?
This will be an extremely expensive ETL for us so we would like to get it right.
06-21-2022 08:19 AM
@Sara Dooley any thoughts?
06-27-2022 08:50 AM
So yes, Autoloader writes smaller files and then when you do that checkpoint it will compact those small files to larger files so your queries and processing will be more efficient. You have to set those flags in the beginning for that checkpoint to compact the data into larger files. This is necessary for the rest of your pipeline's efficient with that data.
Did I answer your question?
06-27-2022 10:59 AM
Hi there,
Thanks for getting back to me.
My question is regarding backfilling and loading HISTORICAL data incrementally (day by day) using AutoLoader.
I would like to run Autoloader on data that is partitioned by year/month/day. I would like Autoloader to read this data incrementally and then write it incrementally to prevent CPU overloading and other memory issues.
When I run Autoloader today using the setup above, I see in the SparkUI that it is trying to load the entire 1TB s3 bucket into memory rather than reading it day-by-day (incrementally.)
Do I have the backfill setup incorrectly or am I missing something that can make Autoloader backfill daily first?
Thanks,
Avkash
06-27-2022 02:35 PM
Are you running this latest run on a single node still?
Do you have 2 TB in the S3 bucket you want fed into Databricks to do processing on the data and then any new data that ends up in that S3 bucket?
07-06-2022 07:34 AM
Yes exactly. Do we have the Backfill setup correctly for Autoloader?
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group