I've been working on ingesting csv files with varying schemas using Autoloader. Goal is to take the csvs and ingest them into a bronze table that writes each record as a key-value mapping with only the relevant fields for that record. I also want to maintain the data types of the fields as they are inferred by Autoloader. At first I was ingesting to a wide table, then converting each row to a list of structs, filtering out nulls, converting to a <STRING, STRING> map, and then using to_variant_object to convert to variant. However, the need for type homogeneity on the map values defeated the purpose of saving each record as variant. I recently was informed that the option outlined here: https://learn.microsoft.com/en-us/azure/databricks/ingestion/variant#-ingest-data-from-cloud-object-... could be applied to csvs. I tried it out, and while it did save each record as a variant type key-value mapping, it seems that it took the set of all fields across all ingested csvs and populated a key for every single field for every single record, resulting in the vast majority of values being NULL and causing processing to be super slow. I don't get why it functions this way... the point of variant is to be able to store semi-structured data, right? Here is the code I used, any ideas?
from pyspark import pipelines as dp
import pyspark.sql.functions as F
from utils import col_naming
# groups to ingest
gropus = ['a', 'b', 'c']
for group in groups:
@dp.table(
name=f"`{group}`",
table_properties={"delta.feature.variantType-preview": "supported"},
comment=f"Raw {group} detail ingested from storage",
)
def bronze():
df = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.includeExistingFiles", "true")
.option("header", "true")
.option("singleVariantColumn", "signals")
.load(f"s3://buck/{group}/*/*/*/*/*/Detail/*.csv")
)
return df