- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-13-2025 01:54 AM - edited 01-13-2025 01:56 AM
Hi , Use following oauth token instead. make changes for AWS
import requests
import json
import os
# Environment variables (replace with your actual values or use environment variables)
DATABRICKS_WORKSPACE_URL = os.getenv("DATABRICKS_WORKSPACE_URL")
CLIENT_ID = os.getenv("CLIENT_ID")
CLIENT_SECRET = os.getenv("CLIENT_SECRET")
TENANT_ID = "<<TENANTID>>"
SCOPE = "2ff814a6-3304-4ab8-85cb-cd0e6f879c1d/.default"
# URLs
TOKEN_URL = f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/v2.0/token"
DATABRICKS_URL = f"{DATABRICKS_WORKSPACE_URL}/api/2.0/token/create"
# Get access token
payload = {
"client_id": CLIENT_ID,
"grant_type": "client_credentials",
"scope": SCOPE,
"client_secret": CLIENT_SECRET
}
response = requests.post(TOKEN_URL, data=payload)
response.raise_for_status() # Raise an error for bad status codes
access_token_val = response.json()
access_token = access_token_val.get("access_token")
if not access_token:
print("Failed to obtain access token")
exit(1)
print(f"Access Token: {access_token}")
# Create Databricks token
headers = {
"Authorization": f"Bearer {access_token}",
"X-Databricks-Azure-SP-Management-Token": access_token,
"Content-Type": "application/json"
}
data = {
"comment": "pipeline token"
}
api_response = requests.post(DATABRICKS_URL, headers=headers, json=data)
api_response.raise_for_status() # Raise an error for bad status codes
api_response_json = api_response.json()
DATABRICKS_NEW_TOKEN = api_response_json.get("token_value")
if not DATABRICKS_NEW_TOKEN:
print("Token could not be created")
exit(1)
else:
print("Successfully created a Databricks Token")
print(f"##vso[task.setvariable variable=DATABRICKS_TOKEN;isOutput=true]{DATABRICKS_NEW_TOKEN}")
print(f"##vso[task.setvariable variable=ACCESS_TOKEN;isOutput=true]{access_token}")