cancel
Showing results for 
Search instead for 
Did you mean: 
Data Governance
Join discussions on data governance practices, compliance, and security within the Databricks Community. Exchange strategies and insights to ensure data integrity and regulatory compliance.
cancel
Showing results for 
Search instead for 
Did you mean: 

How to build model lineage programmatically in a cohesive manner?

rashij
New Contributor II

If a model is registered in Unity Catalog, then what all APIs and SDKs could be used to build the entire lineage for a model? I'm trying to figure out what all do I query to ensure I don't miss any element of the model lineage.
Now a model can have below elements in upstream:
1. Table/feature table/ Delta table
2. Functions
3. Notebooks
4. Workflows/Jobs

So far I've been able to gather these entry points to build the lineage:
1. Figure out notebook from the tags present in run info
2. If a feature table is used, and the model is logged (`log_model`) along with an artifact, then the feature_spec.yaml at least contains the feature tables & functions used. But if the artifact is not logged, then I do not see a way to get even these details.
3. Table lineage can still be figured via lineage tracking API but I'll need to go over every table. Is there a more efficient way to backtrack tables/functions from model or notebook rather?
4. Couldn't find on how to get lineage for functions/workflows at all.

4 REPLIES 4

lingareddy_Alva
Honored Contributor II

@rashij 

You're tackling an important challenge in ML governance. Building a comprehensive lineage for models registered in Unity Catalog requires piecing together information from multiple APIs. Let me outline a more complete approach based on your identified elements.

Available APIs for Model Lineage Tracking

To build comprehensive model lineage, you'll need to use a combination of:

  1. Unity Catalog API - Core lineage information
  2. MLflow Tracking API - Model creation details
  3. Workspace API - Notebook and job information
  4. Delta Table History API - Table modification history
  5. DBFS API - For artifact inspection

 

1. Starting with the Model

 
from databricks.sdk import WorkspaceClientfrom mlflow.tracking import MlflowClient
# Initialize clients
ws_client = WorkspaceClient()
mlflow_client = MlflowClient()

# Get model info from Unity Catalog
model_info = ws_client.catalog_models.get(
    name="catalog.schema.model_name"
)

# Get latest version details
model_version = mlflow_client.get_model_version(
    name=f"{model_info.catalog_name}.{model_info.schema_name}.{model_info.name}",
    version="latest"
)

# Get the run info
run = mlflow_client.get_run(model_version.run_id)

2. Extract Notebook Information

 

 
# Extract notebook path from tags
notebook_path = run.data.tags.get("mlflow.source.name")
notebook_id = run.data.tags.get("mlflow.databricks.notebookID")

# Get notebook revision history
if notebook_id:
    notebook_revisions = ws_client.workspace.get_revision_history(
        path=notebook_path    )

 

3. Extract Table Lineage

 

 
# Direct lineage API approach
for artifact in mlflow_client.list_artifacts(run_id=run.info.run_id):
    if artifact.path == "feature_spec.yaml":
        # Download and parse feature_spec to get table references
        feature_spec = download_and_parse_feature_spec(run.info.run_id, artifact.path)
        
# For tables mentioned in the feature_spec
for table in feature_tables:
    # Get upstream and downstream lineage
    table_lineage = ws_client.tables.get_lineage(
        table_name=table    )

 

4. Extract Function Usage

 

 
# For custom functions, query Unity Catalog functions
functions = []
for func_name in extracted_function_names:  # From feature_spec or notebook analysis
    try:
        func_info = ws_client.catalog_functions.get(
            name=func_name        )
        functions.append(func_info)
    except:
        pass

5. Extract Workflow/Job Information

 

 
# From run tags, extract job ID if available
job_id = run.data.tags.get("mlflow.databricks.jobID")
run_id = run.data.tags.get("mlflow.databricks.jobRunID")

if job_id:
    # Get job details
    job_info = ws_client.jobs.get(
        job_id=job_id    )
    
    # Get specific run details
    if run_id:
        job_run = ws_client.jobs.get_run(
            run_id=run_id        )
LR

Hey @lingareddy_Alva 

Thank you on that detailed information.
So I was trying something similar and kinda stuck on this download artifact which keeps failing for me. Do you see anything wrong in this path because it keeps giving "No such file exists" with PAT auth-type. I clearly have access to that file on UI and can very much download from UI. Are there more privileges needed on the token?

dbfs_file_path = "/dbfs/databricks/mlflow-tracking/<notebook_id>/<run_id>/artifacts/is_close_model_artifact/data/feature_store/feature_spec.yaml"

resp = ws_client.dbfs.read(dbfs_file_path)
print(f"file data... {resp.data}")

 

lingareddy_Alva
Honored Contributor II

try this

Here's how to correctly access your feature spec file:

 

 
from databricks.sdk import WorkspaceClient
# Initialize the client
ws_client = WorkspaceClient()

# Correct path format - remove the /dbfs prefix
dbfs_file_path = "databricks/mlflow-tracking/<notebook_id>/<run_id>/artifacts/is_close_model_artifact/data/feature_store/feature_spec.yaml"

try:
    # Read the file
    resp = ws_client.dbfs.read(dbfs_file_path)
    print(f"File data: {resp.data}")
except Exception as e:
    print(f"Error: {e}")
LR

Tried 2 ways and it gives below error:
1. path: "databricks/mlflow-tracking/<notebook_id>/<run_id>/artifacts/is_close_model_artifact/data/feature_store/feature_spec.yaml"
error: Path must be absolute

2. path: "/databricks/mlflow-tracking/<notebook_id>/<run_id>/artifacts/is_close_model_artifact/data/feature_store/feature_spec.yaml"
error: No operations allowed on this path

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now