Handling Streaming Query Hangs & Delta Upsert Failures in Multi-Table Jobs
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Sunday
Hi Databricks Experts,
I'm encountering issues with my streaming jobs in Databricks and need some advice. I’ve implemented a custom streaming query listener to capture job status events and upsert them into a Delta table. However, the solution behaves differently when running on a single streaming table versus multiple tables.
Below is my complete code, along with explanatory notes:
from abc import ABC from pyspark.sql.types import StructType, StructField, StringType from datetime import datetime # Assume necessary imports from Delta and Databricks libraries class MonitoringHelper(ABC): def persist_status_to_delta(config: BaseDto, job_info: JobInfoDto, spark, event, dest_location): # (Optionally) Sleep for 5 seconds between each job restart # time.sleep(5) # Define the schema for the Delta table schema = StructType([ StructField("job_id", StringType(), True), StructField("job_name", StringType(), True), StructField("job_run_id", StringType(), True), StructField("run_id", StringType(), True), StructField("query_id", StringType(), True), StructField("event_timestamp", StringType(), True), ]) # Create a dictionary with job and event details data = { "job_id": job_info.job_id, "job_name": job_info.job_name, "job_run_id": job_info.job_run_id, "query_id": event.id, "run_id": event.runId, "event_timestamp": f"{event.timestamp[0:23]}+00:00", } # Create a DataFrame based on the defined schema df = spark.createDataFrame([data], schema=schema) # Check if the destination Delta table exists and perform an upsert if DeltaTable.isDeltaTable(spark, dest_location): # Alternative check: if not config.app_conf.pipeline_config["first_create_monitoring_table"]: DeltaTable.forPath(spark, dest_location).alias("target").merge( df.alias("source"), "target.job_name = source.job_name AND target.job_id = source.job_id AND target.query_id = source.query_id" ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute() def add_custom_listening(spark, config: BaseDto, job_info: JobInfoDto = None): class CustomStreamingQueryListener(StreamingQueryListener): def onQueryStarted(self, event): pass def onQueryProgress(self, event): pass def onQueryIdle(self, event): # IMPORTANT: Handles idle event to write streaming job status periodically # Parse the datetime string from the event timestamp dt = datetime.strptime(event.timestamp, "%Y-%m-%dT%H:%M:%S.%fZ") # Write data to the Delta table every configured interval (e.g., every 10 minutes) if dt.minute % config.app_conf.silf_config["time_interval_write_stream_lated"] == 0: buckets = config.app_conf.silf_config["gcs_buckets"] # Uncomment below to persist status # MonitoringHelper.persist_status_to_delta(config, job_info, spark, event, f"{buckets}/logs/silf_logs/streaming_lated/") def onQueryTerminated(self, event): pass # Attaching the custom listener to the current streaming query (code to attach the listener goes here)
Description of the Issue:
Scenario with a Single Streaming Table:
When I run the job with just one streaming table, the upsert into the Delta table (e.g., at gs://buckets_name/logs/silf_logs/silver_logs) works as expected.
Scenario with Multiple Streaming Tables (≈10 tables):
When running with multiple streaming tables concurrently, I encounter the following error:
warnings.warn(f"Listener {str(listener)} threw an exception\n{e}") /databricks/spark/python/pyspark/sql/connect/streaming/query.py:561: UserWarning: Listener <...CustomStreamingQueryListener object...> threw an exception <_InactiveRpcError of RPC that terminated with: status = StatusCode.PERMISSION_DENIED details = "Local RPC without associated session." debug_error_string = "UNKNOWN:Error received from peer ..." >
Questions:
Handling Streaming Query Hangs:
Given that my jobs can involve 20–30 streaming tables, what strategies or best practices does Databricks recommend to handle cases where streaming queries hang after running for about a day?Fixing Delta Upsert Errors in onQueryIdle:
How can I resolve the "Local RPC without associated session" (PERMISSION_DENIED) error when performing the Delta upsert within the onQueryIdle event?Could this be related to session management in a high-concurrency scenario?
Are there configuration tweaks or adjustments to the listener pattern that might help?
I appreciate your insights and recommendations on addressing both issues. Any advice to improve the robustness and stability of these streaming jobs would be extremely valuable.
Thank you!
Hung Nguyen
- Labels:
-
Delta Lake
-
Spark
-
Workflows

