cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Delta live tables multiple .csv diff schemas

Frustrated_DE
New Contributor III

Hi all,

      I have a fairly straight-forward task whereby I am looking to ingest six .csv file all with different names, schema's and blob locations into individual tables on one bronze schema. I have the files in my landing zone under different folders but have the code consolidated in one notebook with each cell pointing to the ingest code using 'cloudfiles' at the different file locations and loading into an appropriate table name.

The issue I am facing is that when I run the pipeline the tables are created with the correct names but the schema's are all the same(?) Does anyone have any suggestions where It could be going wrong? It seems like the most straight-forward task or am I missing some limitation? Any thoughts appreciated. 

4 REPLIES 4

-werners-
Esteemed Contributor III

have you enabled schema inference while reading the csv files?

Frustrated_DE
New Contributor III

I have inferred the schema werners, yes.

-werners-
Esteemed Contributor III

it could be that all the data is read instead of only a single subfolder.

can you share some code perhaps?

Frustrated_DE
New Contributor III

The code follows similar pattern below to load the different tables.

 

import dlt
import re
import pyspark.sql.functions as F


landing_zone = '/Volumes/bronze_dev/landing_zone/'
source = 'addresses'

@Dlt.table(
comment="addresses snapshot",
name="addresses"
)

def addresses(table_properties={"quality": "bronze"}):
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.inferColumnTypes", True)
.option("header", True)
.option("quoted", True)
.option("quote", "\"")
.load(f"{landing_zone}{source}")
.select(
"*",
F.current_timestamp().alias("processing_time"),
F.col("_metadata.file_name").alias("source_file")
)
)

 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group