โ02-22-2022 06:55 AM
Question about spark checkpoints and offsets in a running stream
When the stream started I needed tons of partitions, so we've set it with spark.conf to 5000
As expected offsets in the checkpoint contain this info and the job used this value.
Then we've stopped the job, and changed the number of partitions to 400, with spark.conf again
I've expected the next batch to still use previous value (because it's in the offset) - but when the new offset is calculated ( when current batch ends) to use the new value .Instead I see the 5000 value still in newly created offsets.
While some tasks in the job now use the new 400 number, other tasks use the 5000 number! which is basically killing us now.
I'm quite sure that with a previous version of spark (this job is spark 3.2 in databricks runtime 10.2) this worked as expected, but with this job not any more.. Any idea what We're doing wrong? I'd be glad for help with this or with any clue how I can move the job back to 400 partitions
โ03-01-2022 05:32 PM
Hi @Alon Nisserโ ,
if you change this shuffle partitions configuration, the conf is persisted in the checkpoint, therefore the stream itself will continue to use the old value for stateful aggregations. If you want to use a new value, then you will need to use a new checkpoint.
โ02-22-2022 10:19 AM
Hello, @Alon Nisserโ - My name is Piper, and I'm a moderator for Databricks. Thank you for coming to us with this question. We will give the members a chance to respond before we come back to this if we need to. ๐
Thanks in advance for your patience.
โ03-01-2022 05:32 PM
Hi @Alon Nisserโ ,
if you change this shuffle partitions configuration, the conf is persisted in the checkpoint, therefore the stream itself will continue to use the old value for stateful aggregations. If you want to use a new value, then you will need to use a new checkpoint.
a month ago
Hello @jose_gonzalez @alonisser we are facing similar issue in our pipelines wherein it uses a wide transformations using groupBy which is using default 200 partitions. We want to change it to 20 or 40 partitions and did that change in asset bundle and deployed update to the pipeline however it is not taking effect.
Can you clarify how to change checkpoint location so this change can take effect?
โ03-01-2022 11:12 PM
This is a strange behavior, when a new checkpoint is being calculated (on the end of a batch) why wouldn't the stream use the new spark.conf shuffle.partitions - it's for a new microbatch?
Just removing the checkpoints, for a stream that's running for a long time and where a full backfill doesn't make sense, is a poor solution.
I've found out I can edit the checkpoint and change the number, and it works, but it's an ugly workaround hack
โ03-07-2022 02:14 PM
Hi @Alon Nisserโ ,
I undertand your point. Modifying checkpoint folder/files could produced other issues, so it recommended to use a new checkpoint instead.
โ09-08-2023 12:52 AM
I agree at 100% with you. How can you find a proper value when the workload is completely between the first run and the following regular runs?
โ09-08-2022 04:33 AM
@Jose Gonzalezโ thanks for that information! This is super useful. I was struggling why my streaming still using 200 partitions. This is quite a paint for me because changing checkpoint will insert all data from the source. Do you know where this can be reported so it can be fixed sometime in the future?
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group