cancel
Showing results for 
Search instead for 
Did you mean: 
Generative AI
Explore discussions on generative artificial intelligence techniques and applications within the Databricks Community. Share ideas, challenges, and breakthroughs in this cutting-edge field.
cancel
Showing results for 
Search instead for 
Did you mean: 

How to serve a RAG chain endpoint that supports streaming

MarsWalker
New Contributor II

Hello everyone,

I am trying to serve a sample RAG chain model that should support streaming output. But I could not find any documantation on how to enable streaming for a serving endpoint for a langchain model. Could you provide some hints on how to do that?

 

# RAG Chain
chain = (
    {
        "question": itemgetter("messages") | RunnableLambda(extract_user_query_string),
        "context": itemgetter("messages")
        | RunnableLambda(combine_all_messages_for_vector_search)
        | vector_search_as_retriever
        | RunnableLambda(format_context),
        "chat_history": itemgetter("messages") | RunnableLambda(extract_previous_messages)
    }
    | prompt
    | model
    | StrOutputParser()
)

# Tell MLflow logging where to find your chain.
mlflow.models.set_model(model=chain)

 

  • This RAG chain is logged via

 

# Log the model to MLflow
with mlflow.start_run(run_name=f"dbdemos_rag_quickstart"):
    logged_chain_info = mlflow.langchain.log_model(
        lc_model=os.path.join(os.getcwd(), 'chain.py'),  # Chain code file e.g., /path/to/the/chain.py 
        model_config='rag_chain_config.yaml',  # Chain configuration 
        artifact_path="chain",  # Required by MLflow
        input_example=model_config.get("input_example"),  # Save the chain's input schema.  MLflow will execute the chain before logging & capture it's output schema.
    )

 

  • And it works in both async- and sync-way, i.e.,

 

async for chunk in chain.astream(input_example):
    print(chunk, end="|", flush=True)
answer = chain.invoke(input_example)
print(answer)

 

both work.

  • Then the RAG chain model is served via web interface, i.e., "Machine Learning" -> "Models"-> "(model name)" -> "Serve this model". The serving endpoint started up and is in Ready status.

 

When I test it with Query Endpoint:

  • Without  `"steam": true` in the request, I get Response that contains the answer all at once. It works.
  • With `"steam": true` in the request, I get the following error Response,

 

{"error_code": "BAD_REQUEST", "message": "Encountered an unexpected error while parsing the input data. Error 'This endpoint does not support streaming.'", "stack_trace": "Traceback (most recent call last):\n  File \"/opt/conda/envs/mlflow-env/lib/python3.12/site-packages/mlflowserving/scoring_server/__init__.py\", line 594, in transformation\n    raise MlflowException(\"This endpoint does not support streaming.\")\nmlflow.exceptions.MlflowException: This endpoint does not support streaming.\n"}

 

 

Any insight or suggestions on how to make streaming work would be greatly appreciated! 

MThx!

 

 

 

 

  

2 REPLIES 2

ciroskiviktor
New Contributor II

Hello, 
This seems like a very relevant question, and is the second link to popup on a google search of the topic. Are there any additional resources that I could look into or we could link to this post? 

Thank you, 
Viktor Ciroski 
https://www.linkedin.com/in/viktor-ciroski/

Louis_Frolio
Databricks Employee
Databricks Employee

Greetings @MarsWalker ,  here’s how to get true streaming from a served RAG chain on Databricks Model Serving.

 

What’s going on

  • The served endpoint error (“This endpoint does not support streaming”) is expected when the deployed entity doesn’t expose a stream-capable interface to Model Serving. A LangChain chain logged with the MLflow LangChain flavor will invoke fine synchronously, but the Serving layer won’t stream unless the model implements the stream contract Databricks supports (predict_stream / ResponsesAgent stream events).
  • Streaming is natively supported for OpenAI-compatible endpoints (external models) via the stream parameter, but that mechanism applies to chat/completions tasks on external providers; it doesn’t automatically enable streaming for custom LangChain chains you log as MLflow models.
  • Today, the recommended way to serve streamable custom GenAI logic (including LangChain) is to wrap your chain with the MLflow ResponsesAgent interface and implement predict_stream. That makes your endpoint stream over the Databricks OpenAI-compatible client and REST, and it integrates with tracing/evaluation.

The working recipe

1) Wrap your chain in a ResponsesAgent with predict_stream.
# pip install -U mlflow databricks-agents databricks-langchain langchain

import mlflow
from mlflow.types.responses import (
    ResponsesAgent, ResponsesAgentRequest, ResponsesAgentResponse, ResponsesAgentStreamEvent
)

class RAGChainAgent(ResponsesAgent):
    def __init__(self, chain):
        self.chain = chain

    def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
        # Non-streaming path: run and return one complete response
        answer = self.chain.invoke({"messages": request.messages})
        item = self.create_text_output_item(text=answer)
        return ResponsesAgentResponse(output_items=[item])

    def predict_stream(self, request: ResponsesAgentRequest):
        # Streaming path: emit deltas, then a final "done" item
        item_id = self.new_id()
        full_text = ""
        # If your chain supports async streaming, adapt this to call chain.stream() or chain.astream()
        for chunk in self.chain.stream({"messages": request.messages}):
            text_chunk = chunk if isinstance(chunk, str) else str(chunk)
            full_text += text_chunk
            yield ResponsesAgentStreamEvent.output_text_delta(id=item_id, text=text_chunk)

        # Final completion event with the aggregated text
        yield ResponsesAgentStreamEvent.output_item_done(
            item=self.create_text_output_item(text=full_text, id=item_id)
        )

# Log the agent
with mlflow.start_run():
    mlflow.models.log_model(artifact_path="rag_agent", model=RAGChainAgent(chain))
 
2) Serve the logged agent as a custom model (UI or API). Agent serving is supported as a custom model in Model Serving.
 
3) Query with streaming enabled (Databricks OpenAI client is the easiest): ```python from databricks.sdk import WorkspaceClient w = WorkspaceClient()
client = w.serving_endpoints.get_open_ai_client()
# responses API (Predict/ResponsesAgent) stream = client.responses.create( model="<your-endpoint-name>", input=[{"role": "user", "content": "Ask me something from the knowledge base"}], stream=True, # key to trigger predict_stream )
for chunk in stream: # Handle stream chunks (delta events), aggregate, etc. print(chunk) ```
Alternatively, use chat.completions with stream=True if your wrapper exposes an OpenAI-compatible chat schema, or call predict_stream via the SDK directly; all of these map to the same streaming behavior on served agents.
 

Notes and common pitfalls

  • Query Endpoint panel in the Serving UI returns the full response for custom models; use the Databricks OpenAI client or REST/SDK to receive streamed chunks in real time.
  • External-model endpoints (OpenAI, Anthropic, Cohere, etc.) support “stream” for chat/completions requests out-of-the-box; that path is separate from custom MLflow/LangChain deployments and is governed by the provider’s streaming capability.
  • For LangChain specifically, Databricks documents the integration as experimental and recommends using MLflow/ResponsesAgent for production-grade agents, including streaming, tracing, and evaluation.

Why your current chain.py endpoint isn’t streaming

  • The MLflow LangChain flavor can execute your chain synchronously, but the Model Serving scoring server only streams when the logged model exposes a streamable interface (predict_stream or the ResponsesAgent streaming events). Your current logged chain doesn’t advertise that interface, so the endpoint rejects stream=True with “This endpoint does not support streaming.”
 

If you prefer to keep LangChain “as-is”

  • Keep all your LangChain logic, but add a thin agent wrapper (as shown above) that calls chain.stream/chain.astream and yields ResponsesAgentStreamEvent deltas. This approach preserves your code and makes the endpoint streamable in Databricks.
 
Hope this helps, Louis.