<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Spark structured streaming - not working with checkpoint location set in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/spark-structured-streaming-not-working-with-checkpoint-location/m-p/70679#M34121</link>
    <description>&lt;P&gt;Hi,&lt;/P&gt;&lt;P&gt;have You tried changing the checkpoint location path?&lt;/P&gt;&lt;P&gt;I used that below syntax previously and it worked fine for me:&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;stream = (
    df.writeStream
    .format("delta")
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_location)
    .foreachBatch(...)
    .start()
)&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
    <pubDate>Mon, 27 May 2024 07:54:16 GMT</pubDate>
    <dc:creator>radothede</dc:creator>
    <dc:date>2024-05-27T07:54:16Z</dc:date>
    <item>
      <title>Spark structured streaming - not working with checkpoint location set</title>
      <link>https://community.databricks.com/t5/data-engineering/spark-structured-streaming-not-working-with-checkpoint-location/m-p/70238#M34036</link>
      <description>&lt;P&gt;We have structured streaming that reads from external delta table defined in following way:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;try:
    df_silver = (
        spark.readStream
            .format("delta")
            .option("skipChangeCommits", True)
            .table(src_location)
    )

    (    
    df_silver.writeStream
        .trigger(availableNow = True)
        .option("checkpointLocation", checkpoint_location)
        .option("mergeSchema", "true")
        .foreachBatch(merge_silver_to_gold)
        .start()
        .awaitTermination()
    )
except Exception as e:
    ....&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Only the initial load write data. All other subsequent runs doesn't load anything. Once checkpoint location is removed data is loaded correctly. Here are stats from Streaming query progress from driver logs:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;{
  "id" : "xxx",
  "runId" : "xxx",
  "name" : null,
  "timestamp" : "2024-05-22T14:32:41.183Z",
  "batchId" : 1,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "addBatch" : 6599,
    "commitOffsets" : 90,
    "getBatch" : 41,
    "latestOffset" : 129,
    "queryPlanning" : 27,
    "triggerExecution" : 7015,
    "walCommit" : 103
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[xxx]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "dc87ece9-02eb-404f-b24f-dc61675a4a83",
      "reservoirVersion" : 274,
      "index" : 4,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "dc87ece9-02eb-404f-b24f-dc61675a4a83",
      "reservoirVersion" : 275,
      "index" : -1,
      "isStartingVersion" : false
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "ForeachBatchSink",
    "numOutputRows" : -1
  }
}&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;No rows are read, even though some rows were added to source table. Value of endOffset.reservoirVersion seems to be odd. It is 275 what is equal to maximal version of source Delta Table from which query is reading. All other similar queries that we checked (other Delta Table as source, foreachBatch as sink), have this value set to one unit higher than latest Delta version, so the version that not yet exists.&lt;/P&gt;&lt;P&gt;Line setting the endOffset from the logs:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;INFO DeltaSource: lastOffset for Trigger.AvailableNow has set to {"sourceVersion":1,"reservoirId":"dc87ece9-02eb-404f-b24f-dc61675a4a83","reservoirVersion":275,"index":-1,"isStartingVersion":false}&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;What we tried so far:&lt;/P&gt;&lt;OL&gt;&lt;LI&gt;Remove checkpoint location directory - initial load works fine, then all subsequent loads have similar issue as described&lt;/LI&gt;&lt;LI&gt;Create different query, with new sink and checkpoint location that reads from the same source - issue with endOffset occurs.&lt;/LI&gt;&lt;/OL&gt;</description>
      <pubDate>Wed, 22 May 2024 10:26:32 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/spark-structured-streaming-not-working-with-checkpoint-location/m-p/70238#M34036</guid>
      <dc:creator>skarpeck</dc:creator>
      <dc:date>2024-05-22T10:26:32Z</dc:date>
    </item>
    <item>
      <title>Re: Spark structured streaming - not working with checkpoint location set</title>
      <link>https://community.databricks.com/t5/data-engineering/spark-structured-streaming-not-working-with-checkpoint-location/m-p/70601#M34104</link>
      <description>&lt;P&gt;Hi,&lt;BR /&gt;&lt;BR /&gt;I see you are using `Trigger.AvailableNow`. Is this intended to be a continuous stream or an incremental batch trigger at an interval with Databricks Workflows?&lt;BR /&gt;&lt;BR /&gt;From the docs (&lt;A href="https://docs.databricks.com/en/structured-streaming/triggers.html#configure-structured-streaming-trigger-intervals):" target="_blank"&gt;https://docs.databricks.com/en/structured-streaming/triggers.html#configure-structured-streaming-trigger-intervals):&lt;/A&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;EM&gt;&amp;gt; Databricks recommends you use Trigger.AvailableNow for all incremental batch processing workloads.&lt;/EM&gt;&lt;BR /&gt;&lt;EM&gt;&amp;gt; The available now trigger option consumes all available records as an incremental batch with the ability to configure batch size with options such as maxBytesPerTrigger (sizing options vary by data source).&lt;/EM&gt;&lt;/P&gt;
&lt;P&gt;If intending to run in continuous mode, can you please try with a different trigger interval?&lt;BR /&gt;&lt;BR /&gt;Thanks.&lt;/P&gt;</description>
      <pubDate>Fri, 24 May 2024 18:39:12 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/spark-structured-streaming-not-working-with-checkpoint-location/m-p/70601#M34104</guid>
      <dc:creator>brockb</dc:creator>
      <dc:date>2024-05-24T18:39:12Z</dc:date>
    </item>
    <item>
      <title>Re: Spark structured streaming - not working with checkpoint location set</title>
      <link>https://community.databricks.com/t5/data-engineering/spark-structured-streaming-not-working-with-checkpoint-location/m-p/70673#M34117</link>
      <description>&lt;P&gt;Hi. It runs as incremental batch trigger with Workflow.&lt;/P&gt;</description>
      <pubDate>Mon, 27 May 2024 07:07:41 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/spark-structured-streaming-not-working-with-checkpoint-location/m-p/70673#M34117</guid>
      <dc:creator>skarpeck</dc:creator>
      <dc:date>2024-05-27T07:07:41Z</dc:date>
    </item>
    <item>
      <title>Re: Spark structured streaming - not working with checkpoint location set</title>
      <link>https://community.databricks.com/t5/data-engineering/spark-structured-streaming-not-working-with-checkpoint-location/m-p/70679#M34121</link>
      <description>&lt;P&gt;Hi,&lt;/P&gt;&lt;P&gt;have You tried changing the checkpoint location path?&lt;/P&gt;&lt;P&gt;I used that below syntax previously and it worked fine for me:&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;stream = (
    df.writeStream
    .format("delta")
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_location)
    .foreachBatch(...)
    .start()
)&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Mon, 27 May 2024 07:54:16 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/spark-structured-streaming-not-working-with-checkpoint-location/m-p/70679#M34121</guid>
      <dc:creator>radothede</dc:creator>
      <dc:date>2024-05-27T07:54:16Z</dc:date>
    </item>
    <item>
      <title>Re: Spark structured streaming - not working with checkpoint location set</title>
      <link>https://community.databricks.com/t5/data-engineering/spark-structured-streaming-not-working-with-checkpoint-location/m-p/70680#M34122</link>
      <description>&lt;P&gt;Of course. I even tried completely removing checkpoint location itself.&lt;/P&gt;</description>
      <pubDate>Mon, 27 May 2024 07:55:54 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/spark-structured-streaming-not-working-with-checkpoint-location/m-p/70680#M34122</guid>
      <dc:creator>skarpeck</dc:creator>
      <dc:date>2024-05-27T07:55:54Z</dc:date>
    </item>
    <item>
      <title>Re: Spark structured streaming - not working with checkpoint location set</title>
      <link>https://community.databricks.com/t5/data-engineering/spark-structured-streaming-not-working-with-checkpoint-location/m-p/70681#M34123</link>
      <description>&lt;P&gt;Whats the logic of merge function?&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;PRE&gt;merge_silver_to_gold&lt;/PRE&gt;&lt;P&gt;&amp;nbsp;Whats the output of describe history against that destination delta table after running the streaming query?&lt;/P&gt;</description>
      <pubDate>Mon, 27 May 2024 08:07:50 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/spark-structured-streaming-not-working-with-checkpoint-location/m-p/70681#M34123</guid>
      <dc:creator>radothede</dc:creator>
      <dc:date>2024-05-27T08:07:50Z</dc:date>
    </item>
  </channel>
</rss>

