cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

HELP! Converting GZ JSON to Delta causes massive CPU spikes and ETL's take days!

Kash
Contributor III

Hi there,

I was wondering if I could get your advise.

We would like to create a bronze delta table using GZ JSON data stored in S3 but each time we attempt to read and write it our clusters CPU spikes to 100%. We are not doing any transformations but simply reading from S3, creating a column with as_of_date and writing to S3 in Delta.

Currently it takes over 1 hour to read and write 20GB of GZ JSON from S3 using a server with 122GB of memory and 16 cores which is not efficient.

When we do ETL the data it also writes in tiny files 6.7MB-10MB.

Things we've tried:

  1. I initially thought the problem was due to the fact our data is GZ JSON and that is not splittable so spark has a tough time processing it but we don’t have this issue with other pipelines. (Still unclear if this is it)
  2. I then thought it was a SKEW problem due to the .withColumn("as_of_date", col('metadata.timestamp').cast('date’)) but even after I removed it the problem still continued.
  3. I even tried to add a SALT hint but no luck.
  4. We tried to define the schema of the JSON as it is nested which helped it load faster but the write took just as long. (This is not ideal since our schemas change over time and defining it here causes us to lose data)
  5. We've tried .repartition(1000)
  6. We've also tried to let data bricks dynamically select what is skewed and then set that programmatically as the skew hint but no luck.
  7. We turned on Autocompact, Auto Optimize to have it write in larger files but it did not do this and again wrote in smaller 10MB files.

source_path = ('s3://test-data/source/2022/06/03/*'.format(year, month, day))
test_data = spark.read.json(source_path).withColumn("as_of_date", col('metadata.timestamp').cast('date'))
test_data.createOrReplaceTempView('test_data')
 
 test_data.write.format('delta') \
  .partitionBy('as_of_date') \
  .option("mergeSchema", "true") \
  .option("path",'s3://test-data-bronze/').mode("append") \
  .saveAsTable('test_data.bronze')

1 ACCEPTED SOLUTION

Accepted Solutions

Kash
Contributor III

Hi Kaniz,

Thanks for the note and thank you everyone for the suggestions and help. @Joseph Kambourakis​ I aded your suggestion to our load but I did not see any change in how our data loads or the time it takes to load data.

I've done some additional research and one option for those that are having GZ problems could be to read the data as a text file which makes it read very quickly and then use spark to infer the schema.

However, in the end we decided to use Autoloader to load our data in...but we're still waiting for help on how to backfill a years worth of data incremental (day-by-day).

Let me know if you have any suggestions and I added more contex to my post above to User16460565755155528764 (Databricks)

Thanks,

K

View solution in original post

19 REPLIES 19

Dooley
Valued Contributor

"server with 122GB of memory and 16 cores"

Is this a single node cluster?

Kash
Contributor III

Yes in this example it was a single node cluster but we also tried to scale this to 5- 122GB servers and the CPU problem carried over to each server and the job did not speed up at all.

We tried to also mix up compute instances for divers and delta instances for the workers.

Dooley
Valued Contributor

Try using Autoloader and enabling the auto-optimize for the table property.

First try to use autoloader within Delta Live Tables to manage your ETL pipeline for you.

Otherwise, you can write it out in a notebook.

Below is a way to get your very small GZ JSON files streamed efficiently into Databricks from your S3 bucket & then written into a compact form so the rest of your pipeline should show performance improvement. I suggest you run this on more than one node. Larger, fewer machines is best than many small ones - see "Complex batch ETL" in this Best Practices for Cluster Configuration documentation.

schema_location = <file_path_defined>
upload_path = <file_path_defined>
checkpoint_path = <file_path_defined>
write_path = <file_path_defined>
 
# Set up the stream to begin reading incoming files from the
bronze_df = spark.readStream.format('cloudFiles') \
  .option('cloudFiles.schemaLocation', schema_location) \
  .option("cloudFiles.maxFilesPerTrigger", <set to the number of cores in your cluster>) \
  .option('cloudFiles.format', 'json') \
  .load(upload_path)
 
# Start the stream.
bronze_df.writeStream \
  .format("delta")
  .option('checkpointLocation', checkpoint_path) \
  .table("user_data_bronze_not_compact")

If there are still performance issues, try MaxBytesPerTrigger instead of MaxFilesPerTrigger; may help since you are using GZ JSON and not straight JSON.

Now make sure that all NEW delta tables will be compact automatically.

%sql
set spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite = true;
set spark.databricks.delta.properties.defaults.autoOptimize.autoCompact = true;

Now make the compact table:

checkpoint_path2 = <file_path>
 
bronze_df.writeStream \
  .format("delta")
  .option('checkpointLocation', checkpoint_path2) \
  .table("user_data_bronze_compact")

From here you can read the compact delta table and you should see a better performance.

Kash
Contributor III

Hi there,

Thank you for the advise!

I set up the Autoloader script this morning in a notebook and it appears to transfer over files fairly quickly. I added .option("cloudFiles.inferColumnTypes", "true") in-order to detect the schema.

Questions:

  1. How do we save the user_data_bronze_not_compact to a s3 path partition it by (yyyy/mm/dd)?
  2. How can we set it up so the Autoloader job only triggers once and stops when it has loaded all of the data in the s3 folder?
  3. We want to run Autoloader once a day to process the previous day’s data. At the moment we use upload_path = ('s3://test-data/calendar/{}/{}/{}'.format(year, month, day)) to load data for a specific day. Is there a better way to do this with Autoloader? Backfill? Incremental?
  4. In this query we run load data from GZ JSON into Delta and store it to a table (not optimized). Since we do not specify the location the table in S3, where is this table stored? 
  5. When we optimize this data and store it in S3, we re-write it again so in essence we have 3 copies of this data now right? If so, do we need to run number 3? or can we optimize step 2?
    1. JSON 
    2. JSON TO DELTA (Not optimized)
    3. DETLA to Optimized Delta (Optimized)

Thank you for the help!

Kash

Dooley
Valued Contributor
  1. I would suggest to not write the multitude of small parquet files to S3 since performance will be horrible compared to writing the delta format, less & larger file version of that same data - in our example that was called user_data_bronze_compact. I would not suggest partitioning any table less than 1TB and not to have a partition less than 1GB for performance reasons. Your write to S3 will be more efficient with the compact version of the table. You can try writing using foreachBatch() or foreach().
  2. Then take that bronze dataframe and use the trigger once option. See Triggers here.
  3. Autoloader can backfill with an increment using "cloudFiles.backfillInterval"
  4. You can find the location of the table in DESCRIBE TABLE EXTENDED user_data_bronze_compact at the bottom it says "location." you can see the files that make up that table using %fs and then ls file_path_you_grabbed_from_describe_table_extended_step
  5. You can do the turn on auto optimize step before you start the stream & skip the middle checkpoint.

Dooley
Valued Contributor

Also, turn off auto-compact if latency is an issue

Kash
Contributor III

Hi there,

I was able to set up a notebook but I am having difficulty getting it to backfill incrementally day by day.

I would like to process all of our 2022 data that is stored in a year/month/day format incrementally to reduce the load. I have a notebook setup that will iterate between a start_date_param and end_date_param so it run one job for each day to backfill.

When I use a specific upload path in AutoLoader for example

upload_path = ('s3://test-data/calendar/2022/01/01/*'.format(year, month, day)) I get this error.

java.lang.IllegalStateException: Found mismatched event: key calendar/2022/01/02/00-11-6a088a39-4180-4efe-852d-11d09e6c2eb8.json.gz doesn't have the prefix: calendar/2022/01/01/

When I do not specify the year/month/day Autoloader tries to load the entire directory for 2022 rather than doing it incrementally. I see in the SparkUI it's trying to load 49K files.

How do we set it up so it loads data for the first day..writes it and partitions it by day then goes onto the next day?

I saw that you mentitoed that we should not partiton by year/month/day as that slows down the read but then our S3 directory will have tons of small files.

Lastly, how do we set it up to partition by 1GB rather than optimize and write it in 10MB chunks which is what it's doing now with Auto-optmize and auto-compact?

I've also set .option('cloudFiles.backfillInterval', '1 day') \

and also tried .option('cloudFiles.backfillInterval', 1) \

Any thoughts?

Thank you again for your help!

Avkash

Dooley
Valued Contributor

I would suggest to leave the autoloader to figure out the best way to backfill all your files instead of trying to do the increment yourself by your own schema to backfill. upload_path = ('s3://test-data/calendar/2022/01/01/*'.format(year, month, day)) <--the problem I see here is that you have no place you are putting the year, month, or day. Maybe you mean this?

upload_path = ('s3://test-data/calendar/{}/{}/{}/*'.format(year, month, day))

I mentioned that you should not repartition because with the code I specified before, you will have a compact version of a delta table that is made up of very large files. So the fact you have many small files after doing the auto-optimize step, auto-loader stream read & last checkpoint I mentioned is unusual. Did you do a

DESCRIBE TABLE EXTEND user_data_bronze_compact

and get the location? Did you then in the next cell do an

%fs 
ls file_path_of_delta_tabel 

to see the size of the files? What are the sizes you are seeing?

Kash
Contributor III

Hi there,

Thank you for getting back to me.

You are correct that typically we leave the upload path like this upload_path = ('s3://test-data/calendar/{}/{}/{}/*'.format(year, month, day)) and each day we fill in the date programmatically in order to minimize the data read from S3.

It appears that Autoloader has a better way of doing this and for the backfill we did leave the path empty so Autoloader could identify all buckets but my concern was that Autoloader was trying to load 2TB in memory rather than read and write incrementally.

My question is if we leave the path /* so Autoloader can determine the best way to read/write then why does it try to load all of the data at once?

With regard to repartition. Each folder is 20GB compressed in size and when Autoloader reads the data and writes it back out it only writes it in 10MB chunks. We have autocompact and auto optimize turned on and it still does this.

%sql
set spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite = true;
set spark.databricks.delta.properties.defaults.autoOptimize.autoCompact = true;

 Could the problem be that when I'm doing the optimized write I am partioning by as_of_date which is making autoloader read all the data in and then write?

here is the current code we have in place

upload_path = ('s3://test-data/calendar/*/*/*').format(year,month,day)
write_path = 's3://test-data-bronze/not_optimized/'
 
schema_location = 's3://test-autoloader/not_optimized/'
checkpoint_path = 's3://test-autoloader/not_optimized/'
 
# Set up the stream to begin reading incoming files from the
bronze_df = spark.readStream.format('cloudFiles') \
  .option('cloudFiles.schemaLocation', schema_location) \
  .option("cloudFiles.maxFilesPerTrigger", 16) \
  .option("cloudFiles.inferColumnTypes", "true") \
  .option('cloudFiles.format', 'json') \
  .option('cloudFiles.backfillInterval', 1) \
  .load(upload_path) 
 
# Start the stream.
bronze_df.writeStream.foreachBatch(bronze_df) \
 
checkpoint_path2 = 's3://test-autoloader/optimized/'
bronze_df.withColumn("as_of_date",col('metadata.timestamp').cast('date')) \
  .writeStream \
  .format('delta') \
  .trigger(once=True) \
  .partitionBy('as_of_date') \
  .option('checkpointLocation', checkpoint_path2) \
  .option("path",'s3://test-data-bronze/optimized/') \
  .table('test-data.calendar_bronze')

Would it be possible to get on a 10 min call where I can show you the issue via a screen share?

This will be an extremely expensive ETL for us so we would like to get it right.

Kash
Contributor III

@Sara Dooley​ any thoughts?

Dooley
Valued Contributor

So yes, Autoloader writes smaller files and then when you do that checkpoint it will compact those small files to larger files so your queries and processing will be more efficient. You have to set those flags in the beginning for that checkpoint to compact the data into larger files. This is necessary for the rest of your pipeline's efficient with that data.

Did I answer your question?

Kash
Contributor III

Hi there,

Thanks for getting back to me.

My question is regarding backfilling and loading HISTORICAL data incrementally (day by day) using AutoLoader.

I would like to run Autoloader on data that is partitioned by year/month/day. I would like Autoloader to read this data incrementally and then write it incrementally to prevent CPU overloading and other memory issues.

When I run Autoloader today using the setup above, I see in the SparkUI that it is trying to load the entire 1TB s3 bucket into memory rather than reading it day-by-day (incrementally.)

Do I have the backfill setup incorrectly or am I missing something that can make Autoloader backfill daily first?

Thanks,

Avkash

Dooley
Valued Contributor

Are you running this latest run on a single node still?

Do you have 2 TB in the S3 bucket you want fed into Databricks to do processing on the data and then any new data that ends up in that S3 bucket?

Kash
Contributor III

Yes exactly. Do we have the Backfill setup correctly for Autoloader?

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.