Hi @ruoyuqian , @kranthi2,
Why print() Statements Won’t Work in DLT:
In Databricks Delta Live Tables (DLT), you do not see print() statements, as what is visible are the events.
Alternative Solution: Using Log4j to log to Driver Log
To log information during a DLT pipeline run, you can use the logging library and configure it to log to the driver logs. Here is an example of how you can set up logging within a DLT pipeline to log to the driver logs:
import dlt
import logging
from pyspark.sql.functions import col
# Set up logging configuration
log4jLogger = spark._jvm.org.apache.log4j
logger = log4jLogger.LogManager.getLogger(__name__)
@dlt.table(
comment="This is the raw data from the sample source table."
)
def read_source_data():
# Log the start of reading data
logger.info("Reading data from the source table.")
# Read data from the source table
df = spark.table("sample_source")
# Log the schema and number of rows read
logger.info(f"Schema of the source table: {df.schema.simpleString()}")
logger.info(f"Number of rows read: {df.count()}")
return df
@dlt.table(
comment="This table contains transformed data."
)
def transform_data():
logger.info("Transforming data from the source table.")
# Read the raw data and apply a transformation
df = dlt.read("read_source_data").withColumn("value_doubled", col("value") * 2)
# Log transformation completion
logger.info(f"Transformation completed. Output schema: {df.schema.simpleString()}")
return df
After running the DLT pipeline navigate to driver log and download the log file:
You can search log messages by filtering by "INFO __main__:":
Logging to Cloud Storage:
For more persistent or remote access to logs, you can configure the logger to write directly to a cloud storage location such as Azure Blob Storage.
You need to have a connection to the cloud storage, and then to add handler to the logger. The code would look like this:
import logging
from azure.storage.blob import BlobServiceClient
from io import StringIO
# Azure Storage configuration
storage_account_name = "my_storage_account"
container_name = "logs"
blob_name = "dlt-logs.log"
connection_string = "DefaultEndpointsProtocol=https;AccountName=my_storage_account;AccountKey=<your-storage-account-key>;EndpointSuffix=core.windows.net"
# BlobServiceClient with fewer steps
blob_client = BlobServiceClient.from_connection_string(connection_string).get_blob_client(container=container_name, blob=blob_name)
# Custom log handler
class AzureBlobHandler(logging.Handler):
def __init__(self, blob_client):
super().__init__()
self.blob_client = blob_client
def emit(self, record):
msg = self.format(record) + "\n"
# Directly upload the log message to Azure Blob Storage
self.blob_client.upload_blob(msg, overwrite=True)
# Configure the logger
logger = logging.getLogger("DLTLogger")