ā10-12-2021 10:19 AM
Using OSS Delta, hopefully this is the right forum for this question:
Hey all, I could use some help as I feel like Iām doing something wrong here.
Iām streaming from Kafka -> Delta on EMR/S3FS, and am seeing ever-increasingly slow batches.When looking at the stages, it looks like reading the last delta-snapshot file in is taking upwards of 15 seconds for only a 30mb file, which pushes my batch times into the 20+ second range.
It also is constantly writing the results of that stage to Shuffle. All this work seems to only be picked up by 1 executor as well, which I find interesting. Is this a known limitation of delta, or is there some config I can tune to reduce the impact or parallelize reading the log file? Or is there something obvious I'm missing about this?
Let me know if thereās more info I can provide. Iām relatively new to delta so Iām hoping Iām just missing something obvious.Spark config as follows:
```
SparkConf().setAppName(NAME)\
.set('spark.scheduler.mode','FAIR') \
.set("spark.executor.cores", exec_cores) \
.set("spark.dynamicAllocation.enabled", "true") \
.set('spark.sql.files.maxPartitionBytes', '1073741824') \
.set('spark.dynamicAllocation.minExecutors','3')\
.set('spark.driver.maxResultSize', 0) \
.set('spark.executor.heartbeatInterval', '25000')\
.set('spark.databricks.delta.vacuum.parallelDelete.enabled','true')\
.set('spark.databricks.delta.retentionDurationCheck.enabled','false')\
.set('spark.databricks.delta.checkpoint.partSize','1000000')\
.set('spark.databricks.delta.snapshotPartitions','150')
```
ā10-13-2021 09:22 AM
Found the answer through the Slack user group, courtesy of an Adam Binford.
I had set `delta.logRetentionDuration='24 HOURS'` but did not set `delta.deletedFileRetentionDuration`, and so the checkpoint file still had all the accumulated tombstones since the table existed.
Since I was running a compactor every 15 minutes, the table itself (and thus the checkpoint file) would not consist of too many files, however since all the tombstones of every microbatch streamed in still existed, it allowed the checkpoint file to balloon in size. Once setting it to a lower interval, my batch time decreased from 20+ seconds down to about 5.
ā10-12-2021 01:35 PM
Also related to the above, does each microbatch always have to reload and recompute the state? Is the last checkpoint file not cached/persisted between micro batches?
ā10-13-2021 09:22 AM
Found the answer through the Slack user group, courtesy of an Adam Binford.
I had set `delta.logRetentionDuration='24 HOURS'` but did not set `delta.deletedFileRetentionDuration`, and so the checkpoint file still had all the accumulated tombstones since the table existed.
Since I was running a compactor every 15 minutes, the table itself (and thus the checkpoint file) would not consist of too many files, however since all the tombstones of every microbatch streamed in still existed, it allowed the checkpoint file to balloon in size. Once setting it to a lower interval, my batch time decreased from 20+ seconds down to about 5.
ā08-28-2023 06:27 PM
100000
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonāt want to miss the chance to attend and share knowledge.
If there isnāt a group near you, start one and help create a community that brings people together.
Request a New Group