โ08-07-2024 10:03 PM
I'm trying to debug my pipeline in DLT and during runtime I need some log info and how do I do a print('something') during DLT run?
Monday
I have the same question. This will help the debug process.
Tuesday - last edited Tuesday
Hi @ruoyuqian , @kranthi2,
In Databricks Delta Live Tables (DLT), using print() statements for logging does not work as expected. This is because DLT runs as a managed pipeline, and the execution environment differs from regular Databricks notebooks. Output from print() statements is not captured and displayed in the same way, making it ineffective for debugging during pipeline runs.
To log information during a DLT pipeline run, you can use the logging library and configure it to log to the driver logs. Here is an example of how you can set up logging within a DLT pipeline to log to the driver logs:
import dlt
import logging
from pyspark.sql.functions import col
# Set up logging configuration
log4jLogger = spark._jvm.org.apache.log4j
logger = log4jLogger.LogManager.getLogger(__name__)
@dlt.table(
comment="This is the raw data from the sample source table."
)
def read_source_data():
# Log the start of reading data
logger.info("Reading data from the source table.")
# Read data from the source table
df = spark.table("sample_source")
# Log the schema and number of rows read
logger.info(f"Schema of the source table: {df.schema.simpleString()}")
logger.info(f"Number of rows read: {df.count()}")
return df
@dlt.table(
comment="This table contains transformed data."
)
def transform_data():
logger.info("Transforming data from the source table.")
# Read the raw data and apply a transformation
df = dlt.read("read_source_data").withColumn("value_doubled", col("value") * 2)
# Log transformation completion
logger.info(f"Transformation completed. Output schema: {df.schema.simpleString()}")
return df
After running the DLT pipeline navigate to driver log:
Download the log file:
You can search log messages by filtering by "INFO __main__:":
For more persistent or remote access to logs, you can configure the logger to write directly to a cloud storage location such as AWS S3, Azure Blob Storage, or Google Cloud Storage. This can be useful for capturing logs in a centralized location, especially when dealing with production pipelines.
You need to have a connection to the cloud storage, and then to add handler to the logger. The code would look like this:
import logging
from azure.storage.blob import BlobServiceClient
from io import StringIO
# Azure Storage configuration
storage_account_name = "my_storage_account"
container_name = "logs"
blob_name = "dlt-logs.log"
connection_string = "DefaultEndpointsProtocol=https;AccountName=my_storage_account;AccountKey=<your-storage-account-key>;EndpointSuffix=core.windows.net"
# Initialize BlobServiceClient
blob_client = BlobServiceClient.from_connection_string(connection_string).get_blob_client(container=container_name, blob=blob_name)
# Log handler
class AzureBlobHandler(logging.Handler):
def __init__(self, blob_client):
super().__init__()
self.blob_client = blob_client
def emit(self, record):
msg = self.format(record) + "\n"
# Upload the log message to Azure Blob Storage
self.blob_client.upload_blob(msg, overwrite=True)
# Configure the logger
logger = logging.getLogger("DLTLogger")
logger.setLevel(logging.INFO)
azure_blob_handler = AzureBlobHandler(blob_client)
azure_blob_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(azure_blob_handler)
# Example usage remains the same
logger.info("This is an info message logged to Azure Blob Storage.")
Tuesday - last edited Tuesday
Hi @ruoyuqian , @kranthi2,
Why print() Statements Wonโt Work in DLT:
In Databricks Delta Live Tables (DLT), you do not see print() statements, as what is visible are the events.
Alternative Solution: Using Log4j to log to Driver Log
To log information during a DLT pipeline run, you can use the logging library and configure it to log to the driver logs. Here is an example of how you can set up logging within a DLT pipeline to log to the driver logs:
import dlt
import logging
from pyspark.sql.functions import col
# Set up logging configuration
log4jLogger = spark._jvm.org.apache.log4j
logger = log4jLogger.LogManager.getLogger(__name__)
@dlt.table(
comment="This is the raw data from the sample source table."
)
def read_source_data():
# Log the start of reading data
logger.info("Reading data from the source table.")
# Read data from the source table
df = spark.table("sample_source")
# Log the schema and number of rows read
logger.info(f"Schema of the source table: {df.schema.simpleString()}")
logger.info(f"Number of rows read: {df.count()}")
return df
@dlt.table(
comment="This table contains transformed data."
)
def transform_data():
logger.info("Transforming data from the source table.")
# Read the raw data and apply a transformation
df = dlt.read("read_source_data").withColumn("value_doubled", col("value") * 2)
# Log transformation completion
logger.info(f"Transformation completed. Output schema: {df.schema.simpleString()}")
return df
After running the DLT pipeline navigate to driver log and download the log file:
You can search log messages by filtering by "INFO __main__:":
Logging to Cloud Storage:
For more persistent or remote access to logs, you can configure the logger to write directly to a cloud storage location such as Azure Blob Storage.
You need to have a connection to the cloud storage, and then to add handler to the logger. The code would look like this:
import logging
from azure.storage.blob import BlobServiceClient
from io import StringIO
# Azure Storage configuration
storage_account_name = "my_storage_account"
container_name = "logs"
blob_name = "dlt-logs.log"
connection_string = "DefaultEndpointsProtocol=https;AccountName=my_storage_account;AccountKey=<your-storage-account-key>;EndpointSuffix=core.windows.net"
# BlobServiceClient with fewer steps
blob_client = BlobServiceClient.from_connection_string(connection_string).get_blob_client(container=container_name, blob=blob_name)
# Custom log handler
class AzureBlobHandler(logging.Handler):
def __init__(self, blob_client):
super().__init__()
self.blob_client = blob_client
def emit(self, record):
msg = self.format(record) + "\n"
# Directly upload the log message to Azure Blob Storage
self.blob_client.upload_blob(msg, overwrite=True)
# Configure the logger
logger = logging.getLogger("DLTLogger")
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group