- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-13-2025 05:13 AM
Hi @dlorenzo, interesting take! I don’t agree with your statement, though. According to both the documentation and my own testing, startingVersion = "latest" explicitly skips all historical data and starts from the latest committed version at the time the query initializes.
If you believe otherwise, could you share the code you tested with and explain what you mean when you say it "still includes data inserted into the table before the streaming query had started"? It’d help us better understand your perspective and validate the behavior. Looking forward to your example!
@Maatari here’s some code you can try to explore the behavior of startingVersion = "latest" versus not specifying startingVersion. The outputs should make the differences clearer.
from pyspark.sql.functions import current_timestamp
import time
# Define Delta table path
delta_path = "/tmp/delta-table-test"
# Drop the Delta table if it exists
print("Dropping the Delta table if it exists...")
dbutils.fs.rm(delta_path, True)
# Step 1: Create a Delta table with initial data (add timestamp)
print("Creating a Delta table with initial data: Alice and Bob")
data = [("Alice", 1), ("Bob", 2)]
columns = ["name", "value"]
df = spark.createDataFrame(data, columns).withColumn("timestamp", current_timestamp())
df.write.format("delta").option("mergeSchema", "true").mode("overwrite").save(delta_path)
# Step 2: Display initial data in the Delta table
print("Initial data in the Delta table:")
spark.read.format("delta").load(delta_path).show(truncate=False)
# Step 3a: Start streaming query with `startingVersion = "latest"`
print("Starting streaming query with `startingVersion = 'latest'`...")
streamingDF_latest = (
spark.readStream
.format("delta")
.option("startingVersion", "latest")
.load(delta_path)
)
stream_query_latest = streamingDF_latest.writeStream.format("memory").queryName("latest_query").start()
time.sleep(10)
# Step 4: Capture output of `startingVersion = "latest"` query after initialization
print("Output of `startingVersion = 'latest'` query after initialization (should be empty):")
spark.sql("SELECT * FROM latest_query").show(truncate=False)
# Step 5: Append new data with timestamp to the Delta table
print("Appending new data to the Delta table: Charlie and Diana")
new_data = [("Charlie", 3), ("Diana", 4)]
new_df = (
spark.createDataFrame(new_data, columns)
.withColumn("timestamp", current_timestamp())
)
new_df.write.format("delta").option("mergeSchema", "true").mode("append").save(delta_path)
time.sleep(10)
# Step 6: Capture output of `startingVersion = "latest"` query after new data
print("Output of `startingVersion = 'latest'` query after appending new data (should include Charlie and Diana):")
spark.sql("SELECT * FROM latest_query").show(truncate=False)
# Step 7: Start a new streaming query with no `startingVersion`
print("Starting a new streaming query with no `startingVersion`...")
streamingDF_all = spark.readStream.format("delta").load(delta_path)
stream_query_all = streamingDF_all.writeStream.format("memory").queryName("all_query").start()
time.sleep(10)
# Step 8: Capture output of `no startingVersion` query
print("Output of `no startingVersion` query (should include all data: Alice, Bob, Charlie, Diana):")
spark.sql("SELECT * FROM all_query").createOrReplaceTempView("all_query_sorted")
# Sort the output in a batch query after capturing
print("Sorted output of `no startingVersion` query by timestamp:")
spark.sql("SELECT * FROM all_query_sorted ORDER BY timestamp").show(truncate=False)
# Step 9: Start a third streaming query with `startingVersion = "latest"`
print("Starting a third streaming query with `startingVersion = 'latest'`...")
streamingDF_latest_3 = (
spark.readStream
.format("delta")
.option("startingVersion", "latest")
.load(delta_path)
)
stream_query_latest_3 = streamingDF_latest_3.writeStream.format("memory").queryName("latest_query_3").start()
time.sleep(10)
# Step 10: Capture output of the third `startingVersion = "latest"` query
print("Output of the third `startingVersion = 'latest'` query (should only include new data after the query starts):")
spark.sql("SELECT * FROM latest_query_3").show(truncate=False)
This should help you see that when startingVersion isn’t specified, the stream processes the entire current snapshot and new changes. Meanwhile, with startingVersion = "latest", it skips the snapshot and only processes changes after the query starts. Hope this clarifies things!
Sample output in case you're unable to run this in your Databricks Notebook:
Dropping the Delta table if it exists...
Creating a Delta table with initial data: Alice and Bob
Initial data in the Delta table:
+-----+-----+-----------------------+
|name |value|timestamp |
+-----+-----+-----------------------+
|Alice|1 |2025-01-13 13:01:23.205|
|Bob |2 |2025-01-13 13:01:23.205|
+-----+-----+-----------------------+
Starting streaming query with `startingVersion = 'latest'`...
Output of `startingVersion = 'latest'` query after initialization (should be empty):
+----+-----+---------+
|name|value|timestamp|
+----+-----+---------+
+----+-----+---------+
Appending new data to the Delta table: Charlie and Diana
Output of `startingVersion = 'latest'` query after appending new data (should include Charlie and Diana):
+-------+-----+-----------------------+
|name |value|timestamp |
+-------+-----+-----------------------+
|Charlie|3 |2025-01-13 13:01:35.722|
|Diana |4 |2025-01-13 13:01:35.722|
+-------+-----+-----------------------+
Starting a new streaming query with no `startingVersion`...
Output of `no startingVersion` query (should include all data: Alice, Bob, Charlie, Diana):
Sorted output of `no startingVersion` query by timestamp:
+-------+-----+-----------------------+
|name |value|timestamp |
+-------+-----+-----------------------+
|Alice |1 |2025-01-13 13:01:23.205|
|Bob |2 |2025-01-13 13:01:23.205|
|Diana |4 |2025-01-13 13:01:35.722|
|Charlie|3 |2025-01-13 13:01:35.722|
+-------+-----+-----------------------+
Starting a third streaming query with `startingVersion = 'latest'`...
Output of the third `startingVersion = 'latest'` query (should only include new data after the query starts):
+----+-----+---------+
|name|value|timestamp|
+----+-----+---------+
+----+-----+---------+