cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
li_yu
New Contributor III
New Contributor III

Introduction

Natural Language Processing (NLP) has become a pivotal tool in healthcare, customer services, financial services, and more. NLP is especially useful in parsing through the vast expanses of unstructured documents. Traditionally, utilizing NLP has been a labor-intensive process of data cleansing and feature engineering. The landscape is shifting dramatically with the advent of Large Language Models (LLMs). What sets these models apart is their ability to extract metadata directly from unstructured medical records through prompt engineering, bypassing the conventional processes of feature extraction. 

When working with proprietary data, it's important to note that public APIs may not be a suitable solution due to the sensitive nature of the information. Proprietary data often contains Personally Identifiable Information (PII) or Protected Health Information (PHI), which requires strict privacy and security controls. Using public APIs could potentially expose this sensitive information to unauthorized parties, compromising confidentiality and putting individuals at risk. 

Databricks Lakehouse provides a secure and scalable platform for processing proprietary data (e.g. medical records) using GPU clusters. In this blog, a scalable approach is presented for feature extraction from documents using  state-of-the-art open LLMs via Ray on Databricks. Notebooks with examples in this blog can be downloaded here.

Understanding Ray on Databricks 

Ray is an open-source framework that simplifies the process of building and scaling distributed applications. At its core, Ray is designed to handle large-scale, high-performance computing tasks with ease. It provides a simple, flexible API for parallel and distributed computing, making it an ideal choice for applications that require significant computational power and scalability.

ray-databricks.png

Ray is now included as part of the Machine Learning Runtime (MLR) starting from version 15.0 onwards. If an older version of MLR has to be used, Ray can be installed as a python library. Ray runs seamlessly on Databricks clusters, enabling users to build secure and scalable ML products using Databricks Lakehouse. In the diagram above, a Ray dataset loads data from tables in Databricks Lakehouse. The dataset is processed in parallel by actors running on worker nodes of a Databricks cluster. The details of running the actors are defined in the map_batches method. The extracted features can be written out to a table in Lakehouse and ready to be used by ML or reporting applications downstream.

Implementation

This blog post presents a text summarization example, demonstrating the applicability of LLMs in natural language processing tasks. The summarization task is scaled up using Ray on Databricks. However, the approach outlined is not limited to text summarization. The approach enables seamless adaptation to various feature extraction use cases, highlighting the potential of LLMs in extracting valuable insights from unstructured data across diverse applications.

Set up a Ray cluster

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

GPUS_PER_NODE = 4
NUM_OF_WORKER_NODES = 8

setup_ray_cluster(
  num_cpus_worker_node=1,
  num_gpus_per_node = GPUS_PER_NODE,
  max_worker_nodes = NUM_OF_WORKER_NODES,
  num_cpus_head_node=1,
  collect_log_to_path="/dbfs/tmp/raylogs",
)
A Databricks GPU cluster of eight worker nodes is created with the worker type as Standard_NC64as_T4_v3. In the code example above, a Ray cluster is configured to have 8 worker nodes with 4 GPUs on each worker. Ray recommends setting spark.task.resource.gpu.amount to 0 so that Spark jobs do not reserve GPU resources, preventing Ray-on-Spark workloads from having the maximum number of GPUs available. To run large models (e.g. DBRX or 70b models), it is recommended to change the default Hugging Face cache location to local_disk0, which has more space to download the model artifacts.
HF_HOME=/local_disk0/hf
HF_DATASETS_CACHE=/local_disk0/hf
TRANSFORMERS_CACHE=/local_disk0/hf
The values can be configured from Cluster Configuration -> Advanced options -> Spark.

Run LLM Inference 

A TextSummarizer actor class is implemented for utilizing a LLM to summarize proprietary text such as medical records, financial reports, etc. Actors will be instantiated on worker nodes based upon the resource requirements of each actor and the resource available on the worker nodes. In the Ray cluster created in this example, there are 4 GPUs per worker node and an actor will utilize all 4 GPUs to load a DBRX model. Therefore one actor will be created on each worker node.

class TextSummarizer:

    def __init__(self,checkpoint="meta-llama/Meta-Llama-3-8B-Instruct", verbose=False):
      # Initialize the tokenizer and model on each worker
      print("Initialize the tokenizer and model on each worker")
      self.checkpoint = checkpoint
      self.access_token = 'your hf token retrieved from secret scope'
      self.tokenizer = AutoTokenizer.from_pretrained(self.checkpoint, trust_remote_code=True, token=self.access_token)
      self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
      self.model = None
      gpu_ids = ray.get_gpu_ids()
      print(f"allocated gpu ids: {gpu_ids}")
      self.verbose = verbose

    def _create_model(self):
      if self.model: return
      self.model = AutoModelForCausalLM.from_pretrained(
          self.checkpoint,
          torch_dtype=torch.float16, 
          trust_remote_code=True,
          device_map="auto",
          token=self.access_token
      )
      self.model.eval()
    ...
    def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, list]:
      import time
      self._create_model()
      summeries = []
      durs = []
      for note in list(batch["reporttext"]):
        #   print(note)
        start_time = time.time()    
        # pred = self.process_note(note)
        # pred = self.process_note_refine(note)
        pred = self.process_note_no_chunking(note)
        if self.verbose: print(f"### Final summary: {pred}")
        summeries.append(pred)
        end_time = time.time()
        dur = end_time - start_time
        durs.append(dur)
      batch["summarized_text"] = summeries
      batch["dur"] = durs
      return batch

The actor class downloads and instantiates the tokenizer and model from Hugging Face. The tokenizer and other member variables are initialized in __init__(). However the model itself is created in the __call__ just before the data batches are processed. This is because downloading a large model (e.g. 70b models) will timeout the actor creation. 

When you call map_batches on a Ray Dataset, Ray will serialize the data batches using Apache Arrow and transfer them to the worker nodes for processing. The worker nodes will then deserialize the data batches. The deserialized data will be passed to the __call__ method in the actor class in the format of Dict[str, np.ndarray]. If a pandas dataframe is preferred, batch_format can be set to "pandas" in the map_batches method of a Ray dataset and passed to the __call__ method.

Summarization Approaches

The context window in Large Language Models (LLMs) refers to the amount of text (number of tokens) the model can consider at any given time when making predictions or generating text. Given the limitation on context, several approaches can be taken for summarizing long articles.

  • Stuff: Involves stuffing as much text as possible into the context window of the model to maximize the amount of information processed in one go. It typically involves clever use of the text by prioritizing the most informative parts or condensing the text through techniques like removing less important information or rephrasing to make it more concise. This can be followed by processing the condensed text to create a summary. The Stuff method aims to leverage the limited context window effectively by focusing on the most crucial parts of the text.

  • MapReduce: Involves dividing the long text into smaller chunks that fit within the model's context window. Each chunk is processed (or "mapped") independently to generate a summary or a meaningful representation of that section. These intermediate summaries are then combined (or "reduced") to produce a final summary of the entire text. This method is useful for parallel processing and handling very large documents efficiently.

  • Refine: Focuses on iteratively improving the summary of a text. Initially, a rough summary is created for the entire text, possibly by first summarizing smaller sections and combining those summaries. This initial summary is then used as a new, shorter input for the model to refine and improve in subsequent iterations. By refining the summary iteratively, the model can improve the accuracy and coherence of the final output. This approach is beneficial when dealing with complex texts that require a deeper understanding and more nuanced summarization.
The three methods have been implemented in the TextSummaries actor class. More improvements can be done around the chunking and prompt engineering based upon specific requirements. With Databricks MLR, you can install open source libraries such as LangChain or LlamaIndex to implement a summarization method with them.

Create Ray Dataset

Ray datasets can be created from parquet files or tables in Unity Catalog. The notebooks attached contain code examples for creating parquet files from the current version of a Delta Lake table. A Ray dataset can read from the parquet files and call map_batches() to distribute and process the dataset on worker nodes.

The map_batches method applies a specified function or a callable class to each batch of records in the dataset. This approach leverages Ray's ability to distribute computation across multiple nodes or processes, making it highly efficient for large datasets. Unlike a typical map function that processes each item individually, map_batches handles a batch of items at once, which can be more efficient in terms of both computation and I/O operations. Then the transformed batches are combined to form the output dataset.

ray_res = ray.cluster_resources()
num_gpus_per_actor = 4
worker_num = int(ray_res['GPU']/num_gpus_per_actor)
print(f"### The number of workers: {worker_num}")

summarized_ds = ds.map_batches(
  TextSummarizer,
  concurrency=worker_num,
  num_gpus=num_gpus_per_actor,
  batch_size=(ds.count()//worker_num)
)
summarized_pdf = summarized_ds.to_pandas()

The code snippet above first computes the number of actors the Ray cluster actually has. In this example, there is one actor per worker node, so the worker_num is the number of actors. The number of actors is safer to be computed at runtime because sometimes the cluster may not be able to acquire the hard-coded number of workers. If this happens, Ray will keep trying to wait for enough resources and the actors will not be running.

The batch_size parameter controls the number of records assigned to an actor. In the code example, the whole dataset is evenly divided by the number of actors. Adjusting the batch size allows users to balance between memory usage and performance.

Watch out for the warning messages like: Warning: The following resource request cannot be scheduled right now: {'CPU': 1.0, 'GPU': 4.0}. This is likely due to all cluster resources being claimed by actors. Consider creating fewer actors or adding more nodes to this Ray cluster. Review the cluster resources from setup_ray_cluster and check if actor resources specified in map_batches can fit in.

Discussion

This blog post presents a scalable, secure and efficient approach to feature extraction from documents leveraging Large Language Models (LLMs). By harnessing the power of Ray on a Databricks cluster, feature extraction tasks are distributed and executed in parallel, ensuring high-performance processing. Documents are stored in the Databricks Lakehouse, providing a centralized and managed data repository, and are loaded into a Ray dataset for processing. The extracted features can then be written to Delta tables, enabling seamless integration with downstream reporting and machine learning applications. This approach demonstrates a streamlined and scalable solution for feature extraction, empowering data scientists and engineers to extract valuable insights from large document collections. Notebooks can be downloaded here.