10-01-2024 01:59 AM
Hi all,
I have a fairly straight-forward task whereby I am looking to ingest six .csv file all with different names, schema's and blob locations into individual tables on one bronze schema. I have the files in my landing zone under different folders but have the code consolidated in one notebook with each cell pointing to the ingest code using 'cloudfiles' at the different file locations and loading into an appropriate table name.
The issue I am facing is that when I run the pipeline the tables are created with the correct names but the schema's are all the same(?) Does anyone have any suggestions where It could be going wrong? It seems like the most straight-forward task or am I missing some limitation? Any thoughts appreciated.
10-01-2024 03:03 AM
have you enabled schema inference while reading the csv files?
10-01-2024 03:19 AM
I have inferred the schema werners, yes.
10-01-2024 03:24 AM
it could be that all the data is read instead of only a single subfolder.
can you share some code perhaps?
10-01-2024 04:53 AM - edited 10-01-2024 04:55 AM
The code follows similar pattern below to load the different tables.
import dlt
import re
import pyspark.sql.functions as F
landing_zone = '/Volumes/bronze_dev/landing_zone/'
source = 'addresses'
@Dlt.table(
comment="addresses snapshot",
name="addresses"
)
def addresses(table_properties={"quality": "bronze"}):
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.inferColumnTypes", True)
.option("header", True)
.option("quoted", True)
.option("quote", "\"")
.load(f"{landing_zone}{source}")
.select(
"*",
F.current_timestamp().alias("processing_time"),
F.col("_metadata.file_name").alias("source_file")
)
)
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group