Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-02-2024 01:26 PM
Hello,
Try this :
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder \
.appName("Auto Loader Schema Evolution") \
.getOrCreate()
# Source and checkpoint paths
source_path = "s3://path"
checkpoint_path = "/path/to/checkpoint"
# Define batch processing function
def batch_operation(df, epoch_id):
# Perform your batch operations here
# For example, write to Delta table with schema merge
df.write \
.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.save("/path/to/delta/table")
# Read stream with schema evolution
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "parquet") \
.option("cloudFiles.schemaLocation", checkpoint_path) \
.option("cloudFiles.includeExistingFiles", "false") \
.option("cloudFiles.inferColumnTypes", "true") \
.load(source_path)
# Write stream with schema merge
query = df.writeStream \
.format("delta") \
.option("checkpointLocation", checkpoint_path) \
.trigger(availableNow=True) \
.foreachBatch(batch_operation) \
.option("mergeSchema", "true") \
.start()
query.awaitTermination()
and try Setting Retry Policies
{
"tasks": [
{
"task_key": "example-task",
"notebook_task": {
"notebook_path": "/path/to/your/notebook"
},
"max_retries": 2,
"min_retry_interval_millis": 60000,
"retry_on_timeout": true
}
]
}
Best regards,
Mehdi Tajmouati
mehdi.tajmouati@wytasoft.com
06 68 23 18 42
www.wytasoft.com
Mehdi Tajmouati
mehdi.tajmouati@wytasoft.com
06 68 23 18 42
www.wytasoft.com