cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Structured streaming not working with Serverless compute

mv-rs
New Contributor

Hi,

I have a structured streaming process that is working with a normal compute but when attempting to run using Serverless, the pipeline is failing, and I'm being met with the error seen in the image below.

CONTEXT: I have a Git repo with two folders, `src` and `notebooks`. Inside the `notebooks` directory we use Databricks notebooks as a sort of facade to the business logic code that would be defined in the `src` folders. In this scenario, we have one notebook named 'Streaming Notebook' inside the `notebooks` directory, and two python files (`streaming_main.py` and `bronze_streaming.py`) inside the `src` directory.

The 'Streaming Notebook' instantiates the class inside the `streaming_main.py` which would in turn execute the various steps required of the structured streaming pipeline, where these would be defined in their own separate classes. In this case, the logic for the bronze layer is defined in the `bronze_streaming.py` (opens a read stream from a CDC enabled delta table (defined in Unity Catalog), and using the `.foreachBatch` function, do some processing on the DataFrame).

Below you can find code snippets for replicating the code for all the notebooks/files, provided that the same folder structure defined above is respected:

`Streaming Notebook`:

 

from src.streaming_main import Streaming

UNITY_CATALOG_NAME = "YOUR_UNITY_CATALOG_NAME"
TABLE_NAME = "YOUR_TABLE_NAME"
FULL_TABLE_NAME = UNITY_CATALOG_NAME + ".default." + TABLE_NAME
CHECKPOINT_PATH = "YOUR_CHECKPOINT_PATH"

FULL_TABLE_NAME = UNITY_CATALOG_NAME + ".default." + TABLE_NAME

# Cleanup if needed
spark.sql(f"DROP TABLE IF EXISTS {FULL_TABLE_NAME}")

dbutils.fs.rm(CHECKPOINT_PATH, recurse = True)

# Create and insert into mock table.
spark.sql(f"""CREATE TABLE IF NOT EXISTS {FULL_TABLE_NAME}
    (
        name string COMMENT 'Name',
        surname string COMMENT 'Surname'
    ) TBLPROPERTIES (
        delta.enableChangeDataFeed = true,
        delta.autoOptimize.optimizeWrite = true,
        delta.autoOptimize.autoCompact = true
    ) COMMENT 'Delta table related to serverless issue encoutered when opening a read stream and attempting to process using foreachBatch function.'
""")

spark.sql(f"""
    INSERT INTO {FULL_TABLE_NAME} (
        name,
        surname
    ) VALUES (
        'Test',
        'One'
    ),
    (
        'Test',
        'Two'
    ),
    (
        'Test',
        'Three'
    );
""")

streaming = Streaming(
    spark = spark,
    bronze_table_name = FULL_TABLE_NAME,
    bronze_checkpoint = CHECKPOINT_PATH
)

streaming.run_etl_pipeline()

 

`streaming_main.py`:

 

from pyspark.sql import SparkSession
from src.bronze_streaming import BronzeStreaming


class Streaming:
    def __init__(
        self,
        spark: SparkSession,
        bronze_table_name: str,
        bronze_checkpoint: str
    ) -> None:
        print("Instantiating `Streaming` class.")
        
        # Spark.
        self.spark = spark

        # Bronze Streaming Metadata.
        self.bronze_table_name: str = bronze_table_name
        self.bronze_checkpoint: str = bronze_checkpoint

        self._init_bronze_layer()

        # Define other layers as well but out of scope for replicating issue.

        print("Finished instantiating `Streaming` class.")


    def _init_bronze_layer(
        self
    ) -> None:
        print("Creating instance of bronze streaming.")

        self.bronze_streaming = BronzeStreaming(
            spark = self.spark,
            table_name = self.bronze_table_name,
            checkpoint = self.bronze_checkpoint
        )

        print("Finished creating instance of bronze streaming.")

    def run_etl_pipeline(self) -> None:
        print("Running ETL pipeline.")

        print("Running bronze layer processing.")
        self.bronze_streaming.run_streaming()
        print("Finished running bronze layer processing.")

        print("Finished running ETL pipeline.")

 

`bronze_streaming.py`:

 

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F


class BronzeStreaming:
    def __init__(
        self,
        spark: SparkSession,
        table_name: str,
        checkpoint: str
    ) -> None:
        print("Instantiating `BronzeStreaming` class.")

        # Spark.
        self.spark: SparkSession = spark

        # Bronze Streaming Metadata.
        self.table_name: str = table_name
        self.checkpoint: str = checkpoint

        print("Finished instantiating `BronzeStreaming` class.")

    def _streaming_microbatch(self, batch_df: DataFrame, batch_id: int) -> None:
        print(f"Count: {batch_df.count()}")

        processed_df = batch_df.withColumn(
            "processed",
            F.lit(True)
        ).show()


    def run_streaming(
        self
    ) -> None:
        print("Opening read stream and processing bronze logic via microbatch function.")

        query = (
            self.spark.readStream
            .option("readChangeData", "true")
            .option("startingVersion", 0)
            .table(f"{self.table_name}")
            .writeStream
            .foreachBatch(self._streaming_microbatch)
            .option("checkpointLocation", self.checkpoint)
            .trigger(availableNow=True)
            .start()
        )

        query.awaitTermination()
        
        print("Bronze streaming logic executed. Finished and moving on to the next stage!")

 

Now when it comes to a normal compute, all of the code works but when I am switching to Serverless compute, the pipeline is failing when attempting to open the read-stream from the Delta Table and I am getting the attached error. Not sure if this is a bug or if I'm doing something wrong. I've tried different DBR versions for the normal compute (from 13.3 LTS up till 15.4 LTS Beta) and all have worked.

Does anyone have any suggestions on how this can be solved?

1 REPLY 1

mark_ott
Databricks Employee
Databricks Employee

The core answer is: Many users encounter failures in structured streaming pipelines when switching from Databricks normal (classic) compute to Serverless, especially when using read streams on Unity Catalog Delta tables with Change Data Feed (CDF) enabled and .foreachBatch logic. This is typically not a code issue, but relates to current limitations and requirements of Databricks Serverless compute and Unity Catalog streaming support.

Why Serverless Fails with This Pattern

Databricks Serverless SQL warehouses have features and restrictions that differ from regular clusters. Common issues include:

  • Streaming is not fully supported on Serverless warehouses: As of DBR 13.x–15.x, many structured streaming operations do not work with Serverless. This includes readStream from Unity Catalog tables, and especially Change Data Feed (CDF), which often causes failures on Serverless.

  • Python streaming jobs depend on underlying compute: Serverless is primarily aimed at SQL, and Spark (Python) streaming jobs may fail or lack full feature parity.

  • Delta table streaming and CDF with foreachBatch: ForeachBatch streaming logic requires certain capabilities in cluster management (e.g., persistent storage, job orchestration), which normal clusters provide but Serverless does not.

  • Permissions and Unity Catalog: Serverless compute uses different managed identities, and lack of direct user control over runtime can lead to authentication or permission problems when accessing Unity Catalog tables for streaming.

Official Limitations and Known Issues

Here's a table summarizing streaming capabilities across environments:

Feature Classic Cluster Serverless Compute
readStream from Unity Catalog Delta (CDF) Supported Not Supported
foreachBatch Supported Not Supported
Python streaming pipelines Supported Not Supported
Delta Lake streaming w/ checkpointing Supported Partially / Not Supported
 
 

Troubleshooting and Solution Strategies

  • Use Jobs or Classic Clusters: For Python-based streaming, Databricks recommends running jobs on regular clusters, not Serverless.

  • Check official documentation: Review Databricks docs for latest Serverless and Unity Catalog streaming support. Look for "supported operations" tables for your DBR version.

  • Alternative: SQL Streaming Warehouses: If you must use Serverless, ensure your logic fits within SQL streaming pipelines, and use supported notebook/SQL features only (no Python, no foreachBatch).

  • Permissions: If moving to a supported pipeline, ensure the Serverless warehouse has read/write permissions to Unity Catalog tables and checkpoint path.

Known Workarounds

  • For Python/Spark streaming with CDF/foreachBatch, stick with regular compute.

  • Consider batch ETL patterns or use SQL streaming if you must run Serverless.

References

  • Databricks Unity Catalog streaming limitations, especially with Serverless compute.

  • Discussions of similar failures on Databricks forums and StackOverflow, identifying feature gaps for Serverless.


In summary: The error arises because Databricks Serverless compute does not currently support Python-based streaming jobs on Unity Catalog tables with CDF and foreachBatch. To resolve, switch back to regular clusters for streaming, or adjust your workflow to use supported SQL streaming pipelines on Serverless if possible.