09-17-2025 11:34 PM
I need to load the Spark UI events of a long-running job.
I found this Databricks knowledge base article on the topic:
https://kb.databricks.com/clusters/replay-cluster-spark-events
However, the referenced notebook is no longer accessible.
resulting into:
09-18-2025 01:28 AM
Hi @der ,
Here's the content of Event Log Replay notebook. You can use it to achieve your goals 🙂
%scala
dbutils.widgets.removeAll()
dbutils.widgets.text("event_log_path", "", "event_log_path")
 
val eventLogPath = dbutils.widgets.get("event_log_path")
package org.apache.spark.util
 
import org.apache.spark.scheduler.SparkListenerEvent
import org.json4s.JsonAST.JValue
 
/**
 * Visibility hack.
 */
object PublicJsonProtocol  {
  def sparkEventFromJson(json: JValue): SparkListenerEvent = {
    JsonProtocol.sparkEventFromJson(json)
  }
}
import java.util.zip.GZIPInputStream
import scala.collection.JavaConversions._
import org.json4s.jackson.JsonMethods._
import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException
 
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs._
import org.apache.spark.util.PublicJsonProtocol
 
 
/**
 * Function to replay all the events that happened on a cluster in the case the Spark History Server fails to
 * load the Spark UI. Run this command on an instance that hasn't run any Spark command to completely replay
 * all the events a cluser went through.
 */
def replaySparkEvents(pathToEventLogs: String): Unit = {
  val eventLogFiles = dbutils.fs.ls(pathToEventLogs).filter(_.name.startsWith("eventlog")).map(_.path)
  require(eventLogFiles.nonEmpty, "No event logs found at this path")
  val inOrder = eventLogFiles.tail ++ Seq(eventLogFiles.head)
  
  val lineIterator = inOrder.iterator.map { file =>
    val path = new Path(file)
    val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
    val fileStream = fs.open(path)
    val stream = if (file.endsWith(".gz")) {
      new GZIPInputStream(fileStream)
    } else {
      fileStream
    }
    print("File process:"+file)
    val lines = IOUtils.readLines(stream)
    (stream, lines)
  }
  
  val lines = lineIterator.flatMap(_._2)
  val streams = lineIterator.map(_._1)
  
  val unrecognizedEvents = new scala.collection.mutable.HashSet[String]
  val unrecognizedProperties = new scala.collection.mutable.HashSet[String]
 
  val listenerBus = sc.listenerBus
  var currentLine: String = null
 
  try {
    while (lines.hasNext) {
      try {
        val entry = lines.next()
        currentLine = entry
        listenerBus.post(PublicJsonProtocol.sparkEventFromJson(parse(currentLine)))
      } catch {
        case e: java.lang.ClassNotFoundException =>
          // Ignore unknown events, parse through the event log file.
          // To avoid spamming, warnings are only displayed once for each unknown event.
          if (!unrecognizedEvents.contains(e.getMessage)) {
            println(s"Drop unrecognized event: ${e.getMessage}")
            unrecognizedEvents.add(e.getMessage)
          }
          println(s"Drop incompatible event log: $currentLine")
        case e: UnrecognizedPropertyException =>
          // Ignore unrecognized properties, parse through the event log file.
          // To avoid spamming, warnings are only displayed once for each unrecognized property.
          if (!unrecognizedProperties.contains(e.getMessage)) {
            println(s"Drop unrecognized property: ${e.getMessage}")
            unrecognizedProperties.add(e.getMessage)
          }
          println(s"Drop incompatible event log: $currentLine")
        case jpe: JsonParseException =>
          // We can only ignore exception from last line of the file that might be truncated
          // the last entry may not be the very last line in the event log, but we treat it
          // as such in a best effort to replay the given input
          if (lines.hasNext) {
            throw jpe
          } else {
            println(s"Got JsonParseException from log file. The file might not have finished writing cleanly.")
          }
      }
    }
  } finally {
    streams.foreach(IOUtils.closeQuietly)
  }
}
replaySparkEvents(eventLogPath)
09-18-2025 12:07 AM
Hello @der 
Good day!!
I see whole issue over here. As per the doc. the default size for spark.ui.retainedJobs is 1000.
ref:
Can you increase them?
For very large logs (e.g., 5600+ jobs), scale up further (e.g., to 100000 for retainedTasks/jobs) and use a cluster with more driver memory (e.g., via spark.driver.memory 16g)
I am waiting for your response. 
09-18-2025 12:49 AM
yes I set following values:
And the cluster should have also enough memory
Log is activated:
09-18-2025 01:07 AM
Okay, we can wait then. let me know if there any further issues
09-18-2025 01:45 AM
The solution works as long as the cluster is running:
However, it does not replay all jobs if cluster is terminated (Example the job cluster from pipeline finished after hours)
So how can we replay the full spark UI from a finished job cluster?
09-18-2025 01:54 AM
Of course you can point it towards storage account:
Enable Spark event log delivery on your workspace/cluster (to DBFS, S3, ADLS, etc.). Otherwise you simply don’t have the logs to replay.
Check spark.eventLog.enabled true
Set spark.eventLog.dir to your persistent storage path.
09-18-2025 01:28 AM
Hi @der ,
Here's the content of Event Log Replay notebook. You can use it to achieve your goals 🙂
%scala
dbutils.widgets.removeAll()
dbutils.widgets.text("event_log_path", "", "event_log_path")
 
val eventLogPath = dbutils.widgets.get("event_log_path")
package org.apache.spark.util
 
import org.apache.spark.scheduler.SparkListenerEvent
import org.json4s.JsonAST.JValue
 
/**
 * Visibility hack.
 */
object PublicJsonProtocol  {
  def sparkEventFromJson(json: JValue): SparkListenerEvent = {
    JsonProtocol.sparkEventFromJson(json)
  }
}
import java.util.zip.GZIPInputStream
import scala.collection.JavaConversions._
import org.json4s.jackson.JsonMethods._
import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException
 
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs._
import org.apache.spark.util.PublicJsonProtocol
 
 
/**
 * Function to replay all the events that happened on a cluster in the case the Spark History Server fails to
 * load the Spark UI. Run this command on an instance that hasn't run any Spark command to completely replay
 * all the events a cluser went through.
 */
def replaySparkEvents(pathToEventLogs: String): Unit = {
  val eventLogFiles = dbutils.fs.ls(pathToEventLogs).filter(_.name.startsWith("eventlog")).map(_.path)
  require(eventLogFiles.nonEmpty, "No event logs found at this path")
  val inOrder = eventLogFiles.tail ++ Seq(eventLogFiles.head)
  
  val lineIterator = inOrder.iterator.map { file =>
    val path = new Path(file)
    val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
    val fileStream = fs.open(path)
    val stream = if (file.endsWith(".gz")) {
      new GZIPInputStream(fileStream)
    } else {
      fileStream
    }
    print("File process:"+file)
    val lines = IOUtils.readLines(stream)
    (stream, lines)
  }
  
  val lines = lineIterator.flatMap(_._2)
  val streams = lineIterator.map(_._1)
  
  val unrecognizedEvents = new scala.collection.mutable.HashSet[String]
  val unrecognizedProperties = new scala.collection.mutable.HashSet[String]
 
  val listenerBus = sc.listenerBus
  var currentLine: String = null
 
  try {
    while (lines.hasNext) {
      try {
        val entry = lines.next()
        currentLine = entry
        listenerBus.post(PublicJsonProtocol.sparkEventFromJson(parse(currentLine)))
      } catch {
        case e: java.lang.ClassNotFoundException =>
          // Ignore unknown events, parse through the event log file.
          // To avoid spamming, warnings are only displayed once for each unknown event.
          if (!unrecognizedEvents.contains(e.getMessage)) {
            println(s"Drop unrecognized event: ${e.getMessage}")
            unrecognizedEvents.add(e.getMessage)
          }
          println(s"Drop incompatible event log: $currentLine")
        case e: UnrecognizedPropertyException =>
          // Ignore unrecognized properties, parse through the event log file.
          // To avoid spamming, warnings are only displayed once for each unrecognized property.
          if (!unrecognizedProperties.contains(e.getMessage)) {
            println(s"Drop unrecognized property: ${e.getMessage}")
            unrecognizedProperties.add(e.getMessage)
          }
          println(s"Drop incompatible event log: $currentLine")
        case jpe: JsonParseException =>
          // We can only ignore exception from last line of the file that might be truncated
          // the last entry may not be the very last line in the event log, but we treat it
          // as such in a best effort to replay the given input
          if (lines.hasNext) {
            throw jpe
          } else {
            println(s"Got JsonParseException from log file. The file might not have finished writing cleanly.")
          }
      }
    }
  } finally {
    streams.foreach(IOUtils.closeQuietly)
  }
}
replaySparkEvents(eventLogPath)
09-18-2025 01:36 AM
Hello @szymon_dybczak 
Do we have any documentation around this?
09-18-2025 01:41 AM - edited 09-18-2025 01:42 AM
Hi @Khaja_Zaffer ,
Yes, we do -> Replay Apache Spark events in a cluster - Databricks
It's quite simple. You just need to provide a valid path to your cluster logs and run notebook content that I provided above. It should then replay events.
09-18-2025 01:46 AM
Hello @szymon_dybczak
I think he has some eventlog file.
But Sorry for not asking properly. 
I mean the scala code? does it has any limitations or we can use it for such issues? I was actually looking for this code. 
09-18-2025 02:33 AM
Thank you for providing the content of the notebook. I haven't stored it and now it is not accessible anymore on Databricks Link (Homepage).
I think with this I can do it.
09-18-2025 02:39 AM
Hi,
No problem @der . If the provided answer was helpful please consider marking it as a solution to a thread.
09-18-2025 03:05 AM
Your notebook worked!
I only needed to change this cell:
package org.apache.spark.util
import org.apache.spark.scheduler.SparkListenerEvent
import org.json4s.JsonAST.JValue
import org.json4s.jackson.JsonMethods
/**
 * Visibility hack.
 */
object PublicJsonProtocol  {
  def sparkEventFromJson(json: JValue): SparkListenerEvent = {
    JsonProtocol.sparkEventFromJson(JsonMethods.compact(json))
  }
}
Spark UI output of an job compute (executed yesterday):
you made my day 🤗
09-18-2025 03:08 AM
Thats great!
congratulations @der @szymon_dybczak
09-18-2025 03:12 AM
So cool that it worked. Good job guys @der , @Khaja_Zaffer !
 
					
				
				
			
		
 
					
				
				
			
		
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now