cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

DELTA_FEATURES_REQUIRE_MANUAL_ENABLEMENT DLT Streaming Table as Variant

Fox19
New Contributor III

I am attempting to ingest csv files from an S3 bucket with Autoloader. Since the schema of the data is inconsistent (each csv may have different headers), I was hoping to ingest the data as Variant following this: https://docs.databricks.com/aws/en/ingestion/variant#-ingest-data-from-cloud-object-storage-as-varia.... However, when trying to create a streaming table, I always get the error in the screenshot despite passing table_properties={"delta.feature.variantType-preview": "supported"} as an argument to the flow creating my table. Does the table_properties argument not work with DLT tables for this particular property?

1 ACCEPTED SOLUTION

Accepted Solutions

Fox19
New Contributor III

I think I found the issue... I put the table_properties in the wrong place. It goes in the decorator args not the query_function args. My bad ๐Ÿ˜•

View solution in original post

6 REPLIES 6

pradeep_singh
New Contributor II

The VARIANT type for CSV requires Databricks Runtime 16.4+ . can you check whats the runtime your DLT channel is running on . 

Fox19
New Contributor III

I am on 16.4.15. Prior to attempting to ingest all as variant from csv, I was attempting to turn the csv cols into a map object and then using to_variant_object() to convert to variant, which also faced the same issue.

pradeep_singh
New Contributor II

Can you also share the exact code you are running to ingest 

Fox19
New Contributor III
from pyspark import pipelines as dp
import pyspark.sql.functions as F
from utils import col_naming

 

# groups to ingest
read_groups = ['a', 'b', 'c']

 

for group in read_groups:
    @dp.table(name=f"`{group}`")
    def bronze(
        table_properties={"delta.feature.variantType-preview": "supported"}
    `):`
        df = (
            spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "csv")
            .option("cloudFiles.inferColumnTypes", "true")
            .option("cloudFiles.includeExistingFiles", "true")
            .option("header", "true")
            .option("singleVariantColumn", "variant_column")
            .load(f"s3://bucket_1/{group}/*/*/*/*/*/detail/*.csv")
            .withColumn(
                "name",
                F.regexp_extract(
                    F.col("_metadata.file_path"),
                    f"/{group}/" + r"[^/]+/([^/]+)",
                    1
                )
            )
            .withColumn(
                "package",
                F.regexp_extract(
                    F.col("_metadata.file_path"),
                    f"/{group}/" + r"([^/]+)/",
                    1
                )
            )
        )
        return df

pradeep_singh
New Contributor II

Can you also check with your account admin if this feature is enabled at the workspace level . Since this is still in public preview they might have to enable it . 
https://docs.databricks.com/aws/en/delta/variant

Fox19
New Contributor III

I think I found the issue... I put the table_properties in the wrong place. It goes in the decorator args not the query_function args. My bad ๐Ÿ˜•