08-04-2022 09:15 AM
I have some code that uses RDDs, and the sc.parallelize() and rdd.toDF() methods to get a dataframe back out. The code works in a regular notebook (and if I run the notebook as a job) but fails if I do the same thing in a DLT pipeline. The error message varies on how I try to access the spark context, but I usually get Value Error RDD is empty
I've tried a few different ways to access spark context in a DLT pipeline but none of them seem to work. I'm also new to spark, so I'm maybe just not googling the right terms -- it would be great if someone could point me in the right direction!
I'm attaching two dbc notebooks, the first one creates some fake data, reads it in and does the RDD operation I want to do. The second notebook is a DLT pipeline, it reads in the fake data and tries a few different ways to access spark context and do the RDD operation. The error messages I get with each method are noted.
08-05-2022 12:34 AM
DLT calls your functions several times - with empty dataframe when building the execution graph, and than with actual data when executing the pipeline.
But really, I believe that you can do what you want using the Dataframe APIs, in the worst case - resort to the Pandas UDFs, but you need to provide a description of what do you want to achieve. The reason for that is that RDD implementation will be very slow as each row is evaluated in the Python interpreter and Catalyst won't be able to perform any optimization
08-07-2022 11:47 PM
Hi @Katayun Kamdin, We haven’t heard from you on the last response from @Alex Ott, and I was checking back to see if his suggestions helped you.
Or else, If you have any solution, please do share that with the community as it can be helpful to others.
Also, Please don't forget to click on the "Select As Best" button whenever the information provided helps resolve your question.
08-08-2022 09:43 AM
Thanks for your help Alex, I ended up re-writing my code with spark UDFs -- maybe there is a better solution with only the Dataframe API but I couldn't find it.
To summarize my problem: I was trying to un-nest a large json blob (the fake data in my first attached notebook emulates the problem). I have a data source that contains a lot of nested json, and usually I can use use pyspark.sql.functions.get_json_object() to extract the value for the key I want. That doesn't work in this case though because the nested keys are randomized, so I needed a way to take something like this:
"file_key": {
"nested_blob": {
"rand_nested_id_1": {"k1": "v1", "k2": "v2"},
"rand_nested_id_2": {"k1": "v1", "k2": "v2"},
"rand_nested_id_3": {"k1": "v1", "k2": "v2"}
}
}
And turn it into a dataframe that looks like this:
file_key | rand_nested_id_1 | {"k1": "v1", "k2": "v2"}
file_key | rand_nested_id_2 | {"k1": "v1", "k2": "v2"}
file_key | rand_nested_id_3 | {"k1": "v1", "k2": "v2"}
A combination of a spark udf that returns the file_key and nested key, value pairs as an array followed by the explode function did I wanted. Solution attached.
Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections.
Click here to register and join today!
Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.