cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

CSV Ingestion using Autoloader with single variant column

Fox19
New Contributor III

I've been working on ingesting csv files with varying schemas using Autoloader. Goal is to take the csvs and ingest them into a bronze table that writes each record as a key-value mapping with only the relevant fields for that record. I also want to maintain the data types of the fields as they are inferred by Autoloader. At first I was ingesting to a wide table, then converting each row to a list of structs, filtering out nulls, converting to a <STRING, STRING> map, and then using to_variant_object to convert to variant. However, the need for type homogeneity on the map values defeated the purpose of saving each record as variant. I recently was informed that the option outlined here: https://learn.microsoft.com/en-us/azure/databricks/ingestion/variant#-ingest-data-from-cloud-object-... could be applied to csvs. I tried it out, and while it did save each record as a variant type key-value mapping, it seems that it took the set of all fields across all ingested csvs and populated a key for every single field for every single record, resulting in the vast majority of values being NULL and causing processing to be super slow. I don't get why it functions this way... the point of variant is to be able to store semi-structured data, right? Here is the code I used, any ideas?

from pyspark import pipelines as dp
import pyspark.sql.functions as F
from utils import col_naming

# groups to ingest
gropus = ['a', 'b', 'c']

for group in groups:
    @dp.table(
        name=f"`{group}`",
        table_properties={"delta.feature.variantType-preview": "supported"},
        comment=f"Raw {group} detail ingested from storage",
    )
    def bronze():
        df = (
            spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "csv")
            .option("cloudFiles.inferColumnTypes", "true")
            .option("cloudFiles.includeExistingFiles", "true")
            .option("header", "true")
            .option("singleVariantColumn", "signals")
            .load(f"s3://buck/{group}/*/*/*/*/*/Detail/*.csv")
        )
        return df
1 REPLY 1

pradeep_singh
New Contributor II

What if instead of going the route of using variant you could 

  1. Ingest everything in csv as string, leaving column-type inference 
  2. Add new columns automatically by enabling schema evolution and using addNewColumns as schema evolution mode 
  3. handle schema enforcement/mapping in silver based on what you see in bronze table . 

Do you feel you are getting any benefit out of using variant here ?