โ07-02-2022 04:27 AM
I wrote the following code:
โ
data = spark.sql (" SELECT A_adjClose, AA_adjClose, AAL_adjClose, AAP_adjClose, AAPL_adjClose FROM deltabase.a_30min_delta, deltabase.aa_30min_delta, deltabase.aal_30min_delta, deltabase.aap_30min_delta ,deltabase.aapl_30min_delta ")
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
#This part keeps running command
data_pd = data.toPandas()
df_pct = data_pd.pct_change(1)
โ
The code stucks in .toPandas() this part.
โ
โ
โ
โ07-18-2022 11:39 PM
I just discovered a solution.
Today, I opened Azure Databricks. When I imported python libraries. Databricks told me that toPandas() was deprecated and it suggested me to use toPandas.
The following solution works: Use toPandas instead of toPandas()
data = spark.sql (" SELECT A_adjClose, AA_adjClose, AAL_adjClose, AAP_adjClose, AAPL_adjClose FROM deltabase.a_30min_delta, deltabase.aa_30min_delta, deltabase.aal_30min_delta, deltabase.aap_30min_delta ,deltabase.aapl_30min_delta ")
display(data)
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
data_pd = data.toPandas
Yet, when I tried to calculate percentage change using pct_change(), it didn't work. pct_change() hasn't been put into pyspark.pandas
#This failed because pct_change() function has not been put into pyspark.pandas
df_pct = data_pd.pct_change(1)
Another solution is to use: pandas_api() to convert the spark dataframe to pandas-spark dataframe.
This allows me to use pct_change() after converting spark dataframe to pandas-spark dataframe
data_pd = data.pandas_api()
data_pd.pct_change()
โ07-02-2022 04:28 AM
Btw, this is Azure Databricks
โ07-03-2022 11:19 AM
Try to replace .to_pandas() with .to_pandas_on_spark(). This way, you will ensure that the dataframe is processed distributed among the workers.
โ07-04-2022 01:59 PM
I tried to replace .to_pandas() with .to_pandas_on_spark(), but there were 1 warning message and 1 error message:
/databricks/spark/python/pyspark/sql/dataframe.py:3407: FutureWarning: DataFrame.to_pandas_on_spark is deprecated. Use DataFrame.pandas_api instead.
warnings.warn(
SparkException: Job aborted due to stage failure: Task 0 in stage 21.0 failed 4 times, most recent failure: Lost task 0.3 in stage 21.0 (TID 24) (10.139.64.4 executor 3): ExecutorLostFailure (executor 3 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 176959 ms
โ07-07-2022 06:39 AM
Hi @Cheuk Hin Christophe Poonโ can you please try increasing executors if it helps
โ07-08-2022 02:40 AM
@Prasad Waghโ @Kaniz Fatmaโ I use Standard_DS3_V2 (14GB Memory 4 Cores) in Azure Databricks. Originally, I set Min Worker: 1 Max Workers: 8. Now, Min Worker: 1 Max Worker: 8.
โ
But this still doesn't work. 1 worker should be able to finish the task. The data size of the task is small.
I guess the size of the VM cluster is not the cause.
โ07-08-2022 09:49 AM
As Spark is using lazy evaluation, I bet that is not to_pands which case the issues but pct_change as stated in the quote from the documentation below. Also is better that panda datset has the unique index to run pct_change (if you have no time field, you can set increment id):
df.set_index('month')
"the current implementation of this API uses Sparkโs Window without specifying partition specification. This leads to move all data into single partition in single machine and could cause serious performance degradation. Avoid this method against very large dataset."
โ07-08-2022 08:51 PM
data = spark.sql (" SELECT A_adjClose, AA_adjClose, AAL_adjClose, AAP_adjClose, AAPL_adjClose FROM deltabase.a_30min_delta, deltabase.aa_30min_delta, deltabase.aal_30min_delta, deltabase.aap_30min_delta ,deltabase.aapl_30min_delta ")
display(data)
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
data_pd = data.toPandas()
#df_pct = data_pd.pct_change(1)
#display(df_pct)
@Hubert Dudekโ I don't think the problem is the data type.
โ
In my original code, there was a date/time, but I am debugging now. And I realized removing the date/time column doesn't solve the problem.
Now, the data types of the data are just float.
Also, I removed pct_change. The problem still exists.
โ07-09-2022 05:14 AM
โ07-09-2022 10:04 AM
โ07-09-2022 10:07 AM
โSpark Config:
spark.databricks.delta.autoCompact.enabled true
spark.databricks.delta.optimizeWrite.enabled true
โ07-08-2022 09:09 PM
@Prasad Waghโ @Kaniz Fatmaโ Is it possible to submit a full detailed log report to Databricks?
โ07-18-2022 11:39 PM
I just discovered a solution.
Today, I opened Azure Databricks. When I imported python libraries. Databricks told me that toPandas() was deprecated and it suggested me to use toPandas.
The following solution works: Use toPandas instead of toPandas()
data = spark.sql (" SELECT A_adjClose, AA_adjClose, AAL_adjClose, AAP_adjClose, AAPL_adjClose FROM deltabase.a_30min_delta, deltabase.aa_30min_delta, deltabase.aal_30min_delta, deltabase.aap_30min_delta ,deltabase.aapl_30min_delta ")
display(data)
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
data_pd = data.toPandas
Yet, when I tried to calculate percentage change using pct_change(), it didn't work. pct_change() hasn't been put into pyspark.pandas
#This failed because pct_change() function has not been put into pyspark.pandas
df_pct = data_pd.pct_change(1)
Another solution is to use: pandas_api() to convert the spark dataframe to pandas-spark dataframe.
This allows me to use pct_change() after converting spark dataframe to pandas-spark dataframe
data_pd = data.pandas_api()
data_pd.pct_change()
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group