10-11-2021 01:42 AM
I am using a Py function to read some data from a GET endpoint and write them as a CSV file to a Azure BLOB location.
My GET endpoint takes 2 query parameters,param1 and param2. So initially, I have a dataframe paramDf that has two columns param1 and param2.
param1 param2
12 25
45 95
Schema: paramDF:pyspark.sql.dataframe.DataFrame
param1:string
param2:string
Now I write a function as below:
def executeRestApi(w):
dlist=[]
try:
response=requests.get(DataUrl.format(token=TOKEN, oid=w.param1,wid=w.param2))
if(response.status_code==200):
metrics=response.json()['data']['metrics']
dic={}
dic['metric1'] = metrics['metric1']
dic['metric2'] = metrics['metric2']
dlist.append(dic)
pandas.DataFrame(dlist).to_csv("../../dbfs/mnt/raw/Important/MetricData/listofmetrics_{}_{}.csv".format(param1,param2),header=True,index=False)
return "Success"
except Exception as e:
return "Failure"
Finally, invoke the method as:
paramDf.foreach(executeRestApi)
So ,theoretically ,the function executeRestApi must be executed foe each row in the dataframe, and, within the function ,I extract the required data and write it to a ADLS location as a csv file.
All works good ,except that the file is never written when I execute the foreach command on a multi node cluster.
However the same operation works well on a single node cluster. I am unable to figure out the difference between the two approaches.
what could i be doing wrong here?
10-12-2021 01:03 AM
Ok let's see.
You have a spark dataframe, which is distributed by nature.
On the other hand you use pandas, which is not distributed.
Also your function is plain python, not pyspark.
This will be processed by the driver, so not distributed.
However, the dataframe itself will be processed by the workers.
So the workers want to do something but the code runs on the driver. It could be that. Not sure though, but the fact it works as single node makes me think your code is not executed in a distributed environment.
That is why I mentioned the collect(). This will collect the spark dataframe on the driver so the workers are bypassed in your case.
Moving from python to pyspark takes some time to understand.
This blog explains some interesting topics:
https://medium.com/hashmapinc/5-steps-to-converting-python-jobs-to-pyspark-4b9988ad027a
Using koalas instead of pandas f.e.
10-11-2021 07:54 AM
@Kaniz Fatma Thanks, eagerly awaiting your response.
10-11-2021 08:03 AM
Remove the write from the foreach. Instead build a dataframe. return that dataframe and write that only once.
Now you do a write in each iteration.
10-11-2021 09:44 AM
@Werner Stinckens But foreach doesn't return anything right?
10-11-2021 11:51 AM
foreach itself does not return anything indeed.
Now to solve your issue in a distributed manner, we need to think in a slightly different way.
Looping probably is not the best way.
What you try to do is starting from a DF call a rest api for each record and combine the DF and the result of the rest call.
There are several ways to do this, but the map() function or a udf seem appropriate:
https://stackoverflow.com/questions/64191614/how-to-use-map-to-make-rest-api-calls-in-pyspark
https://stackoverflow.com/questions/67204599/parallel-rest-api-request-using-sparkdatabricks
I hope that will put you in the right direction.
If you want to use a loop after all, try to add a collect() before the foreach so all data gets sent to the driver. But that beats the purpose of having multiple nodes.
10-11-2021 10:36 PM
@Werner Stinckens I don't want to use a loop at all .I tried this same thing using UDF too, but again the write to blob part never happened inside the UDF.
I will try out the strategy that you have suggested, and am also looking at Python multithreading.
10-11-2021 10:38 PM
@Werner Stinckens But I still have a question. Why is the write operation that I am attempting to ,not working in the first place? Is it something to do with how things are designed?
10-12-2021 01:03 AM
Ok let's see.
You have a spark dataframe, which is distributed by nature.
On the other hand you use pandas, which is not distributed.
Also your function is plain python, not pyspark.
This will be processed by the driver, so not distributed.
However, the dataframe itself will be processed by the workers.
So the workers want to do something but the code runs on the driver. It could be that. Not sure though, but the fact it works as single node makes me think your code is not executed in a distributed environment.
That is why I mentioned the collect(). This will collect the spark dataframe on the driver so the workers are bypassed in your case.
Moving from python to pyspark takes some time to understand.
This blog explains some interesting topics:
https://medium.com/hashmapinc/5-steps-to-converting-python-jobs-to-pyspark-4b9988ad027a
Using koalas instead of pandas f.e.
10-12-2021 06:36 AM
use Spark DataFrame instead,
safer is to use dbfs path "dbfs:/mnt/raw/Important/MetricData...."
10-12-2021 06:38 AM
@Hubert Dudek I cant issue a spark command to executor node, throws up an error ,because foreach distributes the processing.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group