cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Adding column masks to a column using the DLT Python create_streaming_table API

NamNguyenCypher
New Contributor II

I'm having difficulty adding a mask function to columns while creating streaming tables using the DLT Python method create_streaming_table() like this but it does not work, the streaming table is created but no column is masked:

def prepare_column_properties_struct(table_contract: dict) -> StructType:
    struct_fields = []

    for column_properties in table_contract["models"]["columns"]:
        column_name = column_properties["name"]
        column_type = column_properties["type"]
        column_nullable = not column_properties["required"]
        column_comment = column_properties["comment"]
        column_mask = column_properties["mask"]

        struct_fields.append(
            StructField(
                name=column_name,
                dataType=parse_data_type(column_type),
                nullable=column_nullable,
                metadata={"comment": column_comment, "mask": "mask_all"},
            )
        )

    return StructType(struct_fields)

dlt.create_streaming_table(
            name="account",
            schema=prepare_column_properties_struct(data_contract),
)

 How do I go about this? Maybe I'm not using the correct metadata key in the StructField? The doc is not helping.

1 ACCEPTED SOLUTION

Accepted Solutions

LRALVA
Honored Contributor

@NamNguyenCypher 
Delta Live Tables’ Python API does not currently honor column-mask metadata embedded in a PySpark StructType. Masking (and row filters) on DLT tables are only applied when you define your table with a DDL-style schema that includes a MASK clause (or via SQL).

Why your StructField(... metadata={"mask": "mask_all"}) isn’t working
The Python create_streaming_table(..., schema=StructType) call will publish the schema (data types, comments, nullability), but it does not inspect StructField.metadata for mask or maskingPolicy keys. https://docs.databricks.com/aws/en/dlt-ref/dlt-python-ref-streaming-table?utm_source=chatgpt.com

Column masks in DLT are applied at the table definition level via SQL’s MASK clause, not via Spark schema metadata. https://docs.azure.cn/en-us/databricks/dlt/sql-ref?utm_source=chatgpt.com

Use a SQL-DDL string for your schema
Pass a single string to the schema parameter that embeds the MASK expression inline, e.g.:

import dlt

dlt.create_streaming_table(
name="account",
schema="""
account_id STRING,
email STRING,
ssn STRING
MASK my_catalog.my_schema.ssn_mask_fn()
COMMENT 'SSN masked for privacy'
""",
comment="Masked account stream",
path="/mnt/dlt/account",
partition_cols=["account_id"]
)

Here, ssn gets masked by the UDF ssn_mask_fn() every time it’s read.

 

LR

View solution in original post

2 REPLIES 2

LRALVA
Honored Contributor

@NamNguyenCypher 
Delta Live Tables’ Python API does not currently honor column-mask metadata embedded in a PySpark StructType. Masking (and row filters) on DLT tables are only applied when you define your table with a DDL-style schema that includes a MASK clause (or via SQL).

Why your StructField(... metadata={"mask": "mask_all"}) isn’t working
The Python create_streaming_table(..., schema=StructType) call will publish the schema (data types, comments, nullability), but it does not inspect StructField.metadata for mask or maskingPolicy keys. https://docs.databricks.com/aws/en/dlt-ref/dlt-python-ref-streaming-table?utm_source=chatgpt.com

Column masks in DLT are applied at the table definition level via SQL’s MASK clause, not via Spark schema metadata. https://docs.azure.cn/en-us/databricks/dlt/sql-ref?utm_source=chatgpt.com

Use a SQL-DDL string for your schema
Pass a single string to the schema parameter that embeds the MASK expression inline, e.g.:

import dlt

dlt.create_streaming_table(
name="account",
schema="""
account_id STRING,
email STRING,
ssn STRING
MASK my_catalog.my_schema.ssn_mask_fn()
COMMENT 'SSN masked for privacy'
""",
comment="Masked account stream",
path="/mnt/dlt/account",
partition_cols=["account_id"]
)

Here, ssn gets masked by the UDF ssn_mask_fn() every time it’s read.

 

LR

Thanks that was very quick. I'll try this in the morning and revert.

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now