Wednesday
I need to load the Spark UI events of a long-running job.
I found this Databricks knowledge base article on the topic:
https://kb.databricks.com/clusters/replay-cluster-spark-events
However, the referenced notebook is no longer accessible.
resulting into:
Thursday
Hi @der ,
Here's the content of Event Log Replay notebook. You can use it to achieve your goals ๐
%scala
dbutils.widgets.removeAll()
dbutils.widgets.text("event_log_path", "", "event_log_path")
val eventLogPath = dbutils.widgets.get("event_log_path")
package org.apache.spark.util
import org.apache.spark.scheduler.SparkListenerEvent
import org.json4s.JsonAST.JValue
/**
* Visibility hack.
*/
object PublicJsonProtocol {
def sparkEventFromJson(json: JValue): SparkListenerEvent = {
JsonProtocol.sparkEventFromJson(json)
}
}
import java.util.zip.GZIPInputStream
import scala.collection.JavaConversions._
import org.json4s.jackson.JsonMethods._
import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs._
import org.apache.spark.util.PublicJsonProtocol
/**
* Function to replay all the events that happened on a cluster in the case the Spark History Server fails to
* load the Spark UI. Run this command on an instance that hasn't run any Spark command to completely replay
* all the events a cluser went through.
*/
def replaySparkEvents(pathToEventLogs: String): Unit = {
val eventLogFiles = dbutils.fs.ls(pathToEventLogs).filter(_.name.startsWith("eventlog")).map(_.path)
require(eventLogFiles.nonEmpty, "No event logs found at this path")
val inOrder = eventLogFiles.tail ++ Seq(eventLogFiles.head)
val lineIterator = inOrder.iterator.map { file =>
val path = new Path(file)
val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
val fileStream = fs.open(path)
val stream = if (file.endsWith(".gz")) {
new GZIPInputStream(fileStream)
} else {
fileStream
}
print("File process:"+file)
val lines = IOUtils.readLines(stream)
(stream, lines)
}
val lines = lineIterator.flatMap(_._2)
val streams = lineIterator.map(_._1)
val unrecognizedEvents = new scala.collection.mutable.HashSet[String]
val unrecognizedProperties = new scala.collection.mutable.HashSet[String]
val listenerBus = sc.listenerBus
var currentLine: String = null
try {
while (lines.hasNext) {
try {
val entry = lines.next()
currentLine = entry
listenerBus.post(PublicJsonProtocol.sparkEventFromJson(parse(currentLine)))
} catch {
case e: java.lang.ClassNotFoundException =>
// Ignore unknown events, parse through the event log file.
// To avoid spamming, warnings are only displayed once for each unknown event.
if (!unrecognizedEvents.contains(e.getMessage)) {
println(s"Drop unrecognized event: ${e.getMessage}")
unrecognizedEvents.add(e.getMessage)
}
println(s"Drop incompatible event log: $currentLine")
case e: UnrecognizedPropertyException =>
// Ignore unrecognized properties, parse through the event log file.
// To avoid spamming, warnings are only displayed once for each unrecognized property.
if (!unrecognizedProperties.contains(e.getMessage)) {
println(s"Drop unrecognized property: ${e.getMessage}")
unrecognizedProperties.add(e.getMessage)
}
println(s"Drop incompatible event log: $currentLine")
case jpe: JsonParseException =>
// We can only ignore exception from last line of the file that might be truncated
// the last entry may not be the very last line in the event log, but we treat it
// as such in a best effort to replay the given input
if (lines.hasNext) {
throw jpe
} else {
println(s"Got JsonParseException from log file. The file might not have finished writing cleanly.")
}
}
}
} finally {
streams.foreach(IOUtils.closeQuietly)
}
}
replaySparkEvents(eventLogPath)
Thursday
Hello @der
Good day!!
I see whole issue over here. As per the doc. the default size for spark.ui.retainedJobs is 1000.
ref:
Can you increase them?
For very large logs (e.g., 5600+ jobs), scale up further (e.g., to 100000 for retainedTasks/jobs) and use a cluster with more driver memory (e.g., via spark.driver.memory 16g)
I am waiting for your response.
Thursday
yes I set following values:
And the cluster should have also enough memory
Log is activated:
Thursday
Okay, we can wait then. let me know if there any further issues
Thursday
The solution works as long as the cluster is running:
However, it does not replay all jobs if cluster is terminated (Example the job cluster from pipeline finished after hours)
So how can we replay the full spark UI from a finished job cluster?
Thursday
Of course you can point it towards storage account:
Enable Spark event log delivery on your workspace/cluster (to DBFS, S3, ADLS, etc.). Otherwise you simply donโt have the logs to replay.
Check spark.eventLog.enabled true
Set spark.eventLog.dir to your persistent storage path.
Thursday
Hi @der ,
Here's the content of Event Log Replay notebook. You can use it to achieve your goals ๐
%scala
dbutils.widgets.removeAll()
dbutils.widgets.text("event_log_path", "", "event_log_path")
val eventLogPath = dbutils.widgets.get("event_log_path")
package org.apache.spark.util
import org.apache.spark.scheduler.SparkListenerEvent
import org.json4s.JsonAST.JValue
/**
* Visibility hack.
*/
object PublicJsonProtocol {
def sparkEventFromJson(json: JValue): SparkListenerEvent = {
JsonProtocol.sparkEventFromJson(json)
}
}
import java.util.zip.GZIPInputStream
import scala.collection.JavaConversions._
import org.json4s.jackson.JsonMethods._
import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs._
import org.apache.spark.util.PublicJsonProtocol
/**
* Function to replay all the events that happened on a cluster in the case the Spark History Server fails to
* load the Spark UI. Run this command on an instance that hasn't run any Spark command to completely replay
* all the events a cluser went through.
*/
def replaySparkEvents(pathToEventLogs: String): Unit = {
val eventLogFiles = dbutils.fs.ls(pathToEventLogs).filter(_.name.startsWith("eventlog")).map(_.path)
require(eventLogFiles.nonEmpty, "No event logs found at this path")
val inOrder = eventLogFiles.tail ++ Seq(eventLogFiles.head)
val lineIterator = inOrder.iterator.map { file =>
val path = new Path(file)
val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
val fileStream = fs.open(path)
val stream = if (file.endsWith(".gz")) {
new GZIPInputStream(fileStream)
} else {
fileStream
}
print("File process:"+file)
val lines = IOUtils.readLines(stream)
(stream, lines)
}
val lines = lineIterator.flatMap(_._2)
val streams = lineIterator.map(_._1)
val unrecognizedEvents = new scala.collection.mutable.HashSet[String]
val unrecognizedProperties = new scala.collection.mutable.HashSet[String]
val listenerBus = sc.listenerBus
var currentLine: String = null
try {
while (lines.hasNext) {
try {
val entry = lines.next()
currentLine = entry
listenerBus.post(PublicJsonProtocol.sparkEventFromJson(parse(currentLine)))
} catch {
case e: java.lang.ClassNotFoundException =>
// Ignore unknown events, parse through the event log file.
// To avoid spamming, warnings are only displayed once for each unknown event.
if (!unrecognizedEvents.contains(e.getMessage)) {
println(s"Drop unrecognized event: ${e.getMessage}")
unrecognizedEvents.add(e.getMessage)
}
println(s"Drop incompatible event log: $currentLine")
case e: UnrecognizedPropertyException =>
// Ignore unrecognized properties, parse through the event log file.
// To avoid spamming, warnings are only displayed once for each unrecognized property.
if (!unrecognizedProperties.contains(e.getMessage)) {
println(s"Drop unrecognized property: ${e.getMessage}")
unrecognizedProperties.add(e.getMessage)
}
println(s"Drop incompatible event log: $currentLine")
case jpe: JsonParseException =>
// We can only ignore exception from last line of the file that might be truncated
// the last entry may not be the very last line in the event log, but we treat it
// as such in a best effort to replay the given input
if (lines.hasNext) {
throw jpe
} else {
println(s"Got JsonParseException from log file. The file might not have finished writing cleanly.")
}
}
}
} finally {
streams.foreach(IOUtils.closeQuietly)
}
}
replaySparkEvents(eventLogPath)
Thursday
Hello @szymon_dybczak
Do we have any documentation around this?
Thursday - last edited Thursday
Hi @Khaja_Zaffer ,
Yes, we do -> Replay Apache Spark events in a cluster - Databricks
It's quite simple. You just need to provide a valid path to your cluster logs and run notebook content that I provided above. It should then replay events.
Thursday
Hello @szymon_dybczak
I think he has some eventlog file.
But Sorry for not asking properly.
I mean the scala code? does it has any limitations or we can use it for such issues? I was actually looking for this code.
Thursday
Thank you for providing the content of the notebook. I haven't stored it and now it is not accessible anymore on Databricks Link (Homepage).
I think with this I can do it.
Thursday
Hi,
No problem @der . If the provided answer was helpful please consider marking it as a solution to a thread.
Thursday
Your notebook worked!
I only needed to change this cell:
package org.apache.spark.util
import org.apache.spark.scheduler.SparkListenerEvent
import org.json4s.JsonAST.JValue
import org.json4s.jackson.JsonMethods
/**
* Visibility hack.
*/
object PublicJsonProtocol {
def sparkEventFromJson(json: JValue): SparkListenerEvent = {
JsonProtocol.sparkEventFromJson(JsonMethods.compact(json))
}
}
Spark UI output of an job compute (executed yesterday):
you made my day ๐ค
Thursday
Thats great!
congratulations @der @szymon_dybczak
Thursday
So cool that it worked. Good job guys @der , @Khaja_Zaffer !
Passionate about hosting events and connecting people? Help us grow a vibrant local communityโsign up today to get started!
Sign Up Now