Hi Databricks Experts,
I'm encountering issues with my streaming jobs in Databricks and need some advice. I’ve implemented a custom streaming query listener to capture job status events and upsert them into a Delta table. However, the solution behaves differently when running on a single streaming table versus multiple tables.
Below is my complete code, along with explanatory notes:
from abc import ABC
from pyspark.sql.types import StructType, StructField, StringType
from datetime import datetime
# Assume necessary imports from Delta and Databricks libraries
class MonitoringHelper(ABC):
def persist_status_to_delta(config: BaseDto, job_info: JobInfoDto, spark, event, dest_location):
# (Optionally) Sleep for 5 seconds between each job restart
# time.sleep(5)
# Define the schema for the Delta table
schema = StructType([
StructField("job_id", StringType(), True),
StructField("job_name", StringType(), True),
StructField("job_run_id", StringType(), True),
StructField("run_id", StringType(), True),
StructField("query_id", StringType(), True),
StructField("event_timestamp", StringType(), True),
])
# Create a dictionary with job and event details
data = {
"job_id": job_info.job_id,
"job_name": job_info.job_name,
"job_run_id": job_info.job_run_id,
"query_id": event.id,
"run_id": event.runId,
"event_timestamp": f"{event.timestamp[0:23]}+00:00",
}
# Create a DataFrame based on the defined schema
df = spark.createDataFrame([data], schema=schema)
# Check if the destination Delta table exists and perform an upsert
if DeltaTable.isDeltaTable(spark, dest_location):
# Alternative check: if not config.app_conf.pipeline_config["first_create_monitoring_table"]:
DeltaTable.forPath(spark, dest_location).alias("target").merge(
df.alias("source"),
"target.job_name = source.job_name AND target.job_id = source.job_id AND target.query_id = source.query_id"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
def add_custom_listening(spark, config: BaseDto, job_info: JobInfoDto = None):
class CustomStreamingQueryListener(StreamingQueryListener):
def onQueryStarted(self, event):
pass
def onQueryProgress(self, event):
pass
def onQueryIdle(self, event):
# IMPORTANT: Handles idle event to write streaming job status periodically
# Parse the datetime string from the event timestamp
dt = datetime.strptime(event.timestamp, "%Y-%m-%dT%H:%M:%S.%fZ")
# Write data to the Delta table every configured interval (e.g., every 10 minutes)
if dt.minute % config.app_conf.silf_config["time_interval_write_stream_lated"] == 0:
buckets = config.app_conf.silf_config["gcs_buckets"]
# Uncomment below to persist status
# MonitoringHelper.persist_status_to_delta(config, job_info, spark, event, f"{buckets}/logs/silf_logs/streaming_lated/")
def onQueryTerminated(self, event):
pass
# Attaching the custom listener to the current streaming query (code to attach the listener goes here)
Description of the Issue:
Scenario with a Single Streaming Table:
Scenario with Multiple Streaming Tables (≈10 tables):
When running with multiple streaming tables concurrently, I encounter the following error:
warnings.warn(f"Listener {str(listener)} threw an exception\n{e}")
/databricks/spark/python/pyspark/sql/connect/streaming/query.py:561: UserWarning: Listener <...CustomStreamingQueryListener object...> threw an exception
<_InactiveRpcError of RPC that terminated with:
status = StatusCode.PERMISSION_DENIED
details = "Local RPC without associated session."
debug_error_string = "UNKNOWN:Error received from peer ..." >
Questions:
Handling Streaming Query Hangs:
Given that my jobs can involve 20–30 streaming tables, what strategies or best practices does Databricks recommend to handle cases where streaming queries hang after running for about a day?
Fixing Delta Upsert Errors in onQueryIdle:
How can I resolve the "Local RPC without associated session" (PERMISSION_DENIED) error when performing the Delta upsert within the onQueryIdle event?
I appreciate your insights and recommendations on addressing both issues. Any advice to improve the robustness and stability of these streaming jobs would be extremely valuable.
Thank you!
Regards,
Hung Nguyen