cancel
Showing results forĀ 
Search instead forĀ 
Did you mean:Ā 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forĀ 
Search instead forĀ 
Did you mean:Ā 

Best practice for adding fixed metadata columns at point of ingestion

QueryingQuail
New Contributor III

Hello all,

We are currently working with ingestion of data from source systems using a mix of custom code and managed connectors (e.g. the Dynamics 365 (Synapse Link) connector) in conjunction with Auto CDC / Auto CDC from snapshot.

 I’m trying to understand how to best add custom metadata columns to all ingestion pipeline. As an example, I would like to add a source system id column from our Application Portfolio Management system (this data is the first data source that has been ingested and tables from this system should themselves have a source system id column with the id of the system itself (inside itself)). Other columns would be ingestion time (in Databricks), change data type metadata and other metadata relevant columns.

We are planning on having a naming pattern scheme to make sure that these metadata columns will not collide with source system data (e.g., ā€œ__dataplatformname_metadatacolumnnameā€ or similar). 

I’m unsure how to allow for this functionality at the point of ingestion without materialising data in a separate step (which defeats the purpose of e.g., ā€œingestion_tsā€), while also not unnecessarily increasing ingestion time and compute. It could also be that a lot of what I would need is already present in Unity Catalog or other system metadata, but I would ideally want this limited number of columns as part of the data itself so as to make it usable inside and outside of UC. 

Maybe someone here has pointers to best practice or other resources that can be shared. 

6 REPLIES 6

AshokT
New Contributor II

Hi! Great question — adding consistent, reliable ingestion-time metadata (like ingestion_ts, source system ID, change type, etc.) is a very common need in medallion architectures, especially when you want to keep auditability and traceability without introducing heavy post-processing steps.

You're right to avoid a separate materialization step just for adding columns — that would defeat the purpose of low-latency ingestion timestamps and increase compute/cost. Fortunately, Databricks gives you several clean ways to inject these columns at ingestion time with very little overhead.

Best way to add consistent ingestion metadata columns (e.g. __dataplatform_ingestion_ts, __source_system_id, __change_data_type) at ingestion time: 

from pyspark.sql import functions as F
from datetime import datetime

SOURCE_SYSTEM_ID = "APP_PORTFOLIO_001" # or from config/job param

df = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.load("s3://source-path/")
.select(
"*",
F.current_timestamp().alias("__dataplatform_ingestion_ts"),
F.lit(SOURCE_SYSTEM_ID).alias("__source_system_id"),
F.lit("AUTO_CDC").alias("__change_data_type"),
F.input_file_name().alias("__source_file_path"),
F._metadata["file_modification_time"].alias("__source_file_mod_ts")
)
)

df.writeStream
.format("delta")
.table("bronze.table_name")

Alternatives (less ideal for your case):

  • Generated columns → only for expressions like current_timestamp()
  • Post-ingestion UPDATE → loses real ingestion timestamp, adds compute

QueryingQuail
New Contributor III

Thank you, but I got so much by prompting an AI myself. 

Do you have real world experience of implementing this in a robust way across both managed connectors and custom ingestion?

saurabh18cs
Honored Contributor III

Hi @QueryingQuail what is this metadata and what will be the use of these metadata columns for your workload? what are the source types?

Isn't it a good idea to put all metadata related columns into a single column as object? so any consumer can use and extract from this single column instead of flatteing them out? tmrw if more columns are arriving they can all be part of same single column than changing schema?

Br

QueryingQuail
New Contributor III

Hello @saurabh18cs,

Please excuse the late reply.

I like the idea of instantiating metadata about the data itself and management of the data - into the data itself. Think e.g., about:

  • __DATAPLATFORM_row_hash_id containing a md5 hash for each row
  • __DATAPLATFORM_ingestion_ts containing a TIMESTAMP_NTZ of ingestion time into the platform 
  • __DATAPLATFORM_change_data_type (e.g., 'CDC', 'SNAPSHOT', 'SNAPSHOT INCREMENTAL', 'NO CHANGE DATA MANAGEMENT') to indicate the type of change data management (if any)
  • _DATAPLATFORM_source_system_id containing the id of the source system that data is ingested from (the process being that data can only be ingested if the Application Portfolio Magement system has the source system registered in IT with an owner - data from this APM system is automatically itself ingested as the first source on the platform).
  • __DATAPLATFORM_start_at being the CDC start at timestamp
  • __DATAPLATFORM_end_at being the CDC end at timestamp
  • ...

I'm looking into how many of these would already be present in INFORMATION_SCHEMA or UC metadata, but there is an argument to having the data within the tables themselves. I'm also unsure if we can customize e.g., the AUTO CDC / AUTO CDC FROM SNAPSHOT naming without having to do a renaming manually everywhere, e.g., having the name of the column __START_AT instantiated from the start as __DATAPLATFORM_start_at (but this might be a very minor concern.)

I like your idea of putting metadata related information into a single column. I'm unsure about any performance impact downstream where unpacking would be needed for many transformations (thinking about the the usecases of the columns above).

saurabh18cs
Honored Contributor III

Hi   what are your source types ? are they raw files or delta table ? I dont think extracting specific key from object is a heavy task, Also what type of transformations are you expecting on these metadata columns by consumer teams? usually metadata columns carried forward for maintaining lineage .

now talking about transformation if you are going to do mutli record to single record transformation then how will you manage row_hash_id for this single record? you have to make a list of source hash ids and then store in there which is already difficult

pls see attachment

 

SteveOstrowski
Databricks Employee
Databricks Employee

Hi @QueryingQuail,

Good question -- I can see from the follow-up discussion that you are looking for practical guidance that goes beyond what a generic AI prompt would give you -- specifically how to handle this across both managed connectors (like the Dynamics 365 Synapse Link connector) and custom code ingestion with Auto CDC / Auto CDC from Snapshot. Let me break this down by scenario.


UNDERSTANDING THE TWO INGESTION PATHS

You have two fundamentally different ingestion patterns, and the approach to adding metadata columns differs for each:

1. Custom code ingestion (Auto Loader, custom Spark streaming) -- you control the DataFrame, so you can add columns directly at read time.
2. Managed connectors (Lakeflow Connect, e.g., Dynamics 365 Synapse Link) -- the connector creates and manages the streaming tables for you, so you cannot inject columns into the initial ingestion step. Instead, you add metadata in a thin downstream layer.


PATH 1: CUSTOM CODE INGESTION (AUTO LOADER + AUTO CDC)

When you control the ingestion code, you can add your metadata columns directly in the streaming table definition. Here is how this looks in a Lakeflow Spark Declarative Pipeline (SDP):

SQL example with Auto Loader:

CREATE OR REFRESH STREAMING TABLE raw_orders
AS SELECT
*,
_metadata.file_path AS __dataplatform_source_file_path,
_metadata.file_modification_time AS __dataplatform_source_file_modified_at,
current_timestamp() AS __dataplatform_ingestion_ts,
'APP_PORTFOLIO_042' AS __dataplatform_source_system_id,
'CDC' AS __dataplatform_change_data_type
FROM STREAM read_files(
's3://my-bucket/incoming/orders/',
format => 'json'
)

Python equivalent:

import dlt
from pyspark.sql.functions import lit, current_timestamp, col

SOURCE_SYSTEM_ID = "APP_PORTFOLIO_042"

@Dlt.table
def raw_orders():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("s3://my-bucket/incoming/orders/")
.select("*",
col("_metadata.file_path").alias("__dataplatform_source_file_path"),
col("_metadata.file_modification_time").alias("__dataplatform_source_file_modified_at")
)
.withColumn("__dataplatform_ingestion_ts", current_timestamp())
.withColumn("__dataplatform_source_system_id", lit(SOURCE_SYSTEM_ID))
.withColumn("__dataplatform_change_data_type", lit("CDC"))
)

The _metadata column is a built-in hidden struct available on all file-based reads. It gives you file_path, file_name, file_size, file_modification_time, file_block_start, and file_block_length -- no extra configuration needed.

Docs: https://docs.databricks.com/aws/ingestion/file-metadata-column

For the AUTO CDC step that follows, you then apply changes into a target table as usual:

AUTO CDC raw_orders
INTO TARGET silver_orders
KEYS (order_id)
SEQUENCE BY updated_at
STORED AS SCD TYPE 2

The __START_AT and __END_AT columns are automatically added by Auto CDC for SCD Type 2 targets. These column names are fixed by the system and cannot be renamed in the AUTO CDC declaration itself. If you want them under your naming convention, you would create a downstream view or materialized view that aliases them:

CREATE OR REFRESH MATERIALIZED VIEW enriched_orders
AS SELECT
*,
__START_AT AS __dataplatform_start_at,
__END_AT AS __dataplatform_end_at
FROM silver_orders

Docs: https://docs.databricks.com/aws/ldp/cdc


PATH 2: MANAGED CONNECTORS (LAKEFLOW CONNECT)

With managed connectors like the Dynamics 365 Synapse Link connector, Lakeflow Connect creates and manages the destination streaming tables for you. You do not write the ingestion code, so you cannot inject columns into those tables directly.

The recommended pattern is a thin enrichment layer immediately downstream. Because Lakeflow Connect outputs streaming tables, you can read from them as a stream in the same pipeline (or a separate one) and add your metadata:

CREATE OR REFRESH STREAMING TABLE enriched_dynamics_accounts
AS SELECT
*,
current_timestamp() AS __dataplatform_ingestion_ts,
'DYNAMICS_365_001' AS __dataplatform_source_system_id,
'SNAPSHOT' AS __dataplatform_change_data_type
FROM STREAM(catalog.schema.dynamics_accounts)

This is not the same as a separate materialization step that "defeats the purpose" -- it is a lightweight streaming transformation that runs incrementally as new data arrives. The ingestion_ts here captures when data enters your enrichment layer, which is very close to actual ingestion time since streaming tables process incrementally with low latency.

If you want to keep everything in one pipeline, you can define your Lakeflow Connect ingestion tables and your enrichment tables in the same Lakeflow Spark Declarative Pipelines (SDP). The managed connector tables are the "bronze" layer and your enrichment tables become a thin "bronze-plus" or "silver" layer.


MAKING METADATA VALUES CONFIGURABLE

Rather than hardcoding source system IDs, use pipeline parameters so the same code works across source systems:

Python:

@Dlt.table
def enriched_source():
source_id = spark.conf.get("pipeline.source_system_id", "UNKNOWN")
change_type = spark.conf.get("pipeline.change_data_type", "UNKNOWN")

return (spark.readStream
.table("catalog.schema.raw_source_table")
.withColumn("__dataplatform_ingestion_ts", current_timestamp())
.withColumn("__dataplatform_source_system_id", lit(source_id))
.withColumn("__dataplatform_change_data_type", lit(change_type))
)

Then set these in your pipeline configuration JSON:

{
"pipeline.source_system_id": "APP_PORTFOLIO_042",
"pipeline.change_data_type": "CDC"
}

This way, you can deploy the same pipeline template per source system with different parameter values.


ON YOUR NAMING CONVENTION AND COLUMN COLLISIONS

Your prefix pattern (__dataplatform_columnname) is a solid approach. A few tips:

- Double-underscore prefixes are a common convention for platform-injected columns and naturally stand out from source data.
- Be aware that Auto CDC already uses __ prefixed columns (__START_AT, __END_AT), so just ensure your prefix is distinct enough to avoid confusion.
- If you want these columns to be portable outside of Unity Catalog (e.g., when sharing data via Delta Sharing or exporting to external systems), embedding them in the table is the right call. UC metadata like table properties, tags, and column comments are great for governance but do not travel with the data itself.


ON THE STRUCT COLUMN APPROACH

The suggestion from @saurabh18cs about putting all metadata into a single struct column is worth considering. Here is how that would look:

.withColumn("__dataplatform_metadata", struct(
current_timestamp().alias("ingestion_ts"),
lit("APP_PORTFOLIO_042").alias("source_system_id"),
lit("CDC").alias("change_data_type"),
col("_metadata.file_path").alias("source_file_path")
))

Pros: schema evolution is easier (add fields to the struct without changing table schema), cleaner column list.
Cons: slightly more verbose access syntax downstream (e.g., __dataplatform_metadata.ingestion_ts), and some BI tools handle nested structs less gracefully.

For your use case where you plan to use these columns extensively in transformations and potentially outside UC, I would lean toward flat columns with the prefix convention. The performance difference for extracting from a struct vs. reading a flat column is negligible, but the usability tradeoff matters.


WHAT IS ALREADY AVAILABLE IN SYSTEM METADATA

You mentioned wondering whether some of this is already in INFORMATION_SCHEMA or UC metadata. Here is what is available without custom columns:

- system.information_schema -- gives you table-level metadata (creation time, owner, table type, etc.) but not row-level ingestion timestamps.
- Table properties and tags -- great for cataloging source system ownership at the table level but not per-row.
- Delta table history (DESCRIBE HISTORY) -- shows operation-level timestamps for commits, but not per-row ingestion time.
- _metadata column (file reads only) -- gives source file metadata, but only at read time; it is not persisted unless you explicitly select it.

So for row-level ingestion tracking, you do need to add columns to the data itself. The system metadata is complementary but serves a different purpose (table-level governance vs. row-level lineage).


SUMMARY

- Custom ingestion: Add metadata columns directly in your streaming table SELECT using lit(), current_timestamp(), and _metadata fields.
- Managed connectors: Add a thin downstream streaming table that enriches with metadata -- this is lightweight and incremental.
- Auto CDC columns: __START_AT and __END_AT are system-managed and not renamable at the CDC step; alias them downstream if needed.
- Naming convention: Your __dataplatform_ prefix approach is solid and avoids collisions.
- Pipeline parameters: Use spark.conf.get() with pipeline configuration to make metadata values dynamic across sources.
- Struct vs. flat columns: Flat columns with a prefix are generally more practical for your described use cases.

Hope this helps you set up a robust, consistent metadata strategy across all your ingestion paths. Let me know if you have questions about any specific connector or scenario.

* This reply used an agent system I built to research and draft this response based on the wide set of documentation I have available and previous memory. I personally review the draft for any obvious issues and for monitoring system reliability and update it when I detect any drift, but there is still a small chance that something is inaccurate, especially if you are experimenting with brand new features.