Dooley
Databricks Employee
Databricks Employee

Try using Autoloader and enabling the auto-optimize for the table property.

First try to use autoloader within Delta Live Tables to manage your ETL pipeline for you.

Otherwise, you can write it out in a notebook.

Below is a way to get your very small GZ JSON files streamed efficiently into Databricks from your S3 bucket & then written into a compact form so the rest of your pipeline should show performance improvement. I suggest you run this on more than one node. Larger, fewer machines is best than many small ones - see "Complex batch ETL" in this Best Practices for Cluster Configuration documentation.

schema_location = <file_path_defined>
upload_path = <file_path_defined>
checkpoint_path = <file_path_defined>
write_path = <file_path_defined>
 
# Set up the stream to begin reading incoming files from the
bronze_df = spark.readStream.format('cloudFiles') \
  .option('cloudFiles.schemaLocation', schema_location) \
  .option("cloudFiles.maxFilesPerTrigger", <set to the number of cores in your cluster>) \
  .option('cloudFiles.format', 'json') \
  .load(upload_path)
 
# Start the stream.
bronze_df.writeStream \
  .format("delta")
  .option('checkpointLocation', checkpoint_path) \
  .table("user_data_bronze_not_compact")

If there are still performance issues, try MaxBytesPerTrigger instead of MaxFilesPerTrigger; may help since you are using GZ JSON and not straight JSON.

Now make sure that all NEW delta tables will be compact automatically.

%sql
set spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite = true;
set spark.databricks.delta.properties.defaults.autoOptimize.autoCompact = true;

Now make the compact table:

checkpoint_path2 = <file_path>
 
bronze_df.writeStream \
  .format("delta")
  .option('checkpointLocation', checkpoint_path2) \
  .table("user_data_bronze_compact")

From here you can read the compact delta table and you should see a better performance.