cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

build autoloader pyspark job

seefoods
Contributor

Hello Guys, 

I have build an ETL in pyspark which use autolaoder. So i want to know what is best way to use autoader databricks? 
What is the best way to vaccum checkpoint files on /Volumes ? 

Hope to have your ideas about that. 


Cordially , 

1 ACCEPTED SOLUTION

Accepted Solutions

intuz
New Contributor II

Hi there,

Great to hear you're using Autoloader in PySpark for your ETL pipeline!

Here are some best practices:

Best way to use Autoloader in Databricks:

  • Use cloudFiles format: This gives you scalable and incremental file ingestion.

spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.load("dbfs:/mnt/yourpath")

  • Use schema evolution when files have changing structure: 

.option("cloudFiles.schemaEvolutionMode", "addNewColumns")

  • Set up checkpointing correctly:
    • Store checkpoints in DBFS or Volumes, e.g. /Volumes/your_catalog/your_schema/checkpoints/
    • Example: 

.writeStream \
.option("checkpointLocation", "/Volumes/my_catalog/my_schema/checkpoints/") \

Best way to vacuum checkpoint files:

Checkpoints shouldn't be manually deleted often — they're used to track file processing. But if you really need to clean up old files:

  1. Use Delta VACUUM on your output data: ```VACUUM '/Volumes/my_catalog/my_schema/output_table' RETAIN 168 HOURS;```
  2. For cleaning up /Volumes checkpoint folders (not recommended unless you're starting fresh), you can:
    • Stop the stream.
    • Delete old checkpoint folders carefully using ```%fs rm -r```.

Be careful — deleting checkpoint folders means the stream may reprocess old data.


Hope this helps!

View solution in original post

3 REPLIES 3

lingareddy_Alva
Honored Contributor II

Hi @seefoods 

Best Practices for Using Autoloader
1. Production Configuration
- Checkpoint Location: Avoid placing checkpoints in locations with cloud object lifecycle policies, as these can corrupt stream state.
- Use Unity Catalog Volumes: Since you're using /Volumes, ensure consistent access patterns and permissions
- Resource Sizing: Use clusters with auto-scaling (1-4 workers, 8 cores each) and drivers with 8-32 cores for optimal performance.

2. Code Structure Best Practices

# Example structure for production Autoloader
def create_autoloader_stream():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json") # or your format
.option("cloudFiles.schemaLocation", f"{checkpoint_path}/schema")
.option("cloudFiles.useNotifications", "true") # for better performance
.option("cloudFiles.maxFilesPerTrigger", 1000) # control batch size
.option("cloudFiles.validateOptions", "true")
.load(source_path)
)

# Write with proper checkpointing
(autoloader_df.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", checkpoint_path)
.option("mergeSchema", "true") # handle schema evolution
.trigger(availableNow=True) # or processingTime="5 minutes"
.table("your_target_table")
)

3. Performance Optimization
1. Use cloudFiles.useNotifications=true for better performance with large datasets
2. Set appropriate maxFilesPerTrigger to control batch sizes
3. Consider availableNow=True trigger for micro-batch processing
4. Enable schema evolution with mergeSchema=true if needed

Checkpoint File Management on /Volumes
1. Understanding Checkpoint Structure
Autoloader checkpoints contain:
- Stream metadata (offsets, committed batches)
- Schema information
- File state tracking

2. Cleanup Strategies
Important: Never manually delete or modify checkpoint files while streams are running

3. Monitoring and Maintenance

4. Best Practices for /Volumes
- Organize by Environment: /Volumes/catalog/schema/volume/env/app/checkpoints/
- Use Descriptive Names: Include stream name, source, and version
- Set Up Monitoring: Regular health checks on checkpoint sizes
- Backup Critical Checkpoints: For mission-critical streams, consider periodic backups


The key is balancing performance with maintainability. Autoloader automatically handles file state management and
prevents duplication, but proper checkpoint management ensures your ETL remains efficient and recoverable.

 

LR

intuz
New Contributor II

Hi there,

Great to hear you're using Autoloader in PySpark for your ETL pipeline!

Here are some best practices:

Best way to use Autoloader in Databricks:

  • Use cloudFiles format: This gives you scalable and incremental file ingestion.

spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.load("dbfs:/mnt/yourpath")

  • Use schema evolution when files have changing structure: 

.option("cloudFiles.schemaEvolutionMode", "addNewColumns")

  • Set up checkpointing correctly:
    • Store checkpoints in DBFS or Volumes, e.g. /Volumes/your_catalog/your_schema/checkpoints/
    • Example: 

.writeStream \
.option("checkpointLocation", "/Volumes/my_catalog/my_schema/checkpoints/") \

Best way to vacuum checkpoint files:

Checkpoints shouldn't be manually deleted often — they're used to track file processing. But if you really need to clean up old files:

  1. Use Delta VACUUM on your output data: ```VACUUM '/Volumes/my_catalog/my_schema/output_table' RETAIN 168 HOURS;```
  2. For cleaning up /Volumes checkpoint folders (not recommended unless you're starting fresh), you can:
    • Stop the stream.
    • Delete old checkpoint folders carefully using ```%fs rm -r```.

Be careful — deleting checkpoint folders means the stream may reprocess old data.


Hope this helps!

seefoods
Contributor

Hello @intuz , 

Thanks for your reply. 

Cordially 

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now