cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Re-Ingest Autoloader files foreachbatch

PaoloF
New Contributor II

Hi all,
I'm using autoloader to ingest files, each files contains changed data from a table and I merge it into a delta table. It works fine.
But if i want re-ingest all the files (deleting the checkpoint location, at example) i need to re-ingest the file in order and follow the sequence, other wise the merge operation create wrong result.
For achieve this goal i have followed this strategy:
- add a column (_metadata.file_path) in the readstream
- the trigger is always AvailableNow= true
- the foreachbatch function contains this logic: extract from the batch_df the distinct file_path and then iterate them in order, to apply the merge opration.
It's seems that extract the distinct file path return always a NoneType and then fail when start the iteration

Any idea how can I solve this problem?

Thanks

def process_batch(microBatchOutputDF, batchId):
if not microBatchOutputDF.isEmpty():
#df_with_file = microBatchOutputDF.filter(F.col("_source_file_path")).isNotNull()
#get distincts files in this batch
files = microBatchOutputDF.select(F.col("_source_file_path")).distinct().collect()
if len(files) > 0:
for file in files.sort():
filename = file["_source_file_path"]
print(f"Processing file: {filename}")
#filter recodrs for this specif file
file_df = microBatchOutputDF.filter(f"(_source_file_path='{filename}')")
process_single_file(file_df, filename, batchId)

1 ACCEPTED SOLUTION

Accepted Solutions

BigRoux
Databricks Employee
Databricks Employee

The issue with your Autoloader setup and foreachBatch approach is likely related to how you're handling the file path metadata. Here's something to try:

File Path Handling in Autoloader

When using Databricks Autoloader, the file path isn't automatically included as a column in your DataFrame. You need to explicitly capture it using one of these approaches:

1. Use the `includeFileName` option:
```python
df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv") # or your specific format
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.includeFileName", "true") # This adds the file path as metadata
.load(input_path)
)
```

2. Or explicitly add it using `input_file_name()`:
```python
df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.includeExistingFiles", "true")
.load(input_path)
.withColumn("_source_file_path", F.input_file_name())
)
```

Fixing the NoneType Issue

The `distinct().collect()` returning NoneType likely means either:
- The column doesn't exist
- The column contains null values
- The DataFrame is empty

Here's a robust approach for your `foreachBatch` function:

```python
def process_batch(microBatchOutputDF, batchId):
# First check if the batch has data
if microBatchOutputDF.isEmpty():
print(f"Batch {batchId} is empty")
return

# Debug: Print schema to verify column exists
microBatchOutputDF.printSchema()

# Get distinct file paths, handling potential nulls
files_df = (
microBatchOutputDF
.select("_source_file_path") # Or use "_metadata.file_path" if that's your column
.dropna()
.distinct()
)

# Convert to Python list safely
files = [row["_source_file_path"] for row in files_df.collect()]

if not files:
print(f"No valid file paths found in batch {batchId}")
return

# Process files in sorted order
for filename in sorted(files):
print(f"Processing file: {filename}")
file_df = microBatchOutputDF.filter(F.col("_source_file_path") == filename)
process_single_file(file_df, filename, batchId)
```

Additional Considerations

- Schema Inference: For CSV/JSON files, consider using `.option("cloudFiles.schemaLocation", schema_location_path)` to maintain consistent schema across batches.

- Checkpointing: Ensure you have proper checkpointing with `.option("checkpointLocation", checkpoint_path)` in your stream definition.

- Error Handling: Add try/except blocks in your processing logic to handle file-specific failures gracefully.

- Traceability: Consider writing the `_source_file_path` to your target table for data lineage tracking.

- Performance: If processing many files per batch, consider using a thread pool to parallelize the file-level processing.

View solution in original post

3 REPLIES 3

BigRoux
Databricks Employee
Databricks Employee

The issue with your Autoloader setup and foreachBatch approach is likely related to how you're handling the file path metadata. Here's something to try:

File Path Handling in Autoloader

When using Databricks Autoloader, the file path isn't automatically included as a column in your DataFrame. You need to explicitly capture it using one of these approaches:

1. Use the `includeFileName` option:
```python
df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv") # or your specific format
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.includeFileName", "true") # This adds the file path as metadata
.load(input_path)
)
```

2. Or explicitly add it using `input_file_name()`:
```python
df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.includeExistingFiles", "true")
.load(input_path)
.withColumn("_source_file_path", F.input_file_name())
)
```

Fixing the NoneType Issue

The `distinct().collect()` returning NoneType likely means either:
- The column doesn't exist
- The column contains null values
- The DataFrame is empty

Here's a robust approach for your `foreachBatch` function:

```python
def process_batch(microBatchOutputDF, batchId):
# First check if the batch has data
if microBatchOutputDF.isEmpty():
print(f"Batch {batchId} is empty")
return

# Debug: Print schema to verify column exists
microBatchOutputDF.printSchema()

# Get distinct file paths, handling potential nulls
files_df = (
microBatchOutputDF
.select("_source_file_path") # Or use "_metadata.file_path" if that's your column
.dropna()
.distinct()
)

# Convert to Python list safely
files = [row["_source_file_path"] for row in files_df.collect()]

if not files:
print(f"No valid file paths found in batch {batchId}")
return

# Process files in sorted order
for filename in sorted(files):
print(f"Processing file: {filename}")
file_df = microBatchOutputDF.filter(F.col("_source_file_path") == filename)
process_single_file(file_df, filename, batchId)
```

Additional Considerations

- Schema Inference: For CSV/JSON files, consider using `.option("cloudFiles.schemaLocation", schema_location_path)` to maintain consistent schema across batches.

- Checkpointing: Ensure you have proper checkpointing with `.option("checkpointLocation", checkpoint_path)` in your stream definition.

- Error Handling: Add try/except blocks in your processing logic to handle file-specific failures gracefully.

- Traceability: Consider writing the `_source_file_path` to your target table for data lineage tracking.

- Performance: If processing many files per batch, consider using a thread pool to parallelize the file-level processing.

PaoloF
New Contributor II

Thank you, very much @BigRoux !!
Now it works as expected.

only some comments:
- the option .option("cloudFiles.includeFileName", "true")  seems not working, exists this option?
- the option 'input_file_name()' is not working in Unity Catalog
so I have maintened my first solution:
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.option("cloudFiles.includeExistingFiles", "true")
.option("mergeSchema", "true")
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.option("cloudFiles.schemaLocation", checkpoint_location)
.load(source_directory)
.selectExpr("*", "_metadata.file_path as _source_file_path")

Probably i did some mistake in the function (i'm not a python expert)...
i think the dropdn() is the key in 

files_df = (
microBatchOutputDF
.select("_source_file_path") # Or use "_metadata.file_path" if that's your column
.dropna()
.distinct()
)

Thanks!!

 

BigRoux
Databricks Employee
Databricks Employee

Glad to help!

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now