<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: How to serve a RAG chain endpoint that supports streaming in Generative AI</title>
    <link>https://community.databricks.com/t5/generative-ai/how-to-serve-a-rag-chain-endpoint-that-supports-streaming/m-p/132008#M1151</link>
    <description>&lt;P&gt;Hello,&amp;nbsp;&lt;BR /&gt;This seems like a very relevant question, and is the second link to popup on a google search of the topic. Are there any additional resources that I could look into or we could link to this post?&amp;nbsp;&lt;BR /&gt;&lt;BR /&gt;Thank you,&amp;nbsp;&lt;BR /&gt;Viktor Ciroski&amp;nbsp;&lt;BR /&gt;&lt;A href="https://www.linkedin.com/in/viktor-ciroski/" target="_blank"&gt;https://www.linkedin.com/in/viktor-ciroski/&lt;/A&gt;&lt;/P&gt;</description>
    <pubDate>Mon, 15 Sep 2025 15:13:06 GMT</pubDate>
    <dc:creator>ciroskiviktor</dc:creator>
    <dc:date>2025-09-15T15:13:06Z</dc:date>
    <item>
      <title>How to serve a RAG chain endpoint that supports streaming</title>
      <link>https://community.databricks.com/t5/generative-ai/how-to-serve-a-rag-chain-endpoint-that-supports-streaming/m-p/110385#M774</link>
      <description>&lt;P&gt;Hello everyone,&lt;/P&gt;&lt;P&gt;I am trying to serve a sample RAG chain model that should support streaming output. But I could not find any documantation on how to enable streaming for a serving endpoint for a langchain model. Could you provide some hints on how to do that?&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;My sample RAG chain is simply the same as&amp;nbsp; &lt;A title="databricks-llm-rag-demo" href="https://notebooks.databricks.com/demos/llm-rag-chatbot/02-simple-app/02-Deploy-RAG-Chatbot-Model.html" target="_self"&gt;databricks-llm-rag-demo&lt;/A&gt;&amp;nbsp;(cell 6,7,8) , in "chain.py"&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;# RAG Chain
chain = (
    {
        "question": itemgetter("messages") | RunnableLambda(extract_user_query_string),
        "context": itemgetter("messages")
        | RunnableLambda(combine_all_messages_for_vector_search)
        | vector_search_as_retriever
        | RunnableLambda(format_context),
        "chat_history": itemgetter("messages") | RunnableLambda(extract_previous_messages)
    }
    | prompt
    | model
    | StrOutputParser()
)

# Tell MLflow logging where to find your chain.
mlflow.models.set_model(model=chain)&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;This RAG chain is logged via&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;# Log the model to MLflow
with mlflow.start_run(run_name=f"dbdemos_rag_quickstart"):
    logged_chain_info = mlflow.langchain.log_model(
        lc_model=os.path.join(os.getcwd(), 'chain.py'),  # Chain code file e.g., /path/to/the/chain.py 
        model_config='rag_chain_config.yaml',  # Chain configuration 
        artifact_path="chain",  # Required by MLflow
        input_example=model_config.get("input_example"),  # Save the chain's input schema.  MLflow will execute the chain before logging &amp;amp; capture it's output schema.
    )&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;And it works in both async- and sync-way, i.e.,&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;async for chunk in chain.astream(input_example):
    print(chunk, end="|", flush=True)&lt;/LI-CODE&gt;&lt;LI-CODE lang="python"&gt;answer = chain.invoke(input_example)
print(answer)&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;both work.&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;Then the RAG chain model is served via web interface, i.e., "Machine Learning" -&amp;gt; "Models"-&amp;gt; "(model name)" -&amp;gt; "Serve this model". The serving endpoint started up and is in Ready status.&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;When I test it with&amp;nbsp;&lt;SPAN&gt;&lt;SPAN&gt;Query&amp;nbsp;Endpoint:&lt;/SPAN&gt;&lt;/SPAN&gt;&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;SPAN&gt;Without&amp;nbsp; `"steam": true` in the request, I get Response that contains the answer all at once. It works.&lt;/SPAN&gt;&lt;/LI&gt;&lt;LI&gt;&lt;SPAN&gt;With&amp;nbsp;&lt;SPAN&gt;`"steam": true` in the request, I get the following error Response,&lt;/SPAN&gt;&lt;/SPAN&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;{"error_code": "BAD_REQUEST", "message": "Encountered an unexpected error while parsing the input data. Error 'This endpoint does not support streaming.'", "stack_trace": "Traceback (most recent call last):\n  File \"/opt/conda/envs/mlflow-env/lib/python3.12/site-packages/mlflowserving/scoring_server/__init__.py\", line 594, in transformation\n    raise MlflowException(\"This endpoint does not support streaming.\")\nmlflow.exceptions.MlflowException: This endpoint does not support streaming.\n"}&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Any insight or suggestions on how to make streaming work would be greatly appreciated!&amp;nbsp;&lt;/P&gt;&lt;P&gt;MThx!&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV class=""&gt;&lt;DIV&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&amp;nbsp;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Mon, 17 Feb 2025 11:28:50 GMT</pubDate>
      <guid>https://community.databricks.com/t5/generative-ai/how-to-serve-a-rag-chain-endpoint-that-supports-streaming/m-p/110385#M774</guid>
      <dc:creator>MarsWalker</dc:creator>
      <dc:date>2025-02-17T11:28:50Z</dc:date>
    </item>
    <item>
      <title>Re: How to serve a RAG chain endpoint that supports streaming</title>
      <link>https://community.databricks.com/t5/generative-ai/how-to-serve-a-rag-chain-endpoint-that-supports-streaming/m-p/132008#M1151</link>
      <description>&lt;P&gt;Hello,&amp;nbsp;&lt;BR /&gt;This seems like a very relevant question, and is the second link to popup on a google search of the topic. Are there any additional resources that I could look into or we could link to this post?&amp;nbsp;&lt;BR /&gt;&lt;BR /&gt;Thank you,&amp;nbsp;&lt;BR /&gt;Viktor Ciroski&amp;nbsp;&lt;BR /&gt;&lt;A href="https://www.linkedin.com/in/viktor-ciroski/" target="_blank"&gt;https://www.linkedin.com/in/viktor-ciroski/&lt;/A&gt;&lt;/P&gt;</description>
      <pubDate>Mon, 15 Sep 2025 15:13:06 GMT</pubDate>
      <guid>https://community.databricks.com/t5/generative-ai/how-to-serve-a-rag-chain-endpoint-that-supports-streaming/m-p/132008#M1151</guid>
      <dc:creator>ciroskiviktor</dc:creator>
      <dc:date>2025-09-15T15:13:06Z</dc:date>
    </item>
    <item>
      <title>Re: How to serve a RAG chain endpoint that supports streaming</title>
      <link>https://community.databricks.com/t5/generative-ai/how-to-serve-a-rag-chain-endpoint-that-supports-streaming/m-p/138111#M1343</link>
      <description>&lt;P&gt;Greetings&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/149496"&gt;@MarsWalker&lt;/a&gt;&amp;nbsp;,&amp;nbsp; here’s how to get true streaming from a served RAG chain on Databricks Model Serving.&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;H3 class="paragraph"&gt;What’s going on&lt;/H3&gt;
&lt;UL&gt;
&lt;LI class="paragraph"&gt;The served endpoint error (“This endpoint does not support streaming”) is expected when the deployed entity doesn’t expose a stream-capable interface to Model Serving. A LangChain chain logged with the MLflow LangChain flavor will invoke fine synchronously, but the Serving layer won’t stream unless the model implements the stream contract Databricks supports (predict_stream / ResponsesAgent stream events).&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;Streaming is natively supported for OpenAI-compatible endpoints (external models) via the stream parameter, but that mechanism applies to chat/completions tasks on external providers; it doesn’t automatically enable streaming for custom LangChain chains you log as MLflow models.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;Today, the recommended way to serve streamable custom GenAI logic (including LangChain) is to wrap your chain with the MLflow ResponsesAgent interface and implement predict_stream. That makes your endpoint stream over the Databricks OpenAI-compatible client and REST, and it integrates with tracing/evaluation.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;H3 class="paragraph"&gt;The working recipe&lt;/H3&gt;
&lt;DIV class="paragraph"&gt;1) Wrap your chain in a ResponsesAgent with predict_stream.&lt;/DIV&gt;
&lt;PRE&gt;&lt;CODE class="markdown-code-python"&gt;# pip install -U mlflow databricks-agents databricks-langchain langchain

import mlflow
from mlflow.types.responses import (
    ResponsesAgent, ResponsesAgentRequest, ResponsesAgentResponse, ResponsesAgentStreamEvent
)

class RAGChainAgent(ResponsesAgent):
    def __init__(self, chain):
        self.chain = chain

    def predict(self, request: ResponsesAgentRequest) -&amp;gt; ResponsesAgentResponse:
        # Non-streaming path: run and return one complete response
        answer = self.chain.invoke({"messages": request.messages})
        item = self.create_text_output_item(text=answer)
        return ResponsesAgentResponse(output_items=[item])

    def predict_stream(self, request: ResponsesAgentRequest):
        # Streaming path: emit deltas, then a final "done" item
        item_id = self.new_id()
        full_text = ""
        # If your chain supports async streaming, adapt this to call chain.stream() or chain.astream()
        for chunk in self.chain.stream({"messages": request.messages}):
            text_chunk = chunk if isinstance(chunk, str) else str(chunk)
            full_text += text_chunk
            yield ResponsesAgentStreamEvent.output_text_delta(id=item_id, text=text_chunk)

        # Final completion event with the aggregated text
        yield ResponsesAgentStreamEvent.output_item_done(
            item=self.create_text_output_item(text=full_text, id=item_id)
        )

# Log the agent
with mlflow.start_run():
    mlflow.models.log_model(artifact_path="rag_agent", model=RAGChainAgent(chain))&lt;/CODE&gt;&lt;/PRE&gt;
&lt;DIV class="paragraph"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;2) Serve the logged agent as a custom model (UI or API). Agent serving is supported as a custom model in Model Serving.&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;3) Query with streaming enabled (Databricks OpenAI client is the easiest): ```python from databricks.sdk import WorkspaceClient w = WorkspaceClient()&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;client = w.serving_endpoints.get_open_ai_client()&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;# responses API (Predict/ResponsesAgent) stream = client.responses.create( model="&amp;lt;your-endpoint-name&amp;gt;", input=[{"role": "user", "content": "Ask me something from the knowledge base"}], stream=True, # key to trigger predict_stream )&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;for chunk in stream: # Handle stream chunks (delta events), aggregate, etc. print(chunk) ```&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;Alternatively, use chat.completions with stream=True if your wrapper exposes an OpenAI-compatible chat schema, or call predict_stream via the SDK directly; all of these map to the same streaming behavior on served agents.&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;H3 class="paragraph"&gt;Notes and common pitfalls&lt;/H3&gt;
&lt;UL&gt;
&lt;LI class="paragraph"&gt;Query Endpoint panel in the Serving UI returns the full response for custom models; use the Databricks OpenAI client or REST/SDK to receive streamed chunks in real time.&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;External-model endpoints (OpenAI, Anthropic, Cohere, etc.) support “stream” for chat/completions requests out-of-the-box; that path is separate from custom MLflow/LangChain deployments and is governed by the provider’s streaming capability.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;For LangChain specifically, Databricks documents the integration as experimental and recommends using MLflow/ResponsesAgent for production-grade agents, including streaming, tracing, and evaluation.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;H3 class="paragraph"&gt;Why your current chain.py endpoint isn’t streaming&lt;/H3&gt;
&lt;UL&gt;
&lt;LI class="paragraph"&gt;The MLflow LangChain flavor can execute your chain synchronously, but the Model Serving scoring server only streams when the logged model exposes a streamable interface (predict_stream or the ResponsesAgent streaming events). Your current logged chain doesn’t advertise that interface, so the endpoint rejects stream=True with “This endpoint does not support streaming.”&lt;/LI&gt;
&lt;/UL&gt;
&lt;DIV class="paragraph"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;H3 class="paragraph"&gt;If you prefer to keep LangChain “as-is”&lt;/H3&gt;
&lt;UL&gt;
&lt;LI class="paragraph"&gt;Keep all your LangChain logic, but add a thin agent wrapper (as shown above) that calls chain.stream/chain.astream and yields ResponsesAgentStreamEvent deltas. This approach preserves your code and makes the endpoint streamable in Databricks.&lt;/LI&gt;
&lt;/UL&gt;
&lt;DIV class="paragraph"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;Hope this helps, Louis.&lt;/DIV&gt;</description>
      <pubDate>Fri, 07 Nov 2025 13:01:40 GMT</pubDate>
      <guid>https://community.databricks.com/t5/generative-ai/how-to-serve-a-rag-chain-endpoint-that-supports-streaming/m-p/138111#M1343</guid>
      <dc:creator>Louis_Frolio</dc:creator>
      <dc:date>2025-11-07T13:01:40Z</dc:date>
    </item>
  </channel>
</rss>

