‎08-15-2024 10:49 AM - edited ‎08-15-2024 10:53 AM
This guide demonstrates how to build a simple Retrieval-Augmented Generation (RAG) application using Databricks Vector Search, a vector database seamlessly integrated with Databricks' Foundation Model API (FMAPI) embedding models.
RAG is a widely adopted architecture for creating natural-language interfaces that allow users to interact with organizational data effectively. This tutorial will walk you through setting up a vector index, loading text data, querying the database, building a prompt for a large language model (LLM), and finally querying the LLM using the FMAPI. All these steps can be completed in just 30 minutes.
This is a beginner’s guide with hands-on instructions that you can follow in your own Databricks workspace.
To learn more about how Databricks Vector Search works, refer to the documentation here.
For more information on querying models via the Foundation Model APIs, see the documentation here.
First, we will install the necessary libraries and set up a temporary catalog/schema/table for this example.
In [ ]:
%pip install --upgrade databricks-vectorsearch databricks-genai-inference dbutils.library.restartPython()
In [ ]:
CATALOG = "workspace" DB='vs_demo' SOURCE_TABLE_NAME = "documents" SOURCE_TABLE_FULLNAME=f"{CATALOG}.{DB}.{SOURCE_TABLE_NAME}"
A Databricks Vector Search Index is created from a Delta Table. The source Delta Table includes the data we ultimately want to index and search with the vector database. In this cell, we create the catalog, schema, and source table from which we will create the vector database.
In [ ]:
# Set up schema/volume/table from pyspark.sql.types
import StructType, StructField, StringType, ArrayType, FloatType spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{DB}") spark.sql( f"""CREATE TABLE IF NOT EXISTS {SOURCE_TABLE_FULLNAME} ( id STRING, text STRING, date DATE, title STRING ) USING delta TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true') """ )
Next, we set up the vector database. There are three key steps:
bge-large-en
embeddings model from the Foundation Model APIIn [ ]:
from databricks.vector_search.client import VectorSearchClient vsc = VectorSearchClient()
The cell below will check if the endpoint already exists and create it if it does not.
In [ ]:
VS_ENDPOINT_NAME = 'vs_endpoint' if vsc.list_endpoints().get('endpoints') == None or not VS_ENDPOINT_NAME in [endpoint.get('name') for endpoint in vsc.list_endpoints().get('endpoints')]: print(f"Creating new Vector Search endpoint named {VS_ENDPOINT_NAME}") vsc.create_endpoint(VS_ENDPOINT_NAME) else: print(f"Endpoint {VS_ENDPOINT_NAME} already exists.") vsc.wait_for_endpoint(VS_ENDPOINT_NAME, 600)
Now we can create the index over the Delta table we created earlier.
In [ ]:
VS_INDEX_NAME = 'fm_api_examples_vs_index'
VS_INDEX_FULLNAME = f"{CATALOG}.{DB}.{VS_INDEX_NAME}"
if not VS_INDEX_FULLNAME in [index.get("name") for index in vsc.list_indexes(VS_ENDPOINT_NAME).get('vector_indexes', [])]:
try:
# set up an index with managed embeddings
print("Creating Vector Index...")
i = vsc.create_delta_sync_index_and_wait(
endpoint_name=VS_ENDPOINT_NAME,
index_name=VS_INDEX_FULLNAME,
source_table_name=SOURCE_TABLE_FULLNAME,
pipeline_type="TRIGGERED",
primary_key="id",
embedding_source_column="text",
embedding_model_endpoint_name="databricks-bge-large-en"
)
except Exception as e:
if "INTERNAL_ERROR" in str(e):
# Check if the index exists after the error occurred
if VS_INDEX_FULLNAME in [index.get("name") for index in vsc.list_indexes(VS_ENDPOINT_NAME).get('vector_indexes', [])]:
print(f"Index {VS_INDEX_FULLNAME} has been created.")
else:
raise e
else:
raise e
else:
print(f"Index {VS_INDEX_FULLNAME} already exists.")
We used pipeline_type="TRIGGERED"
. This requires us to use the index's sync()
method to manually sync the source Delta table with the index. We could, alternatively, use pipeline_type="CONTINUOUS"
which will automatically keep the index in sync with the source table with only seconds of latency. This approach is more costly, though, as a compute cluster must be provisioned for the continuous sync streaming pipeline.
We specified embedding_model_endpoint_name="databricks-bge-large-en"
. We can use any embedding model available via model serving; this is the name of the pay-per-token Foundation Model API version of databricks-bge-large-en
. By passing an embedding_source_column
and embedding_model_endpoint_name
, we configure the index such that it will automatically use the model to generate embeddings for the texts in the text
column of the source table. We do not need to manually generate embeddings.
If, however, we did want to manage embeddings manually, we could include the following arguments instead:
embedding_vector_column="<embedding_column>",
embedding_dimension=<embedding_dimension>
In the latter approach, we include a column for embeddings in the source delta table and embeddings are not computed automatically from the text column.
Now we set up some example texts to index.
# Some example texts
from datetime import datetime
smarter_overview = {"text":"""
S.M.A.R.T.E.R. Initiative: Strategic Management for Achieving Results through Efficiency and Resources
Introduction
The S.M.A.R.T.E.R. Initiative, standing for "Strategic Management for Achieving Results through Efficiency and Resources," is a groundbreaking project aimed at revolutionizing the way our organization operates. In today's rapidly changing business landscape, achieving success demands a strategic approach that leverages resources effectively while optimizing efficiency. The S.M.A.R.T.E.R. Initiative is designed to do just that.
Background
As markets evolve and competition intensifies, organizations must adapt to stay relevant and profitable. Traditional methods of operation often become inefficient and costly. The S.M.A.R.T.E.R. Initiative was conceived as a response to this challenge, with the primary goal of enhancing strategic management practices to achieve better results.
Objectives
1. Resource Optimization
One of the key objectives of the S.M.A.R.T.E.R. Initiative is to optimize resource allocation. This involves identifying underutilized resources, streamlining processes, and reallocating resources to areas that contribute most to our strategic goals.
2. Efficiency Improvement
Efficiency is at the core of the S.M.A.R.T.E.R. Initiative. By identifying bottlenecks and improving processes, we aim to reduce operational costs, shorten project timelines, and enhance overall productivity.
3. Strategic Alignment
For any organization to succeed, its activities must be aligned with its strategic objectives. The S.M.A.R.T.E.R. Initiative will ensure that every action and resource allocation is in sync with our long-term strategic goals.
4. Results-driven Approach
The ultimate measure of success is results. The S.M.A.R.T.E.R. Initiative will foster a results-driven culture within our organization, where decisions and actions are guided by their impact on our bottom line and strategic objectives.
Key Components
The S.M.A.R.T.E.R. Initiative comprises several key components:
1. Data Analytics and Insights
Data is the foundation of informed decision-making. We will invest in advanced data analytics tools to gain insights into our operations, customer behavior, and market trends. These insights will guide our resource allocation and strategy.
2. Process Automation
Automation will play a vital role in enhancing efficiency. Routine and repetitive tasks will be automated, freeing up our workforce to focus on more strategic activities.
3. Performance Metrics and KPIs
To ensure that our efforts are aligned with our objectives, we will establish a comprehensive set of Key Performance Indicators (KPIs). Regular monitoring and reporting will provide visibility into our progress.
4. Training and Development
Enhancing our workforce's skills is essential. We will invest in training and development programs to equip our employees with the knowledge and tools needed to excel in their roles.
Implementation Timeline
The S.M.A.R.T.E.R. Initiative will be implemented in phases over the next three years. This phased approach allows for a smooth transition and ensures that each component is integrated effectively into our operations.
Conclusion
The S.M.A.R.T.E.R. Initiative represents a significant step forward for our organization. By strategically managing our resources and optimizing efficiency, we are positioning ourselves for sustained success in a competitive marketplace. This initiative is a testament to our commitment to excellence and our dedication to achieving exceptional results.
As we embark on this journey, we look forward to the transformative impact that the S.M.A.R.T.E.R. Initiative will have on our organization and the benefits it will bring to our employees, customers, and stakeholders.
""", "title": "Project Kickoff", "date": datetime.strptime("2024-01-16", "%Y-%m-%d")}
smarter_kpis = {"text": """S.M.A.R.T.E.R. Initiative: Key Performance Indicators (KPIs)
Introduction
The S.M.A.R.T.E.R. Initiative (Strategic Management for Achieving Results through Efficiency and Resources) is designed to drive excellence within our organization. To measure the success and effectiveness of this initiative, we have established three concrete and measurable Key Performance Indicators (KPIs). This document outlines these KPIs and their associated targets.
Key Performance Indicators (KPIs)
1. Resource Utilization Efficiency (RUE)
Objective: To optimize resource utilization for cost-efficiency.
KPI Definition: RUE will be calculated as (Actual Resource Utilization / Planned Resource Utilization) * 100%.
Target: Achieve a 15% increase in RUE within the first year.
2. Time-to-Decision Reduction (TDR)
Objective: To streamline operational processes and reduce decision-making time.
KPI Definition: TDR will be calculated as (Pre-Initiative Decision Time - Post-Initiative Decision Time) / Pre-Initiative Decision Time.
Target: Achieve a 20% reduction in TDR for critical business decisions.
3. Strategic Goals Achievement (SGA)
Objective: To ensure that organizational activities align with strategic goals.
KPI Definition: SGA will measure the percentage of predefined strategic objectives achieved.
Target: Achieve an 80% Strategic Goals Achievement rate within two years.
Conclusion
These three KPIs, Resource Utilization Efficiency (RUE), Time-to-Decision Reduction (TDR), and Strategic Goals Achievement (SGA), will serve as crucial metrics for evaluating the success of the S.M.A.R.T.E.R. Initiative. By tracking these KPIs and working towards their targets, we aim to drive efficiency, optimize resource utilization, and align our actions with our strategic objectives. This focus on measurable outcomes will guide our efforts towards achieving excellence within our organization.""",
"title": "Project KPIs", "date": datetime.strptime("2024-01-16", "%Y-%m-%d")}
Typically, when using a vector database for retrieval-augmented generation (RAG) tasks, we break the texts apart into smaller (and sometimes overlapping) chunks to return focused and relevant information without returning an excessive amount of text.
In the code below, we break the sample texts above into shorter overlapping text chunks.
import re
def chunk_text(text, chunk_size, overlap):
words = text.split()
chunks = []
index = 0
while index < len(words):
end = index + chunk_size
while end < len(words) and not re.match(r'.*[.!?]\s*$', words[end]):
end += 1
chunk = ' '.join(words[index:end+1])
chunks.append(chunk)
index += chunk_size - overlap
return chunks
chunks = []
documents = [smarter_overview, smarter_kpis]
for document in documents:
for i, c in enumerate(chunk_text(document["text"], 150, 25)):
chunk = {}
chunk["text"] = c
chunk["title"] = document["title"]
chunk["date"] = document["date"]
chunk["id"] = document["title"] + "_" + str(i)
chunks.append(chunk)
Now we save the chunks, along with some metadata (a document title, date, and a unique id) to the source delta table.
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, FloatType, DateType
schema = StructType(
[
StructField("id", StringType(), True),
StructField("text", StringType(), True),
StructField("title", StringType(), True),
StructField("date", DateType(), True),
]
)
if chunks:
result_df = spark.createDataFrame(chunks, schema=schema)
result_df.write.format("delta").mode("append").saveAsTable(
SOURCE_TABLE_FULLNAME
)
Because we specified pipeline_type="TRIGGERED"
when configuring the Delta Index, we still need to manually tell the index to sync with the delta table. This will take a few minutes.
This will not work if the index is not ready yet. We use the wait_until_ready
method to wait until the index is ready.
In [ ]:
# Sync
index = vsc.get_index(endpoint_name=VS_ENDPOINT_NAME, index_name=VS_INDEX_FULLNAME) index.sync()
Now that we have added our text chunks to the source delta table and synced it with the Vector Search index, we're ready to query the index! We do this with the index.similarity_search()
method.
The columns
argument takes a list of the columns we want returned; in this case, we request the text and title columns.
NOTE: If the cell below does not return any results, wait a couple of minutes and try again. The index may still be syncing.
In [ ]:
# query
index.similarity_search(columns=["text", "title"], query_text="What is the TDR Target for the SMARTER initiative?", num_results = 3)
from databricks_genai_inference import ChatSession
chat = ChatSession(model="databricks-meta-llama-3-70b-instruct",
system_message="You are a helpful assistant.",
max_tokens=128)
chat.reply("What is the TDR Target for the SMARTER initiative?")
chat.last
Now let's see what kind of reply we get when we provide context from vector search.
# reset history
chat = ChatSession(model="databricks-meta-llama-3-70b-instruct",
system_message="You are a helpful assistant. Answer the user's question based on the provided context.",
max_tokens=128)
# get context from vector search
raw_context = index.similarity_search(columns=["text", "title"],
query_text="What is the TDR Target for the SMARTER initiative?",
num_results = 3)
context_string = "Context:\n\n"
for (i,doc) in enumerate(raw_context.get('result').get('data_array')):
context_string += f"Retrieved context {i+1}:\n"
context_string += doc[0]
context_string += "\n\n"
chat.reply(f"User question: What is the TDR Target for the SMARTER initiative?\n\nContext: {context_string}")
chat.last
It is now able to answer based on the provided context.
Most of the Vector Database management steps above can be done via the UI: you can create an endpoint, create an index, sync the index, and more via the UI in the Databricks Catalog Explorer.
The Databricks AI Playground provides a GUI for quickly experimenting with LLMs available via the FMAPI, enabling you to compare the outputs of those models and determine which model best serves your needs.