cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Handling Streaming Query Hangs & Delta Upsert Failures in Multi-Table Jobs

minhhung0507
Contributor III

Hi Databricks Experts,

I'm encountering issues with my streaming jobs in Databricks and need some advice. I’ve implemented a custom streaming query listener to capture job status events and upsert them into a Delta table. However, the solution behaves differently when running on a single streaming table versus multiple tables.

Below is my complete code, along with explanatory notes:

from abc import ABC
from pyspark.sql.types import StructType, StructField, StringType
from datetime import datetime
# Assume necessary imports from Delta and Databricks libraries

class MonitoringHelper(ABC):
    def persist_status_to_delta(config: BaseDto, job_info: JobInfoDto, spark, event, dest_location):
        # (Optionally) Sleep for 5 seconds between each job restart
        # time.sleep(5)
        
        # Define the schema for the Delta table
        schema = StructType([
            StructField("job_id", StringType(), True),
            StructField("job_name", StringType(), True),
            StructField("job_run_id", StringType(), True),
            StructField("run_id", StringType(), True),
            StructField("query_id", StringType(), True),
            StructField("event_timestamp", StringType(), True),
        ])

        # Create a dictionary with job and event details
        data = {
            "job_id": job_info.job_id,
            "job_name": job_info.job_name, 
            "job_run_id": job_info.job_run_id, 
            "query_id": event.id,
            "run_id": event.runId,
            "event_timestamp": f"{event.timestamp[0:23]}+00:00",
        }

        # Create a DataFrame based on the defined schema
        df = spark.createDataFrame([data], schema=schema)
        
        # Check if the destination Delta table exists and perform an upsert
        if DeltaTable.isDeltaTable(spark, dest_location):
            # Alternative check: if not config.app_conf.pipeline_config["first_create_monitoring_table"]:
            DeltaTable.forPath(spark, dest_location).alias("target").merge(
                df.alias("source"),
                "target.job_name = source.job_name AND target.job_id = source.job_id AND target.query_id = source.query_id"
            ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()


def add_custom_listening(spark, config: BaseDto, job_info: JobInfoDto = None):
    class CustomStreamingQueryListener(StreamingQueryListener):
        def onQueryStarted(self, event):
            pass

        def onQueryProgress(self, event):
            pass

        def onQueryIdle(self, event):
            # IMPORTANT: Handles idle event to write streaming job status periodically
            # Parse the datetime string from the event timestamp
            dt = datetime.strptime(event.timestamp, "%Y-%m-%dT%H:%M:%S.%fZ")
            # Write data to the Delta table every configured interval (e.g., every 10 minutes)
            if dt.minute % config.app_conf.silf_config["time_interval_write_stream_lated"] == 0:
                buckets = config.app_conf.silf_config["gcs_buckets"] 
                # Uncomment below to persist status
                # MonitoringHelper.persist_status_to_delta(config, job_info, spark, event, f"{buckets}/logs/silf_logs/streaming_lated/")

        def onQueryTerminated(self, event):
            pass

    # Attaching the custom listener to the current streaming query (code to attach the listener goes here)

Description of the Issue:

  1. Scenario with a Single Streaming Table:

    • When I run the job with just one streaming table, the upsert into the Delta table (e.g., at gs://buckets_name/logs/silf_logs/silver_logs) works as expected.

  2. Scenario with Multiple Streaming Tables (≈10 tables):

    • When running with multiple streaming tables concurrently, I encounter the following error:

      warnings.warn(f"Listener {str(listener)} threw an exception\n{e}")
      /databricks/spark/python/pyspark/sql/connect/streaming/query.py:561: UserWarning: Listener <...CustomStreamingQueryListener object...> threw an exception
      <_InactiveRpcError of RPC that terminated with:
          status = StatusCode.PERMISSION_DENIED
          details = "Local RPC without associated session."
          debug_error_string = "UNKNOWN:Error received from peer ..." >

Questions:

  1. Handling Streaming Query Hangs:
    Given that my jobs can involve 20–30 streaming tables, what strategies or best practices does Databricks recommend to handle cases where streaming queries hang after running for about a day?

  2. Fixing Delta Upsert Errors in onQueryIdle:
    How can I resolve the "Local RPC without associated session" (PERMISSION_DENIED) error when performing the Delta upsert within the onQueryIdle event?

    • Could this be related to session management in a high-concurrency scenario?

    • Are there configuration tweaks or adjustments to the listener pattern that might help?

I appreciate your insights and recommendations on addressing both issues. Any advice to improve the robustness and stability of these streaming jobs would be extremely valuable.

Thank you!

Regards,
Hung Nguyen
0 REPLIES 0

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now