01-21-2023 05:21 AM
I have a workflow created in Dev, now I want to move the whole thing to prod and schedule it. The workflow has multiple notebooks, dependent libraries, parameters and such. How to move the whole thing to prod, instead of moving each notebooks and recreate the workflow again in PROD? Or, what are the other options here? Thanks in advance!
01-21-2023 06:25 AM
databricsk CLI will be helpful here, if the Prod is in Different Shard, if you don't know this then you have to do it manually by downloading DBC files to other shards.
01-23-2023 01:36 AM
@Kris Koirala
You'll need CI/CD pipelines to do that programmatically. You can use Terraform/AzureARM/Bicep or any other tool that you (or you're planning to) use.
For example - Azure + Azure DevOps:
https://learn.microsoft.com/en-us/azure/databricks/dev-tools/ci-cd/ci-cd-azure-devops
01-23-2023 03:06 AM
Option 1:
You can use Terraform. with Azure Devops to automate the deployments:
https://www.databricks.com/blog/2022/12/5/databricks-workflows-through-terraform.html
Option 2:
You can use Databricks to automate the deployments:
01-23-2023 05:46 AM
Alternatively, you can just click the three dots options in workflow and choose "view JSON" and save JSON. Then use it in the Rest API call to create new workflow/job using that JSON (but usually some part needs to be removed)
05-27-2024 04:20 AM
I ended up creating a python script to just do the export, here is the code below.
It will match on Job name, if it matches it will update otherwise it will import.
import requests
source_token = ''
source_instance = 'adb-000000000000000.00.azuredatabricks.net'
target_token = ''
target_instance = 'adb-000000000000000.00.azuredatabricks.net'
target_cluster_id = "0000-000000-x00x0xxx"
target_owner_email = 'produseremail'
# Headers for API requests
headers_source = {
'Authorization': f'Bearer {source_token}',
'Content-Type': 'application/json'
}
headers_target = {
'Authorization': f'Bearer {target_token}',
'Content-Type': 'application/json'
}
# Function to list jobs in the environment
def list_jobs(instance, headers):
response = requests.get(f'https://{instance}/api/2.0/jobs/list', headers=headers)
response.raise_for_status()
return response.json().get('jobs', [])
# Function to get job configuration
def get_job_config(instance, headers, job_id):
response = requests.get(f'https://{instance}/api/2.0/jobs/get?job_id={job_id}', headers=headers)
response.raise_for_status()
return response.json()
# Function to create job in the target environment
def create_job(instance, headers, job_config):
response = requests.post(f'https://{instance}/api/2.0/jobs/create', headers=headers, json=job_config)
response.raise_for_status()
return response.json()
# Function to update job in the target environment
def update_job(instance, headers, job_id, job_config):
response = requests.post(f'https://{instance}/api/2.0/jobs/update?job_id={job_id}', headers=headers, json=job_config)
response.raise_for_status()
return response.json()
# Function to filter jobs by name
def filter_jobs_by_name(jobs, name):
return [job for job in jobs if job['settings']['name'] == name]
# Function to export a job
def export_job(job, target_jobs, target_cluster_id, display_job_config = False) -> str:
error_message = ''
try:
job_id = job['job_id']
job_name = job['settings']['name']
print(f"\nExporting job: {job_name}")
# Get job configuration from the source environment
job_config = get_job_config(source_instance, headers_source, job_id)
# Check if the job already exists in the target environment
target_job = filter_jobs_by_name(target_jobs, job_name)
# Prepare the job configuration
job_config.pop('job_id', None)
job_config['creator_user_name'] = target_owner_email
job_config['run_as_user_name'] = target_owner_email
job_settings = job_config.get('settings', {})
tasks = job_settings.pop('tasks', [])
# Remove the settings
job_config.pop('settings', None)
# Copy settings contents to root level
for key, value in job_settings.items():
job_config[key] = value
# Update cluster ID in tasks
for task in tasks:
if 'existing_cluster_id' in task:
task['existing_cluster_id'] = target_cluster_id
# Add tasks to the root level
job_config['tasks'] = tasks
if target_job:
target_job_id = target_job[0]['job_id']
update_job(target_instance, headers_target, target_job_id, job_config)
else:
create_job(target_instance, headers_target, job_config)
except Exception as e:
error_message = f"Job with name '{job_name}' failed to export. Error: {e}"
print(error_message)
finally:
print(f"Finished processing: {job_name}")
if display_job_config:
print(f"\nOriginal job: {job_config}\n")
print(f"Modified job: {job_config}\n")
return error_message
# Function to export all jobs
def export_all_jobs(display_job_config = False):
source_jobs = list_jobs(source_instance, headers_source)
target_jobs = list_jobs(target_instance, headers_target)
for job in source_jobs:
export_job(job, target_jobs, target_cluster_id, display_job_config)
# Function to export jobs by name
def export_jobs_by_name(job_name):
source_jobs = list_jobs(source_instance, headers_source)
target_jobs = list_jobs(target_instance, headers_target)
filtered_jobs = filter_jobs_by_name(source_jobs, job_name)
if not filtered_jobs:
print(f"No jobs found with name '{job_name}' in the source environment.")
return
for job in filtered_jobs:
export_job(job, target_jobs, target_cluster_id)
print(f"Jobs with name '{job_name}' have been processed.")
export_all_jobs()
#export_jobs_by_name("job-name")
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group