I would recommend the following approaches
Method | Real-Time? | Complexity | Typical Use Case |
SparkListener / QueryListener | Yes | Moderate | Job/stage/batch metrics live |
Custom Metrics Source | Yes (live) | More Advanced | Fine-grained, app-specific |
Metrics Sinks | Yes | Easy/Mod | External dashboard/monitoring |
Example or External Prometheus sink:
package org.apache.spark.metrics.source
import com.codahale.metrics.{MetricRegistry, SettableGauge}
import org.apache.spark.SparkEnv
import org.apache.spark.sql.streaming.StreamingQueryListener
object MyCustomSource extends Source {
override def sourceName: String = "MyCustomSource"
override val metricRegistry: MetricRegistry = new MetricRegistry
val MY_METRIC_A: SettableGauge[Long] = metricRegistry.gauge(MetricRegistry.name("a"))
class MyListener extends StreamingQueryListener {
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
MyCustomSource.MY_METRIC_A.setValue(event.progress.batchId)
}
}
def apply(): MyListener = {
SparkEnv.get.metricsSystem.registerSource(MyCustomSource)
new MyListener()
}
}
// Register in your Spark app:
spark.streams.addListener(MyCustomSource())
This exposes custom metrics (here, batchId) to Sparkās metrics system for integration with Prometheus, Grafana