- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
06-15-2022 11:58 AM
Hi there,
Thank you for getting back to me.
You are correct that typically we leave the upload path like this upload_path = ('s3://test-data/calendar/{}/{}/{}/*'.format(year, month, day)) and each day we fill in the date programmatically in order to minimize the data read from S3.
It appears that Autoloader has a better way of doing this and for the backfill we did leave the path empty so Autoloader could identify all buckets but my concern was that Autoloader was trying to load 2TB in memory rather than read and write incrementally.
My question is if we leave the path /* so Autoloader can determine the best way to read/write then why does it try to load all of the data at once?
With regard to repartition. Each folder is 20GB compressed in size and when Autoloader reads the data and writes it back out it only writes it in 10MB chunks. We have autocompact and auto optimize turned on and it still does this.
%sql
set spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite = true;
set spark.databricks.delta.properties.defaults.autoOptimize.autoCompact = true;Could the problem be that when I'm doing the optimized write I am partioning by as_of_date which is making autoloader read all the data in and then write?
here is the current code we have in place
upload_path = ('s3://test-data/calendar/*/*/*').format(year,month,day)
write_path = 's3://test-data-bronze/not_optimized/'
schema_location = 's3://test-autoloader/not_optimized/'
checkpoint_path = 's3://test-autoloader/not_optimized/'
# Set up the stream to begin reading incoming files from the
bronze_df = spark.readStream.format('cloudFiles') \
.option('cloudFiles.schemaLocation', schema_location) \
.option("cloudFiles.maxFilesPerTrigger", 16) \
.option("cloudFiles.inferColumnTypes", "true") \
.option('cloudFiles.format', 'json') \
.option('cloudFiles.backfillInterval', 1) \
.load(upload_path)
# Start the stream.
bronze_df.writeStream.foreachBatch(bronze_df) \
checkpoint_path2 = 's3://test-autoloader/optimized/'
bronze_df.withColumn("as_of_date",col('metadata.timestamp').cast('date')) \
.writeStream \
.format('delta') \
.trigger(once=True) \
.partitionBy('as_of_date') \
.option('checkpointLocation', checkpoint_path2) \
.option("path",'s3://test-data-bronze/optimized/') \
.table('test-data.calendar_bronze')Would it be possible to get on a 10 min call where I can show you the issue via a screen share?
This will be an extremely expensive ETL for us so we would like to get it right.