cancel
Showing results forĀ 
Search instead forĀ 
Did you mean:Ā 
Data Engineering
cancel
Showing results forĀ 
Search instead forĀ 
Did you mean:Ā 

Slow performance loading checkpoint file?

Matt_L
New Contributor III

Using OSS Delta, hopefully this is the right forum for this question:

Hey all, I could use some help as I feel like Iā€™m doing something wrong here.

Iā€™m streaming from Kafka -> Delta on EMR/S3FS, and am seeing ever-increasingly slow batches.When looking at the stages, it looks like reading the last delta-snapshot file in is taking upwards of 15 seconds for only a 30mb file, which pushes my batch times into the 20+ second range.

It also is constantly writing the results of that stage to Shuffle. All this work seems to only be picked up by 1 executor as well, which I find interesting. Is this a known limitation of delta, or is there some config I can tune to reduce the impact or parallelize reading the log file? Or is there something obvious I'm missing about this?

Let me know if thereā€™s more info I can provide. Iā€™m relatively new to delta so Iā€™m hoping Iā€™m just missing something obvious.Spark config as follows:

```

SparkConf().setAppName(NAME)\

.set('spark.scheduler.mode','FAIR') \

.set("spark.executor.cores", exec_cores) \

.set("spark.dynamicAllocation.enabled", "true") \

.set('spark.sql.files.maxPartitionBytes', '1073741824') \

.set('spark.dynamicAllocation.minExecutors','3')\

.set('spark.driver.maxResultSize', 0) \

.set('spark.executor.heartbeatInterval', '25000')\

.set('spark.databricks.delta.vacuum.parallelDelete.enabled','true')\

.set('spark.databricks.delta.retentionDurationCheck.enabled','false')\

.set('spark.databricks.delta.checkpoint.partSize','1000000')\

.set('spark.databricks.delta.snapshotPartitions','150')

```

1 ACCEPTED SOLUTION

Accepted Solutions

Matt_L
New Contributor III

Found the answer through the Slack user group, courtesy of an Adam Binford.

I had set `delta.logRetentionDuration='24 HOURS'` but did not set `delta.deletedFileRetentionDuration`, and so the checkpoint file still had all the accumulated tombstones since the table existed.

Since I was running a compactor every 15 minutes, the table itself (and thus the checkpoint file) would not consist of too many files, however since all the tombstones of every microbatch streamed in still existed, it allowed the checkpoint file to balloon in size. Once setting it to a lower interval, my batch time decreased from 20+ seconds down to about 5.

View solution in original post

3 REPLIES 3

Matt_L
New Contributor III

Also related to the above, does each microbatch always have to reload and recompute the state? Is the last checkpoint file not cached/persisted between micro batches?

Matt_L
New Contributor III

Found the answer through the Slack user group, courtesy of an Adam Binford.

I had set `delta.logRetentionDuration='24 HOURS'` but did not set `delta.deletedFileRetentionDuration`, and so the checkpoint file still had all the accumulated tombstones since the table existed.

Since I was running a compactor every 15 minutes, the table itself (and thus the checkpoint file) would not consist of too many files, however since all the tombstones of every microbatch streamed in still existed, it allowed the checkpoint file to balloon in size. Once setting it to a lower interval, my batch time decreased from 20+ seconds down to about 5.

100000

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.