08-07-2024 10:03 PM
I'm trying to debug my pipeline in DLT and during runtime I need some log info and how do I do a print('something') during DLT run?
09-09-2024 05:26 AM
I have the same question. This will help the debug process.
09-10-2024 03:04 PM - edited 09-10-2024 03:06 PM
Hi @ruoyuqian , @kranthi2,
In Databricks Delta Live Tables (DLT), using print() statements for logging does not work as expected. This is because DLT runs as a managed pipeline, and the execution environment differs from regular Databricks notebooks. Output from print() statements is not captured and displayed in the same way, making it ineffective for debugging during pipeline runs.
To log information during a DLT pipeline run, you can use the logging library and configure it to log to the driver logs. Here is an example of how you can set up logging within a DLT pipeline to log to the driver logs:
import dlt
import logging
from pyspark.sql.functions import col
# Set up logging configuration
log4jLogger = spark._jvm.org.apache.log4j
logger = log4jLogger.LogManager.getLogger(__name__)
@dlt.table(
comment="This is the raw data from the sample source table."
)
def read_source_data():
# Log the start of reading data
logger.info("Reading data from the source table.")
# Read data from the source table
df = spark.table("sample_source")
# Log the schema and number of rows read
logger.info(f"Schema of the source table: {df.schema.simpleString()}")
logger.info(f"Number of rows read: {df.count()}")
return df
@dlt.table(
comment="This table contains transformed data."
)
def transform_data():
logger.info("Transforming data from the source table.")
# Read the raw data and apply a transformation
df = dlt.read("read_source_data").withColumn("value_doubled", col("value") * 2)
# Log transformation completion
logger.info(f"Transformation completed. Output schema: {df.schema.simpleString()}")
return df
After running the DLT pipeline navigate to driver log:
Download the log file:
You can search log messages by filtering by "INFO __main__:":
For more persistent or remote access to logs, you can configure the logger to write directly to a cloud storage location such as AWS S3, Azure Blob Storage, or Google Cloud Storage. This can be useful for capturing logs in a centralized location, especially when dealing with production pipelines.
You need to have a connection to the cloud storage, and then to add handler to the logger. The code would look like this:
import logging
from azure.storage.blob import BlobServiceClient
from io import StringIO
# Azure Storage configuration
storage_account_name = "my_storage_account"
container_name = "logs"
blob_name = "dlt-logs.log"
connection_string = "DefaultEndpointsProtocol=https;AccountName=my_storage_account;AccountKey=<your-storage-account-key>;EndpointSuffix=core.windows.net"
# Initialize BlobServiceClient
blob_client = BlobServiceClient.from_connection_string(connection_string).get_blob_client(container=container_name, blob=blob_name)
# Log handler
class AzureBlobHandler(logging.Handler):
def __init__(self, blob_client):
super().__init__()
self.blob_client = blob_client
def emit(self, record):
msg = self.format(record) + "\n"
# Upload the log message to Azure Blob Storage
self.blob_client.upload_blob(msg, overwrite=True)
# Configure the logger
logger = logging.getLogger("DLTLogger")
logger.setLevel(logging.INFO)
azure_blob_handler = AzureBlobHandler(blob_client)
azure_blob_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(azure_blob_handler)
# Example usage remains the same
logger.info("This is an info message logged to Azure Blob Storage.")
09-10-2024 03:37 PM - edited 09-10-2024 03:39 PM
Hi @ruoyuqian , @kranthi2,
Why print() Statements Won’t Work in DLT:
In Databricks Delta Live Tables (DLT), you do not see print() statements, as what is visible are the events.
Alternative Solution: Using Log4j to log to Driver Log
To log information during a DLT pipeline run, you can use the logging library and configure it to log to the driver logs. Here is an example of how you can set up logging within a DLT pipeline to log to the driver logs:
import dlt
import logging
from pyspark.sql.functions import col
# Set up logging configuration
log4jLogger = spark._jvm.org.apache.log4j
logger = log4jLogger.LogManager.getLogger(__name__)
@dlt.table(
comment="This is the raw data from the sample source table."
)
def read_source_data():
# Log the start of reading data
logger.info("Reading data from the source table.")
# Read data from the source table
df = spark.table("sample_source")
# Log the schema and number of rows read
logger.info(f"Schema of the source table: {df.schema.simpleString()}")
logger.info(f"Number of rows read: {df.count()}")
return df
@dlt.table(
comment="This table contains transformed data."
)
def transform_data():
logger.info("Transforming data from the source table.")
# Read the raw data and apply a transformation
df = dlt.read("read_source_data").withColumn("value_doubled", col("value") * 2)
# Log transformation completion
logger.info(f"Transformation completed. Output schema: {df.schema.simpleString()}")
return df
After running the DLT pipeline navigate to driver log and download the log file:
You can search log messages by filtering by "INFO __main__:":
Logging to Cloud Storage:
For more persistent or remote access to logs, you can configure the logger to write directly to a cloud storage location such as Azure Blob Storage.
You need to have a connection to the cloud storage, and then to add handler to the logger. The code would look like this:
import logging
from azure.storage.blob import BlobServiceClient
from io import StringIO
# Azure Storage configuration
storage_account_name = "my_storage_account"
container_name = "logs"
blob_name = "dlt-logs.log"
connection_string = "DefaultEndpointsProtocol=https;AccountName=my_storage_account;AccountKey=<your-storage-account-key>;EndpointSuffix=core.windows.net"
# BlobServiceClient with fewer steps
blob_client = BlobServiceClient.from_connection_string(connection_string).get_blob_client(container=container_name, blob=blob_name)
# Custom log handler
class AzureBlobHandler(logging.Handler):
def __init__(self, blob_client):
super().__init__()
self.blob_client = blob_client
def emit(self, record):
msg = self.format(record) + "\n"
# Directly upload the log message to Azure Blob Storage
self.blob_client.upload_blob(msg, overwrite=True)
# Configure the logger
logger = logging.getLogger("DLTLogger")
03-28-2025 08:51 AM
>> LogManager.getLogger() seems is not working in DLT notebook
DLTError: [PY4J_BLOCKED_API] You are using a Python API that is not supported in the current environment. Please check Databricks documentation for alternatives. An error occurred while calling z:org.apache.log4j.LogManager.getLogger
05-28-2025 08:21 AM
Can confirm what @iooj found. It appears for me as well. Using serverless DLT version dlt:16.1.8-delta-pipelines-photon-dlt-release-dp-2025.20-rc0-commit-fcedf0a-image-8aadc5c . This did work on non-serverless for an older version of DLT. Perhaps Databricks has another way? I'll post here if I find something from support.
06-04-2025 10:14 AM
We can try emitting logs to stdout/stderr:
The below sample code worked in UC dlt cluster - dlt:16.4.0-delta-pipelines-photon-dlt-release-dp-2025.20-rc0-commit-fcedf0a-image-be34de2
import dlt
from pyspark.sql.functions import col
from utilities import utils
import logging
import sys
from pyspark.sql.functions import expr
# Configure Python logging to stdout
logger = logging.getLogger("DLTLogger")
logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout) # Change to sys.stderr for stderr
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
# Avoid duplicate handlers if rerun in notebook
if not logger.handlers:
logger.addHandler(handler)
@dlt.table
def sample_trips_dlt_logging_test():
logger.info("dlt_logging## Reading sample trips data from Delta table.")
df = spark.read.table("samples.nyctaxi.trips")
logger.info(f"dlt_logging## Schema of the sample trips data: {df.schema.simpleString()}")
logger.info(f"dlt_logging## Number of rows read: {df.count()}")
df = df.withColumn("trip_distance_km", utils.distance_km(col("trip_distance")))
logger.info("dlt_logging## Added trip_distance_km column to the sample trips data.")
return df
Wednesday
Can confirm what @User16871418122 reported. That was what Databricks support recommended. There are two caveats with it:
1. Logs will be emitted twice. One time due to lazy validation and second time for execution.
2. Logging will not necessarily continue being emitted on subsequent executions. This may likely be the case in streaming tables per Databricks engineering. This implies that this logging would via this way is only affective for debugging initial code and not necessarily for the long-term.
The work-around to the above is the use event hooks (https://docs.databricks.com/aws/en/dlt/event-hooks). Read the docs but from what I'm seeing it does come with its own caveats that may be more impactful for debug logging:
1. Event hooks run asynchronous to the DLT pipeline execution. Suggestion by databricks is to include execution timestamps in the logs to assist with correlating pipelines events with logging.
2. The event hooks will only log so long as the DLT cluster is running. In other words, if the DLT cluster finishes before the event hooks finish, the event hook will be prematurely terminated. No work around for this was provided.
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now