cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Monitoring structured streaming and Log4J properties

cz0
New Contributor

Hi guys, I would like to monitor streaming job on metrics like delay, processing time and more. I found this documentation but I get message on starting and terminating phase and not while I process a record. The job is a pretty easy streaming which read CSV file and write output to the console.

The class listener is the following:

import org.apache.spark.sql.streaming.{StreamingQueryListener}
import org.apache.spark.sql.streaming.StreamingQueryListener._

val listener = new StreamingQueryListener {

  def onQueryStarted(event: QueryStartedEvent): Unit = {
    println("Starting job!")
  }

  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    println(s"TestStreaming: ${event.progress.durationMs}")
  }

  def onQueryIdle(event: QueryProgressEvent): Unit = {}

  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {
    println("Job terminated!")
  }
}

Secondly, I would like to customise Log4J by a properties file, but loading the file in Spark's config folder /databricks/spark/conf by init-script.sh it doesn’t work.

Thank you for the support

2 REPLIES 2

BigRoux
Databricks Employee
Databricks Employee

Hey @cz0 , here are some suggestions to help you along:

To effectively monitor your streaming job metrics such as delay and processing time, using the StreamingQueryListener is the right approach. However, it is important to understand the limitations and behaviors of this listener as documented.

Monitoring Streaming Metrics

Listener Implementation
Your implementation of the StreamingQueryListener looks correct. You have defined methods for when the query starts, progresses, and terminates. However, the issue lies in how the "onQueryProgress" method behaves.

Event Triggering
The "onQueryProgress" event is only triggered at the end of each micro-batch execution. This means:

- It will not trigger continuously for each record processed.
- If the query is waiting for new data (idle), the "onQueryIdle" event is triggered.
- If data is processed, the listener outputs metrics for the batch as a whole after completion. These include metrics for duration, input rate, and processing rate.

So, you will not see metrics for individual record processing, but instead aggregated results per batch. Within your listener, you can access metrics through `event.progress`, such as:

- "event.progress.numInputRows"
- "event.progress.inputRowsPerSecond"
- Processing duration per stage

Enhancing Metrics Tracking
If you need more granular metrics:

- Use the observe method in your DataFrame before writing to the sink.
- This lets you track counts and specific error-related metrics directly during processing.

Example snippet:

```scala
val observed_ds = ds.observe(
"metric",
count(lit(1)).as("cnt"),
count(col("error")).as("malformed")
)

observed_ds.writeStream
.format("console")
.start()
```

This approach allows real-time tracking of processed rows and errors at a finer level of detail.

Customizing Log4J
For customizing Log4J using a properties file via "init-script.sh":

- Ensure that "init-script.sh" places the properties file in a directory recognized by Spark.
- Double-check that Spark is loading the configuration file (look for related errors in the logs).
- Verify that the specific Log4J configuration references you are using are valid.
- Ensure your script runs at startup properly so configurations are not skipped.

Hope this helps.

Cheers, Louis

saurabh18cs
Honored Contributor

Hi @cz0 

 The StreamingQueryListener in Spark is designed to give you metrics at the micro-batch level (not per individual record), which is typical for Spark Structured Streaming

  • onQueryStarted: Called when the streaming job starts.
  • onQueryProgress: Called after each micro-batch is processed. Here, you get metrics like durationMs, inputRowsPerSecond, processedRowsPerSecond, and more.
  • onQueryTerminated: Called when the job ends.
  •  
  • For more granular monitoring, you can add custom logic in your processing code (e.g., use foreachBatch or foreach sink to log or collect metrics per record, but this is not typical and can impact performance).

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now