I want to open some CSV files as an RDD, do some processing and then load it as a DataFrame. Since the files are stored in an Azure blob storage account I need to configure the access accordingly, which for some reason does not work when using an RDD...
I decided to load the files into a DataFrame with a single column and then do the processing before splitting it into separate columns and this works just fine.@Hyper Guy​ thanks for the link, I didn't try that but it seems like it would resolve the ...
Hi, I'm trying to write data from RDD to the storage account:Adding storage account key:spark.conf.set("fs.azure.account.key.y.blob.core.windows.net", "myStorageAccountKey")Read and write to the same storage:val path = "wasbs://x@y.blob.core.windows....
Hello @Vadim1 and @User16764241763. I'm wondering if you find a way to avoid adding the hardcoded key in the advanced options spark config section in the cluster configuration. Is there a similar command to spark.conf.set("spark.hadoop.fs.azure.accou...
Hi there,I need some help with this example. We're trying to create a linearRegression model that can parallelize for thousands of symbols per date. When we run this we get a picklingError Any suggestions would be much appreciated!KError:PicklingErro...
Hi, looking for the right solution pattern for this scenario: We have millions of relatively small XML files (currently sitting in ADLS) that we have to load into delta lake. Each XML file has to be read, parsed, and pivoted before writing to a delta...
Hi @Steph Swierenga​ Thank you for posting your question in our community! We are happy to assist you.To help us provide you with the most accurate information, could you please take a moment to review the responses and select the one that best answe...
when I call RDD Apis during pytest, it seems like module "serializer.py" cannot find any other modules under pyspark.I've already looked up on the internet, and it seems like pyspark modules are not properly importing other referring modules.I see ot...
@hyunho lee​ : It sounds like you are encountering an issue with PySpark's serializer not being able to find the necessary modules during testing with Pytest. One solution you could try is to set the PYTHONPATH environment variable to include the pat...
We have a Cursor in DB2 which reads in each loop data from 2 tables. At the end of each loop, after inserting the data to a target table, we update records related to each loop in these 2 tables before moving to the next loop. An indicative example i...
Hi @ELENI GEORGOUSI​ Thank you for posting your question in our community! We are happy to assist you.To help us provide you with the most accurate information, could you please take a moment to review the responses and select the one that best answe...
Hi all,I am using a persist call on a spark dataframe inside an application to speed-up computations. The dataframe is used throughout my application and at the end of the application I am trying to clear the cache of the whole spark session by calli...
No solution yet:Hi @Suteja Kanuri​ ,Thank you for thinking along and replying!Unfortunately, I have not found a solution yet.I am getting an error that there exists no ```.getCache()``` method on a spark context. Also note that I have tried to do som...
Hi all! I have an S3 bucket with Delta parquet files/folders with different schemas each. I need to create an RDD or DataFrame from all those Delta Tables that should contain the path, name and different schema of each.How could I do that?Thank you!P...
We do have data stored in HDF5 files in a "proprietary" way. This data needs to be read, converted and transformed before it can be inserted into a delta table.All of this transformation is done in a custom python function that takes the HDF5 file an...
Hi, My question is - what happens to the initial RDD after the action is performed on it. Does it disappear or stays in the memory or does it needs to be explicitly cached() if we want to use it again.For eg : If I execute this in a sequence :df_outp...
I'm extracting data from a custom format by day of month using a 32 core executor. I'm using rdds to distribute work across cores of the executor. I'm seeing an intermittent issue where for a run sometimes I see 31 cores being used as expected and ot...
I may have figured this out! I'm explicitly setting the number of slices instead of using the default.days_rdd = sc.parallelize(days_to_process,len(days_to_process))
I need to process a number of files where I manipulate file text utilising an external executable that operates on stdin/stdout. I am quite new to spark. What I am attempting is to use rdd.pipe as in the followingexe_path = " /usr/local/bin/external...
I have some code that uses RDDs, and the sc.parallelize() and rdd.toDF() methods to get a dataframe back out. The code works in a regular notebook (and if I run the notebook as a job) but fails if I do the same thing in a DLT pipeline. The error mess...
Thanks for your help Alex, I ended up re-writing my code with spark UDFs -- maybe there is a better solution with only the Dataframe API but I couldn't find it. To summarize my problem: I was trying to un-nest a large json blob (the fake data in my f...