Read from multiple sources in a single stream
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-25-2025 02:59 PM - edited 02-25-2025 03:00 PM
Hey all,
I am trying to read data from multiple s3 locations using a single stream DLT pipeline and loading data into a single target. Here is the scenario.
S3 Locations: Below are my s3 raw locations with change in the directory names at the end. Base location remains the same. Data is continuously streaming as parquet files into these locations using a CDC tool.
- s3://workspace/data/A100_WH
- s3://workspace/data/A200_WH
- s3://workspace/data/A300_WH
Data in all the above locations have the same schema. Now, I am trying to read all these locations and load to a single target table named raw_WH using DLT framework. The issue I notice is that when I run my dlt pipeline, it doesn't read data from all the S3 locations. So, trying to understand how to ensure that the DLT pipeline will read from all the s3 locations without missing any.
source_table = 'WH'
# I am using a search pattern into load_path so that all the directrories (A100_WH, A200_WH, A300_WH) are read.
load_path = f"s3://workspace/data/*_{source_table}"
table_name = f"{source_table}"
auto_loader_settings = {
"cloudFiles.format": "parquet",
"cloudFiles.maxFilesPerTrigger": "10000",
"cloudFiles.inferColumnTypes": "true",
"cloudFiles.useIncrementalListing": "true",
"cloudFiles.schemaEvolutionMode": "addNewColumns",
"cloudFiles.schemaLocation": f"{SCHEMA_PATH}/{source_schema}_{source_table}",
"mergeSchema": "true"
}
#Start the load of raw table for job
.table(
name=f"raw_{source_table}",
path=f"{RAW_TABLE_PATH}/{source_schema}_{source_table}",
table_properties={
"mergeSchema": "true",
"spark.databricks.delta.schema.autoMerge.enabled": "true"
}
)
def readRawSource():
raw_df = (
spark.readStream.format("cloudfiles")
.options(**auto_loader_settings)
.load(load_path)
)
return raw_df
# With this, I expect my target table raw_WH to have the data Appended from all the S3 buckets with out missing any source locations.
- Labels:
-
Delta Lake
-
Spark
-
Workflows

