I have a basic code setup to read a stream from a Delta table and write it into another Delta table. I am using logging to send logs to Application Insights. However, within the foreachBatch function, the logs I write are not being sent to Application Insights. For example, in the code below, logging works in the _write_stream method as it is outside the foreachBatch function, but it does not work in the _merge_streaming_data method because its inside foreachBatch. When I say it doesn't work, I mean the logs are not sent or captured in Application Insights. Below is the sample code.
def get_logger():
import logging
from opencensus.ext.azure.log_exporter import AzureLogHandler
logger = logging.getLogger(__name__)
key = "*******"
logger.addHandler(AzureLogHandler(connection_string=key))
return logger
properties = {'custom_dimensions': {'test': 'test'}}
def _merge_streaming_data(batch_df, batch_id):
spark = batch_df.sparkSession
logger = get_logger()
logger.warning('_merge_streaming_data1 amit', extra=properties)
batch_df.write.mode("overwrite").saveAsTable("table_name2")
def _write_stream(main_df):
logger = get_logger()
logger.warning('_write_stream', extra=properties)
write_df = (
main_df.writeStream
.foreachBatch(lambda batch_df, batch_id: _merge_streaming_data(batch_df, batch_id))
.option("checkpointLocation", checkpoint)
.trigger(availableNow=True)
.queryName("Query : test").start())
return write_df
checkpoint = '****'
raw_full_table_name = 'table_name'
source_df = spark.readStream.table(raw_full_table_name)
write_stream = _write_stream(source_df)
try:
write_stream.awaitTermination()
print(f"Completed Processing table: {raw_full_table_name}")
except Exception as e:
print(e)
raise