Hi Databricks Experts,
I'm encountering issues with my streaming jobs in Databricks and need some advice. I’ve implemented a custom streaming query listener to capture job status events and upsert them into a Delta table. However, the solution behaves differently when running on a single streaming table versus multiple tables.
Below is my complete code, along with explanatory notes:
from abc import ABC
from pyspark.sql.types import StructType, StructField, StringType
from datetime import datetime
# Assume necessary imports from Delta and Databricks libraries
class MonitoringHelper(ABC):
    def persist_status_to_delta(config: BaseDto, job_info: JobInfoDto, spark, event, dest_location):
        # (Optionally) Sleep for 5 seconds between each job restart
        # time.sleep(5)
        
        # Define the schema for the Delta table
        schema = StructType([
            StructField("job_id", StringType(), True),
            StructField("job_name", StringType(), True),
            StructField("job_run_id", StringType(), True),
            StructField("run_id", StringType(), True),
            StructField("query_id", StringType(), True),
            StructField("event_timestamp", StringType(), True),
        ])
        # Create a dictionary with job and event details
        data = {
            "job_id": job_info.job_id,
            "job_name": job_info.job_name, 
            "job_run_id": job_info.job_run_id, 
            "query_id": event.id,
            "run_id": event.runId,
            "event_timestamp": f"{event.timestamp[0:23]}+00:00",
        }
        # Create a DataFrame based on the defined schema
        df = spark.createDataFrame([data], schema=schema)
        
        # Check if the destination Delta table exists and perform an upsert
        if DeltaTable.isDeltaTable(spark, dest_location):
            # Alternative check: if not config.app_conf.pipeline_config["first_create_monitoring_table"]:
            DeltaTable.forPath(spark, dest_location).alias("target").merge(
                df.alias("source"),
                "target.job_name = source.job_name AND target.job_id = source.job_id AND target.query_id = source.query_id"
            ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
def add_custom_listening(spark, config: BaseDto, job_info: JobInfoDto = None):
    class CustomStreamingQueryListener(StreamingQueryListener):
        def onQueryStarted(self, event):
            pass
        def onQueryProgress(self, event):
            pass
        def onQueryIdle(self, event):
            # IMPORTANT: Handles idle event to write streaming job status periodically
            # Parse the datetime string from the event timestamp
            dt = datetime.strptime(event.timestamp, "%Y-%m-%dT%H:%M:%S.%fZ")
            # Write data to the Delta table every configured interval (e.g., every 10 minutes)
            if dt.minute % config.app_conf.silf_config["time_interval_write_stream_lated"] == 0:
                buckets = config.app_conf.silf_config["gcs_buckets"] 
                # Uncomment below to persist status
                # MonitoringHelper.persist_status_to_delta(config, job_info, spark, event, f"{buckets}/logs/silf_logs/streaming_lated/")
        def onQueryTerminated(self, event):
            pass
    # Attaching the custom listener to the current streaming query (code to attach the listener goes here)Description of the Issue:
Scenario with a Single Streaming Table:
Scenario with Multiple Streaming Tables (≈10 tables):
When running with multiple streaming tables concurrently, I encounter the following error:
warnings.warn(f"Listener {str(listener)} threw an exception\n{e}")
/databricks/spark/python/pyspark/sql/connect/streaming/query.py:561: UserWarning: Listener <...CustomStreamingQueryListener object...> threw an exception
<_InactiveRpcError of RPC that terminated with:
    status = StatusCode.PERMISSION_DENIED
    details = "Local RPC without associated session."
    debug_error_string = "UNKNOWN:Error received from peer ..." >
Questions:
Handling Streaming Query Hangs:
Given that my jobs can involve 20–30 streaming tables, what strategies or best practices does Databricks recommend to handle cases where streaming queries hang after running for about a day?
Fixing Delta Upsert Errors in onQueryIdle:
How can I resolve the "Local RPC without associated session" (PERMISSION_DENIED) error when performing the Delta upsert within the onQueryIdle event?
I appreciate your insights and recommendations on addressing both issues. Any advice to improve the robustness and stability of these streaming jobs would be extremely valuable.
Thank you!
					
				
			
			
				
	Regards,
Hung Nguyen