- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-07-2022 08:29 AM
upload_path = '/mnt/test-data/calendar/'
schema_location ='Users/your_email_address/project_directory_name/_checkpoints'
checkpoint_path = 'Users/your_email_address/project_directory_name/_checkpoints'
# Set up the stream to begin reading incoming files from the
bronze_df = spark.readStream.format('cloudFiles') \
.option('cloudFiles.schemaLocation', schema_location) \
.option("cloudFiles.maxFilesPerTrigger", 16) \
.option("cloudFiles.inferColumnTypes", "true") \
.option('cloudFiles.format', 'json') \
.option('cloudFiles.backfillInterval', "1 day") \
.load(upload_path)
# Start the stream.
bronze_df.writeStream \
.format("delta")
.option('checkpointLocation', checkpoint_path) \
.table("user_data_bronze_compact")Here is my thinking. Assuming you mounted the S3 bucket you want to load from, it will be in the dbfs file system under mnt as you see in this code above. Then you save the data & schema in the dbfs file system as well - maybe under Users.
For the write, I took out all the other stuff you had in there so we keep processing to a minimum to just bring the data into Delta. THEN you can do the timestamp casting. I highly suggest you do not repartition while doing this process since that will be very expensive & unnecessary as explained above with the autoOptimize optimizeWrite flag being turned on before you do this code above & autoCompact.
Also, the backfill will be expensive at first because it needs to bring all the data (which is larger) into a delta format but after that point, any NEW files will only be ingested so the processing will be less later on. However, you are going to have CPU overload issues if you are staying to that single node way of reading in data. You need to do a distributed read in so make a cluster with a driver & workers with 2-8 & watch your Ganglia Metric (under cluster information with the "metrics" tab) help us troubleshoot moving forward. Once you update that cluster then update the MaxFilesPerTrigger to the total number of cores in the total number of cores of the workers.
Let me know.