Natural Language Processing (NLP) has become a pivotal tool in healthcare, customer services, financial services, and more. NLP is especially useful in parsing through the vast expanses of unstructured documents. Traditionally, utilizing NLP has been a labor-intensive process of data cleansing and feature engineering. The landscape is shifting dramatically with the advent of Large Language Models (LLMs). What sets these models apart is their ability to extract metadata directly from unstructured medical records through prompt engineering, bypassing the conventional processes of feature extraction.
When working with proprietary data, it's important to note that public APIs may not be a suitable solution due to the sensitive nature of the information. Proprietary data often contains Personally Identifiable Information (PII) or Protected Health Information (PHI), which requires strict privacy and security controls. Using public APIs could potentially expose this sensitive information to unauthorized parties, compromising confidentiality and putting individuals at risk.
Databricks Lakehouse provides a secure and scalable platform for processing proprietary data (e.g. medical records) using GPU clusters. In this blog, a scalable approach is presented for feature extraction from documents using state-of-the-art open LLMs via Ray on Databricks. Notebooks with examples in this blog can be downloaded here.
Ray is an open-source framework that simplifies the process of building and scaling distributed applications. At its core, Ray is designed to handle large-scale, high-performance computing tasks with ease. It provides a simple, flexible API for parallel and distributed computing, making it an ideal choice for applications that require significant computational power and scalability.
Ray is now included as part of the Machine Learning Runtime (MLR) starting from version 15.0 onwards. If an older version of MLR has to be used, Ray can be installed as a python library. Ray runs seamlessly on Databricks clusters, enabling users to build secure and scalable ML products using Databricks Lakehouse. In the diagram above, a Ray dataset loads data from tables in Databricks Lakehouse. The dataset is processed in parallel by actors running on worker nodes of a Databricks cluster. The details of running the actors are defined in the map_batches method. The extracted features can be written out to a table in Lakehouse and ready to be used by ML or reporting applications downstream.
This blog post presents a text summarization example, demonstrating the applicability of LLMs in natural language processing tasks. The summarization task is scaled up using Ray on Databricks. However, the approach outlined is not limited to text summarization. The approach enables seamless adaptation to various feature extraction use cases, highlighting the potential of LLMs in extracting valuable insights from unstructured data across diverse applications.
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster
GPUS_PER_NODE = 4
NUM_OF_WORKER_NODES = 8
setup_ray_cluster(
num_cpus_worker_node=1,
num_gpus_per_node = GPUS_PER_NODE,
max_worker_nodes = NUM_OF_WORKER_NODES,
num_cpus_head_node=1,
collect_log_to_path="/dbfs/tmp/raylogs",
)
HF_HOME=/local_disk0/hf
HF_DATASETS_CACHE=/local_disk0/hf
TRANSFORMERS_CACHE=/local_disk0/hf
A TextSummarizer actor class is implemented for utilizing a LLM to summarize proprietary text such as medical records, financial reports, etc. Actors will be instantiated on worker nodes based upon the resource requirements of each actor and the resource available on the worker nodes. In the Ray cluster created in this example, there are 4 GPUs per worker node and an actor will utilize all 4 GPUs to load a DBRX model. Therefore one actor will be created on each worker node.
class TextSummarizer:
def __init__(self,checkpoint="meta-llama/Meta-Llama-3-8B-Instruct", verbose=False):
# Initialize the tokenizer and model on each worker
print("Initialize the tokenizer and model on each worker")
self.checkpoint = checkpoint
self.access_token = 'your hf token retrieved from secret scope'
self.tokenizer = AutoTokenizer.from_pretrained(self.checkpoint, trust_remote_code=True, token=self.access_token)
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.model = None
gpu_ids = ray.get_gpu_ids()
print(f"allocated gpu ids: {gpu_ids}")
self.verbose = verbose
def _create_model(self):
if self.model: return
self.model = AutoModelForCausalLM.from_pretrained(
self.checkpoint,
torch_dtype=torch.float16,
trust_remote_code=True,
device_map="auto",
token=self.access_token
)
self.model.eval()
...
def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, list]:
import time
self._create_model()
summeries = []
durs = []
for note in list(batch["reporttext"]):
# print(note)
start_time = time.time()
# pred = self.process_note(note)
# pred = self.process_note_refine(note)
pred = self.process_note_no_chunking(note)
if self.verbose: print(f"### Final summary: {pred}")
summeries.append(pred)
end_time = time.time()
dur = end_time - start_time
durs.append(dur)
batch["summarized_text"] = summeries
batch["dur"] = durs
return batch
The actor class downloads and instantiates the tokenizer and model from Hugging Face. The tokenizer and other member variables are initialized in __init__(). However the model itself is created in the __call__ just before the data batches are processed. This is because downloading a large model (e.g. 70b models) will timeout the actor creation.
When you call map_batches on a Ray Dataset, Ray will serialize the data batches using Apache Arrow and transfer them to the worker nodes for processing. The worker nodes will then deserialize the data batches. The deserialized data will be passed to the __call__ method in the actor class in the format of Dict[str, np.ndarray]. If a pandas dataframe is preferred, batch_format can be set to "pandas" in the map_batches method of a Ray dataset and passed to the __call__ method.
The context window in Large Language Models (LLMs) refers to the amount of text (number of tokens) the model can consider at any given time when making predictions or generating text. Given the limitation on context, several approaches can be taken for summarizing long articles.
Ray datasets can be created from parquet files or tables in Unity Catalog. The notebooks attached contain code examples for creating parquet files from the current version of a Delta Lake table. A Ray dataset can read from the parquet files and call map_batches() to distribute and process the dataset on worker nodes.
The map_batches method applies a specified function or a callable class to each batch of records in the dataset. This approach leverages Ray's ability to distribute computation across multiple nodes or processes, making it highly efficient for large datasets. Unlike a typical map function that processes each item individually, map_batches handles a batch of items at once, which can be more efficient in terms of both computation and I/O operations. Then the transformed batches are combined to form the output dataset.
ray_res = ray.cluster_resources()
num_gpus_per_actor = 4
worker_num = int(ray_res['GPU']/num_gpus_per_actor)
print(f"### The number of workers: {worker_num}")
summarized_ds = ds.map_batches(
TextSummarizer,
concurrency=worker_num,
num_gpus=num_gpus_per_actor,
batch_size=(ds.count()//worker_num)
)
summarized_pdf = summarized_ds.to_pandas()
The code snippet above first computes the number of actors the Ray cluster actually has. In this example, there is one actor per worker node, so the worker_num is the number of actors. The number of actors is safer to be computed at runtime because sometimes the cluster may not be able to acquire the hard-coded number of workers. If this happens, Ray will keep trying to wait for enough resources and the actors will not be running.
The batch_size parameter controls the number of records assigned to an actor. In the code example, the whole dataset is evenly divided by the number of actors. Adjusting the batch size allows users to balance between memory usage and performance.
Watch out for the warning messages like: Warning: The following resource request cannot be scheduled right now: {'CPU': 1.0, 'GPU': 4.0}. This is likely due to all cluster resources being claimed by actors. Consider creating fewer actors or adding more nodes to this Ray cluster. Review the cluster resources from setup_ray_cluster and check if actor resources specified in map_batches can fit in.
This blog post presents a scalable, secure and efficient approach to feature extraction from documents leveraging Large Language Models (LLMs). By harnessing the power of Ray on a Databricks cluster, feature extraction tasks are distributed and executed in parallel, ensuring high-performance processing. Documents are stored in the Databricks Lakehouse, providing a centralized and managed data repository, and are loaded into a Ray dataset for processing. The extracted features can then be written to Delta tables, enabling seamless integration with downstream reporting and machine learning applications. This approach demonstrates a streamlined and scalable solution for feature extraction, empowering data scientists and engineers to extract valuable insights from large document collections. Notebooks can be downloaded here.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.