<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Monitoring structured streaming and Log4J properties in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/monitoring-structured-streaming-and-log4j-properties/m-p/132966#M49690</link>
    <description>&lt;P&gt;Hey&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/173825"&gt;@cz0&lt;/a&gt;&amp;nbsp;, here are some suggestions to help you along:&lt;/P&gt;
&lt;P&gt;To effectively monitor your streaming job metrics such as delay and processing time, using the &lt;STRONG&gt;StreamingQueryListener&lt;/STRONG&gt;&amp;nbsp;is the right approach. However, it is important to understand the limitations and behaviors of this listener as documented.&lt;/P&gt;
&lt;P&gt;&lt;FONT size="5"&gt;Monitoring Streaming Metrics&lt;/FONT&gt;&lt;/P&gt;
&lt;P&gt;&lt;STRONG&gt;&lt;FONT size="4"&gt;Listener Implementation&lt;/FONT&gt;&lt;/STRONG&gt;&lt;BR /&gt;Your implementation of the &lt;STRONG&gt;StreamingQueryListener&lt;/STRONG&gt;&amp;nbsp;looks correct. You have defined methods for when the query starts, progresses, and terminates. However, the issue lies in how the "onQueryProgress" method behaves.&lt;/P&gt;
&lt;P&gt;&lt;STRONG&gt;&lt;FONT size="4"&gt;Event Triggering&lt;/FONT&gt;&lt;/STRONG&gt;&lt;BR /&gt;The "onQueryProgress" event is only triggered at the end of each micro-batch execution. This means:&lt;/P&gt;
&lt;P&gt;- It will not trigger continuously for each record processed.&lt;BR /&gt;- If the query is waiting for new data (idle), the "onQueryIdle" event is triggered.&lt;BR /&gt;- If data is processed, the listener outputs metrics for the batch as a whole after completion. These include metrics for duration, input rate, and processing rate.&lt;/P&gt;
&lt;P&gt;So, you will not see metrics for individual record processing, but instead aggregated results per batch. Within your listener, you can access metrics through `event.progress`, such as:&lt;/P&gt;
&lt;P&gt;- "event.progress.numInputRows"&lt;BR /&gt;- "event.progress.inputRowsPerSecond"&lt;BR /&gt;- Processing duration per stage&lt;/P&gt;
&lt;P&gt;&lt;FONT size="5"&gt;Enhancing Metrics Tracking&lt;/FONT&gt;&lt;BR /&gt;If you need more granular metrics:&lt;/P&gt;
&lt;P&gt;- Use the &lt;STRONG&gt;observe&lt;/STRONG&gt; method in your DataFrame before writing to the sink.&lt;BR /&gt;- This lets you track counts and specific error-related metrics directly during processing.&lt;/P&gt;
&lt;P&gt;Example snippet:&lt;/P&gt;
&lt;P&gt;```scala&lt;BR /&gt;val observed_ds = ds.observe(&lt;BR /&gt;"metric",&lt;BR /&gt;count(lit(1)).as("cnt"),&lt;BR /&gt;count(col("error")).as("malformed")&lt;BR /&gt;)&lt;/P&gt;
&lt;P&gt;observed_ds.writeStream&lt;BR /&gt;.format("console")&lt;BR /&gt;.start()&lt;BR /&gt;```&lt;/P&gt;
&lt;P&gt;This approach allows real-time tracking of processed rows and errors at a finer level of detail.&lt;/P&gt;
&lt;P&gt;&lt;FONT size="5"&gt;Customizing Log4J&lt;/FONT&gt;&lt;BR /&gt;For customizing Log4J using a properties file via "init-script.sh":&lt;/P&gt;
&lt;P&gt;- Ensure that "init-script.sh" places the properties file in a directory recognized by Spark.&lt;BR /&gt;- Double-check that Spark is loading the configuration file (look for related errors in the logs).&lt;BR /&gt;- Verify that the specific Log4J configuration references you are using are valid.&lt;BR /&gt;- Ensure your script runs at startup properly so configurations are not skipped.&lt;/P&gt;
&lt;P&gt;Hope this helps.&lt;/P&gt;
&lt;P&gt;Cheers, Louis&lt;/P&gt;</description>
    <pubDate>Wed, 24 Sep 2025 17:12:28 GMT</pubDate>
    <dc:creator>Louis_Frolio</dc:creator>
    <dc:date>2025-09-24T17:12:28Z</dc:date>
    <item>
      <title>Monitoring structured streaming and Log4J properties</title>
      <link>https://community.databricks.com/t5/data-engineering/monitoring-structured-streaming-and-log4j-properties/m-p/124074#M47127</link>
      <description>&lt;P&gt;Hi guys, I would like to monitor streaming job on metrics like delay, processing time and more. I found this &lt;A href="https://docs.databricks.com/aws/en/structured-streaming/stream-monitoring#defining-observable-metrics-in-structured-streaming" target="_self"&gt;documentation&lt;/A&gt; but I get message on starting and terminating phase and not while I process a record. The job is a pretty easy streaming which read CSV file and write output to the console.&lt;/P&gt;&lt;P&gt;The class listener is the following:&lt;/P&gt;&lt;LI-CODE lang="java"&gt;import org.apache.spark.sql.streaming.{StreamingQueryListener}
import org.apache.spark.sql.streaming.StreamingQueryListener._

val listener = new StreamingQueryListener {

  def onQueryStarted(event: QueryStartedEvent): Unit = {
    println("Starting job!")
  }

  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    println(s"TestStreaming: ${event.progress.durationMs}")
  }

  def onQueryIdle(event: QueryProgressEvent): Unit = {}

  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {
    println("Job terminated!")
  }
}&lt;/LI-CODE&gt;&lt;P&gt;Secondly, I would like to customise Log4J by a properties file, but loading the file in Spark's config folder /databricks/spark/conf by init-script.sh it doesn’t work.&lt;/P&gt;&lt;P&gt;Thank you for the support&lt;/P&gt;</description>
      <pubDate>Sat, 05 Jul 2025 07:41:20 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/monitoring-structured-streaming-and-log4j-properties/m-p/124074#M47127</guid>
      <dc:creator>cz0</dc:creator>
      <dc:date>2025-07-05T07:41:20Z</dc:date>
    </item>
    <item>
      <title>Re: Monitoring structured streaming and Log4J properties</title>
      <link>https://community.databricks.com/t5/data-engineering/monitoring-structured-streaming-and-log4j-properties/m-p/132966#M49690</link>
      <description>&lt;P&gt;Hey&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/173825"&gt;@cz0&lt;/a&gt;&amp;nbsp;, here are some suggestions to help you along:&lt;/P&gt;
&lt;P&gt;To effectively monitor your streaming job metrics such as delay and processing time, using the &lt;STRONG&gt;StreamingQueryListener&lt;/STRONG&gt;&amp;nbsp;is the right approach. However, it is important to understand the limitations and behaviors of this listener as documented.&lt;/P&gt;
&lt;P&gt;&lt;FONT size="5"&gt;Monitoring Streaming Metrics&lt;/FONT&gt;&lt;/P&gt;
&lt;P&gt;&lt;STRONG&gt;&lt;FONT size="4"&gt;Listener Implementation&lt;/FONT&gt;&lt;/STRONG&gt;&lt;BR /&gt;Your implementation of the &lt;STRONG&gt;StreamingQueryListener&lt;/STRONG&gt;&amp;nbsp;looks correct. You have defined methods for when the query starts, progresses, and terminates. However, the issue lies in how the "onQueryProgress" method behaves.&lt;/P&gt;
&lt;P&gt;&lt;STRONG&gt;&lt;FONT size="4"&gt;Event Triggering&lt;/FONT&gt;&lt;/STRONG&gt;&lt;BR /&gt;The "onQueryProgress" event is only triggered at the end of each micro-batch execution. This means:&lt;/P&gt;
&lt;P&gt;- It will not trigger continuously for each record processed.&lt;BR /&gt;- If the query is waiting for new data (idle), the "onQueryIdle" event is triggered.&lt;BR /&gt;- If data is processed, the listener outputs metrics for the batch as a whole after completion. These include metrics for duration, input rate, and processing rate.&lt;/P&gt;
&lt;P&gt;So, you will not see metrics for individual record processing, but instead aggregated results per batch. Within your listener, you can access metrics through `event.progress`, such as:&lt;/P&gt;
&lt;P&gt;- "event.progress.numInputRows"&lt;BR /&gt;- "event.progress.inputRowsPerSecond"&lt;BR /&gt;- Processing duration per stage&lt;/P&gt;
&lt;P&gt;&lt;FONT size="5"&gt;Enhancing Metrics Tracking&lt;/FONT&gt;&lt;BR /&gt;If you need more granular metrics:&lt;/P&gt;
&lt;P&gt;- Use the &lt;STRONG&gt;observe&lt;/STRONG&gt; method in your DataFrame before writing to the sink.&lt;BR /&gt;- This lets you track counts and specific error-related metrics directly during processing.&lt;/P&gt;
&lt;P&gt;Example snippet:&lt;/P&gt;
&lt;P&gt;```scala&lt;BR /&gt;val observed_ds = ds.observe(&lt;BR /&gt;"metric",&lt;BR /&gt;count(lit(1)).as("cnt"),&lt;BR /&gt;count(col("error")).as("malformed")&lt;BR /&gt;)&lt;/P&gt;
&lt;P&gt;observed_ds.writeStream&lt;BR /&gt;.format("console")&lt;BR /&gt;.start()&lt;BR /&gt;```&lt;/P&gt;
&lt;P&gt;This approach allows real-time tracking of processed rows and errors at a finer level of detail.&lt;/P&gt;
&lt;P&gt;&lt;FONT size="5"&gt;Customizing Log4J&lt;/FONT&gt;&lt;BR /&gt;For customizing Log4J using a properties file via "init-script.sh":&lt;/P&gt;
&lt;P&gt;- Ensure that "init-script.sh" places the properties file in a directory recognized by Spark.&lt;BR /&gt;- Double-check that Spark is loading the configuration file (look for related errors in the logs).&lt;BR /&gt;- Verify that the specific Log4J configuration references you are using are valid.&lt;BR /&gt;- Ensure your script runs at startup properly so configurations are not skipped.&lt;/P&gt;
&lt;P&gt;Hope this helps.&lt;/P&gt;
&lt;P&gt;Cheers, Louis&lt;/P&gt;</description>
      <pubDate>Wed, 24 Sep 2025 17:12:28 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/monitoring-structured-streaming-and-log4j-properties/m-p/132966#M49690</guid>
      <dc:creator>Louis_Frolio</dc:creator>
      <dc:date>2025-09-24T17:12:28Z</dc:date>
    </item>
    <item>
      <title>Re: Monitoring structured streaming and Log4J properties</title>
      <link>https://community.databricks.com/t5/data-engineering/monitoring-structured-streaming-and-log4j-properties/m-p/133039#M49706</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/173825"&gt;@cz0&lt;/a&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;The&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;STRONG&gt;StreamingQueryListener&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;in Spark is designed to give you metrics at the&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;STRONG&gt;micro-batch level&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;(not per individual record), which is typical for Spark Structured Streaming&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;onQueryStarted: Called when the streaming job starts.&lt;/LI&gt;&lt;LI&gt;onQueryProgress: Called&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;STRONG&gt;after each micro-batch&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;is processed. Here, you get metrics like&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;durationMs,&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;inputRowsPerSecond,&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;processedRowsPerSecond, and more.&lt;/LI&gt;&lt;LI&gt;onQueryTerminated: Called when the job ends.&lt;/LI&gt;&lt;LI&gt;&amp;nbsp;&lt;/LI&gt;&lt;LI&gt;For more granular monitoring, you can add custom logic in your processing code (e.g., use&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;foreachBatch&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;or&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;foreach&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;sink to log or collect metrics per record, but this is not typical and can impact performance).&lt;/LI&gt;&lt;/UL&gt;</description>
      <pubDate>Thu, 25 Sep 2025 15:07:04 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/monitoring-structured-streaming-and-log4j-properties/m-p/133039#M49706</guid>
      <dc:creator>saurabh18cs</dc:creator>
      <dc:date>2025-09-25T15:07:04Z</dc:date>
    </item>
  </channel>
</rss>

