cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

How to print out logs during DLT pipeline run

ruoyuqian
New Contributor II

I'm trying to debug my pipeline in DLT and during runtime I need some log info and how do I do a print('something') during DLT run?

3 REPLIES 3

kranthi2
New Contributor III

I have the same question. This will help the debug process. 

filipniziol
New Contributor II

Hi @ruoyuqian , @kranthi2,

Why print() Statements Wonโ€™t Work in DLT:

In Databricks Delta Live Tables (DLT), using print() statements for logging does not work as expected. This is because DLT runs as a managed pipeline, and the execution environment differs from regular Databricks notebooks. Output from print() statements is not captured and displayed in the same way, making it ineffective for debugging during pipeline runs.

Alternative Solution: Using Log4j to log to Driver Log

To log information during a DLT pipeline run, you can use the logging library and configure it to log to the driver logs. Here is an example of how you can set up logging within a DLT pipeline to log to the driver logs:

 

import dlt
import logging
from pyspark.sql.functions import col

# Set up logging configuration
log4jLogger = spark._jvm.org.apache.log4j
logger = log4jLogger.LogManager.getLogger(__name__)

@dlt.table(
    comment="This is the raw data from the sample source table."
)
def read_source_data():
    # Log the start of reading data
    logger.info("Reading data from the source table.")
    
    # Read data from the source table
    df = spark.table("sample_source")
    
    # Log the schema and number of rows read
    logger.info(f"Schema of the source table: {df.schema.simpleString()}")
    logger.info(f"Number of rows read: {df.count()}")
    
    return df

@dlt.table(
    comment="This table contains transformed data."
)
def transform_data():
    logger.info("Transforming data from the source table.")
    
    # Read the raw data and apply a transformation
    df = dlt.read("read_source_data").withColumn("value_doubled", col("value") * 2)
    
    # Log transformation completion
    logger.info(f"Transformation completed. Output schema: {df.schema.simpleString()}")
    
    return df

 

After running the DLT pipeline navigate to driver log:

filipniziol_0-1726005018859.png

Download the log file:

filipniziol_1-1726005083772.png

You can search log messages by filtering by "INFO __main__:":

filipniziol_2-1726005297744.png

Logging to Cloud Storage:

For more persistent or remote access to logs, you can configure the logger to write directly to a cloud storage location such as AWS S3, Azure Blob Storage, or Google Cloud Storage. This can be useful for capturing logs in a centralized location, especially when dealing with production pipelines.

You need to have a connection to the cloud storage, and then to add handler to the logger. The code would look like this:

 

import logging
from azure.storage.blob import BlobServiceClient
from io import StringIO

# Azure Storage configuration
storage_account_name = "my_storage_account"
container_name = "logs"
blob_name = "dlt-logs.log"
connection_string = "DefaultEndpointsProtocol=https;AccountName=my_storage_account;AccountKey=<your-storage-account-key>;EndpointSuffix=core.windows.net"

# Initialize BlobServiceClient 
blob_client = BlobServiceClient.from_connection_string(connection_string).get_blob_client(container=container_name, blob=blob_name)

# Log handler
class AzureBlobHandler(logging.Handler):
    def __init__(self, blob_client):
        super().__init__()
        self.blob_client = blob_client

    def emit(self, record):
        msg = self.format(record) + "\n"
        # Upload the log message to Azure Blob Storage
        self.blob_client.upload_blob(msg, overwrite=True)

# Configure the logger
logger = logging.getLogger("DLTLogger")
logger.setLevel(logging.INFO)
azure_blob_handler = AzureBlobHandler(blob_client)
azure_blob_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(azure_blob_handler)

# Example usage remains the same
logger.info("This is an info message logged to Azure Blob Storage.")

 

 

filipniziol
New Contributor II

Hi  @ruoyuqian ,  @kranthi2,

Why print() Statements Wonโ€™t Work in DLT:

In Databricks Delta Live Tables (DLT), you do not see print() statements, as what is visible are the events.

Alternative Solution: Using Log4j to log to Driver Log

To log information during a DLT pipeline run, you can use the logging library and configure it to log to the driver logs. Here is an example of how you can set up logging within a DLT pipeline to log to the driver logs:

 

import dlt
import logging
from pyspark.sql.functions import col

# Set up logging configuration
log4jLogger = spark._jvm.org.apache.log4j
logger = log4jLogger.LogManager.getLogger(__name__)

@dlt.table(
    comment="This is the raw data from the sample source table."
)
def read_source_data():
    # Log the start of reading data
    logger.info("Reading data from the source table.")
    
    # Read data from the source table
    df = spark.table("sample_source")
    
    # Log the schema and number of rows read
    logger.info(f"Schema of the source table: {df.schema.simpleString()}")
    logger.info(f"Number of rows read: {df.count()}")
    
    return df

@dlt.table(
    comment="This table contains transformed data."
)
def transform_data():
    logger.info("Transforming data from the source table.")
    
    # Read the raw data and apply a transformation
    df = dlt.read("read_source_data").withColumn("value_doubled", col("value") * 2)
    
    # Log transformation completion
    logger.info(f"Transformation completed. Output schema: {df.schema.simpleString()}")
    
    return df

 

After running the DLT pipeline navigate to driver log and download the log file:

filipniziol_0-1726007640812.png

You can search log messages by filtering by "INFO __main__:":

filipniziol_1-1726007686193.png

Logging to Cloud Storage:

For more persistent or remote access to logs, you can configure the logger to write directly to a cloud storage location such as Azure Blob Storage.

You need to have a connection to the cloud storage, and then to add handler to the logger. The code would look like this:

 

import logging
from azure.storage.blob import BlobServiceClient
from io import StringIO

# Azure Storage configuration
storage_account_name = "my_storage_account"
container_name = "logs"
blob_name = "dlt-logs.log"
connection_string = "DefaultEndpointsProtocol=https;AccountName=my_storage_account;AccountKey=<your-storage-account-key>;EndpointSuffix=core.windows.net"

# BlobServiceClient with fewer steps
blob_client = BlobServiceClient.from_connection_string(connection_string).get_blob_client(container=container_name, blob=blob_name)

# Custom log handler
class AzureBlobHandler(logging.Handler):
    def __init__(self, blob_client):
        super().__init__()
        self.blob_client = blob_client

    def emit(self, record):
        msg = self.format(record) + "\n"
        # Directly upload the log message to Azure Blob Storage
        self.blob_client.upload_blob(msg, overwrite=True)

# Configure the logger
logger = logging.getLogger("DLTLogger")

 

 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group