cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

DLT pipeline failure - Detected a data update... This is currently not supported

bakselrud
New Contributor III

We are using DLT pipeline in Databricks workspace hosted by Microsoft Azure platform which is failing intermittently and for unclear reason.

The pipeline is as follows:

spark.readStream.format("delta").option("mergeSchema", "true").option("ignoreChanges", "true").load(topic_name)

dlt.create_streaming_live_table(...)

dlt.apply_changes(

target=target_table_name,

source=f"sequence({topic_name})",

keys=["key"],

sequence_by=col("sequence"),

stored_as_scd_type="1"

)

Up to this point, the pipeline works or does not work, depending on the day of week (phase of the Moon) while the input data remains the same! It may work for a couple of days, then raise an error:"

org.apache.spark.sql.streaming.StreamingQueryException: Query MAIN_FLOW_MOVES [id = e8a4577a-5d1a-4bfb-9801-8f47c8534f05, runId = b9364adb-a8d1-4ccf-af30-f00ad5b66520] terminated with exception: Detected a data update (for example part-00004-7397764a-48a1-432c-a104-03192f199def-c000.snappy.parquet) in the source table at version 3. This is currently not supported. If you'd like to ignore updates, set the option 'ignoreChanges' to 'true'. If you would like the data update to be reflected, please restart this query with a fresh checkpoint directory.

The requirement to accomplish SCD type 1 is satisfied: each unique key has unique set of sequence numbers and there is no possibility that a conflict or inconsistency arises in that regard. We have checked it numerous times.

As I stated, the same pipeline without any changes works or does not work and the reason for erratic behavior is not clear. As far as I am concerned, all conditions to correctly and consistently accomplish SCD type 1 ingestion have been satisfied.

Please help us understand the cause of this unstable behavior. This impacts our ability to go live with this application.

1 ACCEPTED SOLUTION

Accepted Solutions

bakselrud
New Contributor III

ErrorExampleI'm trying to accomplish a straightforward scd-type1 on a table. As such, the table has key and sequence number. There are a lot of other data elements, but I think these are just along for a ride - as this is the objective of scd type 1 - to capture the latest update.

As I said I did verify that each key/sequence number combination is good, that is there are multiple sequence numbers per key and they are not repeating, the sequence numbers are all sequential. The objective is to pick up the latest update as per highest sequence number.

Now, when I consider this operation, it appears consistent and reliable logically, but the issue is that it seems to fail sometimes on DLT pipeline.

It is possible that I'm missing or not understanding something.

It is possible that this is not the source of the error, but the issue is that this is what I'm trying to do and it fails.

But can you verify that my logic that I outlined above is correct.

If it is, then it is all that is necessary for me to understand to implement SCD type 1 in DLT.

Please let me know what I'm missing something

I'll be happy to walk you through the actual pipeline but it would require an online meeting.

The pipeline is simple consisting of several steps (above).

As per my original post error message, if you could clarify what it means, when it happens, what it complains about, I would greatly appreciate it. At the moment I can't imagine what the problem might be. Without understanding the problem, it is hard to find a solution.

View solution in original post

12 REPLIES 12

Dooley
Valued Contributor

Can you tell me about your target table a bit more? I know you put .... but:

create_streaming_live_table(
  name = "<table-name>",
  comment = "<comment>"
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  path="<storage-location-path>",
  schema="schema-definition"
)

Any info you can give me on that target table piece?

bakselrud
New Contributor III

ErrorExampleI'm trying to accomplish a straightforward scd-type1 on a table. As such, the table has key and sequence number. There are a lot of other data elements, but I think these are just along for a ride - as this is the objective of scd type 1 - to capture the latest update.

As I said I did verify that each key/sequence number combination is good, that is there are multiple sequence numbers per key and they are not repeating, the sequence numbers are all sequential. The objective is to pick up the latest update as per highest sequence number.

Now, when I consider this operation, it appears consistent and reliable logically, but the issue is that it seems to fail sometimes on DLT pipeline.

It is possible that I'm missing or not understanding something.

It is possible that this is not the source of the error, but the issue is that this is what I'm trying to do and it fails.

But can you verify that my logic that I outlined above is correct.

If it is, then it is all that is necessary for me to understand to implement SCD type 1 in DLT.

Please let me know what I'm missing something

I'll be happy to walk you through the actual pipeline but it would require an online meeting.

The pipeline is simple consisting of several steps (above).

As per my original post error message, if you could clarify what it means, when it happens, what it complains about, I would greatly appreciate it. At the moment I can't imagine what the problem might be. Without understanding the problem, it is hard to find a solution.

Dooley
Valued Contributor

Thanks for the DAG. Can you share the code that produces that DAG? That will help a lot in troubleshooting this issue.

bakselrud
New Contributor III

Block 1 (AECTM):

@dlt.view(

comment=comment if comment is not None else f"define_key({key_expr})",

name=target_table_name

)

def key_func():

df = None

if '://' in topic_name:

if streaming:

df = spark.readStream.format("delta").option("mergeSchema", "true").option("ignoreChanges", "true").load(topic_name)

else:

df = spark.read.format("delta").option("mergeSchema", "true").option("ignoreChanges", "true").load(topic_name)

else:

if streaming:

df = dlt.readStream(topic_name)

else:

df = dlt.read(topic_name)

return df.withColumn("key", eval(key_expr))

Block 2&3 (sequence & scd_type1):

target_table_name = alias if alias is not None else f"scd_type1({topic_name},{sequence_col})"

dlt.create_streaming_live_table(

comment=comment if comment is not None else f"scd_type1({topic_name},{sequence_col})",

name=target_table_name

)

@dlt.table(

comment=comment if comment is not None else f"sequence({topic_name})",

name=f"sequence({topic_name})"

)

@dlt.expect_or_drop("null key", col("key").isNotNull())

def seq_view():

if streaming:

return dlt.readStream(topic_name).withColumn("sequence", eval(sequence_col))

else:

return dlt.read(topic_name).withColumn("sequence", eval(sequence_col))

dlt.apply_changes(

target=target_table_name,

source=f"sequence({topic_name})",

keys=["key"],

sequence_by=col("sequence"),

stored_as_scd_type="1"

)

Block 4 (filter view):

target_table_name = alias if alias is not None else f"filter({filter_cond})"

@dlt.view(

comment=comment if comment is not None else f"filter({filter_cond})",

name=target_table_name

)

def flt():

df = dlt.readStream(topic_name) if streaming else dlt.read(topic_name)

return df.filter(eval(filter_cond))

Block 5 (MAIN_FLOW_MOVES):

target_table_name = alias if alias is not None else f"main_flow_moves({topic_name})"

def getn(position_info, level):

if position_info is not None:

if position_info.level == level:

return position_info.name

else:

return getn(position_info.parent, level)

else:

return None

udfGetName = udf(getn)

@dlt.table(

comment=comment if comment is not None else f"main_flow_moves({topic_name})",

name=target_table_name

)

def func():

df = dlt.readStream(topic_name) if streaming else dlt.read(topic_name)

dfSeries = df.select("containerTerminalVisitKey", col('moveEvents')[0]['from']['location']['locationType'].alias("fromWhere"), explode('moveEvents.to').alias("toLoc")). \

select("containerTerminalVisitKey", (col("toLoc.time") / 1000.0).cast(TimestampType()).alias("time"), "fromWhere", "toLoc.location.positionInfo"). \

withColumn("blockName", udfGetName("positionInfo", lit("BLOCK"))). \

withColumn("bayName", udfGetName("positionInfo", lit("BAY"))). \

withColumn("slotName", udfGetName("positionInfo", lit("STACK"))). \

filter(col("blockName").isNotNull() & col("bayName").isNotNull() & col("slotName").isNotNull()). \

drop("positionInfo")

return dfSeries

==================================================

The input dataframe schema is actually quite intimidating with recursive struct definitions, but most of it is just taken along for the ride. The key field is containerTerminalVisitKey and the sequence field is containerInfo.updateCounter. The latter is just a sequence number (long).

I think there is a lot going on. I'm not sure how much the code that I shared helps. I was thinking that understanding the situation in which the error message (in the original post) is emitted might help.

bakselrud
New Contributor III

I'd like to provide an update on this issue.

We have another DLT pipeline, which is a non-streaming (batch) scd type 1. This pipeline also failed with the same error message as indicated in my previous post. The data in this pipeline is completely different (source and structure) not a complex recursive struct, but just a flat record with some 50 static fields. So there is no chance that the schema has changed there. There is also no chance that key/sequence contains duplicates, as we have checked that carefully several times.

This pipeline worked correctly for the past 3 months. It failed yesterday. And failure in this pipeline also happened before, after which the pipeline worked again with us introducing no changes, but just re-starting it repeatedly.

This really looks erratic to me and I can't fathom any data related or logic related reasons that would be responsible for this which are under our control.

In order to gain understanding of the issue, I would request it again if you could provide some context in which the error message (in the original post) is generated. The content of the error message does not make sense to me and does not allow me to reason about possible causes.

Dooley
Valued Contributor

if you could also send the simple code that has the same error so we can go through it + the DAG, it would be appreciated.

bakselrud
New Contributor III

I was hoping that you can shed some light on under what conditions the error on my original post is generated.

Right now, I have to start pondering how to reproduce it using a simple pipeline and rigged data.

Your help would have helped me in that investigation

Dooley
Valued Contributor

So from my research, you have a domino affect of an error. Once you apply changes to a table you cannot stream after it - however you have a view and then a table so you see the error in the table and not the view. Now if you have inserts you will not see those errors (thus it works sometimes) but if you have updates you will see those errors.

Option 1: You can do the MAIN_FLOW_MOVES function before the apply changes step and try to structure the pipeline to do all the filtering and processing before the apply changes command.

Option 2: So to do ignoreChanges - it will propagate but you will have to deal with duplication - here is the documentation on that. However, to do ignoreChanges you have to do a spark.readStream which will break up your pipeline because that is not a DLT stream function. So that's why you could potentially do this in two pipelines and stitch them together with Databricks workflows.

Now since you use this feature a lot I would suggest you click on the "feedback" button in your Databricks and ask for this feature to be a high priority and finished quickly so you can stream from a table that has apply changes done to it in one single pipeline. I will do the same internally to try to get the awareness.

bakselrud
New Contributor III

So if I'm understanding correctly, I can have a pipeline with a single step in it - SCD type 1 and it should work?

At the moment, it seems that if I clear the pipeline files in the abfs directory and start pipeline from scratch, it is able to carry out all of the steps if I run it as batch then stop. However the subsequent run fails. If I clear pipeline directory and run it again, it works.

So I'm suspecting that there is some trickery going on with keeping state / checkpointing that gets in the way.

Our pipelines are elaborate and use many steps. If I have to break out these pipelines into multiple orchestrated pieces, it will become very cumbersome.

But it would be nice to zero in on exactly what the issue is because we might be waiting for a fix that's not relevant. We will follow your advice anyway, just for evaluation purposes, but ultimately the pipeline would have to work regardless of what chain of operations is desired

Dooley
Valued Contributor

Did the suggested change help?

bakselrud
New Contributor III

I have tried several runs of DLT pipeline with the final step being SCD Type 1 and all of the runs succeeded. I'm yet to try starting new pipelines that read from this one, but I hope that works as well

I now have memory that before I added an additional step to the pipeline after the SCD Type 1 step, it did work, but as soon as I added subsequent steps it started failing. At the time I did not put 2 and 2 together, as it seemed too far-fetched to me.

At the moment it looks like we have a workaround, although it is a very unusual one to conceive of

bakselrud
New Contributor III

Ok, so after doing some investigation on the way to resolving my original question, I think we're getting some clarity after all.

Consider the following data frame that is ingested by DLT streaming pipeline:

dfMock = spark.sparkContext.parallelize([[1,0,2],[1,1,3]]). \

toDF(StructType([StructField("key", LongType(), False), StructField("seq", LongType(), False), StructField("data", LongType(), False)]))

Ingesting this for the very first time, using DLT type-1 updates, results in a successful run. Ingesting this second time fails.

Why?

Because there is no checkpointing associated with the input data. The moment a streaming pipeline is stopped and run again, it appears to re-process the same data and naturally fails, because it thinks that it sees an updates to the same keys.

What was that the designers of DLT pipeline were thinking regarding this originally? We're left to wonder.

Now, the answer at this point from Databricks should be this: "But of course, this is because you misunderstood how data processing using the streaming data sources work. In streaming data sources you have checkpoints that take care of the processed vs unprocessed records. Do your homework"

We did the homework and evaluated streaming read / streaming DLT pipeline. However what we found is that even in that case, the DLT pipeline fails as it does not recognize that during its stopping and re-starting it reads par of the input stream that it already seen in the previous run. This is the root of the problem.

Now, since I break out some of the issues to you, could you help me understand how to properly use DLT pipeline so it does not break on re-start? Is it not supposed to pick up from the checkpoint of the input stream that it has failed (or halted over) berfore?

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.