cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

How do you correctly access the spark context in DLT pipelines?

KateK
New Contributor II

I have some code that uses RDDs, and the sc.parallelize() and rdd.toDF() methods to get a dataframe back out. The code works in a regular notebook (and if I run the notebook as a job) but fails if I do the same thing in a DLT pipeline. The error message varies on how I try to access the spark context, but I usually get Value Error RDD is empty

I've tried a few different ways to access spark context in a DLT pipeline but none of them seem to work. I'm also new to spark, so I'm maybe just not googling the right terms -- it would be great if someone could point me in the right direction!

I'm attaching two dbc notebooks, the first one creates some fake data, reads it in and does the RDD operation I want to do. The second notebook is a DLT pipeline, it reads in the fake data and tries a few different ways to access spark context and do the RDD operation. The error messages I get with each method are noted.

3 REPLIES 3

alexott
Valued Contributor II
Valued Contributor II

DLT calls your functions several times - with empty dataframe when building the execution graph, and than with actual data when executing the pipeline.

But really, I believe that you can do what you want using the Dataframe APIs, in the worst case - resort to the Pandas UDFs, but you need to provide a description of what do you want to achieve. The reason for that is that RDD implementation will be very slow as each row is evaluated in the Python interpreter and Catalyst won't be able to perform any optimization

Kaniz
Community Manager
Community Manager

Hi @Katayun Kamdin​, We haven’t heard from you on the last response from @Alex Ott​, and I was checking back to see if his suggestions helped you.

Or else, If you have any solution, please do share that with the community as it can be helpful to others.

Also, Please don't forget to click on the "Select As Best" button whenever the information provided helps resolve your question.

KateK
New Contributor II

Thanks for your help Alex, I ended up re-writing my code with spark UDFs -- maybe there is a better solution with only the Dataframe API but I couldn't find it.

To summarize my problem: I was trying to un-nest a large json blob (the fake data in my first attached notebook emulates the problem). I have a data source that contains a lot of nested json, and usually I can use use pyspark.sql.functions.get_json_object() to extract the value for the key I want. That doesn't work in this case though because the nested keys are randomized, so I needed a way to take something like this:

"file_key": {
  "nested_blob": {
    "rand_nested_id_1": {"k1": "v1", "k2": "v2"},
    "rand_nested_id_2": {"k1": "v1", "k2": "v2"},
    "rand_nested_id_3": {"k1": "v1", "k2": "v2"}
  }
}

And turn it into a dataframe that looks like this:

file_key | rand_nested_id_1  | {"k1": "v1", "k2": "v2"}
file_key | rand_nested_id_2 | {"k1": "v1", "k2": "v2"}
file_key | rand_nested_id_3 | {"k1": "v1", "k2": "v2"}

A combination of a spark udf that returns the file_key and nested key, value pairs as an array followed by the explode function did I wanted. Solution attached.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.