I have below logic implemented using Databricks Autoloader.
## Autoloader Write stream: Its calling forEachBatch function to write into respective datatype catalog table
# and using checkpoint to keeps track of processing files.
try:
##Observe raw data: calling observeRawData to capture stream metrics and temporally storing into dbfs file location
if telemetry_type == "device_std":
parsed_df = observeDeviceRawData(parsed_df)
else:
parsed_df = observeAppRawData(parsed_df)
#Renaming proto column names if it is same in MData columns
parsed_df = parsed_df.filter("proto_message IS NOT NULL")
proto_columns = [x.lower() for x in parsed_df.selectExpr("proto_message.*").schema.names]
MData_columns = [x.lower() for x in parsed_df.selectExpr(*bronze_transform_common_fields).schema.names]
for column in proto_columns:
if column in MData_columns:
parsed_df = parsed_df.withColumn('proto_message', parsed_df['proto_message'].withField(column + '_pl', parsed_df[f'proto_message.{column}']))
parsed_df = parsed_df.withColumn('proto_message', parsed_df['proto_message'].dropFields(column))
parsed_df = parsed_df.selectExpr("proto_message.*", *bronze_transform_common_fields)
if telemetry_type == "appusage":
bronze_df = parsed_df
else:
bronze_df = flattenRawData(parsed_df)
bronze_query = (
bronze_df.writeStream.foreachBatch(forEachBatch)
.queryName(f"{bronze_target_table}_stream")
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.start()
)
except Exception as e:
print("***************Exception***************************")
print(traceback.format_exc())
print("***************Exception***************************")
notebook_info = json.loads(
dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson()
)
jobId = notebook_info["tags"]["jobId"]
dbutils.notebook.exit({"jobId":jobId,"TableName":message_name,"Status":"NOT OK"})
I am getting some datatype mismatch related exception within the try block. But the stream is getting stopped and rather than executing the codes within except block it skipped all the remaining steps.
I do not want to stop the process rather to handle the exception and update status.
Can anyone faced similar issues or could help me understand why it is happening.