cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
SergioSchena
Databricks Employee
Databricks Employee

SergioSchena_0-1713506579404.png

Organizations on Microsoft Azure commonly use Microsoft Dynamics 365 as their CRM or ERP application. Leveraging these applications' data for Business Intelligence, Machine Learning, or Artificial Intelligence gives organizations a real competitive advantage.

The Azure Synapse Link for Dataverse is the primary tool for accessing data in Microsoft Dynamics and exporting it in Common Data Model format (CDM). Azure Databricks is commonly the platform used to process this data to both gain insights and prepare it for downstream consumption. However, when the CDM data volumes are large and the business demanding near real-time use cases, it can be complex to process the data cost-effectively and on time.

In this blog, we will first understand what the Azure Synapse Link for Dataverse is and how it works. We will then dive deeper into the common challenges faced when reading CDM data and explore a cost-effective solution to meet business requirements.

Spoiler

This blog also provides 2 code examples which you can find in this Github repository!

It provides an example implementation of an incremental ingestion of CDM files from the Azure Synapse Link for Dataverse with Databricks.

Table of Contents

Azure Synapse Link for Dataverse

The Azure Synapse Link for Dataverse, formerly known as Export to data lake, facilitates near real-time insights over the data in the Microsoft Dataverse. The service allows for seamless analytics, business intelligence, and machine learning applications on Dataverse data. In this blogpost, I will shorten the service name to Dataverse Link for simplicity.

The Dataverse Link can continuously export data to Azure Synapse Analytics and Azure Data Lake Storage (ADLS) Gen2 in the Common Data Model (CDM) format. It supports initial and incremental writes for data and metadata of standard and custom tables, automatically pushing changes from Dataverse to the destination without manual intervention. The service efficiently exports all the creation, update, and deletion (CRUD) operations to the destination location, leveraging the change tracking attributes in Dataverse. 

While the service seamlessly integrates with Synapse by creating ready-to-use tables in the target workspace, Dataverse Link with ADLS is the go-to landing location for flexible consumption of Dataverse data for any scenario.

Azure Synapse Link for Dataverse with ADLS Gen2

The Azure Synapse Link for Dataverse with ADLS Gen2 connects the Dataverse data to a Storage Account in the same tenant and region of the Power Apps environment that creates it. When you create a new link or add new tables to an existing link, the service syncs to the target storage account. A few hours later, the incremental updates start.

The service writes the data files to a container called the dataverse-environmentName-organizationUniqueName, where environmentName and organizationUniqueName are the names of the Power Apps environment and organization you used to create the link respectively.

The link creates the following elements in the target container:

  • The metadata file (model.json), providing a list of tables exported to the data lake
  • A folder for each table, including near real-time data and read-only snapshot data
  • A folder (Microsoft.Athena.TrickleFeedService) containing the option set files, one for each table.

SergioSchena_1-1713506612587.png


Metadata file (
model.json)

The metadata file (or model.json) describes the data in the container and can be used to:

  • Discover the table (entities) exported to the container and when they were most recently updated
  • Obtain the semantic information about the schema (attributes) of the underlying data files
  • Data file (partition) locations and metadata about data encoding of data files.

In the context of the Dataverse Link and for a custom implementation of a process consuming CDM data files, it is essential to highlight the following parts of the model.json file (see the table below).

 

JSON Path

Description

Notes

$.entities

The model entities.

It contains an array of exported entities.

$.entities.name

The entity name.

Name of the exported table.

$.entities.annotations

Array of optional model annotations containing contextual information in key/value pairs.

Athena:InitialSyncState is the status of the initial sync, while Athena:InitialSyncDataCompletedTime is the initial sync completion date time.

$.entities.attributes

The attributes within the entity.

 

$.entities.attributes.name

The attribute name.

 

$.entities.attributes.dataType

The attribute data type.

The possible values are: string, int64, double, dateTime, dateTimeOffset, decimal, boolean, guid.

$.entities.attributes.”cdm:traits”

Array of trait references, including precision for decimal attributes or constrained length for string attributes.

Example of content for decimal field:

"cdm:traits": [
    {
        "traitReference": "is.dataFormat.numeric.shaped",
        "arguments": [
            {
                "name": "precision",
                "value": 38
            },
            {
                "name": "scale",
                "value": 2
            }
        ]
    }
]

$.entities.partitions

The entity’s physical partitions (data files).

It contains the array of entity’s partitions

$.entities.partitions.name

The partition name.

 

$.entities.partitions.location

The partition’s physical file location, including the file itself.

HTTPS URL of the latest snapshot partition file.

 

SergioSchena_2-1713506612552.png


Table Data Folders

The Dataverse Link service creates a data folder for each table. The folder contains two types of data files:

  • Near real-time data files: These are incrementally synchronized from the Dataverse by detecting what data has changed since the initial extraction or the last synchronization.
  • Snapshot data files: A read-only copy of the data updated every hour. Only the latest five snapshot files are retained.


Important!

Dataverse data can continuously change through CRUD transactions. As such, snapshot files can be consumed reliably by an analytics consumer at a given point in time as they provide a read-only point in time version of the data. However, near real-time files are continuously updated by the trickle feed engine of Dataverse, and therefore use an analytics consumer that supports the reading of files that are modified while the query is running. 


The Dataverse engine is also responsible for creating a snapshot every hour if and only if there is an update in the corresponding near real-time file. It subsequently updates the
model.json file to point to these snapshots, and removes the stagnant snapshot files to retain only the latest five.

In the following image, you can find an example of the content of a table data folder.

SergioSchena_3-1713506612658.png

There is one data file per partition. The data files are:

  • UTF-8 multiline CSV files,
  • with no header,
  • using the comma (,) as a field delimiter,
  • using double-quotes (“) to quote text fields, and
  • using the double-quotes (“) as an escape character for the quote character.


Note!

As the CSV file has no header, you must refer to the schema stored in the model.json file to infer the correct schema.

You should also be aware that:

  • When a new column is added to the table, it will be appended at the end of the row, and only the new rows will show the newly added column.
  • When a column is deleted, the resulting column isn’t dropped from the file but will be preserved for the existing rows and marked as null (empty) for the new rows.
  • A data type change is a breaking change. You must unlink and relink the table and then reprocess all the data.
  • Dataverse data uses different date and time formats based on the field. Check the table below for a reference to cater accordingly.

SergioSchena_4-1713506612484.png


Option Set Files

For columns that use Microsoft Dataverse Choices, choice values are written as an integer label and not a text label to maintain consistency during edits. The integer-to-text label mapping is stored in the Microsoft.Athena.TrickleFeedService/table-EntityMetadata.json file.

The file has the OptionSetMetadata root level attribute, containing a list of objects with the following properties:

  • EntityName, referring to the name of the entity,
  • OptionSetName, the name of the choice,
  • Option, the option integer label,
  • LocalizedLabel, the text label,
  • LocalizedLabelLanguageCode, the language code of the text label.

SergioSchena_5-1713506612557.png

These files can be used to build reference tables or read on the fly to enrich the raw data when building parsed and enriched data.

Advanced Configurations

The Azure Synapse Link also offers advanced configurations that control how the data files are produced: data partitioning and in-place updates vs. append-only writes

It is worth mentioning them as they affect the performance and behavior of data consumers. 

Data Partitioning

When Azure Synapse Link writes Dataverse table data to ADLS, the tables are partitioned. By default, the table is partitioned by month, but based on the volume and data distribution, you can also choose to partition it yearly. The partitioning configuration is a per-table setting.

The data is written in several files instead of a single one, based on the createdOn value on each row in the source. For tables without the createdOn attribute, each partition contains 5,000,000 records.

When the partition file approaches 95% of the maximum number of blocks in a blob (50,000), a new file is created, appending a three-digit number suffix, e.g., _001, _002, etc.

In-place Updates vs. Append-only

Azure Synapse Link tables can be configured to use in-place updates or append-only.

  • In-place update performs an in-place upsert of the incremental data in the destination, by scanning the partition files to update or remove existing rows and append newly inserted ones. This is the default setting for tables with the createdOn attribute.

  • Append-only, instead, always appends the create, update, and delete changes at the end of the relative partition file. When selecting it, the partition strategy defaults to Year and cannot be modified. This is the default setting for tables without the createdOn attribute.


Note!

In case of missing acknowledgments from ADLS due to network delays, you may find some duplicated rows, i.e., duplicated version numbers for the same entity, because of the retry mechanism of the Dataverse link. To remediate this, you can use the SinkModifiedOn attribute to remove the duplicates and keep the latest row.


The documentation recommends using Append only to perform incremental ingestion to another target, enabling AI and ML scenarios.

SergioSchena_6-1713506612589.png

 

Ingesting CDM Data in Databricks

There are several ways to leverage the Dataverse Link to export tables to ADLS and to further ingest them in Databricks:

The first three alternatives above require setting up and maintaining additional services (Azure Synapse Analytics with Spark Pools, Azure Data Factory), representing complex and costly alternatives to the last one. 

It is also possible that you cannot change the existing setup of the Dataverse Link, and you need to leverage the existing Link with ADLS to implement incremental ingestion of CDM data.

Reading CDM Data

SergioSchena_7-1713506612587.png

 

The initial approach for ingesting CDM tables into Databricks leverages Spark to read the CSV data files and save them to a bronze Delta table. The Databricks pipeline would read the model.json file to build the read schema and get the path to the snapshot (CSV) files.

A filter on the SinkModifiedOn field is usually applied to implement incremental ingestion.

 

entity_schema = get_entity_schema("account")
entity_paths = get_entity_paths("account")
last_sink_modified_on = get_last_modified_on("account")

spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
cdm_timestamp_format = "MM/dd/yyyy hh:mm:ss aa"

df = (
   spark.read.schema(entity_schema)
   .csv(
       path=entity_paths,
       multiLine=True,
       header=False,
       escape='"',
       sep=",",
       timestampFormat=cdm_timestamp_format,
   )
   .where(f"SinkModifiedOn > '{last_sink_modified_on}'")
)

df.write.mode("append").saveAsTable("your-table-name")

 


Important!

You must specify a custom timestampFormat to correctly parse the SinkCreatedOn and SinkModifiedOn fields, as well as customize the spark configuration to use the LEGACY date format.


Challenges

Unfortunately, using the above approach to read directly from the CDM data files could expose you to challenges if the data file size is larger than a few GBs.

In particular, the multi-line nature of the CSV data files forces Spark to consider the file unsplittable, and so it uses a single thread to parse and build a DataFrame part regardless of file size. This can work for files up to a few GBs, but for files as big as 100GB, it may take up to 10-15 hours to parse. If Dataverse Link is configured to write in append-only mode, the output is partitioned by year. In this case, it is not uncommon for file sizes to reach 150-200 GB.

Suppose you are trying to implement an incremental ingestion pattern by reading and ingesting only the newly appended content to each data file. In that case, you may have to read 100GB to write only a few MBs, which can be incredibly slow and substantially delay the availability of fresh data for your business.

Leveraging snapshot files would not solve the problem because if the read takes more than six hours, your data file will be deleted while you are reading since Dataverse Link only retains the last five files.

Proposed Solution

To overcome the challenges mentioned above, we must generate a new data source containing the incremental updates appended to the near real-time data files. These files would be of a size that will mitigate the problem of the slow multiline read.

You can generate incremental update files at a frequency that allows you to meet the business requirements regarding the data freshness of the CDM tables. 

SergioSchena_8-1713506612543.png

The picture above shows the architecture of the proposed solution. The Incremental Copy Job efficiently copies the appended incremental updates to the data files using the low-level BlobStorage APIs. The metadata of the data files is stored in a managed blob log table, including the offsets used to identify the new updates appended to the file.

SergioSchena_9-1713506612551.png

The Ingestion Job then processes the incremental updates generated by the Incremental Copy job as standard CDM data files, builds the schema from the model.json file, and appends the content to the bronze Delta tables. Leveraging the schema evolution feature of Delta Lake, the job handles the addition of new columns in the data files.

However, this approach does not properly handle the first run of the ingestion, where the entire CSV file must be read. Therefore, the first run still suffers from the same problems that were previously described.

Notebook Example: Ingestion of CDM to Delta Lake

The code to implement the solution described above is in this GitHub repository.

Be sure to read the repository’s README, which describes the structure of the code in detail. It is worth highlighting that the implementation of the jobs can handle both a single entity or a list of entities and that all the successful processes (incremental copy and ingestion) are logging auditing events in Delta tables for later inspection.

You will also find two example notebooks that implements the code and the pipeline as described above. These notebooks can be used to create a Databricks Workflow to automate and orchestrate the whole ingestion process.

Conclusion

In this blog, we have covered in detail the Azure Synapse Link for Dataverse, the problems that may arise when dealing with this data source, and a solution to efficiently handle incremental updates appended to the data files. The solution mitigates the issues associated with slow multiline reads and ensures the timely availability of fresh data for business needs. If you are trying to ingest CDM data and facing those challenges, try this approach, leverage the referenced code, and provide feedback to improve it.

References

6 Comments
Wojciech_BUK
Valued Contributor III

Hi @SergioSchena ,

thanks for sharing it, you solution looks pretty solid and I wanted to implement it but copying part of CSV inclemently was making me unable to sleep 😋

I just wonder if there is better (easier) option we can setup. I am currently testing various option and I spotted that Incremental Folder Update Structure is possible without Synapse Workspace - just CSVs on ADLS storage.
Then the data lands in following pattern (example with 60 min incremental) :

conatiner
--2024-10-15T10.00.00Z
----Microsoft.Athena.TrickleFeedService
----OptionsetMetadata
----model.json
----Table_A
------2020.csv
------2021.csv
----Table_B
------2021.csv
------2022.csv
----Table_C
--2024-10-15T11.00.00Z
--2024-10-15T12.00.00Z

Then we could potentially run autoloader for each tables with pattern "root_path/*/Table_A/*.csv.

In this option we could avoid reading parts of CSV and copying to other storage.
What do you think about this?

 

TEP
New Contributor II

This is a great article thank you.  However please correct the timestamp in the code snippet, it should be :

    cdm_timestamp_format = "MM/dd/yyyy hh:mm:ss a"
silvyAlej
New Contributor II

Great article, and thank you for sharing! 

I am currently using CDM, but after migrating to Spark 3.4, I need to find a workaround to consume the Synapse Link files, which are stored in CSV format but do not include column names (these are found in the model.json for CDM). 

the table has more than 300 columns so to define an schema is too hard 😞

Any suggestions would be greatly appreciated!

Thank you!  

Silvia

TEP
New Contributor II

@silvyAlej You need to write python code to read the model.json file.

You can view the databricks code here .

Alternatively you can copy the below and change for your needs:

def get_spark_type(data_type, max_length, traits=None):
if data_type == 'guid':
return StringType()
elif data_type == 'dateTime':
return TimestampType()
elif data_type == 'dateTimeOffset':
return TimestampType()
elif data_type == 'int64':
return LongType()
elif data_type == 'int32':
return IntegerType()
elif data_type == 'decimal':
precision = 38
scale = 18
if traits:
for trait in traits:
if trait['traitReference'] == 'is.dataFormat.numeric.shaped':
for arg in trait['arguments']:
if arg['name'] == 'precision':
precision = arg['value']
elif arg['name'] == 'scale':
scale = arg['value']
return DecimalType(precision, scale)
elif data_type == 'string':
return StringType()
elif data_type == 'boolean':
return BooleanType()
elif data_type == 'date':
return DateType()
else:
return StringType()
#raise ValueError(f"Unsupported data type: {data_type}")

 
def create_spark_schema(fields):
    schema_fields = []
    for field in fields:
        name = field['name']
        data_type = field['dataType']
        max_length = field.get('maxLength', -1)
        traits = field.get('cdm:traits', None)
        spark_type = get_spark_type(data_type, max_length, traits)
        schema_fields.append(StructField(name, spark_type, True))
    return StructType(schema_fields)
 

#Load Metadata
model_json_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/model.json"
model_json = spark.read.text(model_json_path).collect()[0][0]
model = json.loads(model_json)

#List of all Tables
tables_list = [entity['description'] for entity in model['entities']]

#Iterate through all tables
for table_name in tables_list:
    schema_list = [entity['attributes'] for entity in model['entities'] if entity['description'] == table_name][0]
    schema = create_spark_schema(schema_list)

    #Do whatever you need with the schema.

 

 

silvyAlej
New Contributor II

Thank you so much @TEP This helped me a lot!

TEP
New Contributor II

Apologies my above comment on the datetime format is wrong of course, the format in the article is correct, I just forgot to add the legacy timeParserPolicy settings.

 

#To Parse Timestamps correctly
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
 
All datetime formats parsed:
 df = df.withColumn("SinkCreatedOn", to_timestamp(col("SinkCreatedOn"), "MM/dd/yyyy hh:mm:ss aa")
    df = df.withColumn("SinkModifiedOn", to_timestamp(col("SinkModifiedOn"), "MM/dd/yyyy hh:mm:ss aa")) 
    df = df.withColumn("modifieddatetime", to_timestamp(col("modifiedon"), "yyyy-MM-dd'T'HH:mm:ss.SSSSSSS'Z'"))
    df = df.withColumn("createddatetime", to_timestamp(col("modifiedon"), "yyyy-MM-dd'T'HH:mm:ss.SSSSSSS'Z'"))
    df = df.withColumn("createdon", to_timestamp(col("createdon"), "yyyy-MM-dd'T'HH:mm:ss.SSSSSSSXXX"))
    df = df.withColumn("modifiedon", to_timestamp(col("modifiedon"), "yyyy-MM-dd'T'HH:mm:ss.SSSSSSS'Z'"))