<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Handling Streaming Query Hangs &amp;amp; Delta Upsert Failures in Multi-Table Jobs in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/handling-streaming-query-hangs-amp-delta-upsert-failures-in/m-p/117232#M45459</link>
    <description>&lt;P&gt;Hello Hung,&lt;/P&gt;
&lt;P&gt;Working with streaming tables is always a challenge. Let's remember we are working with unbounded data so it's important to consider a few points:&lt;/P&gt;
&lt;OL&gt;
&lt;LI&gt;If you are working with Job, you can define your job cluster for each task. Consider the computer's configuration to be able to handle all these multiple streams, specifically Memory, as it could lead to &lt;A href="https://docs.databricks.com/aws/en/optimizations/spark-ui-guide/long-spark-stage-page" target="_self"&gt;memory spills&lt;/A&gt; into your hard drive. You can leverage &lt;A href="https://spark.apache.org/docs/3.5.0/web-ui.html" target="_self"&gt;Sparks UI&lt;/A&gt; to collect information about your operations.
&lt;OL&gt;
&lt;LI&gt;If you are working with &lt;A href="https://docs.databricks.com/aws/en/structured-streaming/stateful-streaming" target="_self"&gt;heavy stateful streams&lt;/A&gt;, you can consider leveraging &lt;A href="https://docs.databricks.com/aws/en/structured-streaming/rocksdb-state-store" target="_self"&gt;RocksDB for your state handling&lt;/A&gt;&lt;/LI&gt;
&lt;/OL&gt;
&lt;/LI&gt;
&lt;LI&gt;If you are running Stream-Stream Joins, consider reviewing the details for each in the &lt;A href="https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#support-matrix-for-joins-in-streaming-queries" target="_self"&gt;Structure Streaming Guide&lt;/A&gt;&lt;/LI&gt;
&lt;LI&gt;Consider leveraging &lt;A href="https://docs.databricks.com/aws/en/structured-streaming/checkpoints" target="_self"&gt;Checkpointing&lt;/A&gt; of your streams&lt;/LI&gt;
&lt;LI&gt;As you mentioned, there maybe some ch allenges handling the session, you may want to start small then increase the number of streams until you find which is one the culprit&lt;/LI&gt;
&lt;LI&gt;Consider &lt;A href="https://docs.databricks.com/aws/en/structured-streaming/delta-lake#limit-input-rate" target="_self"&gt;limit input rates (files or bytes),&lt;/A&gt;perhaps there is a considerable amount of data stream staggering the operation. Alternative you can consider DLT where this is handled automatically.&lt;/LI&gt;
&lt;LI&gt;Lastly consider your data layout(partitions) and the structure of your operations - Here is an overall &lt;A href="https://www.databricks.com/discover/pages/optimize-data-workloads-guide" target="_self"&gt;guide to optimize data workloads&lt;/A&gt;&lt;/LI&gt;
&lt;/OL&gt;
&lt;P&gt;I wish I could give you a straight answer but hopefully these provides you with some options to keep looking and working it out.&lt;/P&gt;
&lt;P&gt;Thank you&lt;/P&gt;</description>
    <pubDate>Thu, 01 May 2025 00:02:25 GMT</pubDate>
    <dc:creator>mmayorga</dc:creator>
    <dc:date>2025-05-01T00:02:25Z</dc:date>
    <item>
      <title>Handling Streaming Query Hangs &amp; Delta Upsert Failures in Multi-Table Jobs</title>
      <link>https://community.databricks.com/t5/data-engineering/handling-streaming-query-hangs-amp-delta-upsert-failures-in/m-p/115380#M45076</link>
      <description>&lt;P&gt;Hi Databricks Experts,&lt;/P&gt;&lt;P&gt;I'm encountering issues with my streaming jobs in Databricks and need some advice. I’ve implemented a custom streaming query listener to capture job status events and upsert them into a Delta table. However, the solution behaves differently when running on a single streaming table versus multiple tables.&lt;/P&gt;&lt;P&gt;Below is my complete code, along with explanatory notes:&lt;/P&gt;&lt;PRE&gt;from abc import ABC
from pyspark.sql.types import StructType, StructField, StringType
from datetime import datetime
# Assume necessary imports from Delta and Databricks libraries

class MonitoringHelper(ABC):
    def persist_status_to_delta(config: BaseDto, job_info: JobInfoDto, spark, event, dest_location):
        # (Optionally) Sleep for 5 seconds between each job restart
        # time.sleep(5)
        
        # Define the schema for the Delta table
        schema = StructType([
            StructField("job_id", StringType(), True),
            StructField("job_name", StringType(), True),
            StructField("job_run_id", StringType(), True),
            StructField("run_id", StringType(), True),
            StructField("query_id", StringType(), True),
            StructField("event_timestamp", StringType(), True),
        ])

        # Create a dictionary with job and event details
        data = {
            "job_id": job_info.job_id,
            "job_name": job_info.job_name, 
            "job_run_id": job_info.job_run_id, 
            "query_id": event.id,
            "run_id": event.runId,
            "event_timestamp": f"{event.timestamp[0:23]}+00:00",
        }

        # Create a DataFrame based on the defined schema
        df = spark.createDataFrame([data], schema=schema)
        
        # Check if the destination Delta table exists and perform an upsert
        if DeltaTable.isDeltaTable(spark, dest_location):
            # Alternative check: if not config.app_conf.pipeline_config["first_create_monitoring_table"]:
            DeltaTable.forPath(spark, dest_location).alias("target").merge(
                df.alias("source"),
                "target.job_name = source.job_name AND target.job_id = source.job_id AND target.query_id = source.query_id"
            ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()


def add_custom_listening(spark, config: BaseDto, job_info: JobInfoDto = None):
    class CustomStreamingQueryListener(StreamingQueryListener):
        def onQueryStarted(self, event):
            pass

        def onQueryProgress(self, event):
            pass

        def onQueryIdle(self, event):
            # IMPORTANT: Handles idle event to write streaming job status periodically
            # Parse the datetime string from the event timestamp
            dt = datetime.strptime(event.timestamp, "%Y-%m-%dT%H:%M:%S.%fZ")
            # Write data to the Delta table every configured interval (e.g., every 10 minutes)
            if dt.minute % config.app_conf.silf_config["time_interval_write_stream_lated"] == 0:
                buckets = config.app_conf.silf_config["gcs_buckets"] 
                # Uncomment below to persist status
                # MonitoringHelper.persist_status_to_delta(config, job_info, spark, event, f"{buckets}/logs/silf_logs/streaming_lated/")

        def onQueryTerminated(self, event):
            pass

    # Attaching the custom listener to the current streaming query (code to attach the listener goes here)&lt;/PRE&gt;&lt;P&gt;&lt;STRONG&gt;Description of the Issue:&lt;/STRONG&gt;&lt;/P&gt;&lt;OL&gt;&lt;LI&gt;&lt;P&gt;&lt;STRONG&gt;Scenario with a Single Streaming Table:&lt;/STRONG&gt;&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;P&gt;When I run the job with just one streaming table, the upsert into the Delta table (e.g., at gs://buckets_name/logs/silf_logs/silver_logs) works as expected.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;&lt;STRONG&gt;Scenario with Multiple Streaming Tables (≈10 tables):&lt;/STRONG&gt;&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;P&gt;When running with multiple streaming tables concurrently, I encounter the following error:&lt;/P&gt;&lt;PRE&gt;warnings.warn(f"Listener {str(listener)} threw an exception\n{e}")
/databricks/spark/python/pyspark/sql/connect/streaming/query.py:561: UserWarning: Listener &amp;lt;...CustomStreamingQueryListener object...&amp;gt; threw an exception
&amp;lt;_InactiveRpcError of RPC that terminated with:
    status = StatusCode.PERMISSION_DENIED
    details = "Local RPC without associated session."
    debug_error_string = "UNKNOWN:Error received from peer ..." &amp;gt;&lt;/PRE&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;/LI&gt;&lt;/OL&gt;&lt;P&gt;&lt;STRONG&gt;Questions:&lt;/STRONG&gt;&lt;/P&gt;&lt;OL&gt;&lt;LI&gt;&lt;P&gt;&lt;STRONG&gt;Handling Streaming Query Hangs:&lt;/STRONG&gt;&lt;BR /&gt;Given that my jobs can involve 20–30 streaming tables, what strategies or best practices does Databricks recommend to handle cases where streaming queries hang after running for about a day?&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;&lt;STRONG&gt;Fixing Delta Upsert Errors in onQueryIdle:&lt;/STRONG&gt;&lt;BR /&gt;How can I resolve the "Local RPC without associated session" (PERMISSION_DENIED) error when performing the Delta upsert within the onQueryIdle event?&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;P&gt;Could this be related to session management in a high-concurrency scenario?&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Are there configuration tweaks or adjustments to the listener pattern that might help?&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;/LI&gt;&lt;/OL&gt;&lt;P&gt;I appreciate your insights and recommendations on addressing both issues. Any advice to improve the robustness and stability of these streaming jobs would be extremely valuable.&lt;/P&gt;&lt;P&gt;Thank you!&lt;/P&gt;</description>
      <pubDate>Mon, 14 Apr 2025 04:22:23 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/handling-streaming-query-hangs-amp-delta-upsert-failures-in/m-p/115380#M45076</guid>
      <dc:creator>minhhung0507</dc:creator>
      <dc:date>2025-04-14T04:22:23Z</dc:date>
    </item>
    <item>
      <title>Re: Handling Streaming Query Hangs &amp; Delta Upsert Failures in Multi-Table Jobs</title>
      <link>https://community.databricks.com/t5/data-engineering/handling-streaming-query-hangs-amp-delta-upsert-failures-in/m-p/117232#M45459</link>
      <description>&lt;P&gt;Hello Hung,&lt;/P&gt;
&lt;P&gt;Working with streaming tables is always a challenge. Let's remember we are working with unbounded data so it's important to consider a few points:&lt;/P&gt;
&lt;OL&gt;
&lt;LI&gt;If you are working with Job, you can define your job cluster for each task. Consider the computer's configuration to be able to handle all these multiple streams, specifically Memory, as it could lead to &lt;A href="https://docs.databricks.com/aws/en/optimizations/spark-ui-guide/long-spark-stage-page" target="_self"&gt;memory spills&lt;/A&gt; into your hard drive. You can leverage &lt;A href="https://spark.apache.org/docs/3.5.0/web-ui.html" target="_self"&gt;Sparks UI&lt;/A&gt; to collect information about your operations.
&lt;OL&gt;
&lt;LI&gt;If you are working with &lt;A href="https://docs.databricks.com/aws/en/structured-streaming/stateful-streaming" target="_self"&gt;heavy stateful streams&lt;/A&gt;, you can consider leveraging &lt;A href="https://docs.databricks.com/aws/en/structured-streaming/rocksdb-state-store" target="_self"&gt;RocksDB for your state handling&lt;/A&gt;&lt;/LI&gt;
&lt;/OL&gt;
&lt;/LI&gt;
&lt;LI&gt;If you are running Stream-Stream Joins, consider reviewing the details for each in the &lt;A href="https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#support-matrix-for-joins-in-streaming-queries" target="_self"&gt;Structure Streaming Guide&lt;/A&gt;&lt;/LI&gt;
&lt;LI&gt;Consider leveraging &lt;A href="https://docs.databricks.com/aws/en/structured-streaming/checkpoints" target="_self"&gt;Checkpointing&lt;/A&gt; of your streams&lt;/LI&gt;
&lt;LI&gt;As you mentioned, there maybe some ch allenges handling the session, you may want to start small then increase the number of streams until you find which is one the culprit&lt;/LI&gt;
&lt;LI&gt;Consider &lt;A href="https://docs.databricks.com/aws/en/structured-streaming/delta-lake#limit-input-rate" target="_self"&gt;limit input rates (files or bytes),&lt;/A&gt;perhaps there is a considerable amount of data stream staggering the operation. Alternative you can consider DLT where this is handled automatically.&lt;/LI&gt;
&lt;LI&gt;Lastly consider your data layout(partitions) and the structure of your operations - Here is an overall &lt;A href="https://www.databricks.com/discover/pages/optimize-data-workloads-guide" target="_self"&gt;guide to optimize data workloads&lt;/A&gt;&lt;/LI&gt;
&lt;/OL&gt;
&lt;P&gt;I wish I could give you a straight answer but hopefully these provides you with some options to keep looking and working it out.&lt;/P&gt;
&lt;P&gt;Thank you&lt;/P&gt;</description>
      <pubDate>Thu, 01 May 2025 00:02:25 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/handling-streaming-query-hangs-amp-delta-upsert-failures-in/m-p/117232#M45459</guid>
      <dc:creator>mmayorga</dc:creator>
      <dc:date>2025-05-01T00:02:25Z</dc:date>
    </item>
  </channel>
</rss>

