cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Unable to write csv files to Azure BLOB using pandas to_csv ()

halfwind22
New Contributor III

I am using a Py function to read some data from a GET endpoint and write them as a CSV file to a Azure BLOB location.

My GET endpoint takes 2 query parameters,param1 and param2. So initially, I have a dataframe paramDf that has two columns param1 and param2.

param1   param2
12        25
45        95
 
Schema:    paramDF:pyspark.sql.dataframe.DataFrame
           param1:string
           param2:string

Now I write a function as below:

def executeRestApi(w):
 
      dlist=[]
 
      try:
 
        response=requests.get(DataUrl.format(token=TOKEN, oid=w.param1,wid=w.param2))
 
        if(response.status_code==200):
 
          metrics=response.json()['data']['metrics']
 
          dic={}
 
          dic['metric1'] = metrics['metric1']
 
          dic['metric2'] = metrics['metric2']
 
          dlist.append(dic)
 
       pandas.DataFrame(dlist).to_csv("../../dbfs/mnt/raw/Important/MetricData/listofmetrics_{}_{}.csv".format(param1,param2),header=True,index=False)
 
    return "Success"
 
          
 
   except Exception as e:
 
        return "Failure"
 

Finally, invoke the method as:

paramDf.foreach(executeRestApi)

So ,theoretically ,the function executeRestApi must be executed foe each row in the dataframe, and, within the function ,I extract the required data and write it to a ADLS location as a csv file.

All works good ,except that the file is never written when I execute the foreach command on a multi node cluster.

However the same operation works well on a single node cluster. I am unable to figure out the difference between the two approaches.

what could i be doing wrong here?

1 ACCEPTED SOLUTION

Accepted Solutions

-werners-
Esteemed Contributor III

Ok let's see.

You have a spark dataframe, which is distributed by nature.

On the other hand you use pandas, which is not distributed.

Also your function is plain python, not pyspark.

This will be processed by the driver, so not distributed.

However, the dataframe itself will be processed by the workers.

So the workers want to do something but the code runs on the driver. It could be that. Not sure though, but the fact it works as single node makes me think your code is not executed in a distributed environment.

That is why I mentioned the collect(). This will collect the spark dataframe on the driver so the workers are bypassed in your case.

Moving from python to pyspark takes some time to understand.

This blog explains some interesting topics:

https://medium.com/hashmapinc/5-steps-to-converting-python-jobs-to-pyspark-4b9988ad027a

Using koalas instead of pandas f.e.

View solution in original post

9 REPLIES 9

halfwind22
New Contributor III

@Kaniz Fatmaโ€‹ Thanks, eagerly awaiting your response.

-werners-
Esteemed Contributor III

Remove the write from the foreach. Instead build a dataframe. return that dataframe and write that only once.

Now you do a write in each iteration.

halfwind22
New Contributor III

@Werner Stinckensโ€‹ But foreach doesn't return anything right?

-werners-
Esteemed Contributor III

foreach itself does not return anything indeed.

Now to solve your issue in a distributed manner, we need to think in a slightly different way.

Looping probably is not the best way.

What you try to do is starting from a DF call a rest api for each record and combine the DF and the result of the rest call.

There are several ways to do this, but the map() function or a udf seem appropriate:

https://stackoverflow.com/questions/64191614/how-to-use-map-to-make-rest-api-calls-in-pyspark

https://medium.com/geekculture/how-to-execute-a-rest-api-call-on-apache-spark-the-right-way-in-pytho...

https://stackoverflow.com/questions/67204599/parallel-rest-api-request-using-sparkdatabricks

I hope that will put you in the right direction.

If you want to use a loop after all, try to add a collect() before the foreach so all data gets sent to the driver. But that beats the purpose of having multiple nodes.

halfwind22
New Contributor III

@Werner Stinckensโ€‹ I don't want to use a loop at all .I tried this same thing using UDF too, but again the write to blob part never happened inside the UDF.

I will try out the strategy that you have suggested, and am also looking at Python multithreading.

halfwind22
New Contributor III

@Werner Stinckensโ€‹  But I still have a question. Why is the write operation that I am attempting to ,not working in the first place? Is it something to do with how things are designed?

-werners-
Esteemed Contributor III

Ok let's see.

You have a spark dataframe, which is distributed by nature.

On the other hand you use pandas, which is not distributed.

Also your function is plain python, not pyspark.

This will be processed by the driver, so not distributed.

However, the dataframe itself will be processed by the workers.

So the workers want to do something but the code runs on the driver. It could be that. Not sure though, but the fact it works as single node makes me think your code is not executed in a distributed environment.

That is why I mentioned the collect(). This will collect the spark dataframe on the driver so the workers are bypassed in your case.

Moving from python to pyspark takes some time to understand.

This blog explains some interesting topics:

https://medium.com/hashmapinc/5-steps-to-converting-python-jobs-to-pyspark-4b9988ad027a

Using koalas instead of pandas f.e.

Hubert-Dudek
Esteemed Contributor III

use Spark DataFrame instead,

safer is to use dbfs path "dbfs:/mnt/raw/Important/MetricData...."

halfwind22
New Contributor III

@Hubert Dudekโ€‹ I cant issue a spark command to executor node, throws up an error ,because foreach distributes the processing.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group