HELP! Converting GZ JSON to Delta causes massive CPU spikes and ETL's take days!

Kash
Contributor III

Hi there,

I was wondering if I could get your advise.

We would like to create a bronze delta table using GZ JSON data stored in S3 but each time we attempt to read and write it our clusters CPU spikes to 100%. We are not doing any transformations but simply reading from S3, creating a column with as_of_date and writing to S3 in Delta.

Currently it takes over 1 hour to read and write 20GB of GZ JSON from S3 using a server with 122GB of memory and 16 cores which is not efficient.

When we do ETL the data it also writes in tiny files 6.7MB-10MB.

Things we've tried:

  1. I initially thought the problem was due to the fact our data is GZ JSON and that is not splittable so spark has a tough time processing it but we don’t have this issue with other pipelines. (Still unclear if this is it)
  2. I then thought it was a SKEW problem due to the .withColumn("as_of_date", col('metadata.timestamp').cast('date’)) but even after I removed it the problem still continued.
  3. I even tried to add a SALT hint but no luck.
  4. We tried to define the schema of the JSON as it is nested which helped it load faster but the write took just as long. (This is not ideal since our schemas change over time and defining it here causes us to lose data)
  5. We've tried .repartition(1000)
  6. We've also tried to let data bricks dynamically select what is skewed and then set that programmatically as the skew hint but no luck.
  7. We turned on Autocompact, Auto Optimize to have it write in larger files but it did not do this and again wrote in smaller 10MB files.

source_path = ('s3://test-data/source/2022/06/03/*'.format(year, month, day))
test_data = spark.read.json(source_path).withColumn("as_of_date", col('metadata.timestamp').cast('date'))
test_data.createOrReplaceTempView('test_data')
 
 test_data.write.format('delta') \
  .partitionBy('as_of_date') \
  .option("mergeSchema", "true") \
  .option("path",'s3://test-data-bronze/').mode("append") \
  .saveAsTable('test_data.bronze')