Spark UI Replay Notebook

der
Valued Contributor

I need to load the Spark UI events of a long-running job.

I found this Databricks knowledge base article on the topic:
https://kb.databricks.com/clusters/replay-cluster-spark-events

However, the referenced notebook is no longer accessible.

  1. Does anyone still have a copy of that notebook?
  2. Alternatively, is there a better way to increase the default replay limit in Databricks for a completed job?

der_0-1758177009088.png

resulting into:

der_1-1758177084191.png

 

 

Khaja_Zaffer
Esteemed Contributor

Hello @der 

Good day!!

I see whole issue over here. As per the doc. the default size for spark.ui.retainedJobs is 1000.

ref: 

Khaja_Zaffer_0-1758179161839.png

 

 

Can you increase them? 

For very large logs (e.g., 5600+ jobs), scale up further (e.g., to 100000 for retainedTasks/jobs) and use a cluster with more driver memory (e.g., via spark.driver.memory 16g)

I am waiting for your response. 

der
Valued Contributor

Hi @Khaja_Zaffer 

yes I set following values:

der_2-1758181541962.png

 

And the cluster should have also enough memory

der_1-1758181451689.png

Log is activated:

der_3-1758181729042.png

 

 

 

Khaja_Zaffer
Esteemed Contributor

Okay, we can wait then. let me know if there any further issues

der
Valued Contributor

The solution works as long as the cluster is running:

der_0-1758184685442.png

However, it does not replay all jobs if cluster is terminated (Example the job cluster from pipeline finished after hours)

der_1-1758184941528.png

So how can we replay the full spark UI from a finished job cluster?

Khaja_Zaffer
Esteemed Contributor

Of course you can point it towards storage account: 

Khaja_Zaffer_0-1758185385429.png

 

Enable Spark event log delivery on your workspace/cluster (to DBFS, S3, ADLS, etc.). Otherwise you simply don’t have the logs to replay.

Check spark.eventLog.enabled true

Set spark.eventLog.dir to your persistent storage path.

 

 

szymon_dybczak
Esteemed Contributor III

Hi @der ,

Here's the content of Event Log Replay notebook. You can use it to achieve your goals 🙂

szymon_dybczak_0-1758184107442.png

 

 

%scala
dbutils.widgets.removeAll()
dbutils.widgets.text("event_log_path", "", "event_log_path")
 
val eventLogPath = dbutils.widgets.get("event_log_path")

package org.apache.spark.util
 
import org.apache.spark.scheduler.SparkListenerEvent
import org.json4s.JsonAST.JValue
 
/**
 * Visibility hack.
 */
object PublicJsonProtocol  {
  def sparkEventFromJson(json: JValue): SparkListenerEvent = {
    JsonProtocol.sparkEventFromJson(json)
  }
}


import java.util.zip.GZIPInputStream
import scala.collection.JavaConversions._
import org.json4s.jackson.JsonMethods._
import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException
 
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs._
import org.apache.spark.util.PublicJsonProtocol
 
 
/**
 * Function to replay all the events that happened on a cluster in the case the Spark History Server fails to
 * load the Spark UI. Run this command on an instance that hasn't run any Spark command to completely replay
 * all the events a cluser went through.
 */
def replaySparkEvents(pathToEventLogs: String): Unit = {
  val eventLogFiles = dbutils.fs.ls(pathToEventLogs).filter(_.name.startsWith("eventlog")).map(_.path)
  require(eventLogFiles.nonEmpty, "No event logs found at this path")
  val inOrder = eventLogFiles.tail ++ Seq(eventLogFiles.head)
  
  val lineIterator = inOrder.iterator.map { file =>
    val path = new Path(file)
    val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
    val fileStream = fs.open(path)
    val stream = if (file.endsWith(".gz")) {
      new GZIPInputStream(fileStream)
    } else {
      fileStream
    }
    print("File process:"+file)
    val lines = IOUtils.readLines(stream)
    (stream, lines)
  }
  
  val lines = lineIterator.flatMap(_._2)
  val streams = lineIterator.map(_._1)
  
  val unrecognizedEvents = new scala.collection.mutable.HashSet[String]
  val unrecognizedProperties = new scala.collection.mutable.HashSet[String]
 
  val listenerBus = sc.listenerBus
  var currentLine: String = null
 
  try {
    while (lines.hasNext) {
      try {
        val entry = lines.next()
        currentLine = entry
        listenerBus.post(PublicJsonProtocol.sparkEventFromJson(parse(currentLine)))
      } catch {
        case e: java.lang.ClassNotFoundException =>
          // Ignore unknown events, parse through the event log file.
          // To avoid spamming, warnings are only displayed once for each unknown event.
          if (!unrecognizedEvents.contains(e.getMessage)) {
            println(s"Drop unrecognized event: ${e.getMessage}")
            unrecognizedEvents.add(e.getMessage)
          }
          println(s"Drop incompatible event log: $currentLine")
        case e: UnrecognizedPropertyException =>
          // Ignore unrecognized properties, parse through the event log file.
          // To avoid spamming, warnings are only displayed once for each unrecognized property.
          if (!unrecognizedProperties.contains(e.getMessage)) {
            println(s"Drop unrecognized property: ${e.getMessage}")
            unrecognizedProperties.add(e.getMessage)
          }
          println(s"Drop incompatible event log: $currentLine")
        case jpe: JsonParseException =>
          // We can only ignore exception from last line of the file that might be truncated
          // the last entry may not be the very last line in the event log, but we treat it
          // as such in a best effort to replay the given input
          if (lines.hasNext) {
            throw jpe
          } else {
            println(s"Got JsonParseException from log file. The file might not have finished writing cleanly.")
          }
      }
    }
  } finally {
    streams.foreach(IOUtils.closeQuietly)
  }
}

replaySparkEvents(eventLogPath)

 

 

View solution in original post

Hello @szymon_dybczak 

Do we have any documentation around this?

szymon_dybczak
Esteemed Contributor III

Hi @Khaja_Zaffer ,

Yes, we do -> Replay Apache Spark events in a cluster - Databricks

It's quite simple. You just need to provide a valid path to your cluster logs and run notebook content that I provided above. It should then replay events.

szymon_dybczak_0-1758184889651.png

 

Hello @szymon_dybczak 

I think he has some eventlog file. 

But Sorry for not asking properly. 
I mean the scala code? does it has any limitations or we can use it for such issues? I was actually looking for this code. 

 

der
Valued Contributor

Hi @szymon_dybczak 

Thank you for providing the content of the notebook. I haven't stored it and now it is not accessible anymore on Databricks Link (Homepage).

I think with this I can do it.

szymon_dybczak
Esteemed Contributor III

Hi,

No problem @der . If the provided answer was helpful please consider marking it as a solution to a thread.

der
Valued Contributor

Your notebook worked!

I only needed to change this cell:

 

package org.apache.spark.util

import org.apache.spark.scheduler.SparkListenerEvent
import org.json4s.JsonAST.JValue
import org.json4s.jackson.JsonMethods

/**
 * Visibility hack.
 */
object PublicJsonProtocol  {
  def sparkEventFromJson(json: JValue): SparkListenerEvent = {
    JsonProtocol.sparkEventFromJson(JsonMethods.compact(json))
  }
}

 

Spark UI output of an job compute (executed yesterday):

der_0-1758189521569.png

you made my day 🤗

Khaja_Zaffer
Esteemed Contributor

Thats great!

congratulations @der @szymon_dybczak 

szymon_dybczak
Esteemed Contributor III

So cool that it worked. Good job guys @der , @Khaja_Zaffer !