cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

How to Capture Change Data (CDC) from DynamoDB Streams and Write into Delta table of DataBricks

prasad95
New Contributor III

Can anyone give me Steps to achieve this.

4 REPLIES 4

Kaniz
Community Manager
Community Manager

Hi @prasad95, First, we'll need to enable DynamoDB Streams. This powerful feature allows us to track and record every modification made to your table. This means we'll have a comprehensive log of all changes, which we can access and analyze in near real-time for up to 24 hours. Don't worry - enabling a DynamoDB Stream is a quick and easy process. Whether you prefer the AWS CLI or one of the AWS SDKs, you can enable a stream with just a few simple commands.

 

When selecting a streaming model, DynamoDB offers both Kinesis Data Streams for DynamoDB and DynamoDB Streams. Kinesis Data Streams provides increased flexibility and scalability, but DynamoDB Streams offers a simpler setup and usage. To process the data from a DynamoDB stream, you can create a Lambda function that can also transform the data and send it to other services such as Kinesis Firehose or Databricks. For instance, a Lambda function can be used to send all updates from a DynamoDB stream to a Kinesis Firehose, which can then convert the rows into Parquet format and store them in S3 for further analysis.

 

First, import the required libraries, such as pyspark.sql.functions, in Databricks. Next, define a streaming view for your DynamoDB table - for example, "users" - using the Delta format. Then, create a streaming Delta table - named "target" - to store the change data. Lastly, utilize the dlt.apply_changes function to apply the changes from the source (users) to the target Delta table. Be sure to specify the keys, sequence column, and other relevant parameters for a successful application of the changes.

 

 

prasad95
New Contributor III

@Kaniz Thanks for detail overview.
But can you tell more about - How should i fetch DynamoDB Streams generated data (which will reside in S3) to Delta Table via Notebook (automated by Job).
I saw in official docs that if i use Job(Notebook) Trigger as file arrival but there is limit of Max 50 [Reference Docs].

If i have to implement for more than 50 table how can i manage this.

 

saikumar246
New Contributor III
New Contributor III

Hi, @prasad95 Thank you for sharing your concern here. 

In addition to the @Kaniz comments you can follow below To capture Change Data (CDC) from DynamoDB Streams and write it into a Delta table in Databricks:

1. Connect to DynamoDB Streams and read the CDC data using the AWS SDK.
2. Process the CDC data in Databricks using the APPLY CHANGES API in Delta Live Tables, which is designed to correctly process CDC records.
3. Use the APPLY CHANGES INTO statement with the APPLY CHANGES API. An example of its usage is:

CREATE OR REFRESH STREAMING TABLE table_name;
APPLY CHANGES INTO LIVE.table_name FROM source KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

In this statement, source is the CDC data from DynamoDB Streams, and table_name is the Delta table where you want to write the CDC data.

4. After executing this statement, the CDC data from DynamoDB Streams is written into the Delta table in Databricks. Remember to define unique keys for each row in the source data. If you want to track history on certain columns, use the TRACK HISTORY ON clause.

You can go through the below links to understand more about this.

https://docs.databricks.com/en/delta-live-tables/cdc.html#how-is-cdc-implemented-with-delta-live-tab...

DLT with SQL reference:- https://docs.databricks.com/en/delta-live-tables/sql-ref.html

Please leave a like if it is helpful. Follow-ups are appreciated.

Kudos,

Sai Kumar

prasad95
New Contributor III

@saikumar246 Thanks for Detailed steps & Explanation.
Can you help with this - comment

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.