cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

spline agent in Databricks use

ZacayDaushin
New Contributor

spline Agent 
I use spline agent to get lineage of Databricks notebooks 

and for that i put the following code - attached to the notebook 
But i get the error attached

%scala
import scala.util.parsing.json.JSON
import za.co.absa.spline.harvester.SparkLineageInitializer._
import za.co.absa.spline.agent.AgentConfig
import za.co.absa.spline.harvester.postprocessing.AbstractPostProcessingFilter
import za.co.absa.spline.harvester.postprocessing.PostProcessingFilter
import org.apache.commons.configuration.Configuration
import za.co.absa.spline.harvester.conf.StandardSplineConfigurationStack
import za.co.absa.spline.harvester.HarvestingContext
import za.co.absa.spline.producer.model.ExecutionPlan
import za.co.absa.spline.producer.model.ExecutionEvent
import za.co.absa.spline.producer.model.ReadOperation
import za.co.absa.spline.producer.model.WriteOperation
import za.co.absa.spline.producer.model.DataOperation
import za.co.absa.spline.harvester.ExtraMetadataImplicits._
import za.co.absa.spline.harvester.SparkLineageInitializer._

val notebookInformationJson = dbutils.notebook.getContext.toJson
val outerMap = JSON.parseFull(notebookInformationJson).getOrElse(0).asInstanceOf[Map[String,String]]
val tagMap = outerMap("tags").asInstanceOf[Map[String,String]]
val extraContextMap = outerMap("extraContext").asInstanceOf[Map[String,String]]
val notebookPath = extraContextMap("notebook_path").split("/")
//val workspaceUrl=tagMap("browserHostName")
var workspaceUrl="https://adb-7614304971745696.16.azuredatabricks.net"

val workspaceName=dbutils.notebook().getContext().notebookPath.get
val notebookURL = tagMap("browserHostName")+"/?o="+tagMap("orgId")
val user = tagMap("user")
val name = notebookPath(notebookPath.size-1)
val notebookInfo = Map("notebookURL" -> notebookURL,
"user" -> user,
"workspaceName" ->workspaceName,
"workspaceUrl" -> workspaceUrl,
"name" -> name,
"mounts" -> dbutils.fs.ls("/FileStore/tables").map(_.path),
"timestamp" -> System.currentTimeMillis)
val notebookInfoJson = scala.util.parsing.json.JSONObject(notebookInfo)


class CustomFilter extends PostProcessingFilter {
def this(conf: Configuration) = this()

override def processExecutionEvent(event: ExecutionEvent, ctx: HarvestingContext): ExecutionEvent =
event.withAddedExtra(Map("foo" -> "bar"))

override def processExecutionPlan(plan: ExecutionPlan, ctx: HarvestingContext ๐Ÿ˜ž ExecutionPlan =
plan.withAddedExtra(Map( "notebookInfo" -> notebookInfoJson))

override def processReadOperation(op: ReadOperation, ctx: HarvestingContext ๐Ÿ˜ž ReadOperation =
op.withAddedExtra(Map("foo" -> "bar"))

override def processWriteOperation(op: WriteOperation, ctx: HarvestingContext): WriteOperation =
op.withAddedExtra(Map("foo" -> "bar"))

override def processDataOperation(op: DataOperation, ctx: HarvestingContext ๐Ÿ˜ž DataOperation =
op.withAddedExtra(Map("foo" -> "bar"))
}

val myInstance = new CustomFilter()

spark.enableLineageTracking(
AgentConfig.builder()
.postProcessingFilter(myInstance)
.build()
)

 

1 REPLY 1

-werners-
Esteemed Contributor III

Could be me  but I do not see an error message?

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.