cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Unable to write csv files to Azure BLOB using pandas to_csv ()

halfwind22
New Contributor III

I am using a Py function to read some data from a GET endpoint and write them as a CSV file to a Azure BLOB location.

My GET endpoint takes 2 query parameters,param1 and param2. So initially, I have a dataframe paramDf that has two columns param1 and param2.

param1   param2
12        25
45        95
 
Schema:    paramDF:pyspark.sql.dataframe.DataFrame
           param1:string
           param2:string

Now I write a function as below:

def executeRestApi(w):
 
      dlist=[]
 
      try:
 
        response=requests.get(DataUrl.format(token=TOKEN, oid=w.param1,wid=w.param2))
 
        if(response.status_code==200):
 
          metrics=response.json()['data']['metrics']
 
          dic={}
 
          dic['metric1'] = metrics['metric1']
 
          dic['metric2'] = metrics['metric2']
 
          dlist.append(dic)
 
       pandas.DataFrame(dlist).to_csv("../../dbfs/mnt/raw/Important/MetricData/listofmetrics_{}_{}.csv".format(param1,param2),header=True,index=False)
 
    return "Success"
 
          
 
   except Exception as e:
 
        return "Failure"
 

Finally, invoke the method as:

paramDf.foreach(executeRestApi)

So ,theoretically ,the function executeRestApi must be executed foe each row in the dataframe, and, within the function ,I extract the required data and write it to a ADLS location as a csv file.

All works good ,except that the file is never written when I execute the foreach command on a multi node cluster.

However the same operation works well on a single node cluster. I am unable to figure out the difference between the two approaches.

what could i be doing wrong here?

1 ACCEPTED SOLUTION

Accepted Solutions

-werners-
Esteemed Contributor III

Ok let's see.

You have a spark dataframe, which is distributed by nature.

On the other hand you use pandas, which is not distributed.

Also your function is plain python, not pyspark.

This will be processed by the driver, so not distributed.

However, the dataframe itself will be processed by the workers.

So the workers want to do something but the code runs on the driver. It could be that. Not sure though, but the fact it works as single node makes me think your code is not executed in a distributed environment.

That is why I mentioned the collect(). This will collect the spark dataframe on the driver so the workers are bypassed in your case.

Moving from python to pyspark takes some time to understand.

This blog explains some interesting topics:

https://medium.com/hashmapinc/5-steps-to-converting-python-jobs-to-pyspark-4b9988ad027a

Using koalas instead of pandas f.e.

View solution in original post

11 REPLIES 11

Kaniz_Fatma
Community Manager
Community Manager

Hi @ halfwind22! My name is Kaniz, and I'm the technical moderator here. Great to meet you, and thanks for your question! Let's see if your peers in the community have an answer to your question first. Or else I will get back to you soon. Thanks.

halfwind22
New Contributor III

@Kaniz Fatma​ Thanks, eagerly awaiting your response.

Hi @Aravind NK​ , Thanks for your trust in us😀 .

-werners-
Esteemed Contributor III

Remove the write from the foreach. Instead build a dataframe. return that dataframe and write that only once.

Now you do a write in each iteration.

halfwind22
New Contributor III

@Werner Stinckens​ But foreach doesn't return anything right?

-werners-
Esteemed Contributor III

foreach itself does not return anything indeed.

Now to solve your issue in a distributed manner, we need to think in a slightly different way.

Looping probably is not the best way.

What you try to do is starting from a DF call a rest api for each record and combine the DF and the result of the rest call.

There are several ways to do this, but the map() function or a udf seem appropriate:

https://stackoverflow.com/questions/64191614/how-to-use-map-to-make-rest-api-calls-in-pyspark

https://medium.com/geekculture/how-to-execute-a-rest-api-call-on-apache-spark-the-right-way-in-pytho...

https://stackoverflow.com/questions/67204599/parallel-rest-api-request-using-sparkdatabricks

I hope that will put you in the right direction.

If you want to use a loop after all, try to add a collect() before the foreach so all data gets sent to the driver. But that beats the purpose of having multiple nodes.

halfwind22
New Contributor III

@Werner Stinckens​ I don't want to use a loop at all .I tried this same thing using UDF too, but again the write to blob part never happened inside the UDF.

I will try out the strategy that you have suggested, and am also looking at Python multithreading.

halfwind22
New Contributor III

@Werner Stinckens​  But I still have a question. Why is the write operation that I am attempting to ,not working in the first place? Is it something to do with how things are designed?

-werners-
Esteemed Contributor III

Ok let's see.

You have a spark dataframe, which is distributed by nature.

On the other hand you use pandas, which is not distributed.

Also your function is plain python, not pyspark.

This will be processed by the driver, so not distributed.

However, the dataframe itself will be processed by the workers.

So the workers want to do something but the code runs on the driver. It could be that. Not sure though, but the fact it works as single node makes me think your code is not executed in a distributed environment.

That is why I mentioned the collect(). This will collect the spark dataframe on the driver so the workers are bypassed in your case.

Moving from python to pyspark takes some time to understand.

This blog explains some interesting topics:

https://medium.com/hashmapinc/5-steps-to-converting-python-jobs-to-pyspark-4b9988ad027a

Using koalas instead of pandas f.e.

Hubert-Dudek
Esteemed Contributor III

use Spark DataFrame instead,

safer is to use dbfs path "dbfs:/mnt/raw/Important/MetricData...."

halfwind22
New Contributor III

@Hubert Dudek​ I cant issue a spark command to executor node, throws up an error ,because foreach distributes the processing.

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!