07-01-2024 08:38 PM - edited 07-01-2024 08:38 PM
Hi,
written in pyspark.
databricks autoloader job with retry didn't merge/update the schema.
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.option("cloudFiles.schemaLocation", checkpoint_path)
.option("cloudFiles.includeExistingFiles", "false"
.load(source_path)
.writeStream
.queryName("write stream query")
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.forEachBatch(batch_operation)
.option("mergeSchema", True)
.start()
.awaitTermination()
Error:
Error while reading file s3://path
Caused by: UnknownFieldException: [UNKNOWN_FILED_EXCEPTION.NEW_FILEDS_IN_FILE] Encountered unknown fields during parsing: filed type, which can be fixed by an automatic retry: false
tried running couple of times. set retry = 2 for job and task as well.
please can you help?
07-01-2024 11:17 PM
What happens if you enable rescue mode?
.option('cloudFiles.schemaEvolutionMode', 'rescue')07-02-2024 06:43 AM - edited 07-02-2024 07:12 AM
rescue option won't evolve the schema.
https://docs.databricks.com/en/ingestion/auto-loader/schema.html#:~:text=evolve%20data%20types.-,res...
my requirement is schema should evolve automatically
07-01-2024 11:22 PM
Hi @ajithgaade ,
If you are using a merge statement inside the forEachBatch function batch_operation then you have to use DBR 15.2 and above to evolve the schema
https://docs.databricks.com/en/delta/update-schema.html#automatic-schema-evolution-for-delta-lake-me...
07-02-2024 06:53 AM
Hi @Giri-Patcham
batch operation doesn't has merge statement. I am dropping tables and recreating. Tried clearing checkpoint location many times and different options. Tried with DBR 15.3, No Luck.
07-02-2024 08:10 AM
@ajithgaade can you share the sample code inside the batch function ?
07-02-2024 09:17 AM
07-02-2024 10:18 AM
can you try setting this conf
07-02-2024 01:26 PM
Hello,
Try this :
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder \
    .appName("Auto Loader Schema Evolution") \
    .getOrCreate()
# Source and checkpoint paths
source_path = "s3://path"
checkpoint_path = "/path/to/checkpoint"
# Define batch processing function
def batch_operation(df, epoch_id):
    # Perform your batch operations here
    # For example, write to Delta table with schema merge
    df.write \
      .format("delta") \
      .mode("append") \
      .option("mergeSchema", "true") \
      .save("/path/to/delta/table")
# Read stream with schema evolution
df = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "parquet") \
    .option("cloudFiles.schemaLocation", checkpoint_path) \
    .option("cloudFiles.includeExistingFiles", "false") \
    .option("cloudFiles.inferColumnTypes", "true") \
    .load(source_path)
# Write stream with schema merge
query = df.writeStream \
    .format("delta") \
    .option("checkpointLocation", checkpoint_path) \
    .trigger(availableNow=True) \
    .foreachBatch(batch_operation) \
    .option("mergeSchema", "true") \
    .start()
query.awaitTermination()
and try Setting Retry Policies
{
  "tasks": [
    {
      "task_key": "example-task",
      "notebook_task": {
        "notebook_path": "/path/to/your/notebook"
      },
      "max_retries": 2,
      "min_retry_interval_millis": 60000,
      "retry_on_timeout": true
    }
  ]
} 
					
				
				
			
		
 
					
				
				
			
		
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now