cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Cloning Git Repository in Databricks via Rest API Endpoint using Azure Service principal

rahuja
New Contributor III

Hello

I have written a python script that uses Databricks Rest API(s). I am trying to clone/ update an Azure Devops Repository inside databricks using Azure Service Principal. I am able to retrieve the credential_id for the service principal I am using. Every time I try to clone or update the repo I get authentication error.  I am unable to understand how to use it for authentication for cloning of the repo. Here's the script:

 

 

import getopt
import json
import sys
import requests

def create_git_credentials(host_name, token, devops_pat):
    method = "GET"
    url = f"{host_name}api/2.0/git-credentials"
    headers = {"Authorization": "Bearer " + token, "Content-Type": "application/json"}
    payload = json.dumps({
        "git_username": "testuser",
        "git_provider": "azureDevOpsServices",
        "personal_access_token": devops_pat
    })
    response = requests.request(method, url, headers=headers, data=payload)
    print('RESPONSE:', response.json())
    if response.status_code == 200:
        credentials_id = response.json()['credentials'][0]['credential_id']
        return credentials_id
    else:
        raise ValueError(f"Error creating git credentials: {response.text}")

def checkIfRepoExists(host_name, token, repo_path_in_dbks):
    method = "GET"
    url = f"{host_name}api/2.0/repos"
    headers = {"Authorization": "Bearer " + token, "Content-Type": "application/json"}
    response = requests.request(method, url, headers=headers)
    response_json = json.loads(response.text)
    listToFilter = list(response_json["repos"])
    while len(response_json) > 1:
        new_response = requests.request(
            method,
            url,
            headers=headers,
            params={"next_page_token": response_json["next_page_token"]},
        )
        response_json = json.loads(new_response.text)
        listToFilter.extend(response_json["repos"])

    filtered_list = [
        dictionary
        for dictionary in listToFilter
        if repo_path_in_dbks == dictionary["path"]
    ]

    if len(filtered_list) == 0:
        return 0

    repo_id = filtered_list[0]["id"]

    return repo_id

def createParentDirectoryIfNotExists(host_name, token, repo_path_in_dbks):
    parent_path = "/".join(repo_path_in_dbks.split("/")[:-1])
    method = "GET"
    url = f"{host_name}api/2.0/workspace/list"
    headers = {"Authorization": "Bearer " + token, "Content-Type": "application/json"}
    response = requests.request(method, url, headers=headers, params={"path": parent_path})
    response_json = json.loads(response.text)
    
    if 'error_code' in response_json and response_json['error_code'] == 'RESOURCE_DOES_NOT_EXIST':
        method = "POST"
        url = f"{host_name}api/2.0/workspace/mkdirs"
        payload = json.dumps({"path": parent_path})
        response = requests.request(method, url, headers=headers, data=payload)
        if response.status_code != 200:
            raise ValueError(f"Failed to create parent directory {parent_path}: {response.text}")

def updateRepo(host_name, token, repoId, branch):
    method = "PATCH"
    url = f"{host_name}api/2.0/repos/{str(repoId)}"
    headers = {"Authorization": "Bearer " + token, "Content-Type": "application/json"}

    payload = json.dumps({"branch": branch})
    print("payload is\n" + payload)

    print(method + "-Request: " + url + "\n" + "payload:\n" + payload)
    response = requests.request(method, url, headers=headers, data=payload)
    if response.status_code == 200:
        return response.text
    else:
        raise ValueError(response.text)

def cloneRepo(host_name, token, git_url, repo_path_in_dbks, branch):
    method = "POST"
    url = f"{host_name}api/2.0/repos"
    headers = {"Authorization": "Bearer " + token, "Content-Type": "application/json"}

    payload = json.dumps({
        "url": git_url,
        "provider": "azureDevOpsServices",
        "path": repo_path_in_dbks,
        "branch": branch
    })
    print("payload is\n" + payload)

    print(method + "-Request: " + url + "\n" + "payload:\n" + payload)
    response = requests.request(method, url, headers=headers, data=payload)
    print("RESPONSE:", response.json())
    if response.status_code == 200:
        return response.text
    else:
        raise ValueError(response.text)

def main():
    try:
        opts, args = getopt.getopt(
            sys.argv[1:],
            "hstcld",
            ["host_name=", "token=", "git_url=", "branch=", "repo_path_in_dbks=", "devops_pat="],
        )
    except getopt.GetoptError:
        print(
            "create_update_dbks_repo.py"
            + " -h <host_name>"
            + " -t <token>"
            + " -g <git_url>"
            + " -b <branch>"
            + " -p <repo_path_in_dbks>"
            + " -d <devops_pat>"
        )
        sys.exit(2)

    for opt, arg in opts:
        if opt in ("-h", "--host_name"):
            host_name = arg
        elif opt in ("-t", "--token"):
            token = arg
        elif opt in ("-g", "--git_url"):
            git_url = arg
        elif opt in ("-b", "--branch"):
            branch = arg
        elif opt in ("-p", "--repo_path_in_dbks"):
            repo_path_in_dbks = arg
        elif opt in ("-d", "--devops_pat"):
            devops_pat = arg

    print("-h is " + host_name)
    print("-g is " + git_url)
    print("-b is " + branch)
    print("-p is " + repo_path_in_dbks)
    print("-d is " + devops_pat)

    credentials_id = create_git_credentials(host_name, token, devops_pat)
    repoName = git_url.split("/")[-1]
    repoId = checkIfRepoExists(host_name, token, repo_path_in_dbks)
    if repoId != 0:
        updateRepo(host_name, token, repoId, branch)
        print("DBKS Repo: " + repoName + " updated successfully.")
    else:
        createParentDirectoryIfNotExists(host_name, token, repo_path_in_dbks)
        cloneRepo(host_name, token, git_url, repo_path_in_dbks, branch)
        print("DBKS Repo: " + repoName + " cloned successfully.")

if __name__ == "__main__":
    main()

 

 

 Is there a way to directly use the DevOps PAT to authenticate with hitting the get_credentials endpoint?

0 REPLIES 0
Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!