cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

CSV Ingestion using Autoloader with single variant column

Fox19
New Contributor III

I've been working on ingesting csv files with varying schemas using Autoloader. Goal is to take the csvs and ingest them into a bronze table that writes each record as a key-value mapping with only the relevant fields for that record. I also want to maintain the data types of the fields as they are inferred by Autoloader. At first I was ingesting to a wide table, then converting each row to a list of structs, filtering out nulls, converting to a <STRING, STRING> map, and then using to_variant_object to convert to variant. However, the need for type homogeneity on the map values defeated the purpose of saving each record as variant. I recently was informed that the option outlined here: https://learn.microsoft.com/en-us/azure/databricks/ingestion/variant#-ingest-data-from-cloud-object-... could be applied to csvs. I tried it out, and while it did save each record as a variant type key-value mapping, it seems that it took the set of all fields across all ingested csvs and populated a key for every single field for every single record, resulting in the vast majority of values being NULL and causing processing to be super slow. I don't get why it functions this way... the point of variant is to be able to store semi-structured data, right? Here is the code I used, any ideas?

from pyspark import pipelines as dp
import pyspark.sql.functions as F
from utils import col_naming

# groups to ingest
gropus = ['a', 'b', 'c']

for group in groups:
    @dp.table(
        name=f"`{group}`",
        table_properties={"delta.feature.variantType-preview": "supported"},
        comment=f"Raw {group} detail ingested from storage",
    )
    def bronze():
        df = (
            spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "csv")
            .option("cloudFiles.inferColumnTypes", "true")
            .option("cloudFiles.includeExistingFiles", "true")
            .option("header", "true")
            .option("singleVariantColumn", "signals")
            .load(f"s3://buck/{group}/*/*/*/*/*/Detail/*.csv")
        )
        return df
5 REPLIES 5

pradeep_singh
New Contributor II

What if instead of going the route of using variant you could 

  1. Ingest everything in csv as string, leaving column-type inference 
  2. Add new columns automatically by enabling schema evolution and using addNewColumns as schema evolution mode 
  3. handle schema enforcement/mapping in silver based on what you see in bronze table . 

Do you feel you are getting any benefit out of using variant here ?

 

Fox19
New Contributor III

It's a good question.

The benefit of variant here is that, while we will enforce schemas in silver, there will be many use-cases where users are looking to query columns that we do not have in silver. The reason is that new columns are added by users somewhat on the fly. They create a configuration file that collects sensor measurements from a variety of available sensors and the sensor data are what come through autoloader. If they have a new reading that they added to the config that we haven't explicitly unpacked in a silver table, they will need to query bronze directly, in which case variant is much more user-friendly, especially since these users will not be experienced in Databricks. Conversely, with an ever-expanding wide table format, the vast majority of columns will be NULL for a given row which will make the user experience more confusing.

Fox19
New Contributor III

If anyone has any documentation specific to this option with csv that would be awesome.

pradeep_singh
New Contributor II

If i understand the problem correctly you are getting extra keys for records from files where the keys actually dont exist . I was not able to reproduce this issue . I am getting diffrent keys , value pairs and no extra keys with null. 

pradeep_singh_0-1769187539508.png

pradeep_singh_1-1769187553153.png

Can you share similar screenshot of your rows . I am also using Serverless compute . I hope you have the same. 

Fox19
New Contributor III

Thanks for this, it helped me find the real issue. The data WAS bringing in the correct keys. I was building three streaming tables (group a, b, c), so I was using a loop as described here: https://spark.apache.org/docs/latest/declarative-pipelines-programming-guide.html#creating-tables-in.... However, I forgot a critical piece: passing the group into the query function definition. Since the for loop just creates the definitions of the tables, when it came to actually stream the data, group was pointing to "c" for all 3 refreshes, so all 3 tables were streaming in group c data. When I spot-checked a csv file against the table data, I saw the schema mismatch and assumed it was a checkpointing/evolution issue, but it was not.