cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

How do use Databricks Lakeflow Declarative Pipeline on AWS DMS data?

excavator-matt
New Contributor III

Hi!

I am trying to replicate an AWS RDS PostgreSQL database in Databricks. I have successfully manage to enable CDC using AWS DMS that writes an initial load file and continuous CDC files in parquet.

I have been trying to follow the official guide Replicate an external RDBMS table using AUTO CDC. However, this guide leave three main issues unanswered.

1.  How would you handle the scenario where you haven't received any CDC updates yet? That leaves the schema of the rdbms_orders_change_feed view undefined which cause CF_EMPTY_DIR_FOR_SCHEMA_INFERENCE error that can't be caught. I obviously want to run the inital load without waiting for change.

2. Why would you split the initial load from the cdc? Since AUTO CDC checks update time, there is no risk LOAD overrides CDC.

3. How would you handle this scalably if I have maybe 20 tables from the same database? I currently went for Jinja templating, but is there a better way?

Thanks!

 
1 ACCEPTED SOLUTION

Accepted Solutions

Hi @excavator-matt 

Thank you for reaching out and for sending your questions and feedback about this article.

Why would you split the initial load from the cdc?

Short answer: to create a baseline streaming table, then to capture new changes and run the CDC to synchronize these changes incrementally and handle idempotency; otherwise would need to sync it all every time.

Long answer: 

The guide suggests two steps: 

  • The first step is to create a streaming baseline table while leveraging a View as source to obtain the full snapshot of your data (Initial Hydration). This step will leverage the schema from your view to create the schema for your streaming table. Notice that it uses the flag “once=true” to handle processing in a batch-like manner; the pipeline will process all currently available changes from the source only once and then stop.
  • The second step leverages your baseline streaming table, while using a second view as the source. The guide uses JSON files and also infers from them; in your case, this would be the PostgreSQL source. 

The medallion architecture suggests bringing in your raw bronze data (step 1), then in your silver layer, you apply the CDC (step 2), that way you keep history of your changes, you can also leverage Change Data Feed

 

How would you handle the scenario where you haven't received any CDC updates yet?

On your second step, you can pre-define your schema on your view and and let the CDC run its course. This will cause the step not to fail and if there are no updates the pipeline will continue.

 

How would you handle this scalably if I have maybe 20 tables from the same database? I currently went for Jinja templating, but is there a better way?

Jinja templating is a great approach, you can define a SQL template (CREATE STREAMING…), then have a yaml file with all your table configuration (table, schema, path, etc) then use Python generate them.

This approach centralizes table configs for maintainability and reduces code duplication and errors.

 

I hope this helps! If this solution works for you, please click the "Accept as Solution" button.





View solution in original post

6 REPLIES 6

excavator-matt
New Contributor III

If you find the author of the official documentation, there you can also send him bonus tasks

1. The guide still dead names declarative pipelines dlt, but the modern import is of course 

"from pyspark import pipelines as dp" with identical syntax
2. The guide has incorrect indentation for the section continuous feed.
3. The guide doesn't mention how out of order deletes are handled, but probably should.
 

Hi @excavator-matt 

Thank you for reaching out and for sending your questions and feedback about this article.

Why would you split the initial load from the cdc?

Short answer: to create a baseline streaming table, then to capture new changes and run the CDC to synchronize these changes incrementally and handle idempotency; otherwise would need to sync it all every time.

Long answer: 

The guide suggests two steps: 

  • The first step is to create a streaming baseline table while leveraging a View as source to obtain the full snapshot of your data (Initial Hydration). This step will leverage the schema from your view to create the schema for your streaming table. Notice that it uses the flag “once=true” to handle processing in a batch-like manner; the pipeline will process all currently available changes from the source only once and then stop.
  • The second step leverages your baseline streaming table, while using a second view as the source. The guide uses JSON files and also infers from them; in your case, this would be the PostgreSQL source. 

The medallion architecture suggests bringing in your raw bronze data (step 1), then in your silver layer, you apply the CDC (step 2), that way you keep history of your changes, you can also leverage Change Data Feed

 

How would you handle the scenario where you haven't received any CDC updates yet?

On your second step, you can pre-define your schema on your view and and let the CDC run its course. This will cause the step not to fail and if there are no updates the pipeline will continue.

 

How would you handle this scalably if I have maybe 20 tables from the same database? I currently went for Jinja templating, but is there a better way?

Jinja templating is a great approach, you can define a SQL template (CREATE STREAMING…), then have a yaml file with all your table configuration (table, schema, path, etc) then use Python generate them.

This approach centralizes table configs for maintainability and reduces code duplication and errors.

 

I hope this helps! If this solution works for you, please click the "Accept as Solution" button.





Thank you for your reply!

Maybe I am slow, but two things don't sense to me with this reply. If these are clarified, I consider the question resolved.

You raise a good point that even if the you use AUTO CDC's sequence_by, you would still have the problem of reprocessing the same files if you don't have run_once on in the general case. In this case in particular though, this is not an issue as cloudFiles/AutoLoader keeps track of files it has already read. Smaller detail, but this is accurate?

For wider point of using a predefined schema, I guess that would work. However, then you miss out of schema evolution? There are pros and cons of enforcing schemas, and I guess it makes sense if you handle your CDC in silver that the schema is fixed by then, but I was otherwise thinking that it would evolve in bronze.

Or are you saying that, unlike other forms of pipelines in Databricks, a hidden downside of using declarative pipelines is that they don't support schema evolution? Does the declarative pipeline view handle schema evolution if the cdc files evolve? Let's leave schema evolution in AWS DMS as a topic for another day.

mmayorga
Databricks Employee
Databricks Employee

Hi @excavator-matt ,

Yes, you are correct. CloudFiles/Autoloader handles idempotency on the file level. 

From the guide's perspective, the View is created from the source files in the specified location. This view captures all files and their corresponding rows, and then AutoCDC is applied from the view to create the baseline streaming table.

Regarding schema evolution and enforcement, IMO, it is about the right balance: In Bronze, your data changes, so we need to be resilient enough to handle them. However, as we move through the Silver and Gold layers, we may require a more rigid schema and enforce it to ensure that downstream/consumer systems don't break.

The key detail is that AUTO CDC handles schema evolution. Note that the streaming table “rdbms_orders” is created without a schema, and then AUTO CDC FLOW is applied from the “full_orders_snapshot” view.

I hope this helps. Thank you!

I discovered that there is a function in Spark to get the schema definition from file.

So if you can assume there is a DMS load file, you can steal the schema from there and give it to the change feed view.

 

schema = spark.read.parquet(dms_path + "LOAD00000001.parquet").schema.toDDL()

 @dp.view()
def districts_change_feed():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.option("cloudFiles.includeExistingFiles", "true")
.schema(schema)
.load(dms_path + "2*.parquet")
)
That way you can get the schema somewhat dynamically, although you lose the evolution. Perhaps a compromise is to only optionally include this in the Jinja template for rarely updated fields.
 
I would have much preferred if Declarative Pipelines had support for handling change feed that hasn't started yet, but it is what it is. I think this is a weakness of DP.
 

@mmayorga wrote:

Regarding schema evolution and enforcement, IMO, it is about the right balance: In Bronze, your data changes, so we need to be resilient enough to handle them. However, as we move through the Silver and Gold layers, we may require a more rigid schema and enforce it to ensure that downstream/consumer systems don't break.

But in order to get the change feed into bronze, you would still need files CDC to have arrived in that case. If you don't bother about the CDC changes until silver and keep bronze initial load, you don't have schema evolution in practice. 
 
A workaround would be to not separate treatment the load and CDC. Perhaps not as bad as the guide would suggest as the Autoload can keep track of what has been processed.

mmayorga
Databricks Employee
Databricks Employee

hey @excavator-matt 

Let's remember that the Bronze layer is for mere raw ingestion; this provides a baseline for auditing and to start applying transformations based on the different use cases you need to serve. Systems and their requirements change over time, which leads to the addition or removal of columns, schema evolution, and is typically applied to a table. The full practice can also be extended to multiple tables, depending on your scenario: When creating/refreshing your tables using the "SELECT * from ..."  from your source tables with Schema Evolution, but at some point in your flow, you'll need to establish a schema (Schema Enforcement), as you need to guarantee reliability to your consumers.

LDP handles incremental ingestion, so your bronze layer serves as the landing point for your newly arriving data. You receive new/updated/deleted rows, which are normally assigned to an "operation" column that determines the action to apply to each row. From here, we apply a change data feed from your Bronze to your Silver table. This will synchronize (via upserts) the new data arriving with your already transformed baseline data. In the guide, there are two steps: the first step is to create the baseline data, and the second step 2 to apply the CDC with new upcoming data.

My suggestion is to "Split and conquer" all these steps, once you have it working, shape it while cutting corners while keeping core functionality. First ingest all data, prepare, transform, then synchronize with your baseline(cdc).

Thank you, I hope you find this helpful!

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now