Louis_Frolio
Databricks Employee
Databricks Employee

Greetings @MarsWalker ,  here’s how to get true streaming from a served RAG chain on Databricks Model Serving.

 

What’s going on

  • The served endpoint error (“This endpoint does not support streaming”) is expected when the deployed entity doesn’t expose a stream-capable interface to Model Serving. A LangChain chain logged with the MLflow LangChain flavor will invoke fine synchronously, but the Serving layer won’t stream unless the model implements the stream contract Databricks supports (predict_stream / ResponsesAgent stream events).
  • Streaming is natively supported for OpenAI-compatible endpoints (external models) via the stream parameter, but that mechanism applies to chat/completions tasks on external providers; it doesn’t automatically enable streaming for custom LangChain chains you log as MLflow models.
  • Today, the recommended way to serve streamable custom GenAI logic (including LangChain) is to wrap your chain with the MLflow ResponsesAgent interface and implement predict_stream. That makes your endpoint stream over the Databricks OpenAI-compatible client and REST, and it integrates with tracing/evaluation.

The working recipe

1) Wrap your chain in a ResponsesAgent with predict_stream.
# pip install -U mlflow databricks-agents databricks-langchain langchain

import mlflow
from mlflow.types.responses import (
    ResponsesAgent, ResponsesAgentRequest, ResponsesAgentResponse, ResponsesAgentStreamEvent
)

class RAGChainAgent(ResponsesAgent):
    def __init__(self, chain):
        self.chain = chain

    def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
        # Non-streaming path: run and return one complete response
        answer = self.chain.invoke({"messages": request.messages})
        item = self.create_text_output_item(text=answer)
        return ResponsesAgentResponse(output_items=[item])

    def predict_stream(self, request: ResponsesAgentRequest):
        # Streaming path: emit deltas, then a final "done" item
        item_id = self.new_id()
        full_text = ""
        # If your chain supports async streaming, adapt this to call chain.stream() or chain.astream()
        for chunk in self.chain.stream({"messages": request.messages}):
            text_chunk = chunk if isinstance(chunk, str) else str(chunk)
            full_text += text_chunk
            yield ResponsesAgentStreamEvent.output_text_delta(id=item_id, text=text_chunk)

        # Final completion event with the aggregated text
        yield ResponsesAgentStreamEvent.output_item_done(
            item=self.create_text_output_item(text=full_text, id=item_id)
        )

# Log the agent
with mlflow.start_run():
    mlflow.models.log_model(artifact_path="rag_agent", model=RAGChainAgent(chain))
 
2) Serve the logged agent as a custom model (UI or API). Agent serving is supported as a custom model in Model Serving.
 
3) Query with streaming enabled (Databricks OpenAI client is the easiest): ```python from databricks.sdk import WorkspaceClient w = WorkspaceClient()
client = w.serving_endpoints.get_open_ai_client()
# responses API (Predict/ResponsesAgent) stream = client.responses.create( model="<your-endpoint-name>", input=[{"role": "user", "content": "Ask me something from the knowledge base"}], stream=True, # key to trigger predict_stream )
for chunk in stream: # Handle stream chunks (delta events), aggregate, etc. print(chunk) ```
Alternatively, use chat.completions with stream=True if your wrapper exposes an OpenAI-compatible chat schema, or call predict_stream via the SDK directly; all of these map to the same streaming behavior on served agents.
 

Notes and common pitfalls

  • Query Endpoint panel in the Serving UI returns the full response for custom models; use the Databricks OpenAI client or REST/SDK to receive streamed chunks in real time.
  • External-model endpoints (OpenAI, Anthropic, Cohere, etc.) support “stream” for chat/completions requests out-of-the-box; that path is separate from custom MLflow/LangChain deployments and is governed by the provider’s streaming capability.
  • For LangChain specifically, Databricks documents the integration as experimental and recommends using MLflow/ResponsesAgent for production-grade agents, including streaming, tracing, and evaluation.

Why your current chain.py endpoint isn’t streaming

  • The MLflow LangChain flavor can execute your chain synchronously, but the Model Serving scoring server only streams when the logged model exposes a streamable interface (predict_stream or the ResponsesAgent streaming events). Your current logged chain doesn’t advertise that interface, so the endpoint rejects stream=True with “This endpoint does not support streaming.”
 

If you prefer to keep LangChain “as-is”

  • Keep all your LangChain logic, but add a thin agent wrapper (as shown above) that calls chain.stream/chain.astream and yields ResponsesAgentStreamEvent deltas. This approach preserves your code and makes the endpoint streamable in Databricks.
 
Hope this helps, Louis.