cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Autoloader configuration for multiple tables from the same directory

ChrisLawford_n1
New Contributor II
I would like to get a recommendation on how to structure ingestion of lots of tables of data. I am using autoloader currently with the directory searching mode.
I have concerns about performance in the future and have a requirement to ensure that data is processed as a complete batch.

The files dropped in storage are in the following path:
/year/month/day/hh:mm:ss/table_name/tablename_other_text.snappy.parquet

The files are dropped in batches with each table in a batch being written to the same /year/month/day/hh:mm:ss folder which corresponds to the batch timestamp for the data.
For Example:

```
.
├── 2024/
│ ├── Aug/
│ │ ├── 19/
│ │ │ ├── 07:05:40/
│ │ │ │ ├── table_1_name/
│ │ │ │ │ ├── table_1_name_other_text_0_0_0.snappy.parquet
│ │ │ │ │ ├── table_1_name_other_text_0_0_1.snappy.parquet
│ │ │ │ │ └── table_1_name_other_text_0_0_2.snappy.parquet
│ │ │ │ ├── table_2_name/
│ │ │ │ │ ├── table_2_name_other_text_0_0_0.snappy.parquet
│ │ │ │ │ ├── table_2_name_other_text_0_0_1.snappy.parquet
│ │ │ │ │ └── table_2_name_other_text_0_0_2.snappy.parquet
│ │ │ │ └── ...
│ │ │ ├── 10:07:22/
│ │ │ │ ├── table_1_name/
│ │ │ │ │ ├── table_1_name_other_text_0_0_0.snappy.parquet
│ │ │ │ │ ├── table_1_name_other_text_0_0_1.snappy.parquet
│ │ │ │ │ └── table_1_name_other_text_0_0_2.snappy.parquet
│ │ │ │ ├── table_2_name/
│ │ │ │ │ ├── table_2_name_other_text_0_0_0.snappy.parquet
│ │ │ │ │ ├── table_2_name_other_text_0_0_1.snappy.parquet
│ │ │ │ │ └── table_2_name_other_text_0_0_2.snappy.parquet
│ │ │ │ └── ...
│ │ │ └── ...
│ │ └── ...
│ └── ...
└── ...
```

The schema of the different tables are unique.

The external service that is uploading these parquet files also has a eventgrid namespace topic that it publishes events to when it has uploaded all the files for a particular batchtimestamp.
The structure of this event is:

```
{
  "Id": "ee35cf85-eac5-4dc2-b78e-4412bc4918ac",
  "EventType": "Banana.Storage.BlobCreated",
  "Subject": "banana-tree-fox-files",
  "Data": {
    "BatchDate": "2024-08-19T07:05:40",
    "TableNames": #HASH,
    "FileDirPath": "2024/Aug/19/07:05:40",
    "TableNamesArr": [
      "table_1_name",
      "table_2_name",
      "table_3_name"
    ],
    "FileEntries": [
      {
        "Filename": "table_1_name_0_0_0.snappy.parquet",
        "FileSize": "16246",
        "IsSkipped": false,
        "HasError": false
      },
      {
        "Filename": "table_1_name_0_1_0.snappy.parquet",
        "FileSize": "19532",
        "IsSkipped": false,
        "HasError": false
      },
      {
        "Filename": "table_2_name_0_0_0.snappy.parquet",
        "FileSize": "19732",
        "IsSkipped": false,
        "HasError": false
      },
    ]
  }
}
```

Currently I am using multiple instances of autoloader to read all of the different tables using the path with wildcards to filter the data to only pull in the parquets for an individual table. This is working but feels terribly inefficient.
``` python
spark.readStream.format("cloudFiles")
                        .options(**autoloader_config)
                        .load(source_path + table_name.upper() + "/" + table_name.upper() + "*." + fileformat)
                        .selectExpr(
                            "*",
                            "_metadata.file_path AS SOURCE_FILE_PATH",
                            "_metadata.file_modification_time AS LOAD_DATETIME",
                        )
                        .withColumn("BATCH_ID", parse_file_path(col("SOURCE_FILE_PATH")))    
```

The downsides of this method:
1. Each instance of autoloader is doing directory listing of thousands of files that it doesn't care about.
1. When autoloader runs it can pick up a partially written batches if a new timestamped folder has been created and parquets written for some of the tables but not all.

Options:
- have an intermediary service that is triggered by the kafka message and copies the files corresponding to the completed batch over to another blob storage account and put it in a different folder structure. probably /table_name/{timestamp}.parquet and then each autoloader only has to look at the directory that corresponds to it's table.
- Use autoloader in notification mode and create a separate queue for autoloader to look at that would only contain the blob created events corresponding to the files that have been listed in the kafka message.

I would like to know if you have any recommendations or if there is a much simplier solution using aspects of autoloader or databricks that I am unaware of.
3 REPLIES 3

-werners-
Esteemed Contributor III

your patterns with the date folders is the classic way of processing data in data lakes.
But with autoloader you can get rid of them.
Create a folder per table, f.e. /bronze/files/table1 and read that folder using autoloader.
Your filenames can be anything as long as the can be lexicographically sorted.  I use <epoch>_filename...
Autoloader keeps track of what has been processed and what not.
So like that you have one autoloader per table.
Does that make sense?

Hey,
Thanks for the swift reply, I don't have control over the way the source data is loaded into the blob store. So as I understand your answer it would be to go with the first option I listed:
have an intermediary service that is triggered by the kafka message and copies the files corresponding to the completed batch over to another blob storage account and put it in a different folder structure. probably /table_name/{timestamp}.parquet and then each autoloader only has to look at the directory that corresponds to it's table.

I have to copy the data across to a new storage container both for the efficiency gains of each autoloader and the control of when autoloader runs knowing that the container will only have full batches of data available.

You are right that autoloader will keep track of what data it has picked up but it is important for me to also know which batches it has picked up so that I know in my downstream tables that join and aggregate a lot of these source tables have all processed all of the incoming batches. 
Currently I was planning on doing this by using that "BATCH_ID" column that is the timestamp that the batch was uploaded by the source system. Basically after running autoloader on the tables I would join all those dataframes together and getting a distinct list of the "BATCH_ID" column which I can then record in another table to essentially say all these source tables have been processed up to the highest "Batch_ID" number.

Is that what you would do? or is there a way of getting that information out of Autoloader?

-werners-
Esteemed Contributor III

there is an easier way to see what has been processed:
SELECT * FROM cloud_files_state('path/to/checkpoint'
https://docs.databricks.com/en/ingestion/cloud-object-storage/auto-loader/production.html 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group