cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Read from multiple sources in a single stream

SrihariB
New Contributor

Hey all, 

I am trying to read data from multiple s3 locations using a single stream DLT pipeline and loading data into a single target. Here is the scenario. 

S3 Locations: Below are my s3 raw locations with change in the directory names at the end. Base location remains the same. Data is continuously streaming as parquet files into these locations using a CDC tool. 

  1. s3://workspace/data/A100_WH
  2. s3://workspace/data/A200_WH
  3. s3://workspace/data/A300_WH

Data in all the above locations have the same schema. Now, I am trying to read all these locations and load to a single target table named raw_WH using DLT framework. The issue I notice is that when I run my dlt pipeline, it doesn't read data from all the S3 locations. So, trying to understand how to ensure that the DLT pipeline will read from all the s3 locations without missing any.

 

source_table = 'WH'

# I am using a search pattern into load_path so that all the directrories (A100_WH, A200_WH, A300_WH) are read. 
load_path = f"s3://workspace/data/*_{source_table}" 
table_name = f"{source_table}"
auto_loader_settings = {
        "cloudFiles.format": "parquet",
        "cloudFiles.maxFilesPerTrigger": "10000",
        "cloudFiles.inferColumnTypes": "true",
        "cloudFiles.useIncrementalListing": "true",
        "cloudFiles.schemaEvolutionMode": "addNewColumns",
        "cloudFiles.schemaLocation": f"{SCHEMA_PATH}/{source_schema}_{source_table}",
        "mergeSchema": "true"
        }

  #Start the load of raw table for job
  .table(
    name=f"raw_{source_table}",
    path=f"{RAW_TABLE_PATH}/{source_schema}_{source_table}",
    table_properties={
      "mergeSchema": "true",
      "spark.databricks.delta.schema.autoMerge.enabled": "true"
    }
  )
  def readRawSource():
    raw_df = (
      spark.readStream.format("cloudfiles")
      .options(**auto_loader_settings)
      .load(load_path)
    )
    
    return raw_df

# With this, I expect my target table raw_WH to have the data Appended from all the S3 buckets with out missing any source locations.  

 

1 REPLY 1

mark_ott
Databricks Employee
Databricks Employee

You are using Databricks Autoloader (cloudFiles) within a Delta Live Tables (DLT) pipeline to ingest streaming Parquet data from multiple S3 directories with a wildcard pattern, and you want to ensure all matching directories’ data is included in a single target Delta table. However, you are noticing not all directories are being picked up as expected.

Here’s what you should know and steps to troubleshoot or improve your DLT ingestion:

Key Points about Autoloader Patterns

  • The pattern s3://workspace/data/*_{source_table} should match the example directories you provided (i.e., A100_WH, A200_WH, A300_WH), as each name ends with _WH.

  • Autoloader discovers and watches files present in all matching directories, but its directory scan is subject to S3’s eventual consistency.

  • The cloudFiles source expects all files in the specified pattern to have the same schema (which you’ve confirmed).

Common Issues and Recommendations

1. Eventual Consistency in S3

  • S3 may delay reflecting newly uploaded files. This can cause files/directories to be missed temporarily. Usually, Autoloader will pick these up in subsequent lists, but for high-frequency CDC loads and/or new directories, this is the most common cause.

2. Pattern and Directory Matching

  • Double-check that the wildcard pattern indeed matches all intended directories.

    • *_{source_table} matches any folder ending with _WH. If you have, for example, A100_WH/, A101_OTHER/, only the former is picked up. List your S3 bucket contents to verify the actual directory names and that they are all picked up by the pattern (e.g., via AWS CLI: aws s3 ls s3://workspace/data/).

3. Autoloader File Discovery Settings

  • cloudFiles.useIncrementalListing improves performance for large directory counts, but is only available in Databricks Runtime 9.1+ and may have limitations on new folder discovery if the pattern is not optimal.

  • The cloudFiles.maxFilesPerTrigger (set to 10,000) affects throughput but not discovery.

4. Schema Location and Schema Evolution

  • Ensure your cloudFiles.schemaLocation is not on S3 but a DBFS/ADLS location with sticky write permissions. S3 can be unreliable for this purpose due to eventual consistency.

5. Permissions

  • The IAM role assumed by Databricks must have List and Read permissions for s3://workspace/data/ and all sub-folders.

6. File Placement

  • Autoloader recursively discovers new files, but only if they are in subfolders that currently match the search pattern. If a new directory that matches the wildcard is created after the stream starts, it may not be picked up by all modes.

Best Practices

- Use Recursive Wildcard:

For deeper or more nested folder structures, the double star wildcard ** ensures all subdirectories are included:

python
load_path = f"s3://workspace/data/**_{source_table}"

This catches workspace/data/A100_WH/, workspace/data/2025/A100_WH/, etc..

- Restart Pipeline Periodically

For S3’s eventual consistency issues, restarting or scheduling regular refreshes ensures new directories/folders are picked up in the listing.

- Monitor Autoloader Metrics

Review the logs and metrics in the Databricks UI, including “files discovered,” “files processed,” etc., to spot if certain directories are skipped or lag behind.

- Upgrade to Latest Databricks Runtime

Recent Runtime versions improve new directory detection and offer better S3 support for incremental discovery.

Example Revised Load Path

python
load_path = "s3://workspace/data/*_WH" # Or, for multi-level folder structures: # load_path = "s3://workspace/data/**_WH"

Troubleshooting Checklist

  • Use the dbutils.fs.ls("s3://workspace/data/") command and verify that all directories exist and match the pattern.

  • Check your pipeline logs for warnings or skipped files/folders.

  • Ensure IAM has ListBucket and GetObject permissions for all buckets involved.

  • Test your pattern with a Spark static read to see detected files:

    python
    spark.read.format("parquet").load(load_path).show()

References

  • [Databricks Autoloader S3 Patterns & Configuration Documentation]

  • [Databricks Troubleshooting Autoloader Missed Files]

  • [Databricks Incremental Listing & Directory Discovery]


Summary

The wildcard pattern is generally correct, but S3 eventual consistency, Autoloader settings, and new directory detection are the likely issues. Consider using a recursive wildcard, validating your folder structure and permissions, and keeping your Databricks Runtime up to date for best results.