cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

why latestOffset and getBatch takes so long time

Brad
Contributor II

Hi team,

Kinesis -> delta table raw -> job with trigger=availableNow -> delta table target. 

The Kinesis->delta table raw is running continuously. The job is daily with trigger=availableNow. 

The job reads from raw, do some transformation, and run a MERGE function in foreachUpdate.

When the job starts to run, it spends quite a long time on addBatch and lastestOffset. Like

 [queryId = 3e0d2] [batchId = 290] Streaming query made progress: {
  "id" : "3e0...92",
  "runId" : "8c...6c",
  "name" : null,
  "timestamp" : "2024-10-15T18:45:28.123Z",
  "batchId" : 290,
  "batchDuration" : 1779275,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "addBatch" : 61397,
    "commitOffsets" : 95,
    "getBatch" : 548418,
    "latestOffset" : 1168003,
    "queryPlanning" : 803,
    "triggerExecution" : 1779263,
    "walCommit" : 367
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[s3://..]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "...",
      "reservoirVersion" : 3207727,
      "index" : -1,
      "isStartingVersion" : false
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "...",
      "reservoirVersion" : 3223576,
      "index" : -1,
      "isStartingVersion" : false
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "13445289",
      "numFilesOutstanding" : "217"
    }
  } ],
  "sink" : {
    "description" : "ForeachBatchSink",
    "numOutputRows" : -1
  }
}

The MERGE seems very fast after getting data from raw. From the spark UI, there are a big gap there (around batchDuration time) seems waiting on the batches. Is the trigger supposed to get the version from checkpoint offset and read from all versions afterwards or something else? Why it is so slow?

Brad_0-1729034240965.png

Thanks

10 REPLIES 10

VZLA
Databricks Employee
Databricks Employee

Your job is experiencing long durations for the addBatch and latestOffset phases, which suggests there are delays in discovering new data. Can you please try:

  • Run OPTIMIZE on the raw Delta table to compact small files and improve metadata handling.
  • Enable Delta Optimizations with optimizeWrite and autoCompact to maintain file sizes more effectively.
  • Increase minBatchInterval if appropriate for your workload, to allow for larger, less frequent batches.

With availableNow, the job will read from the last known checkpoint offset and scan through all incremental changes. If your Delta table in the raw layer has high data volume or a large number of small files, this can slow down the latestOffset and addBatch phases, as it takes time to retrieve file metadata and determine whatโ€™s new. The numFilesOutstanding and numBytesOutstanding metrics indicate there are many small files waiting to be processed, which further contributes to the delay. The latestOffset phase involves checking the latest offsets across multiple files, which can introduce significant latency, particularly if there are many small files due to continuous ingestion from Kinesis. This latency is common in streaming sources that use file-based checkpoints and Delta tables, as it must calculate the cumulative offset of multiple files.

spark.databricks.delta.optimizeWrite.enabled
spark.databricks.delta.autoCompact.enabled

Ref.: https://docs.databricks.com/en/delta/table-properties.html

Brad
Contributor II

Thanks. The trigger=availableNow runs daily. Each trigger might see many versions (say, 18000 versions). It doesn't look like OPTIMIZE, VACUUM or other configs can help. It seems to be the bottleneck is the way delta table to apply each version. The time is sort of linear to version num.

Brad
Contributor II

When delta table tries to restore a version, it can find the closest checkpoint and apply versions up to the target. However, delta table needs to read and apply many versions to get changes between 2 versions. There are no compact file can be used. 
What is the way to read many versions? Sequentially read every 1000 files (data is on S3)? 

VZLA
Databricks Employee
Databricks Employee

Agree.

TL;DR Control the Delta raw table updates frequency (batch incoming data), and/or reduce the log retention period. This is assuming you're currently on DBR 14.2+, where Delta Logs compaction is already enabled.

Only for clarity, I believe it'll be good to write down the process and pinpoint the bottleneck:

When a daily job with trigger=availableNow initiates to process a Delta table, the latestOffset calculation follows these steps:

  1. Identify the Latest Checkpoint:

    • The system locates the most recent checkpoint file in the _delta_log directory. Checkpoints are Parquet files that capture the table's state at specific versions, enabling efficient state reconstruction.
  2. Determine the Starting Version:

    • Based on the latest checkpoint, the system identifies the starting version for processing. If the checkpoint corresponds to version 1000, the starting version would be 1001.
  3. List Subsequent Log Files:

    • The system lists all JSON log files from the starting version up to the current latest version. Each JSON file represents a commit that records changes to the table.
  4. Apply Changes from Log Files:

    • The system sequentially applies the changes recorded in each JSON log file to reconstruct the table's current state. This process includes adding or removing data files as specified in the logs.
  5. Identify Active Data Files:

    • After processing the log files, the system identifies the set of active data files that constitute the latest state of the table.
  6. Calculate latestOffset:

    • The system determines the latestOffset by identifying the most recent data available for processing, based on the active data files and the latest version processed.

Bottleneck:

Even when utilizing the latest checkpoint and processing up to 100 versions (given the default delta.checkpointInterval of 100), the system may still need to list all files in the _delta_log directory to identify the latest checkpoint and subsequent JSON log files. This file listing operation can be time-consuming, especially in environments like AWS S3, where listing operations are relatively slow.

Rough calculation of file accumulation in the _delta_log directory, considering your estimate of 18k commits per day:

  • Daily Commits: With approximately 18,000 commits per day and a delta.checkpointInterval of 100, each day would generate:

    • Checkpoint Files: 18,000 commits / 100 = 180 checkpoint files
    • JSON Log Files: 18,000 commits - 180 checkpoints = 17,820 JSON log files
  • Retention Period: With a log retention period of 7 days, the _delta_log directory would accumulate:

    • Checkpoint Files: 180 checkpoints/day * 7 days = 1,260 checkpoint files
    • JSON Log Files: 17,820 JSON logs/day * 7 days = 124,740 JSON log files

In total, this results in approximately ~126,000 files in the _delta_log directory over a 7-day period. Note: there may be even more (or less), as I'm not considering checkpoint sidecar files (checkpoint v2) or delta log minorCompaction if available.

"...It doesn't look like OPTIMIZE, VACUUM or other configs can help..."

OptimizeWrite, if your application implementation can leverage this optimization, then by generating fewer larger files, the system decreases the number of commit operations required. Each commit file corresponds to a json log entry; thus, fewer commits result in fewer log entries. Same applies for autoCompact, but we agree that these features are designed to optimize write operations and compact small files, and their effectiveness can be limited when dealing with isolated high-frequency, minimal-size updates.

Optimization Strategies:

  1. Control Update Frequency:

    • Batch incoming data to reduce the frequency of updates to the Delta raw table. This approach decreases the number of commits and, consequently, the number of JSON log files generated.
  2. Adjust Log Retention Period:

    • Reduce the log retention period to limit the accumulation of log files. This change ensures that older log files are purged more frequently, reducing the total number of files in the _delta_log directory.
  3. Log Compaction: Make sure deltaLog.minorCompaction.useForReads is set to true. Note: If I'm not wrong, this is only available starting from DBR 14.2+ and recent versions, where it indeed defaults to True

Brad
Contributor II

Appreciate for the input. Thanks.

We try to use delta table as a streaming sink, so we don't want to control the update frequency for the raw table and target to load it asap. The default checkpointinterval is actually 10. I tried to change it to bigger values but that didn't impact the perf too much. 

I also tried many other configs, but nothing can impact the perf significantly. The duration is sort of linear to version num. The fact is even if I made it down to 40K files under _delta_log, it is still slow, and even if the readStream has a filter like where 1==2, viz. read nothing it is still slow. 

From reading the log I can also see it tries to load version files from last trigger for schema incompatible check, which makes it worse.

I'm guessing this. v0, v1, ... v10, cp1, v11..., cp2, .... every 10 transactions with a checkpoint file can make it restore to any point easily, but if it needs to get changes between 2 versions, there is no compact files it can use. Is the delta table supposed to design something to compact incremental changes too instead of only creating a full snapshot in checkpoint?
Also I'm not sure whether there is file listing. From log I can only see `AWSCredentialProviderList:V3: Using credentials for bucket xxx

from com.databricks.backend.daemon.driver.credentials.CredentialScopeS3TokenProvider`. I understand the steps to find closest checkpoint, apply changes, schema check etc., but can you please share how the 18000 version files are processed to apply the changes? Does it really need file listing? E.g. from https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/source... 
```
val DELTA_HISTORY_PAR_SEARCH_THRESHOLD =
buildConf("history.maxKeysPerList")
.internal()
.doc("How many commits to list when performing a parallel search. Currently set to 1000, " +
"which is the maximum keys returned by S3 per list call. Azure can return 5000, " +
"therefore we choose 1000.")
.intConf
.createWithDefault(1000)
```
Is there any bottleneck in this process? (as mentioned, I tried to make _delta_log has around 40K files, but still slow).
 
Thanks.

VZLA
Databricks Employee
Databricks Employee

Brad
Contributor II

Thanks. I tracked there with log but cannot figure out which parts make the 18000 version apply slow. It is the same with CDF if I feed a big range to table_changes function. Any idea on this?

VZLA
Databricks Employee
Databricks Employee

I think it is in after knowing which is the latest checkpoint file, the process which has to list all version files to finally know what are the diff versions that it needs to read. I'm not good at reading this, but I believe the slow part is starting with createLogSegment, which calls listDeltaCompactedDeltaAndCheckpointFiles, then listDeltaCompactedDeltaCheckpointFilesAndLatestChecksumFile finally calls listFromFileSystemInternal which lists the files in the _delta_log directory and categorizes them as DeltaFile, CompactedDeltaFile, CheckpointFile, or ChecksumFile.

Notice "def listFromOrNone" calls "listFrom" which does have a startingVersion; but even with the prefix or other optimizations, many file systems (especially distributed ones like S3) don't allow partial listings based on complex version rules. The listing operation often retrieves all filenames, and only afterward does the code filter out the ones matching the criteria.

That's as much as I can tell or inferr, it'll be good to check from the storage standpoint, if there is a whole directory listing or not.

Edited: Actually we can also find this storage listing implementation in https://github.com/delta-io/delta/blob/master/storage/src/main/java/io/delta/storage/S3SingleDriverL..., if we specifically focus on https://github.com/delta-io/delta/blob/master/storage/src/main/java/io/delta/storage/S3SingleDriverL... you can see there's an improved listing which considers the startAfter, this is also discussed in https://github.com/delta-io/delta/issues/1191.

So it may be worth setting this property to "True" and see if that helps in your case. I haven't tried this, but I assume it should be possible via spark.conf.set("spark.hadoop.delta.enableFastS3AListFrom", "true"), or to be sure, via Cluster Spark configuration "spark.hadoop.delta.enableFastS3AListFrom true". Please monitor the driver metrics, I'm suspecting this may (or not) have some sort of CPU utilization impact, if that is the case, increasing the Driver instance type would be a good tradeoff anyways.

 

VZLA
Databricks Employee
Databricks Employee

@Brad nvm about my previous comment, after further research, I can tell this feature has been in Databricks for years using https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html, so the listing will not be the problem in your case. If possible I'd like to suggest opening a support ticket to look closer into your use case, spark logs and metrics.

Brad
Contributor II

@VZLA , thanks for the input and suggestion. Will create a support ticket. 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group