cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

issue updating DLT pipeline configurations using databricks sdk

ganapati
New Contributor III

I am updating dlt pipeline configs with job id , run id and run_datetime of the job , so that i can access these values inside dlt pipeline. below is the code i am using to do that. 

# Databricks notebook source
import sys
import logging
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import pipelines


def setup_logger() -> logging.Logger:
    """
    Creates a Logger.

    :returns: A logger instance.
    """
    logger = logging.getLogger(__name__)
    stdout = logging.StreamHandler(stream=sys.stdout)
    stdout.setLevel(logging.INFO)

    logger.addHandler(stdout)
    logger.setLevel(logging.INFO)

    return logger


def update_dlt_pipeline_config(
    pipeline_id: str,
    job_id: str,
    run_id: str,
    run_date: str
) -> None:
    """ This function updates the DLT pipeline configuration with the job parameters.
        To make them available in the pipeline task.

    :param pipeline_id: The ID of the DLT pipeline to be updated.
    :type pipeline_id: str
    :param env: The environment for the workflow (e.g., dev, tst, val, prd).
    :type env: str
    :param job_id: The job ID of the workflow job.
    :type job_id: str
    :param run_id: The run ID of the workflow job.
    :type run_id: str
    :param run_date: The run date of the workflow job.
    :type run_date: str
    """
    w = WorkspaceClient()
    pipeline = w.pipelines.get(pipeline_id)

    print(pipeline)
    configuration = pipeline.spec.configuration

    configuration['job_id'] = job_id
    configuration['run_id'] = run_id
    configuration['run_datetime'] = run_datetime
    print(configuration)

    w.pipelines.update(
        pipeline_id=pipeline_id,
        name=pipeline.name,
        libraries=pipeline.spec.libraries,
        catalog=pipeline.spec.catalog,
        target=pipeline.spec.target,
        configuration=configuration,
        development=pipeline.spec.development,
        edition=pipeline.spec.edition,
        serverless=pipeline.spec.serverless,
        run_as = <SPN>
    )
   
    logger.info("Updated spec %s", w.pipelines.get(pipeline_id))


if __name__ == "__main__":

    logger = setup_logger()


    env = dbutils.widgets.get("env")
    job_id = dbutils.widgets.get("job_id")
    run_id = dbutils.widgets.get("run_id")
    run_datetime = dbutils.widgets.get("run_date")
    pipeline_id =  dbutils.widgets.get("pipeline_id")

    logger.info("job_id: %s", job_id)
    logger.info("run_id: %s", run_id)
    logger.info("run_datetime: %s", run_datetime)
    logger.info("pipeline_id: %s", pipeline_id)

    update_dlt_pipeline_config(pipeline_id, job_id, run_id, run_datetime)
 
 
 
This code has suddenly stopped working with   error that, run_as cannot be set to null, please set run as to the <SPN> (run_as used for this target). Why this issue is there? if i pass the run_as, i get the error that invalid arguement passed to the update API.
 
 
2 ACCEPTED SOLUTIONS

Accepted Solutions

szymon_dybczak
Esteemed Contributor III

Hi @ganapati ,

You're passing run_as in the wrong way. This argument expects a RunAs data type - not string. Try to create RunAs instance and pass that as an argument

szymon_dybczak_0-1757316616529.png

 

View solution in original post

szymon_dybczak
Esteemed Contributor III

I guess it should look something like below. Just provide your service princpal id:

from databricks.sdk.service.pipelines import RunAs
run_as_instance = RunAs(service_principal_name="your_service_principal_id")

  w.pipelines.update(
        pipeline_id=pipeline_id,
        name=pipeline.name,
        libraries=pipeline.spec.libraries,
        catalog=pipeline.spec.catalog,
        target=pipeline.spec.target,
        configuration=configuration,
        development=pipeline.spec.development,
        edition=pipeline.spec.edition,
        serverless=pipeline.spec.serverless,
        run_as = run_as_instance
    )

 

View solution in original post

9 REPLIES 9

szymon_dybczak
Esteemed Contributor III

Hi @ganapati ,

Can you provide your SDK version? Also, when you provided SPN - how did you do that? Passing string?

I am using databricks-sdk (0.65.0),  actually passing run_as is not working inside w.pipelines.update. BTW this code without run_as inside the update was working just fine for a week.

 

ganapati_0-1757315420212.pngganapati_1-1757315437046.png

 

szymon_dybczak
Esteemed Contributor III

Hi @ganapati ,

You're passing run_as in the wrong way. This argument expects a RunAs data type - not string. Try to create RunAs instance and pass that as an argument

szymon_dybczak_0-1757316616529.png

 

ganapati
New Contributor III

Would you know how to create RunAs instance and pass that as an argument?

 

szymon_dybczak
Esteemed Contributor III

I guess it should look something like below. Just provide your service princpal id:

from databricks.sdk.service.pipelines import RunAs
run_as_instance = RunAs(service_principal_name="your_service_principal_id")

  w.pipelines.update(
        pipeline_id=pipeline_id,
        name=pipeline.name,
        libraries=pipeline.spec.libraries,
        catalog=pipeline.spec.catalog,
        target=pipeline.spec.target,
        configuration=configuration,
        development=pipeline.spec.development,
        edition=pipeline.spec.edition,
        serverless=pipeline.spec.serverless,
        run_as = run_as_instance
    )

 

ganapati
New Contributor III

wonderful, thanks a lot, i will try this out

szymon_dybczak
Esteemed Contributor III

Let us know if it works 🙂

szymon_dybczak
Esteemed Contributor III

Hi @ganapati ,

Did you have a chance to test it out?

ganapati
New Contributor III

Hi, just tested it out, it works!, thanks again for helping out