Hi,
I have a structured streaming process that is working with a normal compute but when attempting to run using Serverless, the pipeline is failing, and I'm being met with the error seen in the image below.
CONTEXT: I have a Git repo with two folders, `src` and `notebooks`. Inside the `notebooks` directory we use Databricks notebooks as a sort of facade to the business logic code that would be defined in the `src` folders. In this scenario, we have one notebook named 'Streaming Notebook' inside the `notebooks` directory, and two python files (`streaming_main.py` and `bronze_streaming.py`) inside the `src` directory.
The 'Streaming Notebook' instantiates the class inside the `streaming_main.py` which would in turn execute the various steps required of the structured streaming pipeline, where these would be defined in their own separate classes. In this case, the logic for the bronze layer is defined in the `bronze_streaming.py` (opens a read stream from a CDC enabled delta table (defined in Unity Catalog), and using the `.foreachBatch` function, do some processing on the DataFrame).
Below you can find code snippets for replicating the code for all the notebooks/files, provided that the same folder structure defined above is respected:
`Streaming Notebook`:
from src.streaming_main import Streaming
UNITY_CATALOG_NAME = "YOUR_UNITY_CATALOG_NAME"
TABLE_NAME = "YOUR_TABLE_NAME"
FULL_TABLE_NAME = UNITY_CATALOG_NAME + ".default." + TABLE_NAME
CHECKPOINT_PATH = "YOUR_CHECKPOINT_PATH"
FULL_TABLE_NAME = UNITY_CATALOG_NAME + ".default." + TABLE_NAME
# Cleanup if needed
spark.sql(f"DROP TABLE IF EXISTS {FULL_TABLE_NAME}")
dbutils.fs.rm(CHECKPOINT_PATH, recurse = True)
# Create and insert into mock table.
spark.sql(f"""CREATE TABLE IF NOT EXISTS {FULL_TABLE_NAME}
(
name string COMMENT 'Name',
surname string COMMENT 'Surname'
) TBLPROPERTIES (
delta.enableChangeDataFeed = true,
delta.autoOptimize.optimizeWrite = true,
delta.autoOptimize.autoCompact = true
) COMMENT 'Delta table related to serverless issue encoutered when opening a read stream and attempting to process using foreachBatch function.'
""")
spark.sql(f"""
INSERT INTO {FULL_TABLE_NAME} (
name,
surname
) VALUES (
'Test',
'One'
),
(
'Test',
'Two'
),
(
'Test',
'Three'
);
""")
streaming = Streaming(
spark = spark,
bronze_table_name = FULL_TABLE_NAME,
bronze_checkpoint = CHECKPOINT_PATH
)
streaming.run_etl_pipeline()
`streaming_main.py`:
from pyspark.sql import SparkSession
from src.bronze_streaming import BronzeStreaming
class Streaming:
def __init__(
self,
spark: SparkSession,
bronze_table_name: str,
bronze_checkpoint: str
) -> None:
print("Instantiating `Streaming` class.")
# Spark.
self.spark = spark
# Bronze Streaming Metadata.
self.bronze_table_name: str = bronze_table_name
self.bronze_checkpoint: str = bronze_checkpoint
self._init_bronze_layer()
# Define other layers as well but out of scope for replicating issue.
print("Finished instantiating `Streaming` class.")
def _init_bronze_layer(
self
) -> None:
print("Creating instance of bronze streaming.")
self.bronze_streaming = BronzeStreaming(
spark = self.spark,
table_name = self.bronze_table_name,
checkpoint = self.bronze_checkpoint
)
print("Finished creating instance of bronze streaming.")
def run_etl_pipeline(self) -> None:
print("Running ETL pipeline.")
print("Running bronze layer processing.")
self.bronze_streaming.run_streaming()
print("Finished running bronze layer processing.")
print("Finished running ETL pipeline.")
`bronze_streaming.py`:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
class BronzeStreaming:
def __init__(
self,
spark: SparkSession,
table_name: str,
checkpoint: str
) -> None:
print("Instantiating `BronzeStreaming` class.")
# Spark.
self.spark: SparkSession = spark
# Bronze Streaming Metadata.
self.table_name: str = table_name
self.checkpoint: str = checkpoint
print("Finished instantiating `BronzeStreaming` class.")
def _streaming_microbatch(self, batch_df: DataFrame, batch_id: int) -> None:
print(f"Count: {batch_df.count()}")
processed_df = batch_df.withColumn(
"processed",
F.lit(True)
).show()
def run_streaming(
self
) -> None:
print("Opening read stream and processing bronze logic via microbatch function.")
query = (
self.spark.readStream
.option("readChangeData", "true")
.option("startingVersion", 0)
.table(f"{self.table_name}")
.writeStream
.foreachBatch(self._streaming_microbatch)
.option("checkpointLocation", self.checkpoint)
.trigger(availableNow=True)
.start()
)
query.awaitTermination()
print("Bronze streaming logic executed. Finished and moving on to the next stage!")
Now when it comes to a normal compute, all of the code works but when I am switching to Serverless compute, the pipeline is failing when attempting to open the read-stream from the Delta Table and I am getting the attached error. Not sure if this is a bug or if I'm doing something wrong. I've tried different DBR versions for the normal compute (from 13.3 LTS up till 15.4 LTS Beta) and all have worked.
Does anyone have any suggestions on how this can be solved?