cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Azure application insights logging not working after upgrading cluster to databricks runtime 14.x

abaghel
New Contributor II

I have a basic code setup to read a stream from a Delta table and write it into another Delta table. I am using logging to send logs to Application Insights. However, within the foreachBatch function, the logs I write are not being sent to Application Insights. For example, in the code below, logging works in the _write_stream method as it is outside the foreachBatch function, but it does not work in the _merge_streaming_data method because its inside foreachBatch. When I say it doesn't work, I mean the logs are not sent or captured in Application Insights. Below is the sample code.


def get_logger():
    import logging
    from opencensus.ext.azure.log_exporter import AzureLogHandler

    logger = logging.getLogger(__name__)
    key = "*******"
    logger.addHandler(AzureLogHandler(connection_string=key))

    return logger
 

properties = {'custom_dimensions': {'test': 'test'}}

def _merge_streaming_data(batch_df, batch_id):
        spark = batch_df.sparkSession
        logger = get_logger()
        logger.warning('_merge_streaming_data1 amit', extra=properties)
        batch_df.write.mode("overwrite").saveAsTable("table_name2")

def _write_stream(main_df):
        logger = get_logger()
        logger.warning('_write_stream', extra=properties)
        write_df = (
                        main_df.writeStream
                        .foreachBatch(lambda batch_df, batch_id: _merge_streaming_data(batch_df, batch_id))
                        .option("checkpointLocation", checkpoint)
                         .trigger(availableNow=True)
                         .queryName("Query : test").start())
         return write_df

checkpoint = '****'
raw_full_table_name = 'table_name'

source_df = spark.readStream.table(raw_full_table_name)

write_stream = _write_stream(source_df)

try:
     write_stream.awaitTermination()
     print(f"Completed Processing table: {raw_full_table_name}")
except Exception as e:
     print(e)
     raise

 

2 REPLIES 2

MuthuLakshmi
Databricks Employee
Databricks Employee

@abaghel There are some changes in foreachBatch in Databricks Runtime 14.0.

Please check: https://docs.databricks.com/en/structured-streaming/foreach.html#behavior-changes-for-foreachbatch-i...

abaghel
New Contributor II

@MuthuLakshmi  Thank you for getting back to me. I have read the article and understand that "Any files, modules, or objects referenced in the function must be serializable and available on Spark." However, based on the code provided, can you help me identify where I might be encountering serialization issues? The code seems quite basic. Additionally, could you suggest a sample code for reference?

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group