mtajmouati
Contributor

Hello,

Try this : 

 

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Auto Loader Schema Evolution") \
    .getOrCreate()

# Source and checkpoint paths
source_path = "s3://path"
checkpoint_path = "/path/to/checkpoint"

# Define batch processing function
def batch_operation(df, epoch_id):
    # Perform your batch operations here
    # For example, write to Delta table with schema merge
    df.write \
      .format("delta") \
      .mode("append") \
      .option("mergeSchema", "true") \
      .save("/path/to/delta/table")

# Read stream with schema evolution
df = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "parquet") \
    .option("cloudFiles.schemaLocation", checkpoint_path) \
    .option("cloudFiles.includeExistingFiles", "false") \
    .option("cloudFiles.inferColumnTypes", "true") \
    .load(source_path)

# Write stream with schema merge
query = df.writeStream \
    .format("delta") \
    .option("checkpointLocation", checkpoint_path) \
    .trigger(availableNow=True) \
    .foreachBatch(batch_operation) \
    .option("mergeSchema", "true") \
    .start()

query.awaitTermination()

  

and try Setting Retry Policies 

{
  "tasks": [
    {
      "task_key": "example-task",
      "notebook_task": {
        "notebook_path": "/path/to/your/notebook"
      },
      "max_retries": 2,
      "min_retry_interval_millis": 60000,
      "retry_on_timeout": true
    }
  ]
}
Best regards,
Mehdi Tajmouati
 mehdi.tajmouati@wytasoft.com
 06 68 23 18 42
 www.wytasoft.com