Data quality questions are everywhere in a data team's day. Is this table stale? Did any pipeline fail last night? Why does this dashboard look off? How did we fix this last time? The answers usually exist somewhere in pipeline run history, in expectation logs(the data quality rules you define on Spark Declarative Pipelines), in monitoring tables, in old Slack threads or runbooks. They're just spread across different places, and pulling them together takes time and context-switching.
The above usecase can be achieved in 2 ways in Databricks.
1) Managed (Agent Bricks – Multi-Agent Supervisor)
Use when speed and simplicity matter.
Create a multi agent with the supervisor agent: https://docs.databricks.com/aws/en/generative-ai/agent-bricks/multi-agent-supervisor
2) Custom (Databricks Apps + OpenAI Agents SDK)
Use when control and customization are required.
In this demo let’s walk through the following approach: Custom (Databricks Apps + OpenAI Agents SDK). In this we will build something simple: a chat interface where anyone can ask a data quality question in plain language and get a grounded answer that pulls from all those places at once.
This demo shows building a Databricks App called Data-pipeline monitoring: a chat interface backed by an AI agent that orchestrates three Databricks-managed MCP servers to answer questions about data quality. In this three different sources of "truth" come together in one conversation:
|
Source |
What it answers |
Backed by |
|
Pipeline & monitoring metrics |
"What's the current health?" "Which tables are stale?" |
A Genie space over our monitoring tables |
|
On-demand DQ checks |
"Investigate this specific table." "Run a freshness check." |
UC Functions |
|
Remediation knowledge |
"How do we usually fix stale gold tables?" |
A Vector Search index over past incidents and runbooks |
The agent decides which to call based on the question.
The agent never talks to data directly: It only talks to MCP servers. Each MCP server enforces Unity Catalog permissions on its own.
Unity Catalog is the only access-control plane: Whatever the app's identity is allowed to see, the agent can use. Whatever it isn't, the agent can't.
The Model Context Protocol lets AI agents discover and invoke tools at runtime instead of having tools hardcoded into prompt templates.
Databricks supports three flavors of MCP servers:
For our use case, managed MCP is the perfect fit. Every MCP we need is already available in Databricks Managed MCP servers, every tool call automatically respects Unity Catalog permissions, and we get visibility into all of it through AI Gateway.
MCP on Databricks : https://docs.databricks.com/aws/en/generative-ai/mcp/
The following sections walk through the steps that used to construct the demo:
In this demo we can use system tables like system.access.audit / system.lakeflow.jobs / system.data_quality_monitoring.table_results ...etc. Alternatively, you can create your own tables with the refined data and selected columns. We can fetch data of pipeline_runs, expectation_results, table_freshness, and create a table with historical incidents with resolutions (this becomes the runbook knowledge base).
Once the above data is ready register SQL functions in UC. the UC Functions managed MCP server automatically exposes them at: https://<workspace-host>/api/2.0/mcp/functions/<catalog>/<schema>
Example on registering the function for pipeline health summary:
CREATE OR REPLACE FUNCTION <catalog>.<schema>.get_pipeline_health_summary()
RETURNS STRING
COMMENT 'Returns a summary of pipeline health for the most recent day.
Returns JSON with totals by status and per-domain breakdown.'
RETURN ( ... )
The COMMENT is doing more work here than it looks. It becomes the tool description the LLM sees when deciding which tool to call. Similarly, create functions for table freshness check, DLT expectations with >1% failure rate, Tables flagged for volume anomalies , check pulling freshness + recent failures.
UC Functions : https://docs.databricks.com/aws/en/udf/unity-catalog
Use Databricks managed MCP servers : https://docs.databricks.com/aws/en/generative-ai/mcp/managed-mcp
We embed each dq_incidents row using Databricks' managed embedding model and create a Delta Sync index:
from databricks.vector_search.client import VectorSearchClient
vsc = VectorSearchClient()
vsc.create_delta_sync_index(
endpoint_name= <name-of-the-endpoint>,
source_table_name=<source_table>,
index_name=<source_table_index>,
primary_key=<id>,
embedding_source_column=<content>,
embedding_model_endpoint_name="databricks-gte-large-en",
pipeline_type="TRIGGERED",
)
Once the index is online, it's automatically available via the Vector Search managed MCP server at: https://<workspace-host>/api/2.0/mcp/vector-search/<catalog>/<schema>
Vector Search : https://docs.databricks.com/aws/en/vector-search/vector-search
Open Genie, click + New, and configure a space over the desired monitoring tables. And add instructions field to each Genie how to think about your domain:
Domain knowledge:
- pipeline_runs: daily ETL runs. Statuses: SUCCESS, FAILED, STALE.
Layers: bronze, silver, gold. Domains: sales, inventory, customer, finance.
- expectation_results: Lakeflow pipeline expectation pass/fail counts per day.
A "failure rate" means failed_records / (failed + passed).
- table_freshness: hours since each table was updated, plus the SLA hours.
A table is "stale" when hours_since_update > sla_hours.
- data_volume_metrics: daily row counts vs. predicted lower/upper range.
Style: plain business language. Lead with the headline number.
Group pipeline health by domain by default.
Once the space works, copy its space ID from the URL. Genie spaces become MCP servers automatically at: https://<workspace-host>/api/2.0/mcp/genie/<SPACE_ID>
You can verify all three MCPs are visible at Workspace → AI Gateway → MCPs.
Genie Spaces : https://docs.databricks.com/aws/en/genie/
In this we are using agent-openai-agents-sdk template. The template ships with a built-in chat UI, an MLflow AgentServer, the OpenAI Agents SDK, and authentication Install the template via + New → App → Agents → Agent - OpenAI Agents SDK.
AsyncDatabricksOpenAI from the databricks-openai package handles workspace authentication and routes calls to your foundation model serving endpoints.
OpenAI Agents SDK provides the conversation loop, tool orchestration, and session management.
McpServer from databricks_openai.agents connects to managed MCP servers with auth handled for you.
Connecting to the three MCP servers. Inside agent.py, build the three managed MCP server :
from databricks.sdk import WorkspaceClient
from databricks_openai.agents import McpServer
from agent_server.utils import build_mcp_url
def _genie_mcp(ws: WorkspaceClient) -> McpServer:
return McpServer(
url=build_mcp_url(f"/api/2.0/mcp/genie/{GENIE_SPACE_ID}",
workspace_client=ws),
name="genie",
workspace_client=ws,
)
def _uc_functions_mcp(ws: WorkspaceClient) -> McpServer:
return McpServer(
url=build_mcp_url(f"/api/2.0/mcp/functions/{UC_CATALOG}/{UC_FUNC_SCHEMA}",
workspace_client=ws),
name="uc_functions",
workspace_client=ws,
)
def _vector_search_mcp(ws: WorkspaceClient) -> McpServer:
return McpServer(
url=build_mcp_url(f"/api/2.0/mcp/vector-search/{VS_CATALOG}/{VS_SCHEMA}",
workspace_client=ws),
name="vector_search",
workspace_client=ws,
)
Wiring the agent: The OpenAI Agents SDK takes care of the tool-calling loop. We just declare an Agent with the model, instructions, and the list of MCP servers:
from agents import Agent, Runner
def create_agent(mcp_servers):
return Agent(
name="DQ Pulse",
instructions=SYSTEM_PROMPT,
model="databricks-claude-sonnet-4-5",
mcp_servers=mcp_servers or [],
)
Tools from all three MCPs are discovered automatically, the model picks which to call, results are routed back into the loop, and the final answer comes out the other end.
This will also help in adding a UC function tomorrow or anytime later, It shows up in the agent automatically. No code changes required.
The handlers. The template uses @invoke() for non-streaming requests and @stream() for streaming. Both follow the same shape open the MCP connections, build the agent, run it:
from contextlib import AsyncExitStack
from mlflow.genai.agent_server import invoke, stream
@invoke()
async def invoke_handler(request):
ws = WorkspaceClient()
messages = [i.model_dump() for i in request.input]
async with AsyncExitStack() as stack:
try:
servers = [
await stack.enter_async_context(_genie_mcp(ws)),
await stack.enter_async_context(_uc_functions_mcp(ws)),
await stack.enter_async_context(_vector_search_mcp(ws)),
]
agent = create_agent(mcp_servers=servers)
except Exception:
agent = create_agent(mcp_servers=[])
result = await Runner.run(agent, messages)
return ResponsesAgentResponse(
output=[item.to_input_item() for item in result.new_items]
)
Author an AI agent and deploy it on Databricks Apps: https://docs.databricks.com/aws/en/generative-ai/agent-framework/author-agent
The system prompt. Tool calling alone doesn't make a useful agent. The system prompt is what shapes how the agent investigates and answers:
You are DQ Pulse, a data quality assistant.
You have access to tools across three Databricks managed MCP servers:
- Genie Spaces — natural-language analytical questions about monitoring data
- UC Functions — deterministic DQ check functions
- Vector Search — incident reports and runbooks
Guidelines:
- Always investigate before answering. When asked about a problem,
call tools to gather facts first.
- Use Genie for "what is happening" questions.
Use UC Functions for "run a specific check."
Use Vector Search for "how do we fix this."
- Lead with a one-line status (🟢 Healthy / 🟡 Attention / 🔴 Critical),
then a 2–3 sentence summary in plain business language.
- When you suggest fixes, base them on retrieved runbook content,
not invented advice.
- Never invent table names, incident IDs, or numbers.
That last rule is really helpful as it forces the agent to ground every claim in tool output, which makes the answers trustworthy.
System prompts like this one are typically the result of several rounds of trial and error. If you want to take iteration further than hand-tuning, MLflow Prompt Optimization can help. It exposes an mlflow.genai.optimize_prompts() API that uses the GEPA algorithm to iteratively improve prompts using evaluation metrics and training data.
MLflow Prompt Optimization (Beta): https://docs.databricks.com/aws/en/mlflow3/genai/prompt-version-mgmt/prompt-registry/automatically-o...
Databricks Apps : https://docs.databricks.com/aws/en/dev-tools/databricks-apps/
In databricks.yml file declares all the resources the agent needs. Update it to grant the app's service principal permission to access your Genie space, UC functions schema, Vector Search index, monitoring tables, and the foundation model serving endpoint. Granting permissions declaratively in databricks.yml means the app's service principal automatically gets what it needs at deploy time. Or we can do it separately in UI under forAuthorization section of an APP to grant EXECUTE, SELECT, USE INDEX, and Genie viewer access.
Note: Databricks Apps is moving. Today it lives under Compute → Apps, but it will soon be reachable from the 9-dot menu at the top-right of the workspace, alongside other surfaces like Lakebase and Lakewatch. If Compute → Apps is gone by the time you read this, look there instead the deployment flow itself is unchanged.
1. CAN EXECUTE on UC_FUNCTIONS
2. SELECT on UC_TABLE
3. USE INDEX on VS_INDEX
4. Viewer access to the Genie space (share from the Genie UI)
5. Permission to query the model serving endpoint
This completes the demo of different MCP servers. One coherent answer. No SQL anywhere in the response.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.