cancel
Showing results for 
Search instead for 
Did you mean: 
Machine Learning
cancel
Showing results for 
Search instead for 
Did you mean: 

Stream data from Delta tables replicated with Fivetran into DLT

MA
New Contributor II

I'm attempting to stream into a DLT pipeline with data replicated from Fivetran directly into Delta tables in another database than the one that the DLT pipeline uses. This is not an aggregate, and I don't want to recompute the entire data model each time this runs; I just want to perform insert/update/deletes. I will use apply_changes() from silver to gold, but I am receiving an error when I re-run the silver table pipeline after new data has been added to the source table.

I am streaming into this pipeline with this:

spark.readStream.format("delta").table("fivetran_stage.the_table_name")

The first run, the data loads just fine! I thought everything was working until Fivetran replicated some new rows into that dataset. Then I received this error:

org.apache.spark.sql.streaming.StreamingQueryException: Query my_table_name_silver [id = 7a1f5742-f8ec-4ef0-94aa-e1a05b921cad, runId = 6a42568b-6c1a-461c-98ef-0860e5198e79] terminated with exception: Detected a data update (for example part-00000-95d473e5-1819-4ef0-8c68-26fc81a8c262.c000.snappy.parquet) in the source table at version 14. This is currently not supported. If you'd like to ignore updates, set the option 'ignoreChanges' to 'true'. If you would like the data update to be reflected, please restart this query with a fresh checkpoint directory.
java.lang.UnsupportedOperationException: Detected a data update (for example part-00000-95d473e5-1819-4ef0-8c68-26fc81a8c262.c000.snappy.parquet) in the source table at version 14. This is currently not supported. If you'd like to ignore updates, set the option 'ignoreChanges' to 'true'. If you would like the data update to be reflected, please restart this query with a fresh checkpoint directory.

I assume this is a result of attempting to use spark streaming from a Delta Table, so if there's another way to process micro batches then I'm all ears! I don't care the means of getting there, my only goal is to be able to rerun a pipeline and have it only process the changed records into gold. This same process worked fine for using Auto Loader reading .parquet files. Is streaming from Delta Tables not supported?

1 REPLY 1

Anonymous
Not applicable

Hi @M A​ 

Great to meet you, and thanks for your question! 

Let's see if your peers in the community have an answer to your question first. Or else bricksters will get back to you soon.

Thanks

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.