Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-18-2025 04:49 AM
Hi @Khaja_Zaffer,
This link is no longer available, but I was able to retrieve the content from its latest version. Please find it below:
Event Log Replay (scala)
Enter the path to the cluster event logs in the event_log_path field.
Run the notebook to replay the Apache Spark UI events that are recorded in the logs.
Note: If you are storing event logs on DBFS, the event log path will be similar to this example: dbfs:/cluster-logs/<cluster-name>/eventlog/<cluster-name-cluster-ip>/<log-id>/
%scala
dbutils.widgets.removeAll()
dbutils.widgets.text("event_log_path", "", "event_log_path")
val eventLogPath = dbutils.widgets.get("event_log_path")
package org.apache.spark.util
import org.apache.spark.scheduler.SparkListenerEvent
import org.json4s.JsonAST.JValue
/**
* Visibility hack.
*/
object PublicJsonProtocol {
def sparkEventFromJson(json: JValue): SparkListenerEvent = {
JsonProtocol.sparkEventFromJson(json)
}
}
import java.util.zip.GZIPInputStream
import scala.collection.JavaConversions._
import org.json4s.jackson.JsonMethods._
import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs._
import org.apache.spark.util.PublicJsonProtocol
/**
* Function to replay all the events that happened on a cluster in the case the Spark History Server fails to
* load the Spark UI. Run this command on an instance that hasn't run any Spark command to completely replay
* all the events a cluser went through.
*/
def replaySparkEvents(pathToEventLogs: String): Unit = {
val eventLogFiles = dbutils.fs.ls(pathToEventLogs).filter(_.name.startsWith("eventlog")).map(_.path)
require(eventLogFiles.nonEmpty, "No event logs found at this path")
val inOrder = eventLogFiles.tail ++ Seq(eventLogFiles.head)
val lineIterator = inOrder.iterator.map { file =>
val path = new Path(file)
val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
val fileStream = fs.open(path)
val stream = if (file.endsWith(".gz")) {
new GZIPInputStream(fileStream)
} else {
fileStream
}
print("File process:"+file)
val lines = IOUtils.readLines(stream)
(stream, lines)
}
val lines = lineIterator.flatMap(_._2)
val streams = lineIterator.map(_._1)
val unrecognizedEvents = new scala.collection.mutable.HashSet[String]
val unrecognizedProperties = new scala.collection.mutable.HashSet[String]
val listenerBus = sc.listenerBus
var currentLine: String = null
try {
while (lines.hasNext) {
try {
val entry = lines.next()
currentLine = entry
listenerBus.post(PublicJsonProtocol.sparkEventFromJson(parse(currentLine)))
} catch {
case e: java.lang.ClassNotFoundException =>
// Ignore unknown events, parse through the event log file.
// To avoid spamming, warnings are only displayed once for each unknown event.
if (!unrecognizedEvents.contains(e.getMessage)) {
println(s"Drop unrecognized event: ${e.getMessage}")
unrecognizedEvents.add(e.getMessage)
}
println(s"Drop incompatible event log: $currentLine")
case e: UnrecognizedPropertyException =>
// Ignore unrecognized properties, parse through the event log file.
// To avoid spamming, warnings are only displayed once for each unrecognized property.
if (!unrecognizedProperties.contains(e.getMessage)) {
println(s"Drop unrecognized property: ${e.getMessage}")
unrecognizedProperties.add(e.getMessage)
}
println(s"Drop incompatible event log: $currentLine")
case jpe: JsonParseException =>
// We can only ignore exception from last line of the file that might be truncated
// the last entry may not be the very last line in the event log, but we treat it
// as such in a best effort to replay the given input
if (lines.hasNext) {
throw jpe
} else {
println(s"Got JsonParseException from log file. The file might not have finished writing cleanly.")
}
}
}
} finally {
streams.foreach(IOUtils.closeQuietly)
}
}
replaySparkEvents(eventLogPath)
Wiliam Rosa
Data Engineer | Machine Learning Engineer
LinkedIn: linkedin.com/in/wiliamrosa
Data Engineer | Machine Learning Engineer
LinkedIn: linkedin.com/in/wiliamrosa