I would like to get a recommendation on how to structure ingestion of lots of tables of data. I am using autoloader currently with the directory searching mode.
I have concerns about performance in the future and have a requirement to ensure that data is processed as a complete batch.
The files dropped in storage are in the following path:
/year/month/day/hh:mm:ss/table_name/tablename_other_text.snappy.parquet
The files are dropped in batches with each table in a batch being written to the same /year/month/day/hh:mm:ss folder which corresponds to the batch timestamp for the data.
For Example:
```
.
├── 2024/
│ ├── Aug/
│ │ ├── 19/
│ │ │ ├── 07:05:40/
│ │ │ │ ├── table_1_name/
│ │ │ │ │ ├── table_1_name_other_text_0_0_0.snappy.parquet
│ │ │ │ │ ├── table_1_name_other_text_0_0_1.snappy.parquet
│ │ │ │ │ └── table_1_name_other_text_0_0_2.snappy.parquet
│ │ │ │ ├── table_2_name/
│ │ │ │ │ ├── table_2_name_other_text_0_0_0.snappy.parquet
│ │ │ │ │ ├── table_2_name_other_text_0_0_1.snappy.parquet
│ │ │ │ │ └── table_2_name_other_text_0_0_2.snappy.parquet
│ │ │ │ └── ...
│ │ │ ├── 10:07:22/
│ │ │ │ ├── table_1_name/
│ │ │ │ │ ├── table_1_name_other_text_0_0_0.snappy.parquet
│ │ │ │ │ ├── table_1_name_other_text_0_0_1.snappy.parquet
│ │ │ │ │ └── table_1_name_other_text_0_0_2.snappy.parquet
│ │ │ │ ├── table_2_name/
│ │ │ │ │ ├── table_2_name_other_text_0_0_0.snappy.parquet
│ │ │ │ │ ├── table_2_name_other_text_0_0_1.snappy.parquet
│ │ │ │ │ └── table_2_name_other_text_0_0_2.snappy.parquet
│ │ │ │ └── ...
│ │ │ └── ...
│ │ └── ...
│ └── ...
└── ...
```
The schema of the different tables are unique.
The external service that is uploading these parquet files also has a eventgrid namespace topic that it publishes events to when it has uploaded all the files for a particular batchtimestamp.
The structure of this event is:
```
{
"Id": "ee35cf85-eac5-4dc2-b78e-4412bc4918ac",
"EventType": "Banana.Storage.BlobCreated",
"Subject": "banana-tree-fox-files",
"Data": {
"BatchDate": "2024-08-19T07:05:40",
"TableNames": #HASH,
"FileDirPath": "2024/Aug/19/07:05:40",
"TableNamesArr": [
"table_1_name",
"table_2_name",
"table_3_name"
],
"FileEntries": [
{
"Filename": "table_1_name_0_0_0.snappy.parquet",
"FileSize": "16246",
"IsSkipped": false,
"HasError": false
},
{
"Filename": "table_1_name_0_1_0.snappy.parquet",
"FileSize": "19532",
"IsSkipped": false,
"HasError": false
},
{
"Filename": "table_2_name_0_0_0.snappy.parquet",
"FileSize": "19732",
"IsSkipped": false,
"HasError": false
},
]
}
}
```
Currently I am using multiple instances of autoloader to read all of the different tables using the path with wildcards to filter the data to only pull in the parquets for an individual table. This is working but feels terribly inefficient.
``` python
spark.readStream.format("cloudFiles")
.options(**autoloader_config)
.load(source_path + table_name.upper() + "/" + table_name.upper() + "*." + fileformat)
.selectExpr(
"*",
"_metadata.file_path AS SOURCE_FILE_PATH",
"_metadata.file_modification_time AS LOAD_DATETIME",
)
.withColumn("BATCH_ID", parse_file_path(col("SOURCE_FILE_PATH")))
```
The downsides of this method:
1. Each instance of autoloader is doing directory listing of thousands of files that it doesn't care about.
1. When autoloader runs it can pick up a partially written batches if a new timestamped folder has been created and parquets written for some of the tables but not all.
Options:
- have an intermediary service that is triggered by the kafka message and copies the files corresponding to the completed batch over to another blob storage account and put it in a different folder structure. probably /table_name/{timestamp}.parquet and then each autoloader only has to look at the directory that corresponds to it's table.
- Use autoloader in notification mode and create a separate queue for autoloader to look at that would only contain the blob created events corresponding to the files that have been listed in the kafka message.
I would like to know if you have any recommendations or if there is a much simplier solution using aspects of autoloader or databricks that I am unaware of.