Tuesday
According to this https://iceberg.apache.org/docs/latest/spark-structured-streaming/ , we can stream from iceberg tables. I have ensured that my source table is Iceberg version 3, but no matter what I do, I get Iceberg does not streaming reads. Looking at V3 limitations here https://docs.databricks.com/gcp/en/iceberg/iceberg-v3?language=Managed+Iceberg+table#limitations , it does not mention that anywhere as a limitation. For version 2, one of limitation listed here https://docs.databricks.com/gcp/en/iceberg/ states that "Iceberg doesn't support change data feed. As a result, incremental processing isn't supported when reading managed Iceberg tables as a source for:"
But my destination is another iceberg table (also tried delta)
Error: data source iceberg does not support streamed reading
-- create source table
spark.sql("""CREATE OR REPLACE TABLE engagement.`sandbox-client-feedback`.dummy_iceberg_source_stream (c1 INT)
USING iceberg
TBLPROPERTIES ('format-version' = 3)""")
insert into engagement.`sandbox-client-feedback`.dummy_iceberg_source_stream values (2);
-- create destination table
spark.sql("""CREATE OR REPLACE TABLE engagement.`sandbox-client-feedback`.dummy_iceberg_destination (c1 INT)
USING iceberg""")
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import sys
from databricks.connect import DatabricksSession
import logging
import datetime
def stream_iceberg_to_iceberg(
spark,
source_table,
target_table,
checkpoint_location,
trigger_interval_seconds=60,
stream_start_timestamp=None
):
"""
Stream data from source Iceberg table to target Iceberg table
Args:
spark: SparkSession instance
source_table: Source Iceberg table name (e.g., 'catalog.database.source_table')
target_table: Target Iceberg table name (e.g., 'catalog.database.target_table')
checkpoint_location: Path for checkpoint files
trigger_interval_seconds: Processing trigger interval in seconds (default: 60)
stream_start_timestamp: Optional timestamp in milliseconds to start streaming from
"""
# Configure streaming read from source Iceberg table
read_stream_builder = (spark.readStream
.format("iceberg")
)
# Optional: Start from a specific timestamp
if stream_start_timestamp:
read_stream_builder = (read_stream_builder
.option("stream-from-timestamp", str(stream_start_timestamp))
)
# Optional: Configure streaming behavior
read_stream_builder = (read_stream_builder
.option("streaming-skip-overwrite-snapshots", "true")
.option("streaming-skip-delete-snapshots", "true")
)
# Read the stream
source_stream = read_stream_builder.load(source_table)
# Optional: Apply transformations
# Example: Add processing timestamp and filter
transformed_stream = (source_stream
.withColumn("processing_time", current_timestamp())
.withColumn("processing_date", current_date())
)
# Add your custom transformations here
# .filter(col("some_column").isNotNull())
# .select("col1", "col2", "col3")
# Write stream to target Iceberg table
query = (transformed_stream.writeStream
.format("iceberg")
.outputMode("append")
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_location)
.option("fanout-enabled", "true")
.toTable(target_table)
)
return query
def main():
"""
Main execution function
"""
# Configuration
SOURCE_TABLE = "engagement.`sandbox-client-feedback`.dummy_iceberg_source_stream"
TARGET_TABLE = "engagement.`sandbox-client-feedback`.dummy_iceberg_destination"
CHECKPOINT_LOCATION = "dbfs:/dbfs/tmp/sum_streaming_iceberg_test_stream"
start_time = int(datetime.datetime(2026, 1, 1).timestamp() * 1000)
START_TIMESTAMP = start_time # Set to None to start from latest snapshot
# Create Spark session
spark = DatabricksSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("INFO")
logging.info(f"Starting streaming from {SOURCE_TABLE} to {TARGET_TABLE}")
logging.info(f"Checkpoint location: {CHECKPOINT_LOCATION}")
try:
# Start streaming query
query = stream_iceberg_to_iceberg(
spark=spark,
source_table=SOURCE_TABLE,
target_table=TARGET_TABLE,
checkpoint_location=CHECKPOINT_LOCATION,
stream_start_timestamp=START_TIMESTAMP
)
logging.info(f"Streaming query started: {query.name}")
logging.info("Press Ctrl+C to stop the streaming query")
query.awaitTermination()
except Exception as e:
print(f"Error in streaming job: {str(e)}")
if __name__ == "__main__":
main()
TBLPROPERTIES ('format-version' = 3);
yesterday
Greetings @SaugatMukherjee , I did some research and this is what I found.
Youโre running into a real (and documented) Databricks limitation here: managed Iceberg tables cannot be used as a streaming source today. Thatโs true even though upstream Apache Iceberg documents a Spark Structured Streaming read API.
Letโs unpack whatโs going on and why your code behaves the way it does.
Letโs dig inโฆ
Upstream Apache Iceberg documentation shows support for Spark Structured Streaming reads using:
spark.readStream.format(โicebergโ)
This includes options like stream-from-timestamp, along with guardrails such as skipping overwrite or delete snapshots. In open-source Spark + Iceberg, that story is real.
However, Databricksโ own Managed Iceberg table limitations documentation is explicit about the gap:
Iceberg does not support Change Data Feed (CDF). As a result, incremental processing is not supported when reading managed Iceberg tables as a source for materialized views and streaming tables.
That single sentence explains the behavior youโre seeing.
Even though:
Managed Iceberg is Public Preview in DBR 16.4+
Iceberg v3 features are Beta in DBR 17.3+
โฆthe absence of CDF means Databricks does not allow managed Iceberg tables to participate in incremental or streaming reads as a source.
On Databricks, the Iceberg connector shipped with the runtime does not expose a streaming source implementation.
Databricksโ streaming and incremental features lean heavily on Change Data Feed as the underlying mechanism. Because OSS Iceberg doesnโt have CDF today, Databricks intentionally disables streaming reads from managed Iceberg tables.
Operationally, that shows up exactly as the error you hit:
โdata source iceberg does not support streamed readingโ
Even though the options you set (stream-from-timestamp, streaming-skip-overwrite-snapshots, etc.) are valid in upstream Iceberg, Databricks will reject the read regardless. This is a platform constraint, not a configuration issue.
Hereโs the clean line in the sand:
Streaming reads from managed Iceberg tables as a source are not supported.
Incremental features like streaming tables, materialized views, and similar services cannot read managed Iceberg input today.
Upstream Iceberg examples showing streaming reads and writes do not apply to Databricks-managed Iceberg yet.
The sink side (writing to Iceberg) is not your problem. The read side is.
If you need true streaming or CDC-style incremental processing today, the supported path is:
Keep your source in Delta
Enable Change Data Feed
Read via Structured Streaming with readChangeFeed = true
That pattern is battle-tested and fully supported on Databricks.
If your source must remain Iceberg, the viable alternative is a scheduled batch micro-batch pattern:
Read the Iceberg table in batch:
spark.read.table(โcatalog.schema.tableโ)
Transform and append to your destination (Iceberg or Delta) using batch writes
Persist a watermark (timestamp or snapshot ID) in a control table
Filter โnewโ data on each run
This gives you near-real-time behavior via frequent Jobs runs, but it is still batch, not streaming. There is no Databricks-managed Iceberg streaming source to cite today.
If your downstream targets include materialized views, streaming tables, Lakehouse Monitoring, or other incremental services, the practical guidance is to land or replicate into Delta so those services can leverage CDF and row tracking.
The failure is happening right here:
spark.readStream.format(โicebergโ).load(source_table)
On Databricks, that call will error for managed Iceberg sources every time. Swapping options wonโt change the outcome.
Your choices are:
Replace this with a batch read and schedule it, or
Change the source to Delta with CDF if you need a true streaming pipeline
Again, your sink choice (Iceberg vs Delta) is not the issueโthe source read is.
Hereโs the simplest supported pattern:
from pyspark.sql import functions as F
SOURCE_TABLE = "engagement.`sandbox-client-feedback`.dummy_iceberg_source_stream"
TARGET_TABLE = "engagement.`sandbox-client-feedback`.dummy_iceberg_destination"
df = (
spark.read.table(SOURCE_TABLE)
.withColumn("processing_time", F.current_timestamp())
.withColumn("processing_date", F.current_date())
)
(df.write
.format("iceberg")
.mode("append")
.saveAsTable(TARGET_TABLE))
Schedule this with Jobs every N minutes, and persist a โlast processedโ watermark to avoid reprocessing. Itโs batch, but itโs the closest operational equivalent available today.
If you need streaming today, Delta + CDF is the supported answer on Databricks.
If you need to stay on Iceberg, the path forward is scheduled batch with explicit state managementโuntil Databricks enables an Iceberg-compatible incremental source.
Hope this provides some guidance.
Regars, Louis.
8 hours ago
Hi,
Iceberg streaming is possible in Databricks. One does not need to change to Delta Lake. In my previous attempt, I used "load" while reading the source iceberg table. One should instead use "table". Load apparently seems to take a path and not a table name.
This is correct code to stream from Iceberg table.
from pyspark.sql.functions import current_timestamp
source_stream = (spark.readStream
.format("iceberg")
.table("engagement.`sandbox-client-feedback`.dummy_iceberg_source_stream")
.withColumn("_ProcessedTime", current_timestamp())
)
query = (source_stream.writeStream
.format("iceberg")
.outputMode("append")
.trigger(availableNow=True)
.option("checkpointLocation", "/tmp/checkpoint/testicebergdestination")
.toTable("engagement.`sandbox-client-feedback`.dummy_iceberg_destination")
)
query.awaitTermination()
Passionate about hosting events and connecting people? Help us grow a vibrant local communityโsign up today to get started!
Sign Up Now