cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Read from multiple sources in a single stream

SrihariB
New Contributor

Hey all, 

I am trying to read data from multiple s3 locations using a single stream DLT pipeline and loading data into a single target. Here is the scenario. 

S3 Locations: Below are my s3 raw locations with change in the directory names at the end. Base location remains the same. Data is continuously streaming as parquet files into these locations using a CDC tool. 

  1. s3://workspace/data/A100_WH
  2. s3://workspace/data/A200_WH
  3. s3://workspace/data/A300_WH

Data in all the above locations have the same schema. Now, I am trying to read all these locations and load to a single target table named raw_WH using DLT framework. The issue I notice is that when I run my dlt pipeline, it doesn't read data from all the S3 locations. So, trying to understand how to ensure that the DLT pipeline will read from all the s3 locations without missing any.

 

source_table = 'WH'

# I am using a search pattern into load_path so that all the directrories (A100_WH, A200_WH, A300_WH) are read. 
load_path = f"s3://workspace/data/*_{source_table}" 
table_name = f"{source_table}"
auto_loader_settings = {
        "cloudFiles.format": "parquet",
        "cloudFiles.maxFilesPerTrigger": "10000",
        "cloudFiles.inferColumnTypes": "true",
        "cloudFiles.useIncrementalListing": "true",
        "cloudFiles.schemaEvolutionMode": "addNewColumns",
        "cloudFiles.schemaLocation": f"{SCHEMA_PATH}/{source_schema}_{source_table}",
        "mergeSchema": "true"
        }

  #Start the load of raw table for job
  .table(
    name=f"raw_{source_table}",
    path=f"{RAW_TABLE_PATH}/{source_schema}_{source_table}",
    table_properties={
      "mergeSchema": "true",
      "spark.databricks.delta.schema.autoMerge.enabled": "true"
    }
  )
  def readRawSource():
    raw_df = (
      spark.readStream.format("cloudfiles")
      .options(**auto_loader_settings)
      .load(load_path)
    )
    
    return raw_df

# With this, I expect my target table raw_WH to have the data Appended from all the S3 buckets with out missing any source locations.  

 

0 REPLIES 0

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now