cancel
Showing results forĀ 
Search instead forĀ 
Did you mean:Ā 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forĀ 
Search instead forĀ 
Did you mean:Ā 

Slow performance loading checkpoint file?

Matt_L
New Contributor III

Using OSS Delta, hopefully this is the right forum for this question:

Hey all, I could use some help as I feel like Iā€™m doing something wrong here.

Iā€™m streaming from Kafka -> Delta on EMR/S3FS, and am seeing ever-increasingly slow batches.When looking at the stages, it looks like reading the last delta-snapshot file in is taking upwards of 15 seconds for only a 30mb file, which pushes my batch times into the 20+ second range.

It also is constantly writing the results of that stage to Shuffle. All this work seems to only be picked up by 1 executor as well, which I find interesting. Is this a known limitation of delta, or is there some config I can tune to reduce the impact or parallelize reading the log file? Or is there something obvious I'm missing about this?

Let me know if thereā€™s more info I can provide. Iā€™m relatively new to delta so Iā€™m hoping Iā€™m just missing something obvious.Spark config as follows:

```

SparkConf().setAppName(NAME)\

.set('spark.scheduler.mode','FAIR') \

.set("spark.executor.cores", exec_cores) \

.set("spark.dynamicAllocation.enabled", "true") \

.set('spark.sql.files.maxPartitionBytes', '1073741824') \

.set('spark.dynamicAllocation.minExecutors','3')\

.set('spark.driver.maxResultSize', 0) \

.set('spark.executor.heartbeatInterval', '25000')\

.set('spark.databricks.delta.vacuum.parallelDelete.enabled','true')\

.set('spark.databricks.delta.retentionDurationCheck.enabled','false')\

.set('spark.databricks.delta.checkpoint.partSize','1000000')\

.set('spark.databricks.delta.snapshotPartitions','150')

```

1 ACCEPTED SOLUTION

Accepted Solutions

Matt_L
New Contributor III

Found the answer through the Slack user group, courtesy of an Adam Binford.

I had set `delta.logRetentionDuration='24 HOURS'` but did not set `delta.deletedFileRetentionDuration`, and so the checkpoint file still had all the accumulated tombstones since the table existed.

Since I was running a compactor every 15 minutes, the table itself (and thus the checkpoint file) would not consist of too many files, however since all the tombstones of every microbatch streamed in still existed, it allowed the checkpoint file to balloon in size. Once setting it to a lower interval, my batch time decreased from 20+ seconds down to about 5.

View solution in original post

3 REPLIES 3

Matt_L
New Contributor III

Also related to the above, does each microbatch always have to reload and recompute the state? Is the last checkpoint file not cached/persisted between micro batches?

Matt_L
New Contributor III

Found the answer through the Slack user group, courtesy of an Adam Binford.

I had set `delta.logRetentionDuration='24 HOURS'` but did not set `delta.deletedFileRetentionDuration`, and so the checkpoint file still had all the accumulated tombstones since the table existed.

Since I was running a compactor every 15 minutes, the table itself (and thus the checkpoint file) would not consist of too many files, however since all the tombstones of every microbatch streamed in still existed, it allowed the checkpoint file to balloon in size. Once setting it to a lower interval, my batch time decreased from 20+ seconds down to about 5.

100000

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonā€™t want to miss the chance to attend and share knowledge.

If there isnā€™t a group near you, start one and help create a community that brings people together.

Request a New Group