issue updating DLT pipeline configurations using databricks sdk

ganapati
New Contributor III

I am updating dlt pipeline configs with job id , run id and run_datetime of the job , so that i can access these values inside dlt pipeline. below is the code i am using to do that. 

# Databricks notebook source
import sys
import logging
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import pipelines


def setup_logger() -> logging.Logger:
    """
    Creates a Logger.

    :returns: A logger instance.
    """
    logger = logging.getLogger(__name__)
    stdout = logging.StreamHandler(stream=sys.stdout)
    stdout.setLevel(logging.INFO)

    logger.addHandler(stdout)
    logger.setLevel(logging.INFO)

    return logger


def update_dlt_pipeline_config(
    pipeline_id: str,
    job_id: str,
    run_id: str,
    run_date: str
) -> None:
    """ This function updates the DLT pipeline configuration with the job parameters.
        To make them available in the pipeline task.

    :param pipeline_id: The ID of the DLT pipeline to be updated.
    :type pipeline_id: str
    :param env: The environment for the workflow (e.g., dev, tst, val, prd).
    :type env: str
    :param job_id: The job ID of the workflow job.
    :type job_id: str
    :param run_id: The run ID of the workflow job.
    :type run_id: str
    :param run_date: The run date of the workflow job.
    :type run_date: str
    """
    w = WorkspaceClient()
    pipeline = w.pipelines.get(pipeline_id)

    print(pipeline)
    configuration = pipeline.spec.configuration

    configuration['job_id'] = job_id
    configuration['run_id'] = run_id
    configuration['run_datetime'] = run_datetime
    print(configuration)

    w.pipelines.update(
        pipeline_id=pipeline_id,
        name=pipeline.name,
        libraries=pipeline.spec.libraries,
        catalog=pipeline.spec.catalog,
        target=pipeline.spec.target,
        configuration=configuration,
        development=pipeline.spec.development,
        edition=pipeline.spec.edition,
        serverless=pipeline.spec.serverless,
        run_as = <SPN>
    )
   
    logger.info("Updated spec %s", w.pipelines.get(pipeline_id))


if __name__ == "__main__":

    logger = setup_logger()


    env = dbutils.widgets.get("env")
    job_id = dbutils.widgets.get("job_id")
    run_id = dbutils.widgets.get("run_id")
    run_datetime = dbutils.widgets.get("run_date")
    pipeline_id =  dbutils.widgets.get("pipeline_id")

    logger.info("job_id: %s", job_id)
    logger.info("run_id: %s", run_id)
    logger.info("run_datetime: %s", run_datetime)
    logger.info("pipeline_id: %s", pipeline_id)

    update_dlt_pipeline_config(pipeline_id, job_id, run_id, run_datetime)
 
 
 
This code has suddenly stopped working with   error that, run_as cannot be set to null, please set run as to the <SPN> (run_as used for this target). Why this issue is there? if i pass the run_as, i get the error that invalid arguement passed to the update API.
 
 

szymon_dybczak
Esteemed Contributor III

Hi @ganapati ,

Can you provide your SDK version? Also, when you provided SPN - how did you do that? Passing string?

I am using databricks-sdk (0.65.0),  actually passing run_as is not working inside w.pipelines.update. BTW this code without run_as inside the update was working just fine for a week.

 

ganapati_0-1757315420212.pngganapati_1-1757315437046.png

 

szymon_dybczak
Esteemed Contributor III

Hi @ganapati ,

You're passing run_as in the wrong way. This argument expects a RunAs data type - not string. Try to create RunAs instance and pass that as an argument

szymon_dybczak_0-1757316616529.png

 

View solution in original post

ganapati
New Contributor III

Would you know how to create RunAs instance and pass that as an argument?

 

szymon_dybczak
Esteemed Contributor III

I guess it should look something like below. Just provide your service princpal id:

from databricks.sdk.service.pipelines import RunAs
run_as_instance = RunAs(service_principal_name="your_service_principal_id")

  w.pipelines.update(
        pipeline_id=pipeline_id,
        name=pipeline.name,
        libraries=pipeline.spec.libraries,
        catalog=pipeline.spec.catalog,
        target=pipeline.spec.target,
        configuration=configuration,
        development=pipeline.spec.development,
        edition=pipeline.spec.edition,
        serverless=pipeline.spec.serverless,
        run_as = run_as_instance
    )

 

View solution in original post

ganapati
New Contributor III

wonderful, thanks a lot, i will try this out

szymon_dybczak
Esteemed Contributor III

Let us know if it works 🙂

szymon_dybczak
Esteemed Contributor III

Hi @ganapati ,

Did you have a chance to test it out?

ganapati
New Contributor III

Hi, just tested it out, it works!, thanks again for helping out