- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
06-09-2022 03:02 PM
Try using Autoloader and enabling the auto-optimize for the table property.
First try to use autoloader within Delta Live Tables to manage your ETL pipeline for you.
Otherwise, you can write it out in a notebook.
Below is a way to get your very small GZ JSON files streamed efficiently into Databricks from your S3 bucket & then written into a compact form so the rest of your pipeline should show performance improvement. I suggest you run this on more than one node. Larger, fewer machines is best than many small ones - see "Complex batch ETL" in this Best Practices for Cluster Configuration documentation.
schema_location = <file_path_defined>
upload_path = <file_path_defined>
checkpoint_path = <file_path_defined>
write_path = <file_path_defined>
# Set up the stream to begin reading incoming files from the
bronze_df = spark.readStream.format('cloudFiles') \
.option('cloudFiles.schemaLocation', schema_location) \
.option("cloudFiles.maxFilesPerTrigger", <set to the number of cores in your cluster>) \
.option('cloudFiles.format', 'json') \
.load(upload_path)
# Start the stream.
bronze_df.writeStream \
.format("delta")
.option('checkpointLocation', checkpoint_path) \
.table("user_data_bronze_not_compact")If there are still performance issues, try MaxBytesPerTrigger instead of MaxFilesPerTrigger; may help since you are using GZ JSON and not straight JSON.
Now make sure that all NEW delta tables will be compact automatically.
%sql
set spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite = true;
set spark.databricks.delta.properties.defaults.autoOptimize.autoCompact = true;Now make the compact table:
checkpoint_path2 = <file_path>
bronze_df.writeStream \
.format("delta")
.option('checkpointLocation', checkpoint_path2) \
.table("user_data_bronze_compact")From here you can read the compact delta table and you should see a better performance.