cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Spark structured streaming - not working with checkpoint location set

skarpeck
New Contributor II

We have structured streaming that reads from external delta table defined in following way:

 

try:
    df_silver = (
        spark.readStream
            .format("delta")
            .option("skipChangeCommits", True)
            .table(src_location)
    )

    (    
    df_silver.writeStream
        .trigger(availableNow = True)
        .option("checkpointLocation", checkpoint_location)
        .option("mergeSchema", "true")
        .foreachBatch(merge_silver_to_gold)
        .start()
        .awaitTermination()
    )
except Exception as e:
    ....

 

Only the initial load write data. All other subsequent runs doesn't load anything. Once checkpoint location is removed data is loaded correctly. Here are stats from Streaming query progress from driver logs:

 

{
  "id" : "xxx",
  "runId" : "xxx",
  "name" : null,
  "timestamp" : "2024-05-22T14:32:41.183Z",
  "batchId" : 1,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "addBatch" : 6599,
    "commitOffsets" : 90,
    "getBatch" : 41,
    "latestOffset" : 129,
    "queryPlanning" : 27,
    "triggerExecution" : 7015,
    "walCommit" : 103
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[xxx]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "dc87ece9-02eb-404f-b24f-dc61675a4a83",
      "reservoirVersion" : 274,
      "index" : 4,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "dc87ece9-02eb-404f-b24f-dc61675a4a83",
      "reservoirVersion" : 275,
      "index" : -1,
      "isStartingVersion" : false
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "ForeachBatchSink",
    "numOutputRows" : -1
  }
}

 

No rows are read, even though some rows were added to source table. Value of endOffset.reservoirVersion seems to be odd. It is 275 what is equal to maximal version of source Delta Table from which query is reading. All other similar queries that we checked (other Delta Table as source, foreachBatch as sink), have this value set to one unit higher than latest Delta version, so the version that not yet exists.

Line setting the endOffset from the logs:

 

INFO DeltaSource: lastOffset for Trigger.AvailableNow has set to {"sourceVersion":1,"reservoirId":"dc87ece9-02eb-404f-b24f-dc61675a4a83","reservoirVersion":275,"index":-1,"isStartingVersion":false}

 

What we tried so far:

  1. Remove checkpoint location directory - initial load works fine, then all subsequent loads have similar issue as described
  2. Create different query, with new sink and checkpoint location that reads from the same source - issue with endOffset occurs.
5 REPLIES 5

brockb
Valued Contributor

Hi,

I see you are using `Trigger.AvailableNow`. Is this intended to be a continuous stream or an incremental batch trigger at an interval with Databricks Workflows?

From the docs (https://docs.databricks.com/en/structured-streaming/triggers.html#configure-structured-streaming-tri...

> Databricks recommends you use Trigger.AvailableNow for all incremental batch processing workloads.
> The available now trigger option consumes all available records as an incremental batch with the ability to configure batch size with options such as maxBytesPerTrigger (sizing options vary by data source).

If intending to run in continuous mode, can you please try with a different trigger interval?

Thanks.

skarpeck
New Contributor II

Hi. It runs as incremental batch trigger with Workflow.

radothede
New Contributor III

Hi,

have You tried changing the checkpoint location path?

I used that below syntax previously and it worked fine for me:

stream = (
    df.writeStream
    .format("delta")
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_location)
    .foreachBatch(...)
    .start()
)

 

skarpeck
New Contributor II

Of course. I even tried completely removing checkpoint location itself.

radothede
New Contributor III

Whats the logic of merge function?

 

merge_silver_to_gold

 Whats the output of describe history against that destination delta table after running the streaming query?

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group