03-31-2022 07:39 AM
Hi, I'm doing some something simple on Databricks notebook:
spark.sparkContext.setCheckpointDir("/tmp/")
import pyspark.pandas as ps
sql=("""select
field1, field2
From table
Where date>='2021-01.01""")
df = ps.sql(sql)
df.spark.checkpoint()
That runs great, saves the rdd on /mp/ then I want to save the df with
df.to_csv('/FileStore/tables/test.csv', index=False)
or
df1.spark.coalesce(1).to_csv('/FileStore/tables/test.csv', index=False)
And it recalculates the query again (it first did it on the checkpoint and then again to save the file).
What i'm doing wrong? currently, to solve this I'm saving the first dataframe without checkpoint, opening again and saving with coalesce.
If I use the coalesce(1) directly it doesn't parallelize.
EDIT:
Tried
df.spark.cache()
But still reprocesses when I try to save to CSV, I'm looking to avoid reprocessing and avoid saving twice. Thanks!
the question is, why it recalculates df1 after the checkpoint?
Thanks!
05-03-2022 05:31 AM
If you need checkpointing, please try the below code. Thanks to persist, you will avoid reprocessing:
df = ps.sql(sql).persist()
df.spark.checkpoint()
03-31-2022 08:38 AM
Please use localCheckpoint(True) so it will be stored on executors and trigger immediately.
03-31-2022 08:44 AM
@Hubert Dudek , No luck with that, how do you use it on a ps dataframe?
Why do you think it doesn't work saving to DBFS?
Thanks!
04-04-2022 03:53 AM
04-04-2022 08:33 AM
Hi, i'm repartitioning to 1 because it's easier and faster later to move 1 file instead of 10k files.
What I'm looking for is the possibility to use this or similar:
df.spark.checkpoint()
and later use df.head() without recompute or to_csv without recompute, just the time it takes to merge al the calculated partitions.
Thought eager was default true, will check on that, but as what i'm looking it created the rdd file on disk but isn't using it, it recomputes the query.
Thanks!
04-21-2022 07:25 AM
Hi, back here, any idea of what approach I should take if I want to do something like:
df.head()
--
df.info
--
df.to_csv
and make the computation only once and not three times
Thanks!!!
05-03-2022 05:20 AM
Sorry for the bump, still don't find the proper way to do this.
Thanks!
05-03-2022 05:31 AM
If you need checkpointing, please try the below code. Thanks to persist, you will avoid reprocessing:
df = ps.sql(sql).persist()
df.spark.checkpoint()
4 weeks ago
checkpoint() returns a checkpointed DataFrame, so you need to assign it to a new variable:
checkpointedDF = df.checkpoint()
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group