cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

What is the behaviour of starting version with spark structured streaming ?

Maatari
New Contributor III

Looking into the following

https://docs.databricks.com/en/structured-streaming/delta-lake.html#specify-initial-position

I am unclear as to what is the exact difference (if any) between 

"startingVersion: The Delta Lake version to start from. Databricks recommends omitting this option for most workloads. When not set, the stream starts from the latest available version including a complete snapshot of the table at that moment."

and 

"To return only the latest changes, specify latest."

The two scenarios are:

  • no startingVersion
  • startingVersion = latest

 

Are both scenarios equal, or the first one consume the latest snapshot of the table first and whatever comes next, and the second one only whatever comes after the latest snapshot. I am confused about the two. 

Could anyone clarify it for me please ?

 

1 ACCEPTED SOLUTION

Accepted Solutions

VZLA
Databricks Employee
Databricks Employee

Hi @dlorenzo, interesting take! I don’t agree with your statement, though. According to both the documentation and my own testing, startingVersion = "latest" explicitly skips all historical data and starts from the latest committed version at the time the query initializes.

If you believe otherwise, could you share the code you tested with and explain what you mean when you say it "still includes data inserted into the table before the streaming query had started"? It’d help us better understand your perspective and validate the behavior. Looking forward to your example!

@Maatari here’s some code you can try to explore the behavior of startingVersion = "latest" versus not specifying startingVersion. The outputs should make the differences clearer.

from pyspark.sql.functions import current_timestamp
import time

# Define Delta table path
delta_path = "/tmp/delta-table-test"

# Drop the Delta table if it exists
print("Dropping the Delta table if it exists...")
dbutils.fs.rm(delta_path, True)

# Step 1: Create a Delta table with initial data (add timestamp)
print("Creating a Delta table with initial data: Alice and Bob")
data = [("Alice", 1), ("Bob", 2)]
columns = ["name", "value"]
df = spark.createDataFrame(data, columns).withColumn("timestamp", current_timestamp())
df.write.format("delta").option("mergeSchema", "true").mode("overwrite").save(delta_path)

# Step 2: Display initial data in the Delta table
print("Initial data in the Delta table:")
spark.read.format("delta").load(delta_path).show(truncate=False)

# Step 3a: Start streaming query with `startingVersion = "latest"`
print("Starting streaming query with `startingVersion = 'latest'`...")
streamingDF_latest = (
    spark.readStream
    .format("delta")
    .option("startingVersion", "latest")
    .load(delta_path)
)
stream_query_latest = streamingDF_latest.writeStream.format("memory").queryName("latest_query").start()

time.sleep(10)

# Step 4: Capture output of `startingVersion = "latest"` query after initialization
print("Output of `startingVersion = 'latest'` query after initialization (should be empty):")
spark.sql("SELECT * FROM latest_query").show(truncate=False)

# Step 5: Append new data with timestamp to the Delta table
print("Appending new data to the Delta table: Charlie and Diana")
new_data = [("Charlie", 3), ("Diana", 4)]
new_df = (
    spark.createDataFrame(new_data, columns)
    .withColumn("timestamp", current_timestamp())
)
new_df.write.format("delta").option("mergeSchema", "true").mode("append").save(delta_path)

time.sleep(10)

# Step 6: Capture output of `startingVersion = "latest"` query after new data
print("Output of `startingVersion = 'latest'` query after appending new data (should include Charlie and Diana):")
spark.sql("SELECT * FROM latest_query").show(truncate=False)

# Step 7: Start a new streaming query with no `startingVersion`
print("Starting a new streaming query with no `startingVersion`...")
streamingDF_all = spark.readStream.format("delta").load(delta_path)
stream_query_all = streamingDF_all.writeStream.format("memory").queryName("all_query").start()

time.sleep(10)

# Step 8: Capture output of `no startingVersion` query
print("Output of `no startingVersion` query (should include all data: Alice, Bob, Charlie, Diana):")
spark.sql("SELECT * FROM all_query").createOrReplaceTempView("all_query_sorted")

# Sort the output in a batch query after capturing
print("Sorted output of `no startingVersion` query by timestamp:")
spark.sql("SELECT * FROM all_query_sorted ORDER BY timestamp").show(truncate=False)

# Step 9: Start a third streaming query with `startingVersion = "latest"`
print("Starting a third streaming query with `startingVersion = 'latest'`...")
streamingDF_latest_3 = (
    spark.readStream
    .format("delta")
    .option("startingVersion", "latest")
    .load(delta_path)
)
stream_query_latest_3 = streamingDF_latest_3.writeStream.format("memory").queryName("latest_query_3").start()

time.sleep(10)

# Step 10: Capture output of the third `startingVersion = "latest"` query
print("Output of the third `startingVersion = 'latest'` query (should only include new data after the query starts):")
spark.sql("SELECT * FROM latest_query_3").show(truncate=False)

This should help you see that when startingVersion isn’t specified, the stream processes the entire current snapshot and new changes. Meanwhile, with startingVersion = "latest", it skips the snapshot and only processes changes after the query starts. Hope this clarifies things!

Sample output in case you're unable to run this in your Databricks Notebook:

Dropping the Delta table if it exists...
Creating a Delta table with initial data: Alice and Bob
Initial data in the Delta table:
+-----+-----+-----------------------+
|name |value|timestamp              |
+-----+-----+-----------------------+
|Alice|1    |2025-01-13 13:01:23.205|
|Bob  |2    |2025-01-13 13:01:23.205|
+-----+-----+-----------------------+

Starting streaming query with `startingVersion = 'latest'`...
Output of `startingVersion = 'latest'` query after initialization (should be empty):
+----+-----+---------+
|name|value|timestamp|
+----+-----+---------+
+----+-----+---------+

Appending new data to the Delta table: Charlie and Diana
Output of `startingVersion = 'latest'` query after appending new data (should include Charlie and Diana):
+-------+-----+-----------------------+
|name   |value|timestamp              |
+-------+-----+-----------------------+
|Charlie|3    |2025-01-13 13:01:35.722|
|Diana  |4    |2025-01-13 13:01:35.722|
+-------+-----+-----------------------+

Starting a new streaming query with no `startingVersion`...
Output of `no startingVersion` query (should include all data: Alice, Bob, Charlie, Diana):
Sorted output of `no startingVersion` query by timestamp:
+-------+-----+-----------------------+
|name   |value|timestamp              |
+-------+-----+-----------------------+
|Alice  |1    |2025-01-13 13:01:23.205|
|Bob    |2    |2025-01-13 13:01:23.205|
|Diana  |4    |2025-01-13 13:01:35.722|
|Charlie|3    |2025-01-13 13:01:35.722|
+-------+-----+-----------------------+

Starting a third streaming query with `startingVersion = 'latest'`...
Output of the third `startingVersion = 'latest'` query (should only include new data after the query starts):
+----+-----+---------+
|name|value|timestamp|
+----+-----+---------+
+----+-----+---------+

 

View solution in original post

4 REPLIES 4

VZLA
Databricks Employee
Databricks Employee

The key difference is that not specifying startingVersion will include the current snapshot of the table in the stream, while setting startingVersion to latest will only process new changes from that point onward.

dlorenzo
New Contributor II

This is not true, startingVersion latest still includes data inserted into the table before the streaming query had started.

VZLA
Databricks Employee
Databricks Employee

Hi @dlorenzo, interesting take! I don’t agree with your statement, though. According to both the documentation and my own testing, startingVersion = "latest" explicitly skips all historical data and starts from the latest committed version at the time the query initializes.

If you believe otherwise, could you share the code you tested with and explain what you mean when you say it "still includes data inserted into the table before the streaming query had started"? It’d help us better understand your perspective and validate the behavior. Looking forward to your example!

@Maatari here’s some code you can try to explore the behavior of startingVersion = "latest" versus not specifying startingVersion. The outputs should make the differences clearer.

from pyspark.sql.functions import current_timestamp
import time

# Define Delta table path
delta_path = "/tmp/delta-table-test"

# Drop the Delta table if it exists
print("Dropping the Delta table if it exists...")
dbutils.fs.rm(delta_path, True)

# Step 1: Create a Delta table with initial data (add timestamp)
print("Creating a Delta table with initial data: Alice and Bob")
data = [("Alice", 1), ("Bob", 2)]
columns = ["name", "value"]
df = spark.createDataFrame(data, columns).withColumn("timestamp", current_timestamp())
df.write.format("delta").option("mergeSchema", "true").mode("overwrite").save(delta_path)

# Step 2: Display initial data in the Delta table
print("Initial data in the Delta table:")
spark.read.format("delta").load(delta_path).show(truncate=False)

# Step 3a: Start streaming query with `startingVersion = "latest"`
print("Starting streaming query with `startingVersion = 'latest'`...")
streamingDF_latest = (
    spark.readStream
    .format("delta")
    .option("startingVersion", "latest")
    .load(delta_path)
)
stream_query_latest = streamingDF_latest.writeStream.format("memory").queryName("latest_query").start()

time.sleep(10)

# Step 4: Capture output of `startingVersion = "latest"` query after initialization
print("Output of `startingVersion = 'latest'` query after initialization (should be empty):")
spark.sql("SELECT * FROM latest_query").show(truncate=False)

# Step 5: Append new data with timestamp to the Delta table
print("Appending new data to the Delta table: Charlie and Diana")
new_data = [("Charlie", 3), ("Diana", 4)]
new_df = (
    spark.createDataFrame(new_data, columns)
    .withColumn("timestamp", current_timestamp())
)
new_df.write.format("delta").option("mergeSchema", "true").mode("append").save(delta_path)

time.sleep(10)

# Step 6: Capture output of `startingVersion = "latest"` query after new data
print("Output of `startingVersion = 'latest'` query after appending new data (should include Charlie and Diana):")
spark.sql("SELECT * FROM latest_query").show(truncate=False)

# Step 7: Start a new streaming query with no `startingVersion`
print("Starting a new streaming query with no `startingVersion`...")
streamingDF_all = spark.readStream.format("delta").load(delta_path)
stream_query_all = streamingDF_all.writeStream.format("memory").queryName("all_query").start()

time.sleep(10)

# Step 8: Capture output of `no startingVersion` query
print("Output of `no startingVersion` query (should include all data: Alice, Bob, Charlie, Diana):")
spark.sql("SELECT * FROM all_query").createOrReplaceTempView("all_query_sorted")

# Sort the output in a batch query after capturing
print("Sorted output of `no startingVersion` query by timestamp:")
spark.sql("SELECT * FROM all_query_sorted ORDER BY timestamp").show(truncate=False)

# Step 9: Start a third streaming query with `startingVersion = "latest"`
print("Starting a third streaming query with `startingVersion = 'latest'`...")
streamingDF_latest_3 = (
    spark.readStream
    .format("delta")
    .option("startingVersion", "latest")
    .load(delta_path)
)
stream_query_latest_3 = streamingDF_latest_3.writeStream.format("memory").queryName("latest_query_3").start()

time.sleep(10)

# Step 10: Capture output of the third `startingVersion = "latest"` query
print("Output of the third `startingVersion = 'latest'` query (should only include new data after the query starts):")
spark.sql("SELECT * FROM latest_query_3").show(truncate=False)

This should help you see that when startingVersion isn’t specified, the stream processes the entire current snapshot and new changes. Meanwhile, with startingVersion = "latest", it skips the snapshot and only processes changes after the query starts. Hope this clarifies things!

Sample output in case you're unable to run this in your Databricks Notebook:

Dropping the Delta table if it exists...
Creating a Delta table with initial data: Alice and Bob
Initial data in the Delta table:
+-----+-----+-----------------------+
|name |value|timestamp              |
+-----+-----+-----------------------+
|Alice|1    |2025-01-13 13:01:23.205|
|Bob  |2    |2025-01-13 13:01:23.205|
+-----+-----+-----------------------+

Starting streaming query with `startingVersion = 'latest'`...
Output of `startingVersion = 'latest'` query after initialization (should be empty):
+----+-----+---------+
|name|value|timestamp|
+----+-----+---------+
+----+-----+---------+

Appending new data to the Delta table: Charlie and Diana
Output of `startingVersion = 'latest'` query after appending new data (should include Charlie and Diana):
+-------+-----+-----------------------+
|name   |value|timestamp              |
+-------+-----+-----------------------+
|Charlie|3    |2025-01-13 13:01:35.722|
|Diana  |4    |2025-01-13 13:01:35.722|
+-------+-----+-----------------------+

Starting a new streaming query with no `startingVersion`...
Output of `no startingVersion` query (should include all data: Alice, Bob, Charlie, Diana):
Sorted output of `no startingVersion` query by timestamp:
+-------+-----+-----------------------+
|name   |value|timestamp              |
+-------+-----+-----------------------+
|Alice  |1    |2025-01-13 13:01:23.205|
|Bob    |2    |2025-01-13 13:01:23.205|
|Diana  |4    |2025-01-13 13:01:35.722|
|Charlie|3    |2025-01-13 13:01:35.722|
+-------+-----+-----------------------+

Starting a third streaming query with `startingVersion = 'latest'`...
Output of the third `startingVersion = 'latest'` query (should only include new data after the query starts):
+----+-----+---------+
|name|value|timestamp|
+----+-----+---------+
+----+-----+---------+

 

dlorenzo
New Contributor II

Thank you very much for the detailed explanation, you are totally right! My issue was that I specified startVersion instead of startingVersion, so spark silently ignored the option, tipical spark config issue.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group