Delta live tables multiple .csv diff schemas
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-01-2024 01:59 AM
Hi all,
I have a fairly straight-forward task whereby I am looking to ingest six .csv file all with different names, schema's and blob locations into individual tables on one bronze schema. I have the files in my landing zone under different folders but have the code consolidated in one notebook with each cell pointing to the ingest code using 'cloudfiles' at the different file locations and loading into an appropriate table name.
The issue I am facing is that when I run the pipeline the tables are created with the correct names but the schema's are all the same(?) Does anyone have any suggestions where It could be going wrong? It seems like the most straight-forward task or am I missing some limitation? Any thoughts appreciated.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-01-2024 03:03 AM
have you enabled schema inference while reading the csv files?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-01-2024 03:19 AM
I have inferred the schema werners, yes.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-01-2024 03:24 AM
it could be that all the data is read instead of only a single subfolder.
can you share some code perhaps?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-01-2024 04:53 AM - edited 10-01-2024 04:55 AM
The code follows similar pattern below to load the different tables.
import dlt
import re
import pyspark.sql.functions as F
landing_zone = '/Volumes/bronze_dev/landing_zone/'
source = 'addresses'
@Dlt.table(
comment="addresses snapshot",
name="addresses"
)
def addresses(table_properties={"quality": "bronze"}):
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.inferColumnTypes", True)
.option("header", True)
.option("quoted", True)
.option("quote", "\"")
.load(f"{landing_zone}{source}")
.select(
"*",
F.current_timestamp().alias("processing_time"),
F.col("_metadata.file_name").alias("source_file")
)
)