cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

HELP! Converting GZ JSON to Delta causes massive CPU spikes and ETL's take days!

Kash
Contributor III

Hi there,

I was wondering if I could get your advise.

We would like to create a bronze delta table using GZ JSON data stored in S3 but each time we attempt to read and write it our clusters CPU spikes to 100%. We are not doing any transformations but simply reading from S3, creating a column with as_of_date and writing to S3 in Delta.

Currently it takes over 1 hour to read and write 20GB of GZ JSON from S3 using a server with 122GB of memory and 16 cores which is not efficient.

When we do ETL the data it also writes in tiny files 6.7MB-10MB.

Things we've tried:

  1. I initially thought the problem was due to the fact our data is GZ JSON and that is not splittable so spark has a tough time processing it but we donโ€™t have this issue with other pipelines. (Still unclear if this is it)
  2. I then thought it was a SKEW problem due to the .withColumn("as_of_date", col('metadata.timestamp').cast('dateโ€™)) but even after I removed it the problem still continued.
  3. I even tried to add a SALT hint but no luck.
  4. We tried to define the schema of the JSON as it is nested which helped it load faster but the write took just as long. (This is not ideal since our schemas change over time and defining it here causes us to lose data)
  5. We've tried .repartition(1000)
  6. We've also tried to let data bricks dynamically select what is skewed and then set that programmatically as the skew hint but no luck.
  7. We turned on Autocompact, Auto Optimize to have it write in larger files but it did not do this and again wrote in smaller 10MB files.

source_path = ('s3://test-data/source/2022/06/03/*'.format(year, month, day))
test_data = spark.read.json(source_path).withColumn("as_of_date", col('metadata.timestamp').cast('date'))
test_data.createOrReplaceTempView('test_data')
 
 test_data.write.format('delta') \
  .partitionBy('as_of_date') \
  .option("mergeSchema", "true") \
  .option("path",'s3://test-data-bronze/').mode("append") \
  .saveAsTable('test_data.bronze')

19 REPLIES 19

Dooley
Valued Contributor

Just a quick check. Did you try these paths with dbfs:/ paths instead of these S3 paths? Keep the upload one the way it is, but try changing these ones to a dbfs:/ path. Maybe dbfs:/user/your_user_email/?

write_path = 's3://test-data-bronze/not_optimized/'
schema_location = 's3://test-autoloader/not_optimized/'
checkpoint_path = 's3://test-autoloader/not_optimized/'

Dooley
Valued Contributor
upload_path = '/mnt/test-data/calendar/'
 
schema_location ='Users/your_email_address/project_directory_name/_checkpoints'
checkpoint_path = 'Users/your_email_address/project_directory_name/_checkpoints'
 
# Set up the stream to begin reading incoming files from the
bronze_df = spark.readStream.format('cloudFiles') \
  .option('cloudFiles.schemaLocation', schema_location) \
  .option("cloudFiles.maxFilesPerTrigger", 16) \
  .option("cloudFiles.inferColumnTypes", "true") \
  .option('cloudFiles.format', 'json') \
  .option('cloudFiles.backfillInterval', "1 day") \
  .load(upload_path) 
 
# Start the stream.
bronze_df.writeStream \
.format("delta")
.option('checkpointLocation', checkpoint_path) \
.table("user_data_bronze_compact")

Here is my thinking. Assuming you mounted the S3 bucket you want to load from, it will be in the dbfs file system under mnt as you see in this code above. Then you save the data & schema in the dbfs file system as well - maybe under Users.

For the write, I took out all the other stuff you had in there so we keep processing to a minimum to just bring the data into Delta. THEN you can do the timestamp casting. I highly suggest you do not repartition while doing this process since that will be very expensive & unnecessary as explained above with the autoOptimize optimizeWrite flag being turned on before you do this code above & autoCompact.

Also, the backfill will be expensive at first because it needs to bring all the data (which is larger) into a delta format but after that point, any NEW files will only be ingested so the processing will be less later on. However, you are going to have CPU overload issues if you are staying to that single node way of reading in data. You need to do a distributed read in so make a cluster with a driver & workers with 2-8 & watch your Ganglia Metric (under cluster information with the "metrics" tab) help us troubleshoot moving forward. Once you update that cluster then update the MaxFilesPerTrigger to the total number of cores in the total number of cores of the workers.

Let me know.

Anonymous
Not applicable

The problem is that gzip isn't splittable. It's going to be single threaded as it tries to uncompress it. Try the option:

.option(โ€œcompressionโ€, โ€œgzipโ€)

Kaniz
Community Manager
Community Manager

Hi @Avkash Kanaโ€‹ , We havenโ€™t heard from you on the last responses from @Joseph Kambourakisโ€‹ and @Sara Dooleyโ€‹ , and I was checking back to see if their suggestions helped you. Or else, If you have any solution, please share it with the community as it can be helpful to others. Otherwise, we will respond with more details and try to help.

Kash
Contributor III

Hi Kaniz,

Thanks for the note and thank you everyone for the suggestions and help. @Joseph Kambourakisโ€‹ I aded your suggestion to our load but I did not see any change in how our data loads or the time it takes to load data.

I've done some additional research and one option for those that are having GZ problems could be to read the data as a text file which makes it read very quickly and then use spark to infer the schema.

However, in the end we decided to use Autoloader to load our data in...but we're still waiting for help on how to backfill a years worth of data incremental (day-by-day).

Let me know if you have any suggestions and I added more contex to my post above to User16460565755155528764 (Databricks)

Thanks,

K

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.