cancel
Showing results for 
Search instead for 
Did you mean: 
Generative AI
Explore discussions on generative artificial intelligence techniques and applications within the Databricks Community. Share ideas, challenges, and breakthroughs in this cutting-edge field.
cancel
Showing results for 
Search instead for 
Did you mean: 

How to serve a RAG chain endpoint that supports streaming

MarsWalker
New Contributor

Hello everyone,

I am trying to serve a sample RAG chain model that should support streaming output. But I could not find any documantation on how to enable streaming for a serving endpoint for a langchain model. Could you provide some hints on how to do that?

 

# RAG Chain
chain = (
    {
        "question": itemgetter("messages") | RunnableLambda(extract_user_query_string),
        "context": itemgetter("messages")
        | RunnableLambda(combine_all_messages_for_vector_search)
        | vector_search_as_retriever
        | RunnableLambda(format_context),
        "chat_history": itemgetter("messages") | RunnableLambda(extract_previous_messages)
    }
    | prompt
    | model
    | StrOutputParser()
)

# Tell MLflow logging where to find your chain.
mlflow.models.set_model(model=chain)

 

  • This RAG chain is logged via

 

# Log the model to MLflow
with mlflow.start_run(run_name=f"dbdemos_rag_quickstart"):
    logged_chain_info = mlflow.langchain.log_model(
        lc_model=os.path.join(os.getcwd(), 'chain.py'),  # Chain code file e.g., /path/to/the/chain.py 
        model_config='rag_chain_config.yaml',  # Chain configuration 
        artifact_path="chain",  # Required by MLflow
        input_example=model_config.get("input_example"),  # Save the chain's input schema.  MLflow will execute the chain before logging & capture it's output schema.
    )

 

  • And it works in both async- and sync-way, i.e.,

 

async for chunk in chain.astream(input_example):
    print(chunk, end="|", flush=True)
answer = chain.invoke(input_example)
print(answer)

 

both work.

  • Then the RAG chain model is served via web interface, i.e., "Machine Learning" -> "Models"-> "(model name)" -> "Serve this model". The serving endpoint started up and is in Ready status.

 

When I test it with Query Endpoint:

  • Without  `"steam": true` in the request, I get Response that contains the answer all at once. It works.
  • With `"steam": true` in the request, I get the following error Response,

 

{"error_code": "BAD_REQUEST", "message": "Encountered an unexpected error while parsing the input data. Error 'This endpoint does not support streaming.'", "stack_trace": "Traceback (most recent call last):\n  File \"/opt/conda/envs/mlflow-env/lib/python3.12/site-packages/mlflowserving/scoring_server/__init__.py\", line 594, in transformation\n    raise MlflowException(\"This endpoint does not support streaming.\")\nmlflow.exceptions.MlflowException: This endpoint does not support streaming.\n"}

 

 

Any insight or suggestions on how to make streaming work would be greatly appreciated! 

MThx!

 

 

 

 

  

0 REPLIES 0

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group