<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: why latestOffset and getBatch takes so long time in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/why-latestoffset-and-getbatch-takes-so-long-time/m-p/97101#M39423</link>
    <description>&lt;P&gt;Your job is experiencing long durations for the &lt;CODE&gt;addBatch&lt;/CODE&gt; and &lt;CODE&gt;latestOffset&lt;/CODE&gt; phases, which suggests there are delays in discovering new data. Can you please try:&lt;/P&gt;
&lt;UL&gt;
&lt;LI&gt;Run OPTIMIZE on the raw Delta table to compact small files and improve metadata handling.&lt;/LI&gt;
&lt;LI&gt;Enable Delta Optimizations with optimizeWrite and autoCompact to maintain file sizes more effectively.&lt;/LI&gt;
&lt;LI&gt;Increase minBatchInterval if appropriate for your workload, to allow for larger, less frequent batches.&lt;/LI&gt;
&lt;/UL&gt;
&lt;P&gt;With &lt;STRONG&gt;availableNow&lt;/STRONG&gt;, the job will read from the last known checkpoint offset and scan through all incremental changes. If your Delta table in the raw layer has high data volume or a large number of small files, this can slow down the latestOffset and addBatch phases, as it takes time to retrieve file metadata and determine what’s new.&amp;nbsp;The &lt;STRONG&gt;numFilesOutstanding&lt;/STRONG&gt; and &lt;STRONG&gt;numBytesOutstanding&lt;/STRONG&gt; metrics indicate there are many small files waiting to be processed, which further contributes to the delay.&amp;nbsp;The &lt;STRONG&gt;latestOffset&lt;/STRONG&gt; phase involves checking the latest offsets across multiple files, which can introduce significant latency, particularly if there are many small files due to continuous ingestion from Kinesis. This latency is common in streaming sources that use file-based checkpoints and Delta tables, as it must calculate the cumulative offset of multiple files.&lt;/P&gt;
&lt;P&gt;spark.databricks.delta.optimizeWrite.enabled&lt;BR /&gt;spark.databricks.delta.autoCompact.enabled&lt;/P&gt;
&lt;P&gt;Ref.:&amp;nbsp;&lt;A href="https://docs.databricks.com/en/delta/table-properties.html" target="_blank"&gt;https://docs.databricks.com/en/delta/table-properties.html&lt;/A&gt;&lt;/P&gt;</description>
    <pubDate>Thu, 31 Oct 2024 18:27:48 GMT</pubDate>
    <dc:creator>VZLA</dc:creator>
    <dc:date>2024-10-31T18:27:48Z</dc:date>
    <item>
      <title>why latestOffset and getBatch takes so long time</title>
      <link>https://community.databricks.com/t5/data-engineering/why-latestoffset-and-getbatch-takes-so-long-time/m-p/94216#M38839</link>
      <description>&lt;P&gt;Hi team,&lt;/P&gt;&lt;P&gt;Kinesis -&amp;gt; delta table raw -&amp;gt; job with trigger=availableNow -&amp;gt; delta table target.&amp;nbsp;&lt;/P&gt;&lt;P&gt;The&amp;nbsp;Kinesis-&amp;gt;delta table raw is running continuously. The job is daily with&amp;nbsp;trigger=availableNow.&amp;nbsp;&lt;/P&gt;&lt;P&gt;The job reads from raw, do some transformation, and run a MERGE function in foreachUpdate.&lt;/P&gt;&lt;P&gt;When the job starts to run, it spends quite a long time on addBatch and lastestOffset. Like&lt;/P&gt;&lt;LI-CODE lang="markup"&gt; [queryId = 3e0d2] [batchId = 290] Streaming query made progress: {
  "id" : "3e0...92",
  "runId" : "8c...6c",
  "name" : null,
  "timestamp" : "2024-10-15T18:45:28.123Z",
  "batchId" : 290,
  "batchDuration" : 1779275,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "addBatch" : 61397,
    "commitOffsets" : 95,
    "getBatch" : 548418,
    "latestOffset" : 1168003,
    "queryPlanning" : 803,
    "triggerExecution" : 1779263,
    "walCommit" : 367
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[s3://..]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "...",
      "reservoirVersion" : 3207727,
      "index" : -1,
      "isStartingVersion" : false
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "...",
      "reservoirVersion" : 3223576,
      "index" : -1,
      "isStartingVersion" : false
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "13445289",
      "numFilesOutstanding" : "217"
    }
  } ],
  "sink" : {
    "description" : "ForeachBatchSink",
    "numOutputRows" : -1
  }
}&lt;/LI-CODE&gt;&lt;P&gt;The MERGE seems very fast after getting data from raw. From the spark UI, there are a big gap there (around batchDuration time) seems waiting on the batches. Is the trigger supposed to get the version from checkpoint offset and read from all versions afterwards or something else? Why it is so slow?&lt;/P&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="Brad_0-1729034240965.png" style="width: 400px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/11972i09507094AA0B58C7/image-size/medium?v=v2&amp;amp;px=400" role="button" title="Brad_0-1729034240965.png" alt="Brad_0-1729034240965.png" /&gt;&lt;/span&gt;&lt;/P&gt;&lt;P&gt;Thanks&lt;/P&gt;</description>
      <pubDate>Tue, 15 Oct 2024 23:22:07 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/why-latestoffset-and-getbatch-takes-so-long-time/m-p/94216#M38839</guid>
      <dc:creator>MikeGo</dc:creator>
      <dc:date>2024-10-15T23:22:07Z</dc:date>
    </item>
    <item>
      <title>Re: why latestOffset and getBatch takes so long time</title>
      <link>https://community.databricks.com/t5/data-engineering/why-latestoffset-and-getbatch-takes-so-long-time/m-p/97101#M39423</link>
      <description>&lt;P&gt;Your job is experiencing long durations for the &lt;CODE&gt;addBatch&lt;/CODE&gt; and &lt;CODE&gt;latestOffset&lt;/CODE&gt; phases, which suggests there are delays in discovering new data. Can you please try:&lt;/P&gt;
&lt;UL&gt;
&lt;LI&gt;Run OPTIMIZE on the raw Delta table to compact small files and improve metadata handling.&lt;/LI&gt;
&lt;LI&gt;Enable Delta Optimizations with optimizeWrite and autoCompact to maintain file sizes more effectively.&lt;/LI&gt;
&lt;LI&gt;Increase minBatchInterval if appropriate for your workload, to allow for larger, less frequent batches.&lt;/LI&gt;
&lt;/UL&gt;
&lt;P&gt;With &lt;STRONG&gt;availableNow&lt;/STRONG&gt;, the job will read from the last known checkpoint offset and scan through all incremental changes. If your Delta table in the raw layer has high data volume or a large number of small files, this can slow down the latestOffset and addBatch phases, as it takes time to retrieve file metadata and determine what’s new.&amp;nbsp;The &lt;STRONG&gt;numFilesOutstanding&lt;/STRONG&gt; and &lt;STRONG&gt;numBytesOutstanding&lt;/STRONG&gt; metrics indicate there are many small files waiting to be processed, which further contributes to the delay.&amp;nbsp;The &lt;STRONG&gt;latestOffset&lt;/STRONG&gt; phase involves checking the latest offsets across multiple files, which can introduce significant latency, particularly if there are many small files due to continuous ingestion from Kinesis. This latency is common in streaming sources that use file-based checkpoints and Delta tables, as it must calculate the cumulative offset of multiple files.&lt;/P&gt;
&lt;P&gt;spark.databricks.delta.optimizeWrite.enabled&lt;BR /&gt;spark.databricks.delta.autoCompact.enabled&lt;/P&gt;
&lt;P&gt;Ref.:&amp;nbsp;&lt;A href="https://docs.databricks.com/en/delta/table-properties.html" target="_blank"&gt;https://docs.databricks.com/en/delta/table-properties.html&lt;/A&gt;&lt;/P&gt;</description>
      <pubDate>Thu, 31 Oct 2024 18:27:48 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/why-latestoffset-and-getbatch-takes-so-long-time/m-p/97101#M39423</guid>
      <dc:creator>VZLA</dc:creator>
      <dc:date>2024-10-31T18:27:48Z</dc:date>
    </item>
    <item>
      <title>Re: why latestOffset and getBatch takes so long time</title>
      <link>https://community.databricks.com/t5/data-engineering/why-latestoffset-and-getbatch-takes-so-long-time/m-p/98022#M39586</link>
      <description>&lt;P&gt;Thanks. The&amp;nbsp;&lt;SPAN&gt;trigger=availableNow runs daily. Each trigger might see many versions (say, 18000 versions). It doesn't look like OPTIMIZE, VACUUM or other configs can help. It seems to be&amp;nbsp;the bottleneck is the way delta table to apply each version. The time is sort of linear to version num.&lt;/SPAN&gt;&lt;/P&gt;</description>
      <pubDate>Wed, 06 Nov 2024 22:51:31 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/why-latestoffset-and-getbatch-takes-so-long-time/m-p/98022#M39586</guid>
      <dc:creator>MikeGo</dc:creator>
      <dc:date>2024-11-06T22:51:31Z</dc:date>
    </item>
    <item>
      <title>Re: why latestOffset and getBatch takes so long time</title>
      <link>https://community.databricks.com/t5/data-engineering/why-latestoffset-and-getbatch-takes-so-long-time/m-p/98023#M39587</link>
      <description>&lt;P&gt;When delta table tries to restore a version, it can find the closest checkpoint and apply versions up to the target. However, delta table needs to read and apply many versions to get changes between 2 versions. There are no compact file can be used.&amp;nbsp;&lt;BR /&gt;What is the way to read many versions? Sequentially read every 1000 files (data is on S3)?&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Wed, 06 Nov 2024 22:55:52 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/why-latestoffset-and-getbatch-takes-so-long-time/m-p/98023#M39587</guid>
      <dc:creator>MikeGo</dc:creator>
      <dc:date>2024-11-06T22:55:52Z</dc:date>
    </item>
    <item>
      <title>Re: why latestOffset and getBatch takes so long time</title>
      <link>https://community.databricks.com/t5/data-engineering/why-latestoffset-and-getbatch-takes-so-long-time/m-p/98067#M39595</link>
      <description>&lt;P&gt;Agree.&lt;/P&gt;
&lt;P&gt;&lt;STRONG&gt;TL;DR&amp;nbsp;&lt;/STRONG&gt;Control the Delta raw table updates frequency (batch incoming data), and/or reduce the log retention period. This is assuming you're currently on DBR 14.2+, where&amp;nbsp;&lt;A href="https://docs.delta.io/latest/optimizations-oss.html#log-compactions" target="_self"&gt;Delta Logs compaction&lt;/A&gt; is already enabled.&lt;/P&gt;
&lt;P&gt;Only for clarity, I believe it'll be good to write down the process and pinpoint the bottleneck:&lt;/P&gt;
&lt;P&gt;When a daily job with &lt;STRONG&gt;trigger=availableNow&lt;/STRONG&gt; initiates to process a Delta table, the &lt;STRONG&gt;latestOffset&lt;/STRONG&gt; calculation follows these steps:&lt;/P&gt;
&lt;OL&gt;
&lt;LI&gt;
&lt;P&gt;&lt;STRONG&gt;Identify the Latest Checkpoint:&lt;/STRONG&gt;&lt;/P&gt;
&lt;UL&gt;
&lt;LI&gt;The system locates the most recent checkpoint file in the &lt;STRONG&gt;_delta_log&lt;/STRONG&gt; directory. Checkpoints are Parquet files that capture the table's state at specific versions, enabling efficient state reconstruction.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;P&gt;&lt;STRONG&gt;Determine the Starting Version:&lt;/STRONG&gt;&lt;/P&gt;
&lt;UL&gt;
&lt;LI&gt;Based on the latest checkpoint, the system identifies the starting version for processing. If the checkpoint corresponds to version 1000, the starting version would be 1001.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;P&gt;&lt;STRONG&gt;List Subsequent Log Files:&lt;/STRONG&gt;&lt;/P&gt;
&lt;UL&gt;
&lt;LI&gt;The system lists all JSON log files from the starting version up to the current latest version. Each JSON file represents a commit that records changes to the table.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;P&gt;&lt;STRONG&gt;Apply Changes from Log Files:&lt;/STRONG&gt;&lt;/P&gt;
&lt;UL&gt;
&lt;LI&gt;The system sequentially applies the changes recorded in each JSON log file to reconstruct the table's current state. This process includes adding or removing data files as specified in the logs.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;P&gt;&lt;STRONG&gt;Identify Active Data Files:&lt;/STRONG&gt;&lt;/P&gt;
&lt;UL&gt;
&lt;LI&gt;After processing the log files, the system identifies the set of active data files that constitute the latest state of the table.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;P&gt;&lt;STRONG&gt;Calculate latestOffset:&lt;/STRONG&gt;&lt;/P&gt;
&lt;UL&gt;
&lt;LI&gt;The system determines the &lt;STRONG&gt;latestOffset&lt;/STRONG&gt; by identifying the most recent data available for processing, based on the active data files and the latest version processed.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/LI&gt;
&lt;/OL&gt;
&lt;P&gt;&lt;STRONG&gt;Bottleneck:&lt;/STRONG&gt;&lt;/P&gt;
&lt;P&gt;Even when utilizing the latest checkpoint and processing up to 100 versions (given the default &lt;STRONG&gt;delta.checkpointInterval&lt;/STRONG&gt; of 100), the system may still need to list all files in the &lt;STRONG&gt;_delta_log&lt;/STRONG&gt; directory to identify the latest checkpoint and subsequent JSON log files. This file listing operation can be time-consuming, especially in environments like AWS S3, where listing operations are relatively slow.&lt;/P&gt;
&lt;P&gt;&lt;STRONG&gt;Rough calculation of file accumulation in the _delta_log directory, considering your estimate of 18k commits per day:&lt;/STRONG&gt;&lt;/P&gt;
&lt;UL&gt;
&lt;LI&gt;
&lt;P&gt;&lt;STRONG&gt;Daily Commits:&lt;/STRONG&gt; With approximately 18,000 commits per day and a &lt;STRONG&gt;delta.checkpointInterval&lt;/STRONG&gt; of 100, each day would generate:&lt;/P&gt;
&lt;UL&gt;
&lt;LI&gt;&lt;STRONG&gt;Checkpoint Files:&lt;/STRONG&gt; 18,000 commits / 100 = &lt;STRONG&gt;180 checkpoint files&lt;/STRONG&gt;&lt;/LI&gt;
&lt;LI&gt;&lt;STRONG&gt;JSON Log Files:&lt;/STRONG&gt; 18,000 commits - 180 checkpoints = &lt;STRONG&gt;17,820 JSON log files&lt;/STRONG&gt;&lt;/LI&gt;
&lt;/UL&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;P&gt;&lt;STRONG&gt;Retention Period:&lt;/STRONG&gt; With a log retention period of 7 days, the &lt;STRONG&gt;_delta_log&lt;/STRONG&gt; directory would accumulate:&lt;/P&gt;
&lt;UL&gt;
&lt;LI&gt;&lt;STRONG&gt;Checkpoint Files:&lt;/STRONG&gt; 180 checkpoints/day * 7 days = &lt;STRONG&gt;1,260 checkpoint files&lt;/STRONG&gt;&lt;/LI&gt;
&lt;LI&gt;&lt;STRONG&gt;JSON Log Files:&lt;/STRONG&gt; 17,820 JSON logs/day * 7 days = &lt;STRONG&gt;124,740 JSON log files&lt;/STRONG&gt;&lt;/LI&gt;
&lt;/UL&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;P&gt;In total, this results in approximately ~&lt;STRONG&gt;126,000 files&lt;/STRONG&gt; in the &lt;STRONG&gt;_delta_log&lt;/STRONG&gt; directory over a 7-day period. &lt;STRONG&gt;Note:&amp;nbsp;&lt;/STRONG&gt;there may be even more (or less), as I'm not considering checkpoint sidecar files (checkpoint v2) or delta log minorCompaction if available.&lt;/P&gt;
&lt;P&gt;&lt;STRONG&gt;"...It doesn't look like OPTIMIZE, VACUUM or other configs can help..."&lt;/STRONG&gt;&lt;/P&gt;
&lt;P&gt;&lt;STRONG&gt;&lt;SPAN&gt;OptimizeWrite,&amp;nbsp;&lt;/SPAN&gt;&lt;/STRONG&gt;&lt;SPAN&gt;if your application implementation can leverage this optimization, then by generating fewer larger files, the system decreases the number of commit operations required. Each commit file corresponds to a json log entry; thus, fewer commits result in fewer log entries. Same applies for &lt;STRONG&gt;autoCompact,&amp;nbsp;&lt;/STRONG&gt;but we agree that these features are designed to optimize write operations and compact small files, and their effectiveness can be limited when dealing with isolated&amp;nbsp;&lt;STRONG&gt;high-frequency, minimal-size updates&lt;/STRONG&gt;.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;STRONG&gt;Optimization Strategies:&lt;/STRONG&gt;&lt;/P&gt;
&lt;OL&gt;
&lt;LI&gt;
&lt;P&gt;&lt;STRONG&gt;Control Update Frequency:&lt;/STRONG&gt;&lt;/P&gt;
&lt;UL&gt;
&lt;LI&gt;Batch incoming data to reduce the frequency of updates to the Delta raw table. This approach decreases the number of commits and, consequently, the number of JSON log files generated.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;P&gt;&lt;STRONG&gt;Adjust Log Retention Period:&lt;/STRONG&gt;&lt;/P&gt;
&lt;UL&gt;
&lt;LI&gt;Reduce the log retention period to limit the accumulation of log files. This change ensures that older log files are purged more frequently, reducing the total number of files in the &lt;STRONG&gt;_delta_log&lt;/STRONG&gt; directory.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/LI&gt;
&lt;LI&gt;&lt;STRONG&gt;Log Compaction:&amp;nbsp;&lt;/STRONG&gt;Make sure&amp;nbsp;&lt;STRONG&gt;deltaLog.minorCompaction.useForReads&amp;nbsp;&lt;/STRONG&gt;is set to true. &lt;STRONG&gt;Note:&amp;nbsp;&lt;/STRONG&gt;If I'm not wrong, this is only available starting from DBR 14.2+ and recent versions, where it indeed defaults to &lt;STRONG&gt;True&lt;/STRONG&gt;.&amp;nbsp;&lt;/LI&gt;
&lt;/OL&gt;</description>
      <pubDate>Thu, 07 Nov 2024 10:55:07 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/why-latestoffset-and-getbatch-takes-so-long-time/m-p/98067#M39595</guid>
      <dc:creator>VZLA</dc:creator>
      <dc:date>2024-11-07T10:55:07Z</dc:date>
    </item>
    <item>
      <title>Re: why latestOffset and getBatch takes so long time</title>
      <link>https://community.databricks.com/t5/data-engineering/why-latestoffset-and-getbatch-takes-so-long-time/m-p/98125#M39616</link>
      <description>&lt;P&gt;Appreciate for the input. Thanks.&lt;/P&gt;&lt;P&gt;We try to use delta table as a streaming sink, so we don't want to control the update frequency for the raw table and target to load it asap. The default checkpointinterval is actually 10. I tried to change it to bigger values but that didn't impact the perf too much.&amp;nbsp;&lt;/P&gt;&lt;P&gt;I also tried many other configs, but nothing can impact the perf significantly. The duration is sort of linear to version num. The fact is even if I made it down to 40K files under _delta_log, it is still slow, and even if the readStream has a filter like where 1==2, viz. read nothing it is still slow.&amp;nbsp;&lt;/P&gt;&lt;P&gt;From reading the log I can also see it tries to load version files from last trigger for schema incompatible check, which makes it worse.&lt;/P&gt;&lt;P&gt;I'm guessing this. v0, v1, ... v10, cp1, v11..., cp2, .... every 10 transactions with a checkpoint file can make it restore to any point easily, but if it needs to get changes between 2 versions, there is no compact files it can use. Is the delta table supposed to design something to compact incremental changes too instead of only creating a full snapshot in checkpoint?&lt;BR /&gt;Also I'm not sure whether there is file listing. From log I can only see `&lt;SPAN&gt;AWSCredentialProviderList:V3: Using credentials for bucket xxx&lt;/SPAN&gt;&lt;/P&gt;&lt;DIV&gt;&lt;DIV&gt;from com.databricks.backend.daemon.driver.credentials.CredentialScopeS3TokenProvider&lt;SPAN&gt;`. I understand the steps to find closest checkpoint, apply changes, schema check etc., but can you please share how the 18000 version files are processed to apply the changes? Does it really need file listing? E.g. from&amp;nbsp;&lt;A href="https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala#L451," target="_blank"&gt;https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala#L451,&lt;/A&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;```&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;val DELTA_HISTORY_PAR_SEARCH_THRESHOLD =&lt;BR /&gt;buildConf("history.maxKeysPerList")&lt;BR /&gt;.internal()&lt;BR /&gt;.doc("How many commits to list when performing a parallel search. Currently set to 1000, " +&lt;BR /&gt;"which is the maximum keys returned by S3 per list call. Azure can return 5000, " +&lt;BR /&gt;"therefore we choose 1000.")&lt;BR /&gt;.intConf&lt;BR /&gt;.createWithDefault(1000)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;```&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;Is there any bottleneck in this process? (as mentioned, I tried to make _delta_log has around 40K files, but still slow).&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;Thanks.&lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;</description>
      <pubDate>Thu, 07 Nov 2024 18:12:49 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/why-latestoffset-and-getbatch-takes-so-long-time/m-p/98125#M39616</guid>
      <dc:creator>MikeGo</dc:creator>
      <dc:date>2024-11-07T18:12:49Z</dc:date>
    </item>
    <item>
      <title>Re: why latestOffset and getBatch takes so long time</title>
      <link>https://community.databricks.com/t5/data-engineering/why-latestoffset-and-getbatch-takes-so-long-time/m-p/98205#M39647</link>
      <description>&lt;P&gt;The process is explained in&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&lt;A href="https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala#L410," target="_blank"&gt;https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala#L410&lt;/A&gt;&lt;/P&gt;
&lt;P&gt;and&lt;/P&gt;
&lt;P&gt;&lt;A href="https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala#L396" target="_blank"&gt;https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala#L396&lt;/A&gt;&lt;/P&gt;</description>
      <pubDate>Fri, 08 Nov 2024 19:44:18 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/why-latestoffset-and-getbatch-takes-so-long-time/m-p/98205#M39647</guid>
      <dc:creator>VZLA</dc:creator>
      <dc:date>2024-11-08T19:44:18Z</dc:date>
    </item>
    <item>
      <title>Re: why latestOffset and getBatch takes so long time</title>
      <link>https://community.databricks.com/t5/data-engineering/why-latestoffset-and-getbatch-takes-so-long-time/m-p/98207#M39648</link>
      <description>&lt;P&gt;Thanks. I tracked there with log but cannot figure out which parts make the 18000 version apply slow. It is the same with CDF if I feed a big range to table_changes function. Any idea on this?&lt;/P&gt;</description>
      <pubDate>Fri, 08 Nov 2024 19:47:08 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/why-latestoffset-and-getbatch-takes-so-long-time/m-p/98207#M39648</guid>
      <dc:creator>MikeGo</dc:creator>
      <dc:date>2024-11-08T19:47:08Z</dc:date>
    </item>
    <item>
      <title>Re: why latestOffset and getBatch takes so long time</title>
      <link>https://community.databricks.com/t5/data-engineering/why-latestoffset-and-getbatch-takes-so-long-time/m-p/98269#M39667</link>
      <description>&lt;P&gt;I think it is in after knowing which is the latest checkpoint file, the process which has to &lt;STRONG&gt;list&lt;/STRONG&gt; all version files to finally know what are the diff versions that it needs to read. I'm not good at reading this, but I believe the slow part is starting with&amp;nbsp;createLogSegment, which calls&amp;nbsp;listDeltaCompactedDeltaAndCheckpointFiles, then&amp;nbsp;listDeltaCompactedDeltaCheckpointFilesAndLatestChecksumFile finally calls&amp;nbsp;listFromFileSystemInternal which lists the files in the _delta_log directory and categorizes them as DeltaFile, CompactedDeltaFile, CheckpointFile, or ChecksumFile.&lt;/P&gt;
&lt;P&gt;Notice "def listFromOrNone" calls "listFrom" which does have a startingVersion; but even with the prefix or other optimizations, many file systems (especially distributed ones like S3) don't allow partial listings based on complex version rules. The listing operation often retrieves all filenames, and only afterward does the code filter out the ones matching the criteria.&lt;/P&gt;
&lt;P&gt;&lt;STRIKE&gt;That's as much as I can tell or inferr, it'll be good to check from the storage standpoint, if there is a whole directory listing or not.&lt;/STRIKE&gt;&lt;/P&gt;
&lt;P&gt;Edited: Actually we can also find this storage listing implementation in &lt;A href="https://github.com/delta-io/delta/blob/master/storage/src/main/java/io/delta/storage/S3SingleDriverLogStore.java" target="_blank"&gt;https://github.com/delta-io/delta/blob/master/storage/src/main/java/io/delta/storage/S3SingleDriverLogStore.java&lt;/A&gt;, if we specifically focus on &lt;A href="https://github.com/delta-io/delta/blob/master/storage/src/main/java/io/delta/storage/S3SingleDriverLogStore.java#L53-L64," target="_blank"&gt;https://github.com/delta-io/delta/blob/master/storage/src/main/java/io/delta/storage/S3SingleDriverLogStore.java#L53-L64,&lt;/A&gt;&amp;nbsp;you can see there's an improved listing which considers the startAfter, this is also discussed in &lt;A href="https://github.com/delta-io/delta/issues/1191" target="_blank"&gt;https://github.com/delta-io/delta/issues/1191&lt;/A&gt;.&lt;/P&gt;
&lt;P&gt;So it may be worth setting this property to "True" and see if that helps in your case. I haven't tried this, but I assume it should be possible via&amp;nbsp;spark.conf.set("spark.hadoop.delta.enableFastS3AListFrom", "true"), or to be sure, via Cluster Spark configuration "spark.hadoop.delta.enableFastS3AListFrom true". Please monitor the driver metrics, I'm suspecting this may (or not) have some sort of CPU utilization impact, if that is the case, increasing the Driver instance type would be a good tradeoff anyways.&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Sat, 09 Nov 2024 10:16:49 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/why-latestoffset-and-getbatch-takes-so-long-time/m-p/98269#M39667</guid>
      <dc:creator>VZLA</dc:creator>
      <dc:date>2024-11-09T10:16:49Z</dc:date>
    </item>
    <item>
      <title>Re: why latestOffset and getBatch takes so long time</title>
      <link>https://community.databricks.com/t5/data-engineering/why-latestoffset-and-getbatch-takes-so-long-time/m-p/98353#M39705</link>
      <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/100643"&gt;@MikeGo&lt;/a&gt;&amp;nbsp;nvm about my previous comment, after further research, I can tell this feature has been in Databricks for years using &lt;A href="https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html" target="_blank"&gt;https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html&lt;/A&gt;, so the listing will not be the problem in your case. If possible I'd like to suggest opening a support ticket to look closer into your use case, spark logs and metrics.&lt;/P&gt;</description>
      <pubDate>Mon, 11 Nov 2024 16:58:55 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/why-latestoffset-and-getbatch-takes-so-long-time/m-p/98353#M39705</guid>
      <dc:creator>VZLA</dc:creator>
      <dc:date>2024-11-11T16:58:55Z</dc:date>
    </item>
    <item>
      <title>Re: why latestOffset and getBatch takes so long time</title>
      <link>https://community.databricks.com/t5/data-engineering/why-latestoffset-and-getbatch-takes-so-long-time/m-p/98723#M39818</link>
      <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/34618"&gt;@VZLA&lt;/a&gt;&amp;nbsp;, thanks for the input and suggestion. Will create a support ticket.&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Wed, 13 Nov 2024 22:58:25 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/why-latestoffset-and-getbatch-takes-so-long-time/m-p/98723#M39818</guid>
      <dc:creator>MikeGo</dc:creator>
      <dc:date>2024-11-13T22:58:25Z</dc:date>
    </item>
  </channel>
</rss>

