cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Exceptions are Not Getting Handled In Autoloader Write Stream

Sambit_S
New Contributor III

I have below logic implemented using Databricks Autoloader.

 
## Autoloader Write stream: Its calling forEachBatch function to write into respective datatype catalog table
#  and using checkpoint to keeps track of processing files.
try:
    ##Observe raw data: calling observeRawData to capture stream metrics and temporally storing into dbfs file location
    if telemetry_type == "device_std":
        parsed_df = observeDeviceRawData(parsed_df)
    else:
        parsed_df = observeAppRawData(parsed_df)

    #Renaming proto column names if it is same in MData columns
    parsed_df = parsed_df.filter("proto_message IS NOT NULL")
    proto_columns = [x.lower() for x in parsed_df.selectExpr("proto_message.*").schema.names]
    MData_columns = [x.lower() for x in parsed_df.selectExpr(*bronze_transform_common_fields).schema.names]
    for column in proto_columns:
        if column in MData_columns:
            parsed_df = parsed_df.withColumn('proto_message', parsed_df['proto_message'].withField(column + '_pl', parsed_df[f'proto_message.{column}']))
            parsed_df = parsed_df.withColumn('proto_message', parsed_df['proto_message'].dropFields(column))

    parsed_df = parsed_df.selectExpr("proto_message.*", *bronze_transform_common_fields)

    if telemetry_type == "appusage":
        bronze_df = parsed_df
    else:
        bronze_df = flattenRawData(parsed_df)

    bronze_query = (
        bronze_df.writeStream.foreachBatch(forEachBatch)
        .queryName(f"{bronze_target_table}_stream")
        .option("checkpointLocation", checkpoint_path)
        .trigger(availableNow=True)
        .start()
    )
except Exception as e:
    print("***************Exception***************************")
    print(traceback.format_exc())
    print("***************Exception***************************")
    notebook_info = json.loads(
        dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson()
        )
    jobId = notebook_info["tags"]["jobId"]
    dbutils.notebook.exit({"jobId":jobId,"TableName":message_name,"Status":"NOT OK"})
 
I am getting some datatype mismatch related exception within the try block. But the stream is getting stopped and rather than executing the codes within except block it skipped all the remaining steps.
Sambit_S_0-1717689309381.png

 


I do not want to stop the process rather to handle the exception and update status.

Can anyone faced similar issues or could help me understand why it is happening.
1 REPLY 1

raphaelblg
Honored Contributor
Honored Contributor

Hello @Sambit_S ,

In your scenario, there is a merge failure. Your query won't be able to progress as the problematic batch can't be committed to sink. 

Even if you handle the exception in a try catch block, it's impossible for the autoloader to update the checkpoint and commit the batch if there's such merge failure. 

You need to verify and understand why the merge is failing on the respective fields. Some cases can be addressed by Delta Lake Automatic Schema Evolution.

 

Best regards,

Raphael Balogo
Sr. Technical Solutions Engineer
Databricks
Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!