cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Spark structured streaming - not working with checkpoint location set

skarpeck
New Contributor II

We have structured streaming that reads from external delta table defined in following way:

 

try:
    df_silver = (
        spark.readStream
            .format("delta")
            .option("skipChangeCommits", True)
            .table(src_location)
    )

    (    
    df_silver.writeStream
        .trigger(availableNow = True)
        .option("checkpointLocation", checkpoint_location)
        .option("mergeSchema", "true")
        .foreachBatch(merge_silver_to_gold)
        .start()
        .awaitTermination()
    )
except Exception as e:
    ....

 

Only the initial load write data. All other subsequent runs doesn't load anything. Once checkpoint location is removed data is loaded correctly. Here are stats from Streaming query progress from driver logs:

 

{
  "id" : "xxx",
  "runId" : "xxx",
  "name" : null,
  "timestamp" : "2024-05-22T14:32:41.183Z",
  "batchId" : 1,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "addBatch" : 6599,
    "commitOffsets" : 90,
    "getBatch" : 41,
    "latestOffset" : 129,
    "queryPlanning" : 27,
    "triggerExecution" : 7015,
    "walCommit" : 103
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[xxx]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "dc87ece9-02eb-404f-b24f-dc61675a4a83",
      "reservoirVersion" : 274,
      "index" : 4,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "dc87ece9-02eb-404f-b24f-dc61675a4a83",
      "reservoirVersion" : 275,
      "index" : -1,
      "isStartingVersion" : false
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "ForeachBatchSink",
    "numOutputRows" : -1
  }
}

 

No rows are read, even though some rows were added to source table. Value of endOffset.reservoirVersion seems to be odd. It is 275 what is equal to maximal version of source Delta Table from which query is reading. All other similar queries that we checked (other Delta Table as source, foreachBatch as sink), have this value set to one unit higher than latest Delta version, so the version that not yet exists.

Line setting the endOffset from the logs:

 

INFO DeltaSource: lastOffset for Trigger.AvailableNow has set to {"sourceVersion":1,"reservoirId":"dc87ece9-02eb-404f-b24f-dc61675a4a83","reservoirVersion":275,"index":-1,"isStartingVersion":false}

 

What we tried so far:

  1. Remove checkpoint location directory - initial load works fine, then all subsequent loads have similar issue as described
  2. Create different query, with new sink and checkpoint location that reads from the same source - issue with endOffset occurs.
5 REPLIES 5

brockb
Contributor III
Contributor III

Hi,

I see you are using `Trigger.AvailableNow`. Is this intended to be a continuous stream or an incremental batch trigger at an interval with Databricks Workflows?

From the docs (https://docs.databricks.com/en/structured-streaming/triggers.html#configure-structured-streaming-tri...

> Databricks recommends you use Trigger.AvailableNow for all incremental batch processing workloads.
> The available now trigger option consumes all available records as an incremental batch with the ability to configure batch size with options such as maxBytesPerTrigger (sizing options vary by data source).

If intending to run in continuous mode, can you please try with a different trigger interval?

Thanks.

skarpeck
New Contributor II

Hi. It runs as incremental batch trigger with Workflow.

radothede
New Contributor III

Hi,

have You tried changing the checkpoint location path?

I used that below syntax previously and it worked fine for me:

stream = (
    df.writeStream
    .format("delta")
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_location)
    .foreachBatch(...)
    .start()
)

 

skarpeck
New Contributor II

Of course. I even tried completely removing checkpoint location itself.

radothede
New Contributor III

Whats the logic of merge function?

 

merge_silver_to_gold

 Whats the output of describe history against that destination delta table after running the streaming query?

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!