cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

When delta is a streaming source, how can we get the consumer lag?

YFL
New Contributor III

Hi,

I want to keep track of the streaming lag from the source table, which is a delta table.

I see that in query progress logs, there is some information about the last version and the last file in the version for the end offset, but this don't give the lag from the source table, unless I query it and check what the last version and files count is.

"sources" : [ {
    "description" : "DeltaSource[dbfs:/mnt/defaultDatalake/zones/bronze/my_source_table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "15059b8a-0f48-4561-9424-8fcb0c8906de",
      "reservoirVersion" : 39673,
      "index" : -1,
      "isStartingVersion" : false
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "15059b8a-0f48-4561-9424-8fcb0c8906de",
      "reservoirVersion" : 39674,
      "index" : -1,
      "isStartingVersion" : false
    },

Just to be clear, by lag I mean, that if for example the source table has the last row 100 and the streaming is now processing row 90, my lag would be 10 from the source table.

One more technical point: how can I parse the startOffset and endOffset. From the `SourceProgress` class I have direct access to the endOffset field, but not to its inners fields (like index). Should I just parse the endOffset string as json using some standard json library like jackson or ujson?

Thank you very much.

1 ACCEPTED SOLUTION

Accepted Solutions

Hi @Yerachmiel Feltzman​ ,

You will need to take a look at the micro-batch metrics. This article will explain more what each metric means https://databricks.com/blog/2020/07/29/a-look-at-the-new-structured-streaming-ui-in-apache-spark-3-0...

View solution in original post

11 REPLIES 11

Kaniz
Community Manager
Community Manager

Hi @ YFL ! My name is Kaniz, and I'm the technical moderator here. Great to meet you, and thanks for your question! Let's see if your peers in the community have an answer to your question first. Or else I will get back to you soon. Thanks.

YFL
New Contributor III

Thanks, Kaniz. This is a highly important question for some production jobs we have (and we are highly invested in Databricks and Delta). I have seen others through the internet asking the same question, as well.

Thank you.

Yerachmiel Feltzman | Data Platform Developer

Hi @Yerachmiel Feltzman​ 

You can take a look at the following metrics https://docs.databricks.com/delta/delta-streaming.html#metrics in your stream query progress

YFL
New Contributor III

Hi, @Jose Gonzalez​ , I don't see in the link something that states the lag from the source delta table.

Thanks anyway.

Hi @Yerachmiel Feltzman​ 

Are you able to see these metrics?

{

"sources" : [

{

"description" : "DeltaSource[file:/path/to/source]",

"metrics" : {

"numBytesOutstanding" : "3456",

"numFilesOutstanding" : "8"

},

}

]

}

YFL
New Contributor III

I am.

Yerachmiel Feltzman | Data Platform Developer

Anonymous
Not applicable

@Yerachmiel Feltzman​ - Does the fact you can see the metrics resolve the issue?

YFL
New Contributor III

Hi, @Piper Wilson​ . Those metrics don't solve the issue. What I am asking for is to keep tracking the lag between the running streaming and the source delta table. Those metrics give me a lot of information, like input rates (as bytes and as files) and some other metrics, but lag from the source table I don't see there.

Again, by lag, I mean "the difference between the source table last record position/timestamp and the current row my streaming is processing". For example (simplistic), the source table has 100 rows and the streaming has processed 90, it is lagging 10 rows.

What I have in mind is something similar to Kafka's consumer lag, but another approach is welcome as well. The whole point here is: "How do I keep track and know where my streaming is regarding its source? How can I know if it is processing slow than expected, ie, slower than its source delta table?"

Thanks. 🙂

Hi @Yerachmiel Feltzman​ ,

You will need to take a look at the micro-batch metrics. This article will explain more what each metric means https://databricks.com/blog/2020/07/29/a-look-at-the-new-structured-streaming-ui-in-apache-spark-3-0...

Anonymous
Not applicable

Hey @Yerachmiel Feltzman​ 

I hope all is well.

Just wanted to check in if you were able to resolve your issue or do you need more help? We'd love to hear from you.

Thanks!

YFL
New Contributor III

Hey @Vartika Nain​ . The issue was resolved. Thanks.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.