Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-07-2025 10:48 PM
I am updating dlt pipeline configs with job id , run id and run_datetime of the job , so that i can access these values inside dlt pipeline. below is the code i am using to do that.
# Databricks notebook source
import sys
import logging
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import pipelines
def setup_logger() -> logging.Logger:
"""
Creates a Logger.
:returns: A logger instance.
"""
logger = logging.getLogger(__name__)
stdout = logging.StreamHandler(stream=sys.stdout)
stdout.setLevel(logging.INFO)
logger.addHandler(stdout)
logger.setLevel(logging.INFO)
return logger
def update_dlt_pipeline_config(
pipeline_id: str,
job_id: str,
run_id: str,
run_date: str
) -> None:
""" This function updates the DLT pipeline configuration with the job parameters.
To make them available in the pipeline task.
:param pipeline_id: The ID of the DLT pipeline to be updated.
:type pipeline_id: str
:param env: The environment for the workflow (e.g., dev, tst, val, prd).
:type env: str
:param job_id: The job ID of the workflow job.
:type job_id: str
:param run_id: The run ID of the workflow job.
:type run_id: str
:param run_date: The run date of the workflow job.
:type run_date: str
"""
w = WorkspaceClient()
pipeline = w.pipelines.get(pipeline_id)
print(pipeline)
configuration = pipeline.spec.configuration
configuration['job_id'] = job_id
configuration['run_id'] = run_id
configuration['run_datetime'] = run_datetime
print(configuration)
w.pipelines.update(
pipeline_id=pipeline_id,
name=pipeline.name,
libraries=pipeline.spec.libraries,
catalog=pipeline.spec.catalog,
target=pipeline.spec.target,
configuration=configuration,
development=pipeline.spec.development,
edition=pipeline.spec.edition,
serverless=pipeline.spec.serverless,
run_as = <SPN>
)
logger.info("Updated spec %s", w.pipelines.get(pipeline_id))
if __name__ == "__main__":
logger = setup_logger()
env = dbutils.widgets.get("env")
job_id = dbutils.widgets.get("job_id")
run_id = dbutils.widgets.get("run_id")
run_datetime = dbutils.widgets.get("run_date")
pipeline_id = dbutils.widgets.get("pipeline_id")
logger.info("job_id: %s", job_id)
logger.info("run_id: %s", run_id)
logger.info("run_datetime: %s", run_datetime)
logger.info("pipeline_id: %s", pipeline_id)
update_dlt_pipeline_config(pipeline_id, job_id, run_id, run_datetime)
This code has suddenly stopped working with error that, run_as cannot be set to null, please set run as to the <SPN> (run_as used for this target). Why this issue is there? if i pass the run_as, i get the error that invalid arguement passed to the update API.