cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Spark UI Replay Notebook

der
Contributor

I need to load the Spark UI events of a long-running job.

I found this Databricks knowledge base article on the topic:
https://kb.databricks.com/clusters/replay-cluster-spark-events

However, the referenced notebook is no longer accessible.

  1. Does anyone still have a copy of that notebook?
  2. Alternatively, is there a better way to increase the default replay limit in Databricks for a completed job?

der_0-1758177009088.png

resulting into:

der_1-1758177084191.png

 

 

1 ACCEPTED SOLUTION

Accepted Solutions

szymon_dybczak
Esteemed Contributor III

Hi @der ,

Here's the content of Event Log Replay notebook. You can use it to achieve your goals ๐Ÿ™‚

szymon_dybczak_0-1758184107442.png

 

 

%scala
dbutils.widgets.removeAll()
dbutils.widgets.text("event_log_path", "", "event_log_path")
 
val eventLogPath = dbutils.widgets.get("event_log_path")

package org.apache.spark.util
 
import org.apache.spark.scheduler.SparkListenerEvent
import org.json4s.JsonAST.JValue
 
/**
 * Visibility hack.
 */
object PublicJsonProtocol  {
  def sparkEventFromJson(json: JValue): SparkListenerEvent = {
    JsonProtocol.sparkEventFromJson(json)
  }
}


import java.util.zip.GZIPInputStream
import scala.collection.JavaConversions._
import org.json4s.jackson.JsonMethods._
import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException
 
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs._
import org.apache.spark.util.PublicJsonProtocol
 
 
/**
 * Function to replay all the events that happened on a cluster in the case the Spark History Server fails to
 * load the Spark UI. Run this command on an instance that hasn't run any Spark command to completely replay
 * all the events a cluser went through.
 */
def replaySparkEvents(pathToEventLogs: String): Unit = {
  val eventLogFiles = dbutils.fs.ls(pathToEventLogs).filter(_.name.startsWith("eventlog")).map(_.path)
  require(eventLogFiles.nonEmpty, "No event logs found at this path")
  val inOrder = eventLogFiles.tail ++ Seq(eventLogFiles.head)
  
  val lineIterator = inOrder.iterator.map { file =>
    val path = new Path(file)
    val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
    val fileStream = fs.open(path)
    val stream = if (file.endsWith(".gz")) {
      new GZIPInputStream(fileStream)
    } else {
      fileStream
    }
    print("File process:"+file)
    val lines = IOUtils.readLines(stream)
    (stream, lines)
  }
  
  val lines = lineIterator.flatMap(_._2)
  val streams = lineIterator.map(_._1)
  
  val unrecognizedEvents = new scala.collection.mutable.HashSet[String]
  val unrecognizedProperties = new scala.collection.mutable.HashSet[String]
 
  val listenerBus = sc.listenerBus
  var currentLine: String = null
 
  try {
    while (lines.hasNext) {
      try {
        val entry = lines.next()
        currentLine = entry
        listenerBus.post(PublicJsonProtocol.sparkEventFromJson(parse(currentLine)))
      } catch {
        case e: java.lang.ClassNotFoundException =>
          // Ignore unknown events, parse through the event log file.
          // To avoid spamming, warnings are only displayed once for each unknown event.
          if (!unrecognizedEvents.contains(e.getMessage)) {
            println(s"Drop unrecognized event: ${e.getMessage}")
            unrecognizedEvents.add(e.getMessage)
          }
          println(s"Drop incompatible event log: $currentLine")
        case e: UnrecognizedPropertyException =>
          // Ignore unrecognized properties, parse through the event log file.
          // To avoid spamming, warnings are only displayed once for each unrecognized property.
          if (!unrecognizedProperties.contains(e.getMessage)) {
            println(s"Drop unrecognized property: ${e.getMessage}")
            unrecognizedProperties.add(e.getMessage)
          }
          println(s"Drop incompatible event log: $currentLine")
        case jpe: JsonParseException =>
          // We can only ignore exception from last line of the file that might be truncated
          // the last entry may not be the very last line in the event log, but we treat it
          // as such in a best effort to replay the given input
          if (lines.hasNext) {
            throw jpe
          } else {
            println(s"Got JsonParseException from log file. The file might not have finished writing cleanly.")
          }
      }
    }
  } finally {
    streams.foreach(IOUtils.closeQuietly)
  }
}

replaySparkEvents(eventLogPath)

 

 

View solution in original post

14 REPLIES 14

Khaja_Zaffer
Contributor

Hello @der 

Good day!!

I see whole issue over here. As per the doc. the default size for spark.ui.retainedJobs is 1000.

ref: 

Khaja_Zaffer_0-1758179161839.png

 

 

Can you increase them? 

For very large logs (e.g., 5600+ jobs), scale up further (e.g., to 100000 for retainedTasks/jobs) and use a cluster with more driver memory (e.g., via spark.driver.memory 16g)

I am waiting for your response. 

Hi @Khaja_Zaffer 

yes I set following values:

der_2-1758181541962.png

 

And the cluster should have also enough memory

der_1-1758181451689.png

Log is activated:

der_3-1758181729042.png

 

 

 

Okay, we can wait then. let me know if there any further issues

The solution works as long as the cluster is running:

der_0-1758184685442.png

However, it does not replay all jobs if cluster is terminated (Example the job cluster from pipeline finished after hours)

der_1-1758184941528.png

So how can we replay the full spark UI from a finished job cluster?

Of course you can point it towards storage account: 

Khaja_Zaffer_0-1758185385429.png

 

Enable Spark event log delivery on your workspace/cluster (to DBFS, S3, ADLS, etc.). Otherwise you simply donโ€™t have the logs to replay.

Check spark.eventLog.enabled true

Set spark.eventLog.dir to your persistent storage path.

 

 

szymon_dybczak
Esteemed Contributor III

Hi @der ,

Here's the content of Event Log Replay notebook. You can use it to achieve your goals ๐Ÿ™‚

szymon_dybczak_0-1758184107442.png

 

 

%scala
dbutils.widgets.removeAll()
dbutils.widgets.text("event_log_path", "", "event_log_path")
 
val eventLogPath = dbutils.widgets.get("event_log_path")

package org.apache.spark.util
 
import org.apache.spark.scheduler.SparkListenerEvent
import org.json4s.JsonAST.JValue
 
/**
 * Visibility hack.
 */
object PublicJsonProtocol  {
  def sparkEventFromJson(json: JValue): SparkListenerEvent = {
    JsonProtocol.sparkEventFromJson(json)
  }
}


import java.util.zip.GZIPInputStream
import scala.collection.JavaConversions._
import org.json4s.jackson.JsonMethods._
import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException
 
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs._
import org.apache.spark.util.PublicJsonProtocol
 
 
/**
 * Function to replay all the events that happened on a cluster in the case the Spark History Server fails to
 * load the Spark UI. Run this command on an instance that hasn't run any Spark command to completely replay
 * all the events a cluser went through.
 */
def replaySparkEvents(pathToEventLogs: String): Unit = {
  val eventLogFiles = dbutils.fs.ls(pathToEventLogs).filter(_.name.startsWith("eventlog")).map(_.path)
  require(eventLogFiles.nonEmpty, "No event logs found at this path")
  val inOrder = eventLogFiles.tail ++ Seq(eventLogFiles.head)
  
  val lineIterator = inOrder.iterator.map { file =>
    val path = new Path(file)
    val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
    val fileStream = fs.open(path)
    val stream = if (file.endsWith(".gz")) {
      new GZIPInputStream(fileStream)
    } else {
      fileStream
    }
    print("File process:"+file)
    val lines = IOUtils.readLines(stream)
    (stream, lines)
  }
  
  val lines = lineIterator.flatMap(_._2)
  val streams = lineIterator.map(_._1)
  
  val unrecognizedEvents = new scala.collection.mutable.HashSet[String]
  val unrecognizedProperties = new scala.collection.mutable.HashSet[String]
 
  val listenerBus = sc.listenerBus
  var currentLine: String = null
 
  try {
    while (lines.hasNext) {
      try {
        val entry = lines.next()
        currentLine = entry
        listenerBus.post(PublicJsonProtocol.sparkEventFromJson(parse(currentLine)))
      } catch {
        case e: java.lang.ClassNotFoundException =>
          // Ignore unknown events, parse through the event log file.
          // To avoid spamming, warnings are only displayed once for each unknown event.
          if (!unrecognizedEvents.contains(e.getMessage)) {
            println(s"Drop unrecognized event: ${e.getMessage}")
            unrecognizedEvents.add(e.getMessage)
          }
          println(s"Drop incompatible event log: $currentLine")
        case e: UnrecognizedPropertyException =>
          // Ignore unrecognized properties, parse through the event log file.
          // To avoid spamming, warnings are only displayed once for each unrecognized property.
          if (!unrecognizedProperties.contains(e.getMessage)) {
            println(s"Drop unrecognized property: ${e.getMessage}")
            unrecognizedProperties.add(e.getMessage)
          }
          println(s"Drop incompatible event log: $currentLine")
        case jpe: JsonParseException =>
          // We can only ignore exception from last line of the file that might be truncated
          // the last entry may not be the very last line in the event log, but we treat it
          // as such in a best effort to replay the given input
          if (lines.hasNext) {
            throw jpe
          } else {
            println(s"Got JsonParseException from log file. The file might not have finished writing cleanly.")
          }
      }
    }
  } finally {
    streams.foreach(IOUtils.closeQuietly)
  }
}

replaySparkEvents(eventLogPath)

 

 

Hello @szymon_dybczak 

Do we have any documentation around this?

szymon_dybczak
Esteemed Contributor III

Hi @Khaja_Zaffer ,

Yes, we do -> Replay Apache Spark events in a cluster - Databricks

It's quite simple. You just need to provide a valid path to your cluster logs and run notebook content that I provided above. It should then replay events.

szymon_dybczak_0-1758184889651.png

 

Hello @szymon_dybczak 

I think he has some eventlog file. 

But Sorry for not asking properly. 
I mean the scala code? does it has any limitations or we can use it for such issues? I was actually looking for this code. 

 

Hi @szymon_dybczak 

Thank you for providing the content of the notebook. I haven't stored it and now it is not accessible anymore on Databricks Link (Homepage).

I think with this I can do it.

szymon_dybczak
Esteemed Contributor III

Hi,

No problem @der . If the provided answer was helpful please consider marking it as a solution to a thread.

Your notebook worked!

I only needed to change this cell:

 

package org.apache.spark.util

import org.apache.spark.scheduler.SparkListenerEvent
import org.json4s.JsonAST.JValue
import org.json4s.jackson.JsonMethods

/**
 * Visibility hack.
 */
object PublicJsonProtocol  {
  def sparkEventFromJson(json: JValue): SparkListenerEvent = {
    JsonProtocol.sparkEventFromJson(JsonMethods.compact(json))
  }
}

 

Spark UI output of an job compute (executed yesterday):

der_0-1758189521569.png

you made my day ๐Ÿค—

Thats great!

congratulations @der @szymon_dybczak 

szymon_dybczak
Esteemed Contributor III

So cool that it worked. Good job guys @der , @Khaja_Zaffer !