07-02-2022 04:27 AM
I wrote the following code:
data = spark.sql (" SELECT A_adjClose, AA_adjClose, AAL_adjClose, AAP_adjClose, AAPL_adjClose FROM deltabase.a_30min_delta, deltabase.aa_30min_delta, deltabase.aal_30min_delta, deltabase.aap_30min_delta ,deltabase.aapl_30min_delta ")
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
#This part keeps running command
data_pd = data.toPandas()
df_pct = data_pd.pct_change(1)
The code stucks in .toPandas() this part.
07-18-2022 11:39 PM
I just discovered a solution.
Today, I opened Azure Databricks. When I imported python libraries. Databricks told me that toPandas() was deprecated and it suggested me to use toPandas.
The following solution works: Use toPandas instead of toPandas()
data = spark.sql (" SELECT A_adjClose, AA_adjClose, AAL_adjClose, AAP_adjClose, AAPL_adjClose FROM deltabase.a_30min_delta, deltabase.aa_30min_delta, deltabase.aal_30min_delta, deltabase.aap_30min_delta ,deltabase.aapl_30min_delta ")
display(data)
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
data_pd = data.toPandas
Yet, when I tried to calculate percentage change using pct_change(), it didn't work. pct_change() hasn't been put into pyspark.pandas
#This failed because pct_change() function has not been put into pyspark.pandas
df_pct = data_pd.pct_change(1)
Another solution is to use: pandas_api() to convert the spark dataframe to pandas-spark dataframe.
This allows me to use pct_change() after converting spark dataframe to pandas-spark dataframe
data_pd = data.pandas_api()
data_pd.pct_change()
07-02-2022 04:28 AM
Btw, this is Azure Databricks
07-03-2022 11:19 AM
Try to replace .to_pandas() with .to_pandas_on_spark(). This way, you will ensure that the dataframe is processed distributed among the workers.
07-04-2022 01:59 PM
I tried to replace .to_pandas() with .to_pandas_on_spark(), but there were 1 warning message and 1 error message:
/databricks/spark/python/pyspark/sql/dataframe.py:3407: FutureWarning: DataFrame.to_pandas_on_spark is deprecated. Use DataFrame.pandas_api instead.
warnings.warn(
SparkException: Job aborted due to stage failure: Task 0 in stage 21.0 failed 4 times, most recent failure: Lost task 0.3 in stage 21.0 (TID 24) (10.139.64.4 executor 3): ExecutorLostFailure (executor 3 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 176959 ms
07-07-2022 06:39 AM
Hi @Cheuk Hin Christophe Poon can you please try increasing executors if it helps
07-08-2022 02:40 AM
@Prasad Wagh @Kaniz Fatma I use Standard_DS3_V2 (14GB Memory 4 Cores) in Azure Databricks. Originally, I set Min Worker: 1 Max Workers: 8. Now, Min Worker: 1 Max Worker: 8.
But this still doesn't work. 1 worker should be able to finish the task. The data size of the task is small.
I guess the size of the VM cluster is not the cause.
07-08-2022 09:49 AM
As Spark is using lazy evaluation, I bet that is not to_pands which case the issues but pct_change as stated in the quote from the documentation below. Also is better that panda datset has the unique index to run pct_change (if you have no time field, you can set increment id):
df.set_index('month')
"the current implementation of this API uses Spark’s Window without specifying partition specification. This leads to move all data into single partition in single machine and could cause serious performance degradation. Avoid this method against very large dataset."
07-08-2022 08:51 PM
data = spark.sql (" SELECT A_adjClose, AA_adjClose, AAL_adjClose, AAP_adjClose, AAPL_adjClose FROM deltabase.a_30min_delta, deltabase.aa_30min_delta, deltabase.aal_30min_delta, deltabase.aap_30min_delta ,deltabase.aapl_30min_delta ")
display(data)
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
data_pd = data.toPandas()
#df_pct = data_pd.pct_change(1)
#display(df_pct)
@Hubert Dudek I don't think the problem is the data type.
In my original code, there was a date/time, but I am debugging now. And I realized removing the date/time column doesn't solve the problem.
Now, the data types of the data are just float.
Also, I removed pct_change. The problem still exists.
07-09-2022 05:14 AM
07-09-2022 10:04 AM
07-09-2022 10:07 AM
Spark Config:
spark.databricks.delta.autoCompact.enabled true
spark.databricks.delta.optimizeWrite.enabled true
07-08-2022 09:09 PM
@Prasad Wagh @Kaniz Fatma Is it possible to submit a full detailed log report to Databricks?
07-18-2022 11:39 PM
I just discovered a solution.
Today, I opened Azure Databricks. When I imported python libraries. Databricks told me that toPandas() was deprecated and it suggested me to use toPandas.
The following solution works: Use toPandas instead of toPandas()
data = spark.sql (" SELECT A_adjClose, AA_adjClose, AAL_adjClose, AAP_adjClose, AAPL_adjClose FROM deltabase.a_30min_delta, deltabase.aa_30min_delta, deltabase.aal_30min_delta, deltabase.aap_30min_delta ,deltabase.aapl_30min_delta ")
display(data)
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
data_pd = data.toPandas
Yet, when I tried to calculate percentage change using pct_change(), it didn't work. pct_change() hasn't been put into pyspark.pandas
#This failed because pct_change() function has not been put into pyspark.pandas
df_pct = data_pd.pct_change(1)
Another solution is to use: pandas_api() to convert the spark dataframe to pandas-spark dataframe.
This allows me to use pct_change() after converting spark dataframe to pandas-spark dataframe
data_pd = data.pandas_api()
data_pd.pct_change()
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group