- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-12-2021 10:19 AM
Using OSS Delta, hopefully this is the right forum for this question:
Hey all, I could use some help as I feel like I’m doing something wrong here.
I’m streaming from Kafka -> Delta on EMR/S3FS, and am seeing ever-increasingly slow batches.When looking at the stages, it looks like reading the last delta-snapshot file in is taking upwards of 15 seconds for only a 30mb file, which pushes my batch times into the 20+ second range.
It also is constantly writing the results of that stage to Shuffle. All this work seems to only be picked up by 1 executor as well, which I find interesting. Is this a known limitation of delta, or is there some config I can tune to reduce the impact or parallelize reading the log file? Or is there something obvious I'm missing about this?
Let me know if there’s more info I can provide. I’m relatively new to delta so I’m hoping I’m just missing something obvious.Spark config as follows:
```
SparkConf().setAppName(NAME)\
.set('spark.scheduler.mode','FAIR') \
.set("spark.executor.cores", exec_cores) \
.set("spark.dynamicAllocation.enabled", "true") \
.set('spark.sql.files.maxPartitionBytes', '1073741824') \
.set('spark.dynamicAllocation.minExecutors','3')\
.set('spark.driver.maxResultSize', 0) \
.set('spark.executor.heartbeatInterval', '25000')\
.set('spark.databricks.delta.vacuum.parallelDelete.enabled','true')\
.set('spark.databricks.delta.retentionDurationCheck.enabled','false')\
.set('spark.databricks.delta.checkpoint.partSize','1000000')\
.set('spark.databricks.delta.snapshotPartitions','150')
```
Accepted Solutions
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-13-2021 09:22 AM
Found the answer through the Slack user group, courtesy of an Adam Binford.
I had set `delta.logRetentionDuration='24 HOURS'` but did not set `delta.deletedFileRetentionDuration`, and so the checkpoint file still had all the accumulated tombstones since the table existed.
Since I was running a compactor every 15 minutes, the table itself (and thus the checkpoint file) would not consist of too many files, however since all the tombstones of every microbatch streamed in still existed, it allowed the checkpoint file to balloon in size. Once setting it to a lower interval, my batch time decreased from 20+ seconds down to about 5.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-12-2021 01:35 PM
Also related to the above, does each microbatch always have to reload and recompute the state? Is the last checkpoint file not cached/persisted between micro batches?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-13-2021 09:22 AM
Found the answer through the Slack user group, courtesy of an Adam Binford.
I had set `delta.logRetentionDuration='24 HOURS'` but did not set `delta.deletedFileRetentionDuration`, and so the checkpoint file still had all the accumulated tombstones since the table existed.
Since I was running a compactor every 15 minutes, the table itself (and thus the checkpoint file) would not consist of too many files, however since all the tombstones of every microbatch streamed in still existed, it allowed the checkpoint file to balloon in size. Once setting it to a lower interval, my batch time decreased from 20+ seconds down to about 5.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
08-28-2023 06:27 PM
100000