cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

How to print out logs during DLT pipeline run

ruoyuqian
New Contributor II

I'm trying to debug my pipeline in DLT and during runtime I need some log info and how do I do a print('something') during DLT run?

7 REPLIES 7

kranthi2
New Contributor III

I have the same question. This will help the debug process. 

filipniziol
Esteemed Contributor

Hi @ruoyuqian , @kranthi2,

Why print() Statements Won’t Work in DLT:

In Databricks Delta Live Tables (DLT), using print() statements for logging does not work as expected. This is because DLT runs as a managed pipeline, and the execution environment differs from regular Databricks notebooks. Output from print() statements is not captured and displayed in the same way, making it ineffective for debugging during pipeline runs.

Alternative Solution: Using Log4j to log to Driver Log

To log information during a DLT pipeline run, you can use the logging library and configure it to log to the driver logs. Here is an example of how you can set up logging within a DLT pipeline to log to the driver logs:

 

import dlt
import logging
from pyspark.sql.functions import col

# Set up logging configuration
log4jLogger = spark._jvm.org.apache.log4j
logger = log4jLogger.LogManager.getLogger(__name__)

@dlt.table(
    comment="This is the raw data from the sample source table."
)
def read_source_data():
    # Log the start of reading data
    logger.info("Reading data from the source table.")
    
    # Read data from the source table
    df = spark.table("sample_source")
    
    # Log the schema and number of rows read
    logger.info(f"Schema of the source table: {df.schema.simpleString()}")
    logger.info(f"Number of rows read: {df.count()}")
    
    return df

@dlt.table(
    comment="This table contains transformed data."
)
def transform_data():
    logger.info("Transforming data from the source table.")
    
    # Read the raw data and apply a transformation
    df = dlt.read("read_source_data").withColumn("value_doubled", col("value") * 2)
    
    # Log transformation completion
    logger.info(f"Transformation completed. Output schema: {df.schema.simpleString()}")
    
    return df

 

After running the DLT pipeline navigate to driver log:

filipniziol_0-1726005018859.png

Download the log file:

filipniziol_1-1726005083772.png

You can search log messages by filtering by "INFO __main__:":

filipniziol_2-1726005297744.png

Logging to Cloud Storage:

For more persistent or remote access to logs, you can configure the logger to write directly to a cloud storage location such as AWS S3, Azure Blob Storage, or Google Cloud Storage. This can be useful for capturing logs in a centralized location, especially when dealing with production pipelines.

You need to have a connection to the cloud storage, and then to add handler to the logger. The code would look like this:

 

import logging
from azure.storage.blob import BlobServiceClient
from io import StringIO

# Azure Storage configuration
storage_account_name = "my_storage_account"
container_name = "logs"
blob_name = "dlt-logs.log"
connection_string = "DefaultEndpointsProtocol=https;AccountName=my_storage_account;AccountKey=<your-storage-account-key>;EndpointSuffix=core.windows.net"

# Initialize BlobServiceClient 
blob_client = BlobServiceClient.from_connection_string(connection_string).get_blob_client(container=container_name, blob=blob_name)

# Log handler
class AzureBlobHandler(logging.Handler):
    def __init__(self, blob_client):
        super().__init__()
        self.blob_client = blob_client

    def emit(self, record):
        msg = self.format(record) + "\n"
        # Upload the log message to Azure Blob Storage
        self.blob_client.upload_blob(msg, overwrite=True)

# Configure the logger
logger = logging.getLogger("DLTLogger")
logger.setLevel(logging.INFO)
azure_blob_handler = AzureBlobHandler(blob_client)
azure_blob_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(azure_blob_handler)

# Example usage remains the same
logger.info("This is an info message logged to Azure Blob Storage.")

 

 

filipniziol
Esteemed Contributor

Hi  @ruoyuqian ,  @kranthi2,

Why print() Statements Won’t Work in DLT:

In Databricks Delta Live Tables (DLT), you do not see print() statements, as what is visible are the events.

Alternative Solution: Using Log4j to log to Driver Log

To log information during a DLT pipeline run, you can use the logging library and configure it to log to the driver logs. Here is an example of how you can set up logging within a DLT pipeline to log to the driver logs:

 

import dlt
import logging
from pyspark.sql.functions import col

# Set up logging configuration
log4jLogger = spark._jvm.org.apache.log4j
logger = log4jLogger.LogManager.getLogger(__name__)

@dlt.table(
    comment="This is the raw data from the sample source table."
)
def read_source_data():
    # Log the start of reading data
    logger.info("Reading data from the source table.")
    
    # Read data from the source table
    df = spark.table("sample_source")
    
    # Log the schema and number of rows read
    logger.info(f"Schema of the source table: {df.schema.simpleString()}")
    logger.info(f"Number of rows read: {df.count()}")
    
    return df

@dlt.table(
    comment="This table contains transformed data."
)
def transform_data():
    logger.info("Transforming data from the source table.")
    
    # Read the raw data and apply a transformation
    df = dlt.read("read_source_data").withColumn("value_doubled", col("value") * 2)
    
    # Log transformation completion
    logger.info(f"Transformation completed. Output schema: {df.schema.simpleString()}")
    
    return df

 

After running the DLT pipeline navigate to driver log and download the log file:

filipniziol_0-1726007640812.png

You can search log messages by filtering by "INFO __main__:":

filipniziol_1-1726007686193.png

Logging to Cloud Storage:

For more persistent or remote access to logs, you can configure the logger to write directly to a cloud storage location such as Azure Blob Storage.

You need to have a connection to the cloud storage, and then to add handler to the logger. The code would look like this:

 

import logging
from azure.storage.blob import BlobServiceClient
from io import StringIO

# Azure Storage configuration
storage_account_name = "my_storage_account"
container_name = "logs"
blob_name = "dlt-logs.log"
connection_string = "DefaultEndpointsProtocol=https;AccountName=my_storage_account;AccountKey=<your-storage-account-key>;EndpointSuffix=core.windows.net"

# BlobServiceClient with fewer steps
blob_client = BlobServiceClient.from_connection_string(connection_string).get_blob_client(container=container_name, blob=blob_name)

# Custom log handler
class AzureBlobHandler(logging.Handler):
    def __init__(self, blob_client):
        super().__init__()
        self.blob_client = blob_client

    def emit(self, record):
        msg = self.format(record) + "\n"
        # Directly upload the log message to Azure Blob Storage
        self.blob_client.upload_blob(msg, overwrite=True)

# Configure the logger
logger = logging.getLogger("DLTLogger")

 

 

iooj
New Contributor II

>> LogManager.getLogger() seems is not working in DLT notebook 

DLTError: [PY4J_BLOCKED_API] You are using a Python API that is not supported in the current environment. Please check Databricks documentation for alternatives. An error occurred while calling z:org.apache.log4j.LogManager.getLogger

_DatabricksUser
New Contributor III

Can confirm what @iooj found. It appears for me as well. Using serverless DLT version dlt:16.1.8-delta-pipelines-photon-dlt-release-dp-2025.20-rc0-commit-fcedf0a-image-8aadc5c . This did work on non-serverless for an older version of DLT. Perhaps Databricks has another way? I'll post here if I find something from support.

User16871418122
Contributor III

We can try emitting logs to stdout/stderr: 

The below sample code worked in UC dlt cluster - dlt:16.4.0-delta-pipelines-photon-dlt-release-dp-2025.20-rc0-commit-fcedf0a-image-be34de2

import dlt
from pyspark.sql.functions import col
from utilities import utils
import logging
import sys
from pyspark.sql.functions import expr

# Configure Python logging to stdout
logger = logging.getLogger("DLTLogger")
logger.setLevel(logging.INFO)

handler = logging.StreamHandler(sys.stdout) # Change to sys.stderr for stderr
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)

# Avoid duplicate handlers if rerun in notebook
if not logger.handlers:
  logger.addHandler(handler)

@dlt.table
def sample_trips_dlt_logging_test():
  logger.info("dlt_logging## Reading sample trips data from Delta table.")
  df = spark.read.table("samples.nyctaxi.trips")

  logger.info(f"dlt_logging## Schema of the sample trips data: {df.schema.simpleString()}")
  logger.info(f"dlt_logging## Number of rows read: {df.count()}")

  df = df.withColumn("trip_distance_km", utils.distance_km(col("trip_distance")))
  logger.info("dlt_logging## Added trip_distance_km column to the sample trips data.")

  return df

 

Can confirm what @User16871418122 reported. That was what Databricks support recommended. There are two caveats with it:

1. Logs will be emitted twice. One time due to lazy validation and second time for execution.
2. Logging will not necessarily continue being emitted on subsequent executions. This may likely be the case in streaming tables per Databricks engineering. This implies that this logging would via this way is only affective for debugging initial code and not necessarily for the long-term.

The work-around to the above is the use event hooks (https://docs.databricks.com/aws/en/dlt/event-hooks). Read the docs but from what I'm seeing it does come with its own caveats that may be more impactful for debug logging:

1. Event hooks run asynchronous to the DLT pipeline execution. Suggestion by databricks is to include execution timestamps in the logs to assist with correlating pipelines events with logging.
2. The event hooks will only log so long as the DLT cluster is running. In other words, if the DLT cluster finishes before the event hooks finish, the event hook will be prematurely terminated. No work around for this was provided.