cancel
Showing results for 
Search instead for 
Did you mean: 
Generative AI
Explore discussions on generative artificial intelligence techniques and applications within the Databricks Community. Share ideas, challenges, and breakthroughs in this cutting-edge field.
cancel
Showing results for 
Search instead for 
Did you mean: 

How to serve a RAG chain endpoint that supports streaming

MarsWalker
New Contributor

Hello everyone,

I am trying to serve a sample RAG chain model that should support streaming output. But I could not find any documantation on how to enable streaming for a serving endpoint for a langchain model. Could you provide some hints on how to do that?

 

# RAG Chain
chain = (
    {
        "question": itemgetter("messages") | RunnableLambda(extract_user_query_string),
        "context": itemgetter("messages")
        | RunnableLambda(combine_all_messages_for_vector_search)
        | vector_search_as_retriever
        | RunnableLambda(format_context),
        "chat_history": itemgetter("messages") | RunnableLambda(extract_previous_messages)
    }
    | prompt
    | model
    | StrOutputParser()
)

# Tell MLflow logging where to find your chain.
mlflow.models.set_model(model=chain)

 

  • This RAG chain is logged via

 

# Log the model to MLflow
with mlflow.start_run(run_name=f"dbdemos_rag_quickstart"):
    logged_chain_info = mlflow.langchain.log_model(
        lc_model=os.path.join(os.getcwd(), 'chain.py'),  # Chain code file e.g., /path/to/the/chain.py 
        model_config='rag_chain_config.yaml',  # Chain configuration 
        artifact_path="chain",  # Required by MLflow
        input_example=model_config.get("input_example"),  # Save the chain's input schema.  MLflow will execute the chain before logging & capture it's output schema.
    )

 

  • And it works in both async- and sync-way, i.e.,

 

async for chunk in chain.astream(input_example):
    print(chunk, end="|", flush=True)
answer = chain.invoke(input_example)
print(answer)

 

both work.

  • Then the RAG chain model is served via web interface, i.e., "Machine Learning" -> "Models"-> "(model name)" -> "Serve this model". The serving endpoint started up and is in Ready status.

 

When I test it with Query Endpoint:

  • Without  `"steam": true` in the request, I get Response that contains the answer all at once. It works.
  • With `"steam": true` in the request, I get the following error Response,

 

{"error_code": "BAD_REQUEST", "message": "Encountered an unexpected error while parsing the input data. Error 'This endpoint does not support streaming.'", "stack_trace": "Traceback (most recent call last):\n  File \"/opt/conda/envs/mlflow-env/lib/python3.12/site-packages/mlflowserving/scoring_server/__init__.py\", line 594, in transformation\n    raise MlflowException(\"This endpoint does not support streaming.\")\nmlflow.exceptions.MlflowException: This endpoint does not support streaming.\n"}

 

 

Any insight or suggestions on how to make streaming work would be greatly appreciated! 

MThx!

 

 

 

 

  

0 REPLIES 0

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now