Hey @cz0 , here are some suggestions to help you along:
To effectively monitor your streaming job metrics such as delay and processing time, using the StreamingQueryListener is the right approach. However, it is important to understand the limitations and behaviors of this listener as documented.
Monitoring Streaming Metrics
Listener Implementation
Your implementation of the StreamingQueryListener looks correct. You have defined methods for when the query starts, progresses, and terminates. However, the issue lies in how the "onQueryProgress" method behaves.
Event Triggering
The "onQueryProgress" event is only triggered at the end of each micro-batch execution. This means:
- It will not trigger continuously for each record processed.
- If the query is waiting for new data (idle), the "onQueryIdle" event is triggered.
- If data is processed, the listener outputs metrics for the batch as a whole after completion. These include metrics for duration, input rate, and processing rate.
So, you will not see metrics for individual record processing, but instead aggregated results per batch. Within your listener, you can access metrics through `event.progress`, such as:
- "event.progress.numInputRows"
- "event.progress.inputRowsPerSecond"
- Processing duration per stage
Enhancing Metrics Tracking
If you need more granular metrics:
- Use the observe method in your DataFrame before writing to the sink.
- This lets you track counts and specific error-related metrics directly during processing.
Example snippet:
```scala
val observed_ds = ds.observe(
"metric",
count(lit(1)).as("cnt"),
count(col("error")).as("malformed")
)
observed_ds.writeStream
.format("console")
.start()
```
This approach allows real-time tracking of processed rows and errors at a finer level of detail.
Customizing Log4J
For customizing Log4J using a properties file via "init-script.sh":
- Ensure that "init-script.sh" places the properties file in a directory recognized by Spark.
- Double-check that Spark is loading the configuration file (look for related errors in the logs).
- Verify that the specific Log4J configuration references you are using are valid.
- Ensure your script runs at startup properly so configurations are not skipped.
Hope this helps.
Cheers, Louis