cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Obtain the source table version number from checkpoint file when using Structured Streaming

Agus1
New Contributor III

Hello!

I'm using Structured Streaming to write to a delta table. The source is another delta table written with Structured Streaming as well. In order to datacheck the results I'm attempting to obtain from the checkpoint files of the target table the version number of the source table used to process each run. 

When inspecting the checkpoint files I recognize two possible patterns:

 

{"sourceVersion":1,"reservoirId":"4121e6a2-ab1a-4f6c-8217-6412909486c0","reservoirVersion":3716,"index":5285,"isStartingVersion":true}
{"sourceVersion":1,"reservoirId":"4121e6a2-ab1a-4f6c-8217-6412909486c0","reservoirVersion":3719,"index":-1,"isStartingVersion":false}

 

From the cases I've seen so far, it seems like the `reservoirVersion` value refers to the version of the source table. But this value should be adjusted by 1 when `index` = -1, and kept as is when `index` is a positive number.

In these examples:

  • The first one read version `3716` of the source table
  • The second one read version `3718` of the source table (adjusted from `reservoirVersion` because `index` = -1)

Also it seems like `index` is always -1 except for the first checkpoint file of a stream (which contains the value `isStartingVersion` = true as well).

I was able to verify these assumptions for every file I've checked, particularly noticing that for cases where `index` was -1 the value of `reservoirVersion` was always 1 unit above the last available version of the source table.

I couldn't find any documentation backing up this logic.
Could you help me confirm if this reasoning is correct and it will continue to be work like this for all future runs?
If not, could another pattern appear in these files?
Is there any documentation explaining the meaning of each of these fields?

Thank you for your help!

2 REPLIES 2

Agus1
New Contributor III

Hello @Retired_mod, thank you for your answer.

I'm a bit confused here because you seem to be describing the opposite behavior of what I've seen in our checkpoint files.

Here I repost my examples to try to understand better.

First checkpoint file:

{"sourceVersion":1,"reservoirId":"4121e6a2-ab1a-4f6c-8217-6412909486c0","reservoirVersion":3716,"index":5285,"isStartingVersion":true}

All following checkpoint files:

{"sourceVersion":1,"reservoirId":"4121e6a2-ab1a-4f6c-8217-6412909486c0","reservoirVersion":3719,"index":-1,"isStartingVersion":false}

 

When you mention "when index is -1, it signifies the first checkpoint file of a stream. In this scenario, the reservoirVersion should be adjusted by adding 1 to it.":

  • as I describe in my original posting, when I see index = -1 I need to subtract 1 from the reservoirVersion not add 1, as the version present in the file doesn't even exist yet (it's 1 version above the last available version of the table). In the examples I show this is the second one, where reservoirVersion is 3719, where I need to subtract 1 as this version doesn't exist.
  • You say "it signifies the first checkpoint file of a stream", but from all the files I've seen in my implementations, never the index is -1 for the first file. Actually, in the first file is the only time I don't see index = -1.

From your comment regarding isStartingVersion: "This field is present only in the first checkpoint file of a stream (where index = -1)":

  • As you can see in my 1st example marked as "First checkpoint file", isStartingVersion is true and index is not -1. Also reservoirVersion points to the correct version of the table (no need to subtract 1). As I mentioned before, this is the behavior I've seen for all first checkpoint files. (First shared example)
  • And for all following files, isStartingVersion is always false, index is always -1, and the reservoirVersion always needs to be adjusted by subtracting 1. (Second shared example)

Can you help me understand if this behavior is normal or why I might be experiencing something different to what you mention?

Thank you very much for your help.

erssiws
New Contributor II

@Retired_mod : it looks like you were helping but when I read through your answer, it seems that you just repeated the information shared by Argus1. It would be much better if you just acknowledge that you have no idea about this question. 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group