cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Failed to convert Spark.sql to Pandas Dataframe using .toPandas()

Dicer
Valued Contributor

I wrote the following code:

data = spark.sql (" SELECT A_adjClose, AA_adjClose, AAL_adjClose, AAP_adjClose, AAPL_adjClose FROM deltabase.a_30min_delta, deltabase.aa_30min_delta, deltabase.aal_30min_delta, deltabase.aap_30min_delta ,deltabase.aapl_30min_delta ")
 
 
 
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
 
 
 
#This part keeps running command
 
data_pd = data.toPandas()
 
 
 
df_pct = data_pd.pct_change(1)

The code stucks in .toPandas() this part.

1 ACCEPTED SOLUTION

Accepted Solutions

Dicer
Valued Contributor

I just discovered a solution.

Today, I opened Azure Databricks. When I imported python libraries. Databricks told me that toPandas() was deprecated and it suggested me to use toPandas.

The following solution works: Use toPandas instead of toPandas()

data = spark.sql (" SELECT A_adjClose, AA_adjClose, AAL_adjClose, AAP_adjClose, AAPL_adjClose FROM deltabase.a_30min_delta, deltabase.aa_30min_delta, deltabase.aal_30min_delta, deltabase.aap_30min_delta ,deltabase.aapl_30min_delta ")
 
display(data)
 
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
 
data_pd = data.toPandas

Yet, when I tried to calculate percentage change using pct_change(), it didn't work. pct_change() hasn't been put into pyspark.pandas

#This failed because pct_change() function has not been put into pyspark.pandas
df_pct = data_pd.pct_change(1)

Another solution is to use: pandas_api() to convert the spark dataframe to pandas-spark dataframe.

This allows me to use pct_change() after converting spark dataframe to pandas-spark dataframe

data_pd = data.pandas_api()
data_pd.pct_change()

 Source: https://stackoverflow.com/questions/73061556/does-pyspark-pandas-support-pandas-pct-change-function/...

View solution in original post

13 REPLIES 13

Dicer
Valued Contributor

Btw, this is Azure Databricks

Hubert-Dudek
Esteemed Contributor III

Try to replace .to_pandas() with .to_pandas_on_spark(). This way, you will ensure that the dataframe is processed distributed among the workers.

Dicer
Valued Contributor

I tried to replace .to_pandas() with .to_pandas_on_spark(), but there were 1 warning message and 1 error message:

/databricks/spark/python/pyspark/sql/dataframe.py:3407: FutureWarning: DataFrame.to_pandas_on_spark is deprecated. Use DataFrame.pandas_api instead.

warnings.warn(

SparkException: Job aborted due to stage failure: Task 0 in stage 21.0 failed 4 times, most recent failure: Lost task 0.3 in stage 21.0 (TID 24) (10.139.64.4 executor 3): ExecutorLostFailure (executor 3 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 176959 ms

User16753725469
Contributor II

Hi @Cheuk Hin Christophe Poon​  can you please try increasing executors if it helps

Kaniz_Fatma
Community Manager
Community Manager

Hi @Cheuk Hin Christophe Poon​, We haven’t heard from you on the last response from @Prasad Wagh​, and I was checking back to see if his suggestions helped you. Or else, If you have any solution, please do share that with the community as it can be helpful to others.

Also, Please don't forget to click on the "Select As Best" button whenever the information provided helps resolve your question.

Dicer
Valued Contributor

@Prasad Wagh​ @Kaniz Fatma​ I use Standard_DS3_V2 (14GB Memory 4 Cores) in Azure Databricks. Originally, I set Min Worker: 1 Max Workers: 8. Now, Min Worker: 1 Max Worker: 8.

But this still doesn't work. 1 worker should be able to finish the task. The data size of the task is small.

I guess the size of the VM cluster is not the cause.

Hubert-Dudek
Esteemed Contributor III

As Spark is using lazy evaluation, I bet that is not to_pands which case the issues but pct_change as stated in the quote from the documentation below. Also is better that panda datset has the unique index to run pct_change (if you have no time field, you can set increment id):

df.set_index('month')

"the current implementation of this API uses Spark’s Window without specifying partition specification. This leads to move all data into single partition in single machine and could cause serious performance degradation. Avoid this method against very large dataset."

Dicer
Valued Contributor
data = spark.sql (" SELECT A_adjClose, AA_adjClose, AAL_adjClose, AAP_adjClose, AAPL_adjClose FROM deltabase.a_30min_delta, deltabase.aa_30min_delta, deltabase.aal_30min_delta, deltabase.aap_30min_delta ,deltabase.aapl_30min_delta ")
 
display(data)
 
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
 
data_pd = data.toPandas()
 
#df_pct = data_pd.pct_change(1)
 
#display(df_pct)

@Hubert Dudek​ I don't think the problem is the data type.

In my original code, there was a date/time, but I am debugging now. And I realized removing the date/time column doesn't solve the problem.

Now, the data types of the data are just float.

Also, I removed pct_change. The problem still exists.

Hubert-Dudek
Esteemed Contributor III
  • How many rows does the dataset have?
  • Can you share excel from the display function with the sample?
  • What are the cluster-specific (worker type and runtime version)? Is it standard, high-concurrent, or single-machine?

Dicer
Valued Contributor

  • Each Delta table has about more than 100,000 rows, but each Delta table only has about 3.18 MB.

  • I upload the photo and there is a sample, but not using the python display() function.

Dicer
Valued Contributor
  • Cluster Model: Standard. Runtime version: 11.0(Includes Apache Spark 3.3.0, Scala 2.12). Worker Type: Standard_DS3_V2.

​Spark Config:

spark.databricks.delta.autoCompact.enabled true
spark.databricks.delta.optimizeWrite.enabled true

Dicer
Valued Contributor

@Prasad Wagh​ @Kaniz Fatma​  Is it possible to submit a full detailed log report to Databricks?

Dicer
Valued Contributor

I just discovered a solution.

Today, I opened Azure Databricks. When I imported python libraries. Databricks told me that toPandas() was deprecated and it suggested me to use toPandas.

The following solution works: Use toPandas instead of toPandas()

data = spark.sql (" SELECT A_adjClose, AA_adjClose, AAL_adjClose, AAP_adjClose, AAPL_adjClose FROM deltabase.a_30min_delta, deltabase.aa_30min_delta, deltabase.aal_30min_delta, deltabase.aap_30min_delta ,deltabase.aapl_30min_delta ")
 
display(data)
 
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
 
data_pd = data.toPandas

Yet, when I tried to calculate percentage change using pct_change(), it didn't work. pct_change() hasn't been put into pyspark.pandas

#This failed because pct_change() function has not been put into pyspark.pandas
df_pct = data_pd.pct_change(1)

Another solution is to use: pandas_api() to convert the spark dataframe to pandas-spark dataframe.

This allows me to use pct_change() after converting spark dataframe to pandas-spark dataframe

data_pd = data.pandas_api()
data_pd.pct_change()

 Source: https://stackoverflow.com/questions/73061556/does-pyspark-pandas-support-pandas-pct-change-function/...

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group