01-21-2026 12:06 PM
Is the following type of union safe with spark structured streaming?
union multiple streaming dataframes, and each from a different source.
Anything better solution ?
for example,
df1 = spark.readStream.table(f"{bronze_catalog}.{bronze_schema}.table1")
df2 = spark.readStream.table(f"{bronze_catalog}.{bronze_schema}.table2")
df3 = spark.readStream.table(f"{bronze_catalog}.{bronze_schema}.table3")
df1a = df1.select(....).transform(....)
df2a = df1.select(....).transform(....)
df3a = df1.select(....).transform(....)
df = df1a.unionByName(df2a).unionByName(df3a).dropDuplicates(.....)
df.writeStream.format("delta").outputMode("append").option(
"checkpointLocation", my_checkpoint_path)
).trigger(availableNow=True).table(f"{silver_catalog}.{silver_schema_}.my_silver_table")
df.
3 weeks ago
Hi @cdn_yyz_yul,
You are running into expected Structured Streaming behavior. Spark tracks the number and identity of streaming sources in the checkpoint metadata. When you change the query plan by adding a new source (e.g., adding df3 to a union that previously only had df1 and df2), the checkpoint no longer matches the new query, and Spark raises the assertion error you saw:
java.lang.AssertionError: assertion failed: There are [1] sources in the checkpoint offsets and now there are [2] sources requested by the query. Cannot continue.
This is documented behavior: changes to the number or type of input sources are not allowed against an existing checkpoint.
Here is a summary of your options and the trade-offs for each.
OPTION 1: SEPARATE STREAMING WRITERS (ONE PER SOURCE)
Each source gets its own streaming query writing to the same target Delta table. Delta Lake fully supports multiple concurrent append writers to the same table. The key rule is that each writer must have its own unique checkpoint location.
df1a.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", checkpoint_path_1) \
.trigger(availableNow=True) \
.table(f"{silver_catalog}.{silver_schema}.my_silver_table")
df2a.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", checkpoint_path_2) \
.trigger(availableNow=True) \
.table(f"{silver_catalog}.{silver_schema}.my_silver_table")
df3a.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", checkpoint_path_3) \
.trigger(availableNow=True) \
.table(f"{silver_catalog}.{silver_schema}.my_silver_table")
Pros:
- Adding a new source (e.g., df4) only requires adding a new writer with a new checkpoint. No existing checkpoints need to be deleted.
- Each source progresses independently, so a failure in one does not block the others.
- Simpler to monitor per-source throughput and latency.
Cons:
- Cannot do cross-source deduplication inline (you would need a separate downstream dedup step if needed).
- You need to manage multiple checkpoint paths.
The error you hit earlier ("DIFFERENT_DELTA_TABLE_READ_BY_STREAMING_SOURCE") was likely because you reused the same checkpoint location for a different streaming query. Each distinct streaming query must have its own checkpoint.
OPTION 2: SINGLE UNION QUERY WITH CHECKPOINT RESET
This is what you described in your original post: union all source DataFrames into one query, then write once. When you add a new source to the union, you must delete the old checkpoint (and optionally the target table) and reprocess from scratch.
df = df1a.unionByName(df2a).unionByName(df3a) \
.dropDuplicatesWithinWatermark(["your_key_columns"])
df.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", my_checkpoint_path) \
.trigger(availableNow=True) \
.table(f"{silver_catalog}.{silver_schema}.my_silver_table")
Pros:
- You can deduplicate across all sources in a single step.
- Single checkpoint to manage.
Cons:
- Every time you add or remove a source, you must delete the checkpoint and reprocess all data from scratch.
- You should also delete (or truncate) the target table to avoid duplicate rows from the reprocessing, unless you have an idempotent merge/upsert pattern downstream.
If you go this route, one improvement over plain dropDuplicates() is to use withWatermark() combined with dropDuplicatesWithinWatermark(). This bounds the state size, which is important for long-running streams:
df = df1a.unionByName(df2a).unionByName(df3a) \
.withWatermark("event_time", "10 hours") \
.dropDuplicatesWithinWatermark(["guid"])
See the docs for details:
https://docs.databricks.com/aws/en/structured-streaming/watermarks.html
WHICH OPTION TO CHOOSE
- If your sources change over time (new tables get added periodically) and you do not need cross-source deduplication, Option 1 (separate writers) is the more maintainable approach. Adding a new source is just adding a new writer with a new checkpoint, with zero disruption to existing streams.
- If cross-source deduplication is a hard requirement, Option 2 (single union query) is the way to go. Just plan for the checkpoint reset when the set of sources changes.
To answer your specific question about whether you missed any steps when resetting: deleting the checkpoint is the required step. Deleting the target table is only necessary if you want to avoid duplicate rows from reprocessing data that was already written. If your downstream consumers can tolerate temporary duplicates (or you use a MERGE pattern), you can keep the table and just delete the checkpoint.
Relevant documentation:
- Checkpoints and recovery from query changes: https://docs.databricks.com/aws/en/structured-streaming/checkpoints.html
- Watermarks and dropDuplicatesWithinWatermark: https://docs.databricks.com/aws/en/structured-streaming/watermarks.html
- Delta Lake concurrency control (multiple writers): https://docs.databricks.com/aws/en/delta/concurrency-control.html
* This reply used an agent system I built to research and draft this response based on the wide set of documentation I have available and previous memory. I personally review the draft for any obvious issues and for monitoring system reliability and update it when I detect any drift, but there is still a small chance that something is inaccurate, especially if you are experimenting with brand new features.
If this answer resolves your question, could you mark it as "Accept as Solution"? That helps other users quickly find the correct fix.
3 weeks ago
Hey, I've dealt with this exact situation a few times and Option 2 is generally the way to go — union all your streaming sources up front and write once. The checkpoint reset is annoying but unavoidable since Spark tracks the number and identity of sources in the checkpoint metadata.
A few things that made this smoother for me in practice:
When you add a new source, yes you do need to delete the checkpoint. But you don't necessarily have to drop the silver table itself — you can just delete the checkpoint directory and let the stream reprocess. If your transforms are idempotent and you're using dropDuplicates or dropDuplicatesWithinWatermark, the reprocessed data from df1 and df2 will just get deduplicated against what's already in the table (assuming append mode with merge logic or dedup downstream).
That said, if you're using plain append with no dedup at the write level, then yeah, nuking both the table and checkpoint is the cleaner path to avoid duplicates.
On the dedup side — definitely worth switching to withWatermark + dropDuplicatesWithinWatermark if you haven't already. Plain dropDuplicates on a streaming DataFrame forces Spark to maintain state for every unique row it's ever seen, which gets expensive over time. The watermark-based version lets Spark bound that state.
One more thing to consider: if you expect to keep adding new sources over time, you might want to look into Delta Live Tables (DLT). You can define each source as a separate streaming table and then union them in a downstream live table. DLT handles the checkpoint management for you, so adding a new source is just adding a new table definition — no manual checkpoint wrangling needed.
Hope that helps!
01-21-2026 12:26 PM
Hi @cdn_yyz_yul ,
Yes, this looks like a standard and safe approach - is there something in particular you were hoping to improve on? Or were you mainly looking for validation?
01-21-2026 12:36 PM
Thanks @stbjelcevic ,
I am looking for a solution ....
=== Let's say, I have already had:
df1 = spark.readStream.table(f"{bronze_catalog}.{bronze_schema}.table1")
df2 = spark.readStream.table(f"{bronze_catalog}.{bronze_schema}.table2")
df1a = df1.select(....).transform(....)
df2a = df1.select(....).transform(....)
df = df1a.unionByName(df2a)
df.writeStream.format("delta").outputMode("append").option(
"checkpointLocation", my_checkpoint_path)
).trigger(availableNow=True).table(f"{silver_catalog}.{silver_schema}.my_silver_table")
=== Now, table3 is created by autoloader, and I read it as df3.
After some transformation, df3a has the same columns as the existing f"{silver_catalog}.{silver_schema}.my_silver_table".
df3 = spark.readStream.table(f"{bronze_catalog}.{bronze_schema}.table3")
df3a = df1.select(....).transform(....)
What would be the recommended solution to append the content of df3a to the existing f"{silver_catalog}.{silver_schema}.my_silver_table".
01-21-2026 01:10 PM
Ok, I see - so with the updated scenario above, the easiest option would be to add a second streaming writer for df3a to the same existing f"{silver_catalog}.{silver_schema}.my_silver_table". I would only stick with the easiest option if this is an append-only table and if you are not trying to handle duplicates here.
If this could introduce duplicates, than the first thing you posted is more recommended, where you handle the duplicates by doing a union with all 3 dataframes. One thing I didn't consider on my first response is that you can use "withWatermark" + "dropDuplicatesWithinWatermark" instead of just dropDuplicates
01-22-2026 07:09 AM
Thanks again,
Option1: >> the easiest option would be to add a second streaming writer for df3a to the same existing f"{silver_catalog}.{silver_schema}.my_silver_table".
If I simply read the new bronze table3, then append only to the existing table, i.e.,
--------
df3 = spark.readStream.table(f"{bronze_catalog}.{bronze_schema}.table3")
df3a = df1.select(....).transform(....)
df3a.writeStream.format("delta").outputMode("append").option(
"checkpointLocation", my_checkpoint_path)
).trigger(availableNow=True).table(f"{silver_catalog}.{silver_schema}.my_silver_table")
--------
Spark will give errors like:
com.databricks.sql.transaction.tahoe.DeltaIllegalStateException: [DIFFERENT_DELTA_TABLE_READ_BY_STREAMING_SOURCE] The streaming query was reading from an unexpected Delta table (id = 'de2e002d-e6a7-452a-acd6-6843796fe00c').
Option2: >> doing a union with all dataframes from the different sources, then write once. i.e., the example for df1+df2 in my previous post.
I remember that I must create my_silver_table from scratch whenever unioning a new df. a.k.a, deleting the table created before adding the new df (of the new source) and its corresponding checkpoint. Otherwise, spark complains the streaming source is different.
Using the adding df3a as an example, the existing my_silver_table is created when the source has df1 & df2. After introducing df3, spark will not allow writing to my_silver_table unless I remove the cehckpoint. For consistency, remove the table itself too. Did I miss any important steps?
01-22-2026 11:58 AM
Option 2: if the existing table and checkpoint are not removed, the error is similar to:
java.lang.AssertionError: assertion failed: There are [1] sources in the checkpoint offsets and now there are [2] sources requested by the query. Cannot continue.
3 weeks ago
Hi @cdn_yyz_yul,
You are running into expected Structured Streaming behavior. Spark tracks the number and identity of streaming sources in the checkpoint metadata. When you change the query plan by adding a new source (e.g., adding df3 to a union that previously only had df1 and df2), the checkpoint no longer matches the new query, and Spark raises the assertion error you saw:
java.lang.AssertionError: assertion failed: There are [1] sources in the checkpoint offsets and now there are [2] sources requested by the query. Cannot continue.
This is documented behavior: changes to the number or type of input sources are not allowed against an existing checkpoint.
Here is a summary of your options and the trade-offs for each.
OPTION 1: SEPARATE STREAMING WRITERS (ONE PER SOURCE)
Each source gets its own streaming query writing to the same target Delta table. Delta Lake fully supports multiple concurrent append writers to the same table. The key rule is that each writer must have its own unique checkpoint location.
df1a.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", checkpoint_path_1) \
.trigger(availableNow=True) \
.table(f"{silver_catalog}.{silver_schema}.my_silver_table")
df2a.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", checkpoint_path_2) \
.trigger(availableNow=True) \
.table(f"{silver_catalog}.{silver_schema}.my_silver_table")
df3a.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", checkpoint_path_3) \
.trigger(availableNow=True) \
.table(f"{silver_catalog}.{silver_schema}.my_silver_table")
Pros:
- Adding a new source (e.g., df4) only requires adding a new writer with a new checkpoint. No existing checkpoints need to be deleted.
- Each source progresses independently, so a failure in one does not block the others.
- Simpler to monitor per-source throughput and latency.
Cons:
- Cannot do cross-source deduplication inline (you would need a separate downstream dedup step if needed).
- You need to manage multiple checkpoint paths.
The error you hit earlier ("DIFFERENT_DELTA_TABLE_READ_BY_STREAMING_SOURCE") was likely because you reused the same checkpoint location for a different streaming query. Each distinct streaming query must have its own checkpoint.
OPTION 2: SINGLE UNION QUERY WITH CHECKPOINT RESET
This is what you described in your original post: union all source DataFrames into one query, then write once. When you add a new source to the union, you must delete the old checkpoint (and optionally the target table) and reprocess from scratch.
df = df1a.unionByName(df2a).unionByName(df3a) \
.dropDuplicatesWithinWatermark(["your_key_columns"])
df.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", my_checkpoint_path) \
.trigger(availableNow=True) \
.table(f"{silver_catalog}.{silver_schema}.my_silver_table")
Pros:
- You can deduplicate across all sources in a single step.
- Single checkpoint to manage.
Cons:
- Every time you add or remove a source, you must delete the checkpoint and reprocess all data from scratch.
- You should also delete (or truncate) the target table to avoid duplicate rows from the reprocessing, unless you have an idempotent merge/upsert pattern downstream.
If you go this route, one improvement over plain dropDuplicates() is to use withWatermark() combined with dropDuplicatesWithinWatermark(). This bounds the state size, which is important for long-running streams:
df = df1a.unionByName(df2a).unionByName(df3a) \
.withWatermark("event_time", "10 hours") \
.dropDuplicatesWithinWatermark(["guid"])
See the docs for details:
https://docs.databricks.com/aws/en/structured-streaming/watermarks.html
WHICH OPTION TO CHOOSE
- If your sources change over time (new tables get added periodically) and you do not need cross-source deduplication, Option 1 (separate writers) is the more maintainable approach. Adding a new source is just adding a new writer with a new checkpoint, with zero disruption to existing streams.
- If cross-source deduplication is a hard requirement, Option 2 (single union query) is the way to go. Just plan for the checkpoint reset when the set of sources changes.
To answer your specific question about whether you missed any steps when resetting: deleting the checkpoint is the required step. Deleting the target table is only necessary if you want to avoid duplicate rows from reprocessing data that was already written. If your downstream consumers can tolerate temporary duplicates (or you use a MERGE pattern), you can keep the table and just delete the checkpoint.
Relevant documentation:
- Checkpoints and recovery from query changes: https://docs.databricks.com/aws/en/structured-streaming/checkpoints.html
- Watermarks and dropDuplicatesWithinWatermark: https://docs.databricks.com/aws/en/structured-streaming/watermarks.html
- Delta Lake concurrency control (multiple writers): https://docs.databricks.com/aws/en/delta/concurrency-control.html
* This reply used an agent system I built to research and draft this response based on the wide set of documentation I have available and previous memory. I personally review the draft for any obvious issues and for monitoring system reliability and update it when I detect any drift, but there is still a small chance that something is inaccurate, especially if you are experimenting with brand new features.
If this answer resolves your question, could you mark it as "Accept as Solution"? That helps other users quickly find the correct fix.
3 weeks ago
Thanks
I am currently using Option 2, and I am thinking to move to Option 1 since we do need add to new sources as time goes. It will be fine to move the deduplication from the current "Single step (after union)" to a later step, e.g., when the downstream consumers read the "multiple source delta table".
Another solution, I was wondering and was testing, would be to convert to Declarative Streaming table with multiple flows. My observation is the "DLT with multiple flows to single streaming table" behaves the same as the Option 1 that we have been discussing.
Do you have more input and suggestions.
I will mark the thread with "Accept as Solution".
3 weeks ago
thanks @SteveOstrowski
3 weeks ago
Hey, I've dealt with this exact situation a few times and Option 2 is generally the way to go — union all your streaming sources up front and write once. The checkpoint reset is annoying but unavoidable since Spark tracks the number and identity of sources in the checkpoint metadata.
A few things that made this smoother for me in practice:
When you add a new source, yes you do need to delete the checkpoint. But you don't necessarily have to drop the silver table itself — you can just delete the checkpoint directory and let the stream reprocess. If your transforms are idempotent and you're using dropDuplicates or dropDuplicatesWithinWatermark, the reprocessed data from df1 and df2 will just get deduplicated against what's already in the table (assuming append mode with merge logic or dedup downstream).
That said, if you're using plain append with no dedup at the write level, then yeah, nuking both the table and checkpoint is the cleaner path to avoid duplicates.
On the dedup side — definitely worth switching to withWatermark + dropDuplicatesWithinWatermark if you haven't already. Plain dropDuplicates on a streaming DataFrame forces Spark to maintain state for every unique row it's ever seen, which gets expensive over time. The watermark-based version lets Spark bound that state.
One more thing to consider: if you expect to keep adding new sources over time, you might want to look into Delta Live Tables (DLT). You can define each source as a separate streaming table and then union them in a downstream live table. DLT handles the checkpoint management for you, so adding a new source is just adding a new table definition — no manual checkpoint wrangling needed.
Hope that helps!
2 weeks ago
@Kirankumarbs When you say "If your transforms are idempotent ...", do you mean using merge into delta?
Thanks.
3 weeks ago
Thanks @Kirankumarbs, Thanks @SteveOstrowski
You have provided very useful information.
2 weeks ago
What I mean by idempotent here is that if the same rows from df1 and df2 get reprocessed after a checkpoint reset, you won't end up with duplicates in the silver table. There are a few ways to handle that:
If you're using foreachBatch with a merge (MERGE INTO), then yeah, that's idempotent by design since the merge logic decides whether to insert or update based on your key columns. Reprocessing the same data just results in matched updates rather than new rows.
If you're doing a plain append with no dedup logic at all, then reprocessing after a checkpoint reset will give you duplicates. In that case you'd want to drop the table too and start clean, or add dedup logic downstream.
So it depends on your write strategy. Merge is the most naturally idempotent, but dropDuplicatesWithinWatermark in append mode also works if you have a reliable event time column and your watermark window is wide enough.
2 weeks ago
In the case of checkpoint rest (delelte checkpoint but keeping delta table), my understanding is that (foreachBatch with a MERGE INTO) is the only solution to avoid duplicates in the delta table.
for example,
-------
df = df1a.unionByName(df2a).unionByName(df3a) \
.dropDuplicatesWithinWatermark(["your_key_columns"])
df.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", my_checkpoint_path) \
.trigger(availableNow=True) \
.table(f"{silver_catalog}.{silver_schema}.my_silver_table")
----
Assume the event time is reliable and watermark windows is wide.
Assume df1a, df2a data has already in the target delta table;
I want to add df3a.
So, I remove the checkpoint before running the above scripte.
Now,
- the df contains unique rows from df1a+df2a+df3a.
- df.writeStream appends the content of df to the target delta, which has already had unique rows from df1a+df2a.
In summary, the target delta will contain rows from (df1a+df2a) twice. They were write before and after the checkpoint reset.
This is why I asked if the "idempotent" means (foreachBatch with a MERGE INTO)
Do you agree?
Thanks again.