cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Structured streaming for iceberg tables

SaugatMukherjee
New Contributor III

According to this https://iceberg.apache.org/docs/latest/spark-structured-streaming/ , we can stream from iceberg tables. I have ensured that my source table is Iceberg version 3, but no matter what I do, I get Iceberg does not streaming reads. Looking at V3 limitations here https://docs.databricks.com/gcp/en/iceberg/iceberg-v3?language=Managed+Iceberg+table#limitations , it does not mention that anywhere as a limitation. For version 2, one of limitation listed here https://docs.databricks.com/gcp/en/iceberg/ states that "Iceberg doesn't support change data feed. As a result, incremental processing isn't supported when reading managed Iceberg tables as a source for:"

But my destination is another iceberg table (also tried delta)

Error: data source iceberg does not support streamed reading

-- create source table
spark.sql("""CREATE OR REPLACE TABLE engagement.`sandbox-client-feedback`.dummy_iceberg_source_stream (c1 INT)
USING iceberg
TBLPROPERTIES ('format-version' = 3)""")

insert into engagement.`sandbox-client-feedback`.dummy_iceberg_source_stream values (2);

-- create destination table
spark.sql("""CREATE OR REPLACE TABLE engagement.`sandbox-client-feedback`.dummy_iceberg_destination (c1 INT)
USING iceberg""")

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import sys
from databricks.connect import DatabricksSession
import logging
import datetime


def stream_iceberg_to_iceberg(
    spark,
    source_table,
    target_table,
    checkpoint_location,
    trigger_interval_seconds=60,
    stream_start_timestamp=None
):
    """
    Stream data from source Iceberg table to target Iceberg table
    
    Args:
        spark: SparkSession instance
        source_table: Source Iceberg table name (e.g., 'catalog.database.source_table')
        target_table: Target Iceberg table name (e.g., 'catalog.database.target_table')
        checkpoint_location: Path for checkpoint files
        trigger_interval_seconds: Processing trigger interval in seconds (default: 60)
        stream_start_timestamp: Optional timestamp in milliseconds to start streaming from
    """
    
    # Configure streaming read from source Iceberg table
    read_stream_builder = (spark.readStream 
        .format("iceberg")
    )
    
    # Optional: Start from a specific timestamp
    if stream_start_timestamp:
        read_stream_builder = (read_stream_builder 
            .option("stream-from-timestamp", str(stream_start_timestamp))
        )
    
    # Optional: Configure streaming behavior
    read_stream_builder = (read_stream_builder 
        .option("streaming-skip-overwrite-snapshots", "true") 
        .option("streaming-skip-delete-snapshots", "true")
    )
    
    # Read the stream
    source_stream = read_stream_builder.load(source_table)
    
    # Optional: Apply transformations
    # Example: Add processing timestamp and filter
    transformed_stream = (source_stream 
        .withColumn("processing_time", current_timestamp()) 
        .withColumn("processing_date", current_date())
    )
    # Add your custom transformations here
    # .filter(col("some_column").isNotNull())
    # .select("col1", "col2", "col3")
    
    # Write stream to target Iceberg table
    query = (transformed_stream.writeStream 
        .format("iceberg") 
        .outputMode("append") 
        .trigger(availableNow=True) 
        .option("checkpointLocation", checkpoint_location) 
        .option("fanout-enabled", "true") 
        .toTable(target_table)
    )
    
    return query



def main():
    """
    Main execution function
    """
    # Configuration
    SOURCE_TABLE = "engagement.`sandbox-client-feedback`.dummy_iceberg_source_stream"
    TARGET_TABLE = "engagement.`sandbox-client-feedback`.dummy_iceberg_destination"
    CHECKPOINT_LOCATION = "dbfs:/dbfs/tmp/sum_streaming_iceberg_test_stream"
    
    start_time = int(datetime.datetime(2026, 1, 1).timestamp() * 1000)
    START_TIMESTAMP = start_time  # Set to None to start from latest snapshot
    
    # Create Spark session
    spark = DatabricksSession.builder.getOrCreate()
    spark.sparkContext.setLogLevel("INFO")
    
    logging.info(f"Starting streaming from {SOURCE_TABLE} to {TARGET_TABLE}")
    logging.info(f"Checkpoint location: {CHECKPOINT_LOCATION}")
    
    try:
        # Start streaming query
        query = stream_iceberg_to_iceberg(
            spark=spark,
            source_table=SOURCE_TABLE,
            target_table=TARGET_TABLE,
            checkpoint_location=CHECKPOINT_LOCATION,
            stream_start_timestamp=START_TIMESTAMP
        )
        
        logging.info(f"Streaming query started: {query.name}")
        logging.info("Press Ctrl+C to stop the streaming query")
        
        query.awaitTermination()
        
    except Exception as e:
        print(f"Error in streaming job: {str(e)}")


if __name__ == "__main__":
    main()

TBLPROPERTIES ('format-version' = 3);

 

 
1 REPLY 1

Louis_Frolio
Databricks Employee
Databricks Employee

Greetings @SaugatMukherjee , I did some research and this is what I found. 

Youโ€™re running into a real (and documented) Databricks limitation here: managed Iceberg tables cannot be used as a streaming source today. Thatโ€™s true even though upstream Apache Iceberg documents a Spark Structured Streaming read API.

Letโ€™s unpack whatโ€™s going on and why your code behaves the way it does.

Letโ€™s dig inโ€ฆ

What the docs actually say

Upstream Apache Iceberg documentation shows support for Spark Structured Streaming reads using:

spark.readStream.format(โ€œicebergโ€)

This includes options like stream-from-timestamp, along with guardrails such as skipping overwrite or delete snapshots. In open-source Spark + Iceberg, that story is real.

However, Databricksโ€™ own Managed Iceberg table limitations documentation is explicit about the gap:

Iceberg does not support Change Data Feed (CDF). As a result, incremental processing is not supported when reading managed Iceberg tables as a source for materialized views and streaming tables.

That single sentence explains the behavior youโ€™re seeing.

Even though:

  • Managed Iceberg is Public Preview in DBR 16.4+

  • Iceberg v3 features are Beta in DBR 17.3+

โ€ฆthe absence of CDF means Databricks does not allow managed Iceberg tables to participate in incremental or streaming reads as a source.

 

Why you see โ€œdata source iceberg does not support streamed readingโ€

On Databricks, the Iceberg connector shipped with the runtime does not expose a streaming source implementation.

Databricksโ€™ streaming and incremental features lean heavily on Change Data Feed as the underlying mechanism. Because OSS Iceberg doesnโ€™t have CDF today, Databricks intentionally disables streaming reads from managed Iceberg tables.

Operationally, that shows up exactly as the error you hit:

โ€œdata source iceberg does not support streamed readingโ€

Even though the options you set (stream-from-timestamp, streaming-skip-overwrite-snapshots, etc.) are valid in upstream Iceberg, Databricks will reject the read regardless. This is a platform constraint, not a configuration issue.

What works vs. what doesnโ€™t on Databricks today

Hereโ€™s the clean line in the sand:

  • Streaming reads from managed Iceberg tables as a source are not supported.

  • Incremental features like streaming tables, materialized views, and similar services cannot read managed Iceberg input today.

  • Upstream Iceberg examples showing streaming reads and writes do not apply to Databricks-managed Iceberg yet.

The sink side (writing to Iceberg) is not your problem. The read side is.

Practical options and workarounds

If you need true streaming or CDC-style incremental processing today, the supported path is:

  • Keep your source in Delta

  • Enable Change Data Feed

  • Read via Structured Streaming with readChangeFeed = true

That pattern is battle-tested and fully supported on Databricks.

If your source must remain Iceberg, the viable alternative is a scheduled batch micro-batch pattern:

  • Read the Iceberg table in batch:

    spark.read.table(โ€œcatalog.schema.tableโ€)

  • Transform and append to your destination (Iceberg or Delta) using batch writes

  • Persist a watermark (timestamp or snapshot ID) in a control table

  • Filter โ€œnewโ€ data on each run

This gives you near-real-time behavior via frequent Jobs runs, but it is still batch, not streaming. There is no Databricks-managed Iceberg streaming source to cite today.

If your downstream targets include materialized views, streaming tables, Lakehouse Monitoring, or other incremental services, the practical guidance is to land or replicate into Delta so those services can leverage CDF and row tracking.

Notes on your code specifically

The failure is happening right here:

spark.readStream.format(โ€œicebergโ€).load(source_table)

On Databricks, that call will error for managed Iceberg sources every time. Swapping options wonโ€™t change the outcome.

Your choices are:

  • Replace this with a batch read and schedule it, or

  • Change the source to Delta with CDF if you need a true streaming pipeline

Again, your sink choice (Iceberg vs Delta) is not the issueโ€”the source read is.

Minimal batch example: Iceberg โ†’ Iceberg on Databricks

Hereโ€™s the simplest supported pattern:

from pyspark.sql import functions as F

SOURCE_TABLE = "engagement.`sandbox-client-feedback`.dummy_iceberg_source_stream"
TARGET_TABLE = "engagement.`sandbox-client-feedback`.dummy_iceberg_destination"

df = (
    spark.read.table(SOURCE_TABLE)
         .withColumn("processing_time", F.current_timestamp())
         .withColumn("processing_date", F.current_date())
)

(df.write
   .format("iceberg")
   .mode("append")
   .saveAsTable(TARGET_TABLE))

Schedule this with Jobs every N minutes, and persist a โ€œlast processedโ€ watermark to avoid reprocessing. Itโ€™s batch, but itโ€™s the closest operational equivalent available today.

Bottom line

If you need streaming today, Delta + CDF is the supported answer on Databricks.

If you need to stay on Iceberg, the path forward is scheduled batch with explicit state managementโ€”until Databricks enables an Iceberg-compatible incremental source.

Hope this provides some guidance.

Regars, Louis.