cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Move whole workflow from Dev to Prod

KKo
Contributor III

I have a workflow created in Dev, now I want to move the whole thing to prod and schedule it. The workflow has multiple notebooks, dependent libraries, parameters and such. How to move the whole thing to prod, instead of moving each notebooks and recreate the workflow again in PROD? Or, what are the other options here? Thanks in advance!

5 REPLIES 5

Aviral-Bhardwaj
Esteemed Contributor III

databricsk CLI will be helpful here, if the Prod is in Different Shard, if you don't know this then you have to do it manually by downloading DBC files to other shards.

daniel_sahal
Esteemed Contributor

@Kris Koirala​ 

You'll need CI/CD pipelines to do that programmatically. You can use Terraform/AzureARM/Bicep or any other tool that you (or you're planning to) use.

For example - Azure + Azure DevOps:

https://learn.microsoft.com/en-us/azure/databricks/dev-tools/ci-cd/ci-cd-azure-devops

SRK
Contributor III

Option 1:

You can use Terraform. with Azure Devops to automate the deployments:

https://www.databricks.com/blog/2022/12/5/databricks-workflows-through-terraform.html

Option 2:

You can use Databricks to automate the deployments:

Jobs CLI | Databricks on AWS

Hubert-Dudek
Esteemed Contributor III

Alternatively, you can just click the three dots options in workflow and choose "view JSON" and save JSON. Then use it in the Rest API call to create new workflow/job using that JSON (but usually some part needs to be removed)

mkassa
New Contributor II

I ended up creating a python script to just do the export, here is the code below.
It will match on Job name, if it matches it will update otherwise it will import. 

import requests

source_token = ''
source_instance = 'adb-000000000000000.00.azuredatabricks.net'
target_token = ''
target_instance = 'adb-000000000000000.00.azuredatabricks.net'

target_cluster_id = "0000-000000-x00x0xxx"
target_owner_email = 'produseremail'


# Headers for API requests
headers_source = {
    'Authorization': f'Bearer {source_token}',
    'Content-Type': 'application/json'
}

headers_target = {
    'Authorization': f'Bearer {target_token}',
    'Content-Type': 'application/json'
}

# Function to list jobs in the environment
def list_jobs(instance, headers):
    response = requests.get(f'https://{instance}/api/2.0/jobs/list', headers=headers)
    response.raise_for_status()
    return response.json().get('jobs', [])

# Function to get job configuration
def get_job_config(instance, headers, job_id):
    response = requests.get(f'https://{instance}/api/2.0/jobs/get?job_id={job_id}', headers=headers)
    response.raise_for_status()
    return response.json()

# Function to create job in the target environment
def create_job(instance, headers, job_config):
    response = requests.post(f'https://{instance}/api/2.0/jobs/create', headers=headers, json=job_config)
    response.raise_for_status()
    return response.json()

# Function to update job in the target environment
def update_job(instance, headers, job_id, job_config):
    response = requests.post(f'https://{instance}/api/2.0/jobs/update?job_id={job_id}', headers=headers, json=job_config)
    response.raise_for_status()
    return response.json()

# Function to filter jobs by name
def filter_jobs_by_name(jobs, name):
    return [job for job in jobs if job['settings']['name'] == name]

# Function to export a job
def export_job(job, target_jobs, target_cluster_id, display_job_config = False) -> str:
    error_message = ''
    try:
        job_id = job['job_id']
        job_name = job['settings']['name']

        print(f"\nExporting job: {job_name}") 

        # Get job configuration from the source environment
        job_config = get_job_config(source_instance, headers_source, job_id)
       
        # Check if the job already exists in the target environment
        target_job = filter_jobs_by_name(target_jobs, job_name)

        # Prepare the job configuration
        job_config.pop('job_id', None)
        job_config['creator_user_name'] = target_owner_email
        job_config['run_as_user_name'] = target_owner_email

        job_settings = job_config.get('settings', {})
        tasks = job_settings.pop('tasks', [])

        # Remove the settings
        job_config.pop('settings', None)

        # Copy settings contents to root level
        for key, value in job_settings.items():
            job_config[key] = value

        # Update cluster ID in tasks
        for task in tasks:
            if 'existing_cluster_id' in task:
                task['existing_cluster_id'] = target_cluster_id
        # Add tasks to the root level
        job_config['tasks'] = tasks

        if target_job:
            target_job_id = target_job[0]['job_id']
            update_job(target_instance, headers_target, target_job_id, job_config)
        else:
            create_job(target_instance, headers_target, job_config)
    except Exception as e:
        error_message = f"Job with name '{job_name}' failed to export. Error: {e}"
        print(error_message)
    finally:
        print(f"Finished processing: {job_name}")    
        if display_job_config:
            print(f"\nOriginal job: {job_config}\n")
            print(f"Modified job: {job_config}\n") 
        return error_message


# Function to export all jobs
def export_all_jobs(display_job_config = False):

    source_jobs = list_jobs(source_instance, headers_source)
    target_jobs = list_jobs(target_instance, headers_target)

    for job in source_jobs:
      export_job(job, target_jobs, target_cluster_id, display_job_config)
    

# Function to export jobs by name
def export_jobs_by_name(job_name):
    source_jobs = list_jobs(source_instance, headers_source)
    target_jobs = list_jobs(target_instance, headers_target)

    filtered_jobs = filter_jobs_by_name(source_jobs, job_name)

    if not filtered_jobs:
        print(f"No jobs found with name '{job_name}' in the source environment.")
        return

    for job in filtered_jobs:
        export_job(job, target_jobs, target_cluster_id)
                
    print(f"Jobs with name '{job_name}' have been processed.")


export_all_jobs()
#export_jobs_by_name("job-name")

 

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!