10-15-2024 04:22 PM
Hi team,
Kinesis -> delta table raw -> job with trigger=availableNow -> delta table target.
The Kinesis->delta table raw is running continuously. The job is daily with trigger=availableNow.
The job reads from raw, do some transformation, and run a MERGE function in foreachUpdate.
When the job starts to run, it spends quite a long time on addBatch and lastestOffset. Like
[queryId = 3e0d2] [batchId = 290] Streaming query made progress: {
"id" : "3e0...92",
"runId" : "8c...6c",
"name" : null,
"timestamp" : "2024-10-15T18:45:28.123Z",
"batchId" : 290,
"batchDuration" : 1779275,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"addBatch" : 61397,
"commitOffsets" : 95,
"getBatch" : 548418,
"latestOffset" : 1168003,
"queryPlanning" : 803,
"triggerExecution" : 1779263,
"walCommit" : 367
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[s3://..]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "...",
"reservoirVersion" : 3207727,
"index" : -1,
"isStartingVersion" : false
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "...",
"reservoirVersion" : 3223576,
"index" : -1,
"isStartingVersion" : false
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "13445289",
"numFilesOutstanding" : "217"
}
} ],
"sink" : {
"description" : "ForeachBatchSink",
"numOutputRows" : -1
}
}
The MERGE seems very fast after getting data from raw. From the spark UI, there are a big gap there (around batchDuration time) seems waiting on the batches. Is the trigger supposed to get the version from checkpoint offset and read from all versions afterwards or something else? Why it is so slow?
Thanks
2 weeks ago
Your job is experiencing long durations for the addBatch
and latestOffset
phases, which suggests there are delays in discovering new data. Can you please try:
With availableNow, the job will read from the last known checkpoint offset and scan through all incremental changes. If your Delta table in the raw layer has high data volume or a large number of small files, this can slow down the latestOffset and addBatch phases, as it takes time to retrieve file metadata and determine what’s new. The numFilesOutstanding and numBytesOutstanding metrics indicate there are many small files waiting to be processed, which further contributes to the delay. The latestOffset phase involves checking the latest offsets across multiple files, which can introduce significant latency, particularly if there are many small files due to continuous ingestion from Kinesis. This latency is common in streaming sources that use file-based checkpoints and Delta tables, as it must calculate the cumulative offset of multiple files.
spark.databricks.delta.optimizeWrite.enabled
spark.databricks.delta.autoCompact.enabled
Ref.: https://docs.databricks.com/en/delta/table-properties.html
2 weeks ago
Thanks. The trigger=availableNow runs daily. Each trigger might see many versions (say, 18000 versions). It doesn't look like OPTIMIZE, VACUUM or other configs can help. It seems to be the bottleneck is the way delta table to apply each version. The time is sort of linear to version num.
2 weeks ago
When delta table tries to restore a version, it can find the closest checkpoint and apply versions up to the target. However, delta table needs to read and apply many versions to get changes between 2 versions. There are no compact file can be used.
What is the way to read many versions? Sequentially read every 1000 files (data is on S3)?
2 weeks ago
Agree.
TL;DR Control the Delta raw table updates frequency (batch incoming data), and/or reduce the log retention period. This is assuming you're currently on DBR 14.2+, where Delta Logs compaction is already enabled.
Only for clarity, I believe it'll be good to write down the process and pinpoint the bottleneck:
When a daily job with trigger=availableNow initiates to process a Delta table, the latestOffset calculation follows these steps:
Identify the Latest Checkpoint:
Determine the Starting Version:
List Subsequent Log Files:
Apply Changes from Log Files:
Identify Active Data Files:
Calculate latestOffset:
Bottleneck:
Even when utilizing the latest checkpoint and processing up to 100 versions (given the default delta.checkpointInterval of 100), the system may still need to list all files in the _delta_log directory to identify the latest checkpoint and subsequent JSON log files. This file listing operation can be time-consuming, especially in environments like AWS S3, where listing operations are relatively slow.
Rough calculation of file accumulation in the _delta_log directory, considering your estimate of 18k commits per day:
Daily Commits: With approximately 18,000 commits per day and a delta.checkpointInterval of 100, each day would generate:
Retention Period: With a log retention period of 7 days, the _delta_log directory would accumulate:
In total, this results in approximately ~126,000 files in the _delta_log directory over a 7-day period. Note: there may be even more (or less), as I'm not considering checkpoint sidecar files (checkpoint v2) or delta log minorCompaction if available.
"...It doesn't look like OPTIMIZE, VACUUM or other configs can help..."
OptimizeWrite, if your application implementation can leverage this optimization, then by generating fewer larger files, the system decreases the number of commit operations required. Each commit file corresponds to a json log entry; thus, fewer commits result in fewer log entries. Same applies for autoCompact, but we agree that these features are designed to optimize write operations and compact small files, and their effectiveness can be limited when dealing with isolated high-frequency, minimal-size updates.
Optimization Strategies:
Control Update Frequency:
Adjust Log Retention Period:
2 weeks ago
Appreciate for the input. Thanks.
We try to use delta table as a streaming sink, so we don't want to control the update frequency for the raw table and target to load it asap. The default checkpointinterval is actually 10. I tried to change it to bigger values but that didn't impact the perf too much.
I also tried many other configs, but nothing can impact the perf significantly. The duration is sort of linear to version num. The fact is even if I made it down to 40K files under _delta_log, it is still slow, and even if the readStream has a filter like where 1==2, viz. read nothing it is still slow.
From reading the log I can also see it tries to load version files from last trigger for schema incompatible check, which makes it worse.
I'm guessing this. v0, v1, ... v10, cp1, v11..., cp2, .... every 10 transactions with a checkpoint file can make it restore to any point easily, but if it needs to get changes between 2 versions, there is no compact files it can use. Is the delta table supposed to design something to compact incremental changes too instead of only creating a full snapshot in checkpoint?
Also I'm not sure whether there is file listing. From log I can only see `AWSCredentialProviderList:V3: Using credentials for bucket xxx
a week ago
a week ago
Thanks. I tracked there with log but cannot figure out which parts make the 18000 version apply slow. It is the same with CDF if I feed a big range to table_changes function. Any idea on this?
a week ago - last edited a week ago
I think it is in after knowing which is the latest checkpoint file, the process which has to list all version files to finally know what are the diff versions that it needs to read. I'm not good at reading this, but I believe the slow part is starting with createLogSegment, which calls listDeltaCompactedDeltaAndCheckpointFiles, then listDeltaCompactedDeltaCheckpointFilesAndLatestChecksumFile finally calls listFromFileSystemInternal which lists the files in the _delta_log directory and categorizes them as DeltaFile, CompactedDeltaFile, CheckpointFile, or ChecksumFile.
Notice "def listFromOrNone" calls "listFrom" which does have a startingVersion; but even with the prefix or other optimizations, many file systems (especially distributed ones like S3) don't allow partial listings based on complex version rules. The listing operation often retrieves all filenames, and only afterward does the code filter out the ones matching the criteria.
That's as much as I can tell or inferr, it'll be good to check from the storage standpoint, if there is a whole directory listing or not.
Edited: Actually we can also find this storage listing implementation in https://github.com/delta-io/delta/blob/master/storage/src/main/java/io/delta/storage/S3SingleDriverL..., if we specifically focus on https://github.com/delta-io/delta/blob/master/storage/src/main/java/io/delta/storage/S3SingleDriverL... you can see there's an improved listing which considers the startAfter, this is also discussed in https://github.com/delta-io/delta/issues/1191.
So it may be worth setting this property to "True" and see if that helps in your case. I haven't tried this, but I assume it should be possible via spark.conf.set("spark.hadoop.delta.enableFastS3AListFrom", "true"), or to be sure, via Cluster Spark configuration "spark.hadoop.delta.enableFastS3AListFrom true". Please monitor the driver metrics, I'm suspecting this may (or not) have some sort of CPU utilization impact, if that is the case, increasing the Driver instance type would be a good tradeoff anyways.
Monday - last edited Monday
@Brad nvm about my previous comment, after further research, I can tell this feature has been in Databricks for years using https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html, so the listing will not be the problem in your case. If possible I'd like to suggest opening a support ticket to look closer into your use case, spark logs and metrics.
Wednesday
@VZLA , thanks for the input and suggestion. Will create a support ticket.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group