mkassa
New Contributor II

I ended up creating a python script to just do the export, here is the code below.
It will match on Job name, if it matches it will update otherwise it will import. 

import requests

source_token = ''
source_instance = 'adb-000000000000000.00.azuredatabricks.net'
target_token = ''
target_instance = 'adb-000000000000000.00.azuredatabricks.net'

target_cluster_id = "0000-000000-x00x0xxx"
target_owner_email = 'produseremail'


# Headers for API requests
headers_source = {
    'Authorization': f'Bearer {source_token}',
    'Content-Type': 'application/json'
}

headers_target = {
    'Authorization': f'Bearer {target_token}',
    'Content-Type': 'application/json'
}

# Function to list jobs in the environment
def list_jobs(instance, headers):
    response = requests.get(f'https://{instance}/api/2.0/jobs/list', headers=headers)
    response.raise_for_status()
    return response.json().get('jobs', [])

# Function to get job configuration
def get_job_config(instance, headers, job_id):
    response = requests.get(f'https://{instance}/api/2.0/jobs/get?job_id={job_id}', headers=headers)
    response.raise_for_status()
    return response.json()

# Function to create job in the target environment
def create_job(instance, headers, job_config):
    response = requests.post(f'https://{instance}/api/2.0/jobs/create', headers=headers, json=job_config)
    response.raise_for_status()
    return response.json()

# Function to update job in the target environment
def update_job(instance, headers, job_id, job_config):
    response = requests.post(f'https://{instance}/api/2.0/jobs/update?job_id={job_id}', headers=headers, json=job_config)
    response.raise_for_status()
    return response.json()

# Function to filter jobs by name
def filter_jobs_by_name(jobs, name):
    return [job for job in jobs if job['settings']['name'] == name]

# Function to export a job
def export_job(job, target_jobs, target_cluster_id, display_job_config = False) -> str:
    error_message = ''
    try:
        job_id = job['job_id']
        job_name = job['settings']['name']

        print(f"\nExporting job: {job_name}") 

        # Get job configuration from the source environment
        job_config = get_job_config(source_instance, headers_source, job_id)
       
        # Check if the job already exists in the target environment
        target_job = filter_jobs_by_name(target_jobs, job_name)

        # Prepare the job configuration
        job_config.pop('job_id', None)
        job_config['creator_user_name'] = target_owner_email
        job_config['run_as_user_name'] = target_owner_email

        job_settings = job_config.get('settings', {})
        tasks = job_settings.pop('tasks', [])

        # Remove the settings
        job_config.pop('settings', None)

        # Copy settings contents to root level
        for key, value in job_settings.items():
            job_config[key] = value

        # Update cluster ID in tasks
        for task in tasks:
            if 'existing_cluster_id' in task:
                task['existing_cluster_id'] = target_cluster_id
        # Add tasks to the root level
        job_config['tasks'] = tasks

        if target_job:
            target_job_id = target_job[0]['job_id']
            update_job(target_instance, headers_target, target_job_id, job_config)
        else:
            create_job(target_instance, headers_target, job_config)
    except Exception as e:
        error_message = f"Job with name '{job_name}' failed to export. Error: {e}"
        print(error_message)
    finally:
        print(f"Finished processing: {job_name}")    
        if display_job_config:
            print(f"\nOriginal job: {job_config}\n")
            print(f"Modified job: {job_config}\n") 
        return error_message


# Function to export all jobs
def export_all_jobs(display_job_config = False):

    source_jobs = list_jobs(source_instance, headers_source)
    target_jobs = list_jobs(target_instance, headers_target)

    for job in source_jobs:
      export_job(job, target_jobs, target_cluster_id, display_job_config)
    

# Function to export jobs by name
def export_jobs_by_name(job_name):
    source_jobs = list_jobs(source_instance, headers_source)
    target_jobs = list_jobs(target_instance, headers_target)

    filtered_jobs = filter_jobs_by_name(source_jobs, job_name)

    if not filtered_jobs:
        print(f"No jobs found with name '{job_name}' in the source environment.")
        return

    for job in filtered_jobs:
        export_job(job, target_jobs, target_cluster_id)
                
    print(f"Jobs with name '{job_name}' have been processed.")


export_all_jobs()
#export_jobs_by_name("job-name")