Using Delta Live Tables with Common Data Model (CDM) as a Source in Databricks
I'm investigating the use of Delta Live Tables (DLT) to process Common Data Model (CDM) files exported from Dynamics 365, and I found a solution that works well. Hereโs a quick guide on how to set it up:
Prerequisites:
- Azure Databricks workspace
- Azure Data Lake Storage (ADLS) to store your CDM files
Steps:
Store Data in ADLS: Upload your Dynamics 365 exported CSV and CDM files to ADLS.
Mount ADLS in Databricks: Use the following code to mount your ADLS in Databricks:
configs = {
"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": "<client-id>",
"fs.azure.account.oauth2.client.secret": "<client-secret>",
"fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenant-id>/oauth2/token",
}
dbutils.fs.mount(
source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/",
mount_point = "/mnt/datalake",
extra_configs = configs
)
Create a Delta Live Table Pipeline: Navigate to the "Jobs" section in Databricks and create a new "Delta Live Tables" pipeline.
Define Pipeline Configuration: Define the pipeline to read and process the CDM files:
import dlt
from pyspark.sql.functions import col
@Dlt.table(comment="Read Account data from CDM")
def account():
return (spark.read.csv("/mnt/datalake/CDMExports/Account.csv", header=True, inferSchema=True)
.withColumnRenamed("AccountID", "account_id")
.withColumnRenamed("AccountName", "account_name"))
@Dlt.table(comment="Read Contact data from CDM")
def contact():
return (spark.read.csv("/mnt/datalake/CDMExports/Contact.csv", header=True, inferSchema=True)
.withColumnRenamed("ContactID", "contact_id")
.withColumnRenamed("ContactName", "contact_name"))
@Dlt.table(comment="Join Account and Contact data")
def account_contact():
account_df = dlt.read("account")
contact_df = dlt.read("contact")
return account_df.join(contact_df, account_df.account_id == contact_df.account_id)
Deploy and Run the Pipeline: Deploy the pipeline from the Databricks UI. This will read the CDM files, apply the transformations, and write the results to Delta tables.
This setup allows you to leverage Databricks and Delta Lake capabilities to process and transform CDM data efficiently.
Hope this helps anyone looking to integrate Dynamics 365 CDM with Delta Live Tables!
Surya@the_data-bro