cancel
Showing results for 
Search instead for 
Did you mean: 
Machine Learning
Dive into the world of machine learning on the Databricks platform. Explore discussions on algorithms, model training, deployment, and more. Connect with ML enthusiasts and experts.
cancel
Showing results for 
Search instead for 
Did you mean: 

Stream data from Delta tables replicated with Fivetran into DLT

MA
New Contributor II

I'm attempting to stream into a DLT pipeline with data replicated from Fivetran directly into Delta tables in another database than the one that the DLT pipeline uses. This is not an aggregate, and I don't want to recompute the entire data model each time this runs; I just want to perform insert/update/deletes. I will use apply_changes() from silver to gold, but I am receiving an error when I re-run the silver table pipeline after new data has been added to the source table.

I am streaming into this pipeline with this:

spark.readStream.format("delta").table("fivetran_stage.the_table_name")

The first run, the data loads just fine! I thought everything was working until Fivetran replicated some new rows into that dataset. Then I received this error:

org.apache.spark.sql.streaming.StreamingQueryException: Query my_table_name_silver [id = 7a1f5742-f8ec-4ef0-94aa-e1a05b921cad, runId = 6a42568b-6c1a-461c-98ef-0860e5198e79] terminated with exception: Detected a data update (for example part-00000-95d473e5-1819-4ef0-8c68-26fc81a8c262.c000.snappy.parquet) in the source table at version 14. This is currently not supported. If you'd like to ignore updates, set the option 'ignoreChanges' to 'true'. If you would like the data update to be reflected, please restart this query with a fresh checkpoint directory.
java.lang.UnsupportedOperationException: Detected a data update (for example part-00000-95d473e5-1819-4ef0-8c68-26fc81a8c262.c000.snappy.parquet) in the source table at version 14. This is currently not supported. If you'd like to ignore updates, set the option 'ignoreChanges' to 'true'. If you would like the data update to be reflected, please restart this query with a fresh checkpoint directory.

I assume this is a result of attempting to use spark streaming from a Delta Table, so if there's another way to process micro batches then I'm all ears! I don't care the means of getting there, my only goal is to be able to rerun a pipeline and have it only process the changed records into gold. This same process worked fine for using Auto Loader reading .parquet files. Is streaming from Delta Tables not supported?

1 REPLY 1

Anonymous
Not applicable

Hi @M A​ 

Great to meet you, and thanks for your question! 

Let's see if your peers in the community have an answer to your question first. Or else bricksters will get back to you soon.

Thanks

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group