- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
2 weeks ago
Hi all,
I'm using autoloader to ingest files, each files contains changed data from a table and I merge it into a delta table. It works fine.
But if i want re-ingest all the files (deleting the checkpoint location, at example) i need to re-ingest the file in order and follow the sequence, other wise the merge operation create wrong result.
For achieve this goal i have followed this strategy:
- add a column (_metadata.file_path) in the readstream
- the trigger is always AvailableNow= true
- the foreachbatch function contains this logic: extract from the batch_df the distinct file_path and then iterate them in order, to apply the merge opration.
It's seems that extract the distinct file path return always a NoneType and then fail when start the iteration
Any idea how can I solve this problem?
Thanks
def process_batch(microBatchOutputDF, batchId):
if not microBatchOutputDF.isEmpty():
#df_with_file = microBatchOutputDF.filter(F.col("_source_file_path")).isNotNull()
#get distincts files in this batch
files = microBatchOutputDF.select(F.col("_source_file_path")).distinct().collect()
if len(files) > 0:
for file in files.sort():
filename = file["_source_file_path"]
print(f"Processing file: {filename}")
#filter recodrs for this specif file
file_df = microBatchOutputDF.filter(f"(_source_file_path='{filename}')")
process_single_file(file_df, filename, batchId)
Accepted Solutions
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
2 weeks ago
The issue with your Autoloader setup and foreachBatch approach is likely related to how you're handling the file path metadata. Here's something to try:
File Path Handling in Autoloader
When using Databricks Autoloader, the file path isn't automatically included as a column in your DataFrame. You need to explicitly capture it using one of these approaches:
1. Use the `includeFileName` option:
```python
df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv") # or your specific format
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.includeFileName", "true") # This adds the file path as metadata
.load(input_path)
)
```
2. Or explicitly add it using `input_file_name()`:
```python
df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.includeExistingFiles", "true")
.load(input_path)
.withColumn("_source_file_path", F.input_file_name())
)
```
Fixing the NoneType Issue
The `distinct().collect()` returning NoneType likely means either:
- The column doesn't exist
- The column contains null values
- The DataFrame is empty
Here's a robust approach for your `foreachBatch` function:
```python
def process_batch(microBatchOutputDF, batchId):
# First check if the batch has data
if microBatchOutputDF.isEmpty():
print(f"Batch {batchId} is empty")
return
# Debug: Print schema to verify column exists
microBatchOutputDF.printSchema()
# Get distinct file paths, handling potential nulls
files_df = (
microBatchOutputDF
.select("_source_file_path") # Or use "_metadata.file_path" if that's your column
.dropna()
.distinct()
)
# Convert to Python list safely
files = [row["_source_file_path"] for row in files_df.collect()]
if not files:
print(f"No valid file paths found in batch {batchId}")
return
# Process files in sorted order
for filename in sorted(files):
print(f"Processing file: {filename}")
file_df = microBatchOutputDF.filter(F.col("_source_file_path") == filename)
process_single_file(file_df, filename, batchId)
```
Additional Considerations
- Schema Inference: For CSV/JSON files, consider using `.option("cloudFiles.schemaLocation", schema_location_path)` to maintain consistent schema across batches.
- Checkpointing: Ensure you have proper checkpointing with `.option("checkpointLocation", checkpoint_path)` in your stream definition.
- Error Handling: Add try/except blocks in your processing logic to handle file-specific failures gracefully.
- Traceability: Consider writing the `_source_file_path` to your target table for data lineage tracking.
- Performance: If processing many files per batch, consider using a thread pool to parallelize the file-level processing.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
2 weeks ago
The issue with your Autoloader setup and foreachBatch approach is likely related to how you're handling the file path metadata. Here's something to try:
File Path Handling in Autoloader
When using Databricks Autoloader, the file path isn't automatically included as a column in your DataFrame. You need to explicitly capture it using one of these approaches:
1. Use the `includeFileName` option:
```python
df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv") # or your specific format
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.includeFileName", "true") # This adds the file path as metadata
.load(input_path)
)
```
2. Or explicitly add it using `input_file_name()`:
```python
df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.includeExistingFiles", "true")
.load(input_path)
.withColumn("_source_file_path", F.input_file_name())
)
```
Fixing the NoneType Issue
The `distinct().collect()` returning NoneType likely means either:
- The column doesn't exist
- The column contains null values
- The DataFrame is empty
Here's a robust approach for your `foreachBatch` function:
```python
def process_batch(microBatchOutputDF, batchId):
# First check if the batch has data
if microBatchOutputDF.isEmpty():
print(f"Batch {batchId} is empty")
return
# Debug: Print schema to verify column exists
microBatchOutputDF.printSchema()
# Get distinct file paths, handling potential nulls
files_df = (
microBatchOutputDF
.select("_source_file_path") # Or use "_metadata.file_path" if that's your column
.dropna()
.distinct()
)
# Convert to Python list safely
files = [row["_source_file_path"] for row in files_df.collect()]
if not files:
print(f"No valid file paths found in batch {batchId}")
return
# Process files in sorted order
for filename in sorted(files):
print(f"Processing file: {filename}")
file_df = microBatchOutputDF.filter(F.col("_source_file_path") == filename)
process_single_file(file_df, filename, batchId)
```
Additional Considerations
- Schema Inference: For CSV/JSON files, consider using `.option("cloudFiles.schemaLocation", schema_location_path)` to maintain consistent schema across batches.
- Checkpointing: Ensure you have proper checkpointing with `.option("checkpointLocation", checkpoint_path)` in your stream definition.
- Error Handling: Add try/except blocks in your processing logic to handle file-specific failures gracefully.
- Traceability: Consider writing the `_source_file_path` to your target table for data lineage tracking.
- Performance: If processing many files per batch, consider using a thread pool to parallelize the file-level processing.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
2 weeks ago
Thank you, very much @BigRoux !!
Now it works as expected.
only some comments:
- the option .option("cloudFiles.includeFileName", "true") seems not working, exists this option?
- the option 'input_file_name()' is not working in Unity Catalog
so I have maintened my first solution:
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.option("cloudFiles.includeExistingFiles", "true")
.option("mergeSchema", "true")
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.option("cloudFiles.schemaLocation", checkpoint_location)
.load(source_directory)
.selectExpr("*", "_metadata.file_path as _source_file_path")
Probably i did some mistake in the function (i'm not a python expert)...
i think the dropdn() is the key in
files_df = (
microBatchOutputDF
.select("_source_file_path") # Or use "_metadata.file_path" if that's your column
.dropna()
.distinct()
)
Thanks!!
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
2 weeks ago
Glad to help!

