cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Pandas.spark.checkpoint() doesn't broke lineage

alejandrofm
Valued Contributor

Hi, I'm doing some something simple on Databricks notebook:

spark.sparkContext.setCheckpointDir("/tmp/")
 
import pyspark.pandas as ps
 
sql=("""select 
field1, field2
From table
Where date>='2021-01.01""")
 
df = ps.sql(sql)
df.spark.checkpoint()

That runs great, saves the rdd on /mp/ then I want to save the df with

df.to_csv('/FileStore/tables/test.csv', index=False)

or

df1.spark.coalesce(1).to_csv('/FileStore/tables/test.csv', index=False)

And it recalculates the query again (it first did it on the checkpoint and then again to save the file).

What i'm doing wrong? currently, to solve this I'm saving the first dataframe without checkpoint, opening again and saving with coalesce.

If I use the coalesce(1) directly it doesn't parallelize.

EDIT:

Tried

df.spark.cache()

But still reprocesses when I try to save to CSV, I'm looking to avoid reprocessing and avoid saving twice. Thanks!

the question is, why it recalculates df1 after the checkpoint?

Thanks!

1 ACCEPTED SOLUTION

Accepted Solutions

Hubert-Dudek
Esteemed Contributor III

If you need checkpointing, please try the below code. Thanks to persist, you will avoid reprocessing:

df = ps.sql(sql).persist()
df.spark.checkpoint()

View solution in original post

7 REPLIES 7

Hubert-Dudek
Esteemed Contributor III

Please use localCheckpoint(True) so it will be stored on executors and trigger immediately.

alejandrofm
Valued Contributor

@Hubert Dudek​ , No luck with that, how do you use it on a ps dataframe?

Why do you think it doesn't work saving to DBFS?

Thanks!

Hubert-Dudek
Esteemed Contributor III

  • path should be directory in to_csv, not file as one file = 1 partition
  • try checkpoint(eager=True)
  • use df.spark.explain() before and after checkpointing
  • checkpointing saving files to disk requires disk and computation and removes RDD from memory. So then, when you read it from disk, it requires recomputation. I think it doesn't make sense. I used checkpoint only once with some UDF function that made REST API calls and needed to have that executed in that place of code. Ddatabricks/spark using lazy evaluation and many optimizations of your code. Sometimes you need to do a checkpoint, so it will not do it optimized way.

Hi, i'm repartitioning to 1 because it's easier and faster later to move 1 file instead of 10k files.

What I'm looking for is the possibility to use this or similar:

df.spark.checkpoint()

and later use df.head() without recompute or to_csv without recompute, just the time it takes to merge al the calculated partitions.

Thought eager was default true, will check on that, but as what i'm looking it created the rdd file on disk but isn't using it, it recomputes the query.

Thanks!

alejandrofm
Valued Contributor

Hi, back here, any idea of what approach I should take if I want to do something like:

df.head()

--

df.info

--

df.to_csv

and make the computation only once and not three times

Thanks!!!

alejandrofm
Valued Contributor

Sorry for the bump, still don't find the proper way to do this.

Thanks!

Hubert-Dudek
Esteemed Contributor III

If you need checkpointing, please try the below code. Thanks to persist, you will avoid reprocessing:

df = ps.sql(sql).persist()
df.spark.checkpoint()

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.