cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Exceptions are Not Getting Handled In Autoloader Write Stream

Sambit_S
New Contributor III

I have below logic implemented using Databricks Autoloader.

 
## Autoloader Write stream: Its calling forEachBatch function to write into respective datatype catalog table
#  and using checkpoint to keeps track of processing files.
try:
    ##Observe raw data: calling observeRawData to capture stream metrics and temporally storing into dbfs file location
    if telemetry_type == "device_std":
        parsed_df = observeDeviceRawData(parsed_df)
    else:
        parsed_df = observeAppRawData(parsed_df)

    #Renaming proto column names if it is same in MData columns
    parsed_df = parsed_df.filter("proto_message IS NOT NULL")
    proto_columns = [x.lower() for x in parsed_df.selectExpr("proto_message.*").schema.names]
    MData_columns = [x.lower() for x in parsed_df.selectExpr(*bronze_transform_common_fields).schema.names]
    for column in proto_columns:
        if column in MData_columns:
            parsed_df = parsed_df.withColumn('proto_message', parsed_df['proto_message'].withField(column + '_pl', parsed_df[f'proto_message.{column}']))
            parsed_df = parsed_df.withColumn('proto_message', parsed_df['proto_message'].dropFields(column))

    parsed_df = parsed_df.selectExpr("proto_message.*", *bronze_transform_common_fields)

    if telemetry_type == "appusage":
        bronze_df = parsed_df
    else:
        bronze_df = flattenRawData(parsed_df)

    bronze_query = (
        bronze_df.writeStream.foreachBatch(forEachBatch)
        .queryName(f"{bronze_target_table}_stream")
        .option("checkpointLocation", checkpoint_path)
        .trigger(availableNow=True)
        .start()
    )
except Exception as e:
    print("***************Exception***************************")
    print(traceback.format_exc())
    print("***************Exception***************************")
    notebook_info = json.loads(
        dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson()
        )
    jobId = notebook_info["tags"]["jobId"]
    dbutils.notebook.exit({"jobId":jobId,"TableName":message_name,"Status":"NOT OK"})
 
I am getting some datatype mismatch related exception within the try block. But the stream is getting stopped and rather than executing the codes within except block it skipped all the remaining steps.
Sambit_S_0-1717689309381.png

 


I do not want to stop the process rather to handle the exception and update status.

Can anyone faced similar issues or could help me understand why it is happening.
1 REPLY 1

raphaelblg
Esteemed Contributor III
Esteemed Contributor III

Hello @Sambit_S ,

In your scenario, there is a merge failure. Your query won't be able to progress as the problematic batch can't be committed to sink. 

Even if you handle the exception in a try catch block, it's impossible for the autoloader to update the checkpoint and commit the batch if there's such merge failure. 

You need to verify and understand why the merge is failing on the respective fields. Some cases can be addressed by Delta Lake Automatic Schema Evolution.

 

Best regards,

Raphael Balogo
Sr. Technical Solutions Engineer
Databricks

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group