cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

checkpoint changes not working on my databricks job

vijsharm
New Contributor II

Hi,

I do have a job processing kafka stream using kafka.readstream process and due to some issue we changed the checkpoint path to other path and it pulled all the records and later when i changed to the original checkpoint location it is not pulling the new records coming through kafka. Any clue what can be done to make it running again the way it was. Here is the code snippet.

curr_env = getenv()

config_path = "../configs/CDLZ_MEMBERS_Custom_Tables.yaml"
process_name = "CDLZ_MEMBERS_CUSTOM_Tables"
config = load_yaml_config(config_path)

source_kafka_topic = config[process_name]["KAFKA_CONFIG"]["input_kafka_topic"]
raw_table_name = config[process_name]["DELTA_TABLE_CONFIG"]["raw_delta_table"]
tables_to_read = config[process_name]["KAFKA_CONFIG"]["tables_to_read"]

raw_table_name = raw_table_name.replace("{env}", curr_env)

checkpoint = get_checkpoint_path(source_kafka_topic + "/" +  process_name)
 
 
kafka_stream = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers)
    .option("kafka.security.protocol", "SASL_SSL")
    .option(
        "kafka.sasl.jaas.config",
        "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(
            username, password
        ),
    )
    .option("kafka.ssl.endpoint.identification.algorithm", "https")
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("failOnDataLoss", "false")
    .option("subscribe", source_kafka_topic)
    .option("startingOffsets", "earliest")  # read from specific timestamp (in ms)
    .load()
)
 
 
query = (
    df_stream.writeStream.format("delta")
    .outputMode("append")
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint)
    .option("mergeSchema", "true")
    .option("schemaCheck", "false")
    .toTable(raw_table_name)
)
query.awaitTermination()

 

5 REPLIES 5

nikhil_2212
New Contributor II

As I understand, you want to ingest "only new records coming through Kafka via old checkpoint path".

I would suggest that you can change .option("startingOffsets", "earliest") to .option("startingOffsets", "latest"). It would start reading from the newest offset and also avoid reprocessing of your old data. However, you would need to delete the old checkpoint path and start your stream after making the change.

P.S. - Don't try the above directly for production load.

So you are saying that first set the .option("startingOffsets", "latest") to read all the latest records and then delete the old checkpoint path and create the new one for it and do you recommend to use the checkpoint like below so it dont happen in the future.

 

# Generate a unique suffix using timestamp and UUID
unique_suffix = datetime.now().strftime("%Y%m%d_%H%M%S") + "_" + str(uuid.uuid4())

# Combine to form a unique checkpoint path
checkpoint_path = f"{base_checkpoint_dir}/checkpoint_{unique_suffix}"

 

 

cgrant
Databricks Employee
Databricks Employee

When you swapped back to the old checkpoint, were any records flowing through, and were batches completing? It's possible that you've accumulated a big backlog with the old checkpoint, and/or records in Kafka have expired. And the "startingOffsets" option should only affect new checkpoints/streams.

vijsharm
New Contributor II

yes i have seen few records flown but there are messages after that too is not coming in there.

SteveOstrowski
Databricks Employee
Databricks Employee

Hi @vijsharm,

This is a common scenario when working with Structured Streaming checkpoints and Kafka. Here is what is happening and how to resolve it.

WHAT HAPPENED

When you switched to a new checkpoint path, the streaming query had no prior offset information, so startingOffsets = "earliest" kicked in and reprocessed everything from the beginning of the Kafka topic. That part is expected.

When you switched back to the original checkpoint path, the query resumed from the last committed offset stored in that original checkpoint. The key detail: startingOffsets is only used when a query starts for the very first time with no existing checkpoint. On all subsequent runs, the query always resumes from the offsets recorded in the checkpoint, ignoring startingOffsets entirely.

So the original checkpoint still has the old offsets from before you switched away. If new records arrived in Kafka while you were using the alternate checkpoint, the original checkpoint does not know about them unless those offsets are still within range.

WHY NEW RECORDS ARE NOT APPEARING

There are a few likely causes:

1. Kafka retention expired the messages. If your Kafka topic has a retention period (e.g., 7 days) and the original checkpoint was stale for longer than that, the offsets it recorded may now point to data that Kafka has already purged. Since you have failOnDataLoss = "false", Spark silently skips over the gap rather than throwing an error, and it may appear that no new data is coming in.

2. The checkpoint committed offsets that already cover the new data. If the alternate checkpoint processed data past the point the original checkpoint left off, and then you switched back, the original checkpoint's offsets are behind. The query should pick up from those older offsets, but if Kafka has deleted those segments, the stream jumps ahead and may land at the current end with nothing new to process.

3. Offset metadata mismatch. If the Kafka topic was reconfigured (partitions added, topic recreated, etc.) while the checkpoint was in use elsewhere, the stored partition/offset mapping may no longer align with the current topic state.

HOW TO FIX THIS

Option A: Start fresh with a new checkpoint (recommended)

Use a brand-new checkpoint location and set startingOffsets to "latest" so you only pick up new records going forward. This avoids reprocessing and gives you a clean slate:

checkpoint_new = get_checkpoint_path(source_kafka_topic + "/" + process_name + "_v2")

query = (
    df_stream.writeStream.format("delta")
    .outputMode("append")
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_new)
    .option("startingOffsets", "latest")
    .option("mergeSchema", "true")
    .option("schemaCheck", "false")
    .toTable(raw_table_name)
)
query.awaitTermination()

If you need to backfill the gap between when the original checkpoint stopped and now, you can run a separate batch read from Kafka using startingOffsets and endingOffsets with specific timestamps or offset values to fill in the missing window.

Option B: Start fresh with "earliest" if you can tolerate reprocessing

If your downstream pipeline is idempotent and can handle duplicate records, you can use a new checkpoint with startingOffsets = "earliest". This reprocesses everything but guarantees nothing is missed. To avoid duplicates in your Delta table, consider adding a MERGE or deduplication step downstream.

Option C: Inspect and adjust the checkpoint offsets (advanced)

You can examine the checkpoint's offsets directory to see what offsets were last committed:

%fs ls <your_checkpoint_path>/offsets/

Look at the latest offset file to see the recorded partition offsets. Compare these with the current Kafka topic offsets (earliest and latest) to understand the gap. If the committed offsets are still within Kafka's retention window, the query should resume correctly on the next trigger.

BEST PRACTICES FOR CHECKPOINT MANAGEMENT

- Never share a checkpoint location between different streaming queries.
- Never reuse a checkpoint after significant time gaps if Kafka retention might have expired the data.
- When you need to reset a stream, always create a new checkpoint path rather than deleting or reusing an old one.
- Use failOnDataLoss = "true" (the default) during development so you get clear errors when offsets fall out of Kafka's retention window, rather than silently skipping data.
- Consider versioning your checkpoint paths (e.g., appending _v1, _v2) so you can always trace which checkpoint belongs to which configuration.

DOCUMENTATION REFERENCES

Structured Streaming checkpoints:
https://docs.databricks.com/aws/en/structured-streaming/checkpoints.html

Kafka Structured Streaming configuration (startingOffsets behavior):
https://docs.databricks.com/aws/en/structured-streaming/kafka.html

* This reply used an agent system I built to research and draft this response based on the wide set of documentation I have available and previous memory. I personally review the draft for any obvious issues and for monitoring system reliability and update it when I detect any drift, but there is still a small chance that something is inaccurate, especially if you are experimenting with brand new features.

If this answer resolves your question, could you mark it as "Accept as Solution"? That helps other users quickly find the correct fix.