cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Changing shuffle.partitions with spark.conf in a spark stream - isn't respected even after a checkpoint

alonisser
Contributor

Question about spark checkpoints and offsets in a running stream

When the stream started I needed tons of partitions, so we've set it with spark.conf to 5000

As expected offsets in the checkpoint contain this info and the job used this value.

Then we've stopped the job, and changed the number of partitions to 400, with spark.conf again

I've expected the next batch to still use previous value (because it's in the offset) - but when the new offset is calculated ( when current batch ends) to use the new value .Instead I see the 5000 value still in newly created offsets. 

While some tasks in the job now use the new 400 number, other tasks use the 5000 number! which is basically killing us now.

 I'm quite sure that with a previous version of spark (this job is spark 3.2 in databricks runtime 10.2) this worked as expected, but with this job not any more.. Any idea what We're doing wrong? I'd be glad for help with this or with any clue how I can move the job back to 400 partitions

1 ACCEPTED SOLUTION

Accepted Solutions

jose_gonzalez
Moderator
Moderator

Hi @Alon Nisserโ€‹ ,

if you change this shuffle partitions configuration, the conf is persisted in the checkpoint, therefore the stream itself will continue to use the old value for stateful aggregations. If you want to use a new value, then you will need to use a new checkpoint.

View solution in original post

6 REPLIES 6

Anonymous
Not applicable

Hello, @Alon Nisserโ€‹ - My name is Piper, and I'm a moderator for Databricks. Thank you for coming to us with this question. We will give the members a chance to respond before we come back to this if we need to. ๐Ÿ™‚

Thanks in advance for your patience.

jose_gonzalez
Moderator
Moderator

Hi @Alon Nisserโ€‹ ,

if you change this shuffle partitions configuration, the conf is persisted in the checkpoint, therefore the stream itself will continue to use the old value for stateful aggregations. If you want to use a new value, then you will need to use a new checkpoint.

alonisser
Contributor

This is a strange behavior, when a new checkpoint is being calculated (on the end of a batch) why wouldn't the stream use the new spark.conf shuffle.partitions - it's for a new microbatch?

Just removing the checkpoints, for a stream that's running for a long time and where a full backfill doesn't make sense, is a poor solution.

I've found out I can edit the checkpoint and change the number, and it works, but it's an ugly workaround hack

Hi @Alon Nisserโ€‹ ,

I undertand your point. Modifying checkpoint folder/files could produced other issues, so it recommended to use a new checkpoint instead.

Thor
New Contributor III

I agree at 100% with you. How can you find a proper value when the workload is completely between the first run and the following regular runs?

Leszek
Contributor

@Jose Gonzalezโ€‹ thanks for that information! This is super useful. I was struggling why my streaming still using 200 partitions. This is quite a paint for me because changing checkpoint will insert all data from the source. Do you know where this can be reported so it can be fixed sometime in the future?

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.