DELTA_FEATURES_REQUIRE_MANUAL_ENABLEMENT DLT Streaming Table as Variant

luketl2
Contributor

I am attempting to ingest csv files from an S3 bucket with Autoloader. Since the schema of the data is inconsistent (each csv may have different headers), I was hoping to ingest the data as Variant following this: https://docs.databricks.com/aws/en/ingestion/variant#-ingest-data-from-cloud-object-storage-as-varia.... However, when trying to create a streaming table, I always get the error in the screenshot despite passing table_properties={"delta.feature.variantType-preview": "supported"} as an argument to the flow creating my table. Does the table_properties argument not work with DLT tables for this particular property?

pradeep_singh
Contributor III

The VARIANT type for CSV requires Databricks Runtime 16.4+ . can you check whats the runtime your DLT channel is running on . 

Thank You
Pradeep Singh - https://www.linkedin.com/in/dbxdev

I am on 16.4.15. Prior to attempting to ingest all as variant from csv, I was attempting to turn the csv cols into a map object and then using to_variant_object() to convert to variant, which also faced the same issue.

pradeep_singh
Contributor III

Can you also share the exact code you are running to ingest 

Thank You
Pradeep Singh - https://www.linkedin.com/in/dbxdev

from pyspark import pipelines as dp
import pyspark.sql.functions as F
from utils import col_naming

 

# groups to ingest
read_groups = ['a', 'b', 'c']

 

for group in read_groups:
    @dp.table(name=f"`{group}`")
    def bronze(
        table_properties={"delta.feature.variantType-preview": "supported"}
    `):`
        df = (
            spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "csv")
            .option("cloudFiles.inferColumnTypes", "true")
            .option("cloudFiles.includeExistingFiles", "true")
            .option("header", "true")
            .option("singleVariantColumn", "variant_column")
            .load(f"s3://bucket_1/{group}/*/*/*/*/*/detail/*.csv")
            .withColumn(
                "name",
                F.regexp_extract(
                    F.col("_metadata.file_path"),
                    f"/{group}/" + r"[^/]+/([^/]+)",
                    1
                )
            )
            .withColumn(
                "package",
                F.regexp_extract(
                    F.col("_metadata.file_path"),
                    f"/{group}/" + r"([^/]+)/",
                    1
                )
            )
        )
        return df

pradeep_singh
Contributor III

Can you also check with your account admin if this feature is enabled at the workspace level . Since this is still in public preview they might have to enable it . 
https://docs.databricks.com/aws/en/delta/variant

Thank You
Pradeep Singh - https://www.linkedin.com/in/dbxdev

luketl2
Contributor

I think I found the issue... I put the table_properties in the wrong place. It goes in the decorator args not the query_function args. My bad 😕

View solution in original post