07-01-2024 08:38 PM - edited 07-01-2024 08:38 PM
Hi,
written in pyspark.
databricks autoloader job with retry didn't merge/update the schema.
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.option("cloudFiles.schemaLocation", checkpoint_path)
.option("cloudFiles.includeExistingFiles", "false"
.load(source_path)
.writeStream
.queryName("write stream query")
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.forEachBatch(batch_operation)
.option("mergeSchema", True)
.start()
.awaitTermination()
Error:
Error while reading file s3://path
Caused by: UnknownFieldException: [UNKNOWN_FILED_EXCEPTION.NEW_FILEDS_IN_FILE] Encountered unknown fields during parsing: filed type, which can be fixed by an automatic retry: false
tried running couple of times. set retry = 2 for job and task as well.
please can you help?
07-01-2024 11:17 PM
What happens if you enable rescue mode?
.option('cloudFiles.schemaEvolutionMode', 'rescue')
07-02-2024 06:43 AM - edited 07-02-2024 07:12 AM
rescue option won't evolve the schema.
https://docs.databricks.com/en/ingestion/auto-loader/schema.html#:~:text=evolve%20data%20types.-,res...
my requirement is schema should evolve automatically
07-01-2024 11:22 PM
Hi @ajithgaade ,
If you are using a merge statement inside the forEachBatch function batch_operation then you have to use DBR 15.2 and above to evolve the schema
https://docs.databricks.com/en/delta/update-schema.html#automatic-schema-evolution-for-delta-lake-me...
07-02-2024 06:53 AM
Hi @Giri-Patcham
batch operation doesn't has merge statement. I am dropping tables and recreating. Tried clearing checkpoint location many times and different options. Tried with DBR 15.3, No Luck.
07-02-2024 08:10 AM
@ajithgaade can you share the sample code inside the batch function ?
07-02-2024 09:17 AM
07-02-2024 10:18 AM
can you try setting this conf
07-02-2024 01:26 PM
Hello,
Try this :
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder \
.appName("Auto Loader Schema Evolution") \
.getOrCreate()
# Source and checkpoint paths
source_path = "s3://path"
checkpoint_path = "/path/to/checkpoint"
# Define batch processing function
def batch_operation(df, epoch_id):
# Perform your batch operations here
# For example, write to Delta table with schema merge
df.write \
.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.save("/path/to/delta/table")
# Read stream with schema evolution
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "parquet") \
.option("cloudFiles.schemaLocation", checkpoint_path) \
.option("cloudFiles.includeExistingFiles", "false") \
.option("cloudFiles.inferColumnTypes", "true") \
.load(source_path)
# Write stream with schema merge
query = df.writeStream \
.format("delta") \
.option("checkpointLocation", checkpoint_path) \
.trigger(availableNow=True) \
.foreachBatch(batch_operation) \
.option("mergeSchema", "true") \
.start()
query.awaitTermination()
and try Setting Retry Policies
{
"tasks": [
{
"task_key": "example-task",
"notebook_task": {
"notebook_path": "/path/to/your/notebook"
},
"max_retries": 2,
"min_retry_interval_millis": 60000,
"retry_on_timeout": true
}
]
}
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now