- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-02-2022 04:27 AM
I wrote the following code:
data = spark.sql (" SELECT A_adjClose, AA_adjClose, AAL_adjClose, AAP_adjClose, AAPL_adjClose FROM deltabase.a_30min_delta, deltabase.aa_30min_delta, deltabase.aal_30min_delta, deltabase.aap_30min_delta ,deltabase.aapl_30min_delta ")
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
#This part keeps running command
data_pd = data.toPandas()
df_pct = data_pd.pct_change(1)
The code stucks in .toPandas() this part.
Accepted Solutions
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-18-2022 11:39 PM
I just discovered a solution.
Today, I opened Azure Databricks. When I imported python libraries. Databricks told me that toPandas() was deprecated and it suggested me to use toPandas.
The following solution works: Use toPandas instead of toPandas()
data = spark.sql (" SELECT A_adjClose, AA_adjClose, AAL_adjClose, AAP_adjClose, AAPL_adjClose FROM deltabase.a_30min_delta, deltabase.aa_30min_delta, deltabase.aal_30min_delta, deltabase.aap_30min_delta ,deltabase.aapl_30min_delta ")
display(data)
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
data_pd = data.toPandas
Yet, when I tried to calculate percentage change using pct_change(), it didn't work. pct_change() hasn't been put into pyspark.pandas
#This failed because pct_change() function has not been put into pyspark.pandas
df_pct = data_pd.pct_change(1)
Another solution is to use: pandas_api() to convert the spark dataframe to pandas-spark dataframe.
This allows me to use pct_change() after converting spark dataframe to pandas-spark dataframe
data_pd = data.pandas_api()
data_pd.pct_change()
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-02-2022 04:28 AM
Btw, this is Azure Databricks
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-03-2022 11:19 AM
Try to replace .to_pandas() with .to_pandas_on_spark(). This way, you will ensure that the dataframe is processed distributed among the workers.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-04-2022 01:59 PM
I tried to replace .to_pandas() with .to_pandas_on_spark(), but there were 1 warning message and 1 error message:
/databricks/spark/python/pyspark/sql/dataframe.py:3407: FutureWarning: DataFrame.to_pandas_on_spark is deprecated. Use DataFrame.pandas_api instead.
warnings.warn(
SparkException: Job aborted due to stage failure: Task 0 in stage 21.0 failed 4 times, most recent failure: Lost task 0.3 in stage 21.0 (TID 24) (10.139.64.4 executor 3): ExecutorLostFailure (executor 3 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 176959 ms
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-07-2022 06:39 AM
Hi @Cheuk Hin Christophe Poon can you please try increasing executors if it helps
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-08-2022 02:40 AM
@Prasad Wagh @Kaniz Fatma I use Standard_DS3_V2 (14GB Memory 4 Cores) in Azure Databricks. Originally, I set Min Worker: 1 Max Workers: 8. Now, Min Worker: 1 Max Worker: 8.
But this still doesn't work. 1 worker should be able to finish the task. The data size of the task is small.
I guess the size of the VM cluster is not the cause.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-08-2022 09:49 AM
As Spark is using lazy evaluation, I bet that is not to_pands which case the issues but pct_change as stated in the quote from the documentation below. Also is better that panda datset has the unique index to run pct_change (if you have no time field, you can set increment id):
df.set_index('month')
"the current implementation of this API uses Spark’s Window without specifying partition specification. This leads to move all data into single partition in single machine and could cause serious performance degradation. Avoid this method against very large dataset."
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-08-2022 08:51 PM
data = spark.sql (" SELECT A_adjClose, AA_adjClose, AAL_adjClose, AAP_adjClose, AAPL_adjClose FROM deltabase.a_30min_delta, deltabase.aa_30min_delta, deltabase.aal_30min_delta, deltabase.aap_30min_delta ,deltabase.aapl_30min_delta ")
display(data)
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
data_pd = data.toPandas()
#df_pct = data_pd.pct_change(1)
#display(df_pct)
@Hubert Dudek I don't think the problem is the data type.
In my original code, there was a date/time, but I am debugging now. And I realized removing the date/time column doesn't solve the problem.
Now, the data types of the data are just float.
Also, I removed pct_change. The problem still exists.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-09-2022 05:14 AM
- How many rows does the dataset have?
- Can you share excel from the display function with the sample?
- What are the cluster-specific (worker type and runtime version)? Is it standard, high-concurrent, or single-machine?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-09-2022 10:04 AM
- Each Delta table has about more than 100,000 rows, but each Delta table only has about 3.18 MB.
- I upload the photo and there is a sample, but not using the python display() function.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-09-2022 10:07 AM
- Cluster Model: Standard. Runtime version: 11.0(Includes Apache Spark 3.3.0, Scala 2.12). Worker Type: Standard_DS3_V2.
Spark Config:
spark.databricks.delta.autoCompact.enabled true
spark.databricks.delta.optimizeWrite.enabled true
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-08-2022 09:09 PM
@Prasad Wagh @Kaniz Fatma Is it possible to submit a full detailed log report to Databricks?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-18-2022 11:39 PM
I just discovered a solution.
Today, I opened Azure Databricks. When I imported python libraries. Databricks told me that toPandas() was deprecated and it suggested me to use toPandas.
The following solution works: Use toPandas instead of toPandas()
data = spark.sql (" SELECT A_adjClose, AA_adjClose, AAL_adjClose, AAP_adjClose, AAPL_adjClose FROM deltabase.a_30min_delta, deltabase.aa_30min_delta, deltabase.aal_30min_delta, deltabase.aap_30min_delta ,deltabase.aapl_30min_delta ")
display(data)
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
data_pd = data.toPandas
Yet, when I tried to calculate percentage change using pct_change(), it didn't work. pct_change() hasn't been put into pyspark.pandas
#This failed because pct_change() function has not been put into pyspark.pandas
df_pct = data_pd.pct_change(1)
Another solution is to use: pandas_api() to convert the spark dataframe to pandas-spark dataframe.
This allows me to use pct_change() after converting spark dataframe to pandas-spark dataframe
data_pd = data.pandas_api()
data_pd.pct_change()

