cancel
Showing results for 
Search instead for 
Did you mean: 
Community Platform Discussions
Connect with fellow community members to discuss general topics related to the Databricks platform, industry trends, and best practices. Share experiences, ask questions, and foster collaboration within the community.
cancel
Showing results for 
Search instead for 
Did you mean: 

Problem with ray train and Databricks Notebook (Strange dbutils error)

JavierS
New Contributor

Hi everyone,

I'm running some code to train a multimodal Hugging Face model with SFTTrainer and TorchTrainer to use all GPU workers. When trying to execute trainer.fit() it gives me a dbutils serialization error,
even I am not using dbutils directly in my code and when I try to restart the Ray cluster it gives me the same dbutils error:

 

 

Exception: You cannot use dbutils within a spark job
File <command-1633567752829769>, line 1
----> 1 trainer.fit()

 

 My code is the given:

 

 

import ray
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster, MAX_NUM_WORKER_NODES

num_cpu_cores_per_worker = 8 # total cpu's present in each node
num_gpu_per_worker = 1 # total gpu's present in each node
resource_per_worker_int = (num_cpu_cores_per_worker / num_gpu_per_worker) - 2
use_gpu = True
ray_log_dir = f"/local_disk0/ray_logs"

try: 
  shutdown_ray_cluster()
except:
  print("No Ray cluster is initiated")

# Start the ray cluster and follow the output link to open the Ray Dashboard - a vital observability tool for understanding your infrastructure and application.
setup_ray_cluster(
  num_worker_nodes=MAX_NUM_WORKER_NODES,
  num_cpus_per_node=num_cpu_cores_per_worker,
  num_gpus_per_node=num_gpu_per_worker,
  num_cpus_head_node=8,
  num_gpus_head_node=1,
  collect_log_to_path=ray_log_dir
)

ray.init(ignore_reinit_error=True)

import torch
from accelerate import Accelerator
from datasets import load_dataset

from transformers import AutoModelForVision2Seq, AutoProcessor, LlavaForConditionalGeneration, BitsAndBytesConfig
from trl import (
    ModelConfig,
    SFTConfig,
    SFTTrainer
)
from peft import LoraConfig

import ray.train
from ray.train.huggingface.transformers import prepare_trainer, RayTrainReportCallback


import ray
import ray.train.huggingface.transformers
from ray.train import ScalingConfig, RunConfig, CheckpointConfig
from ray.train.torch import TorchTrainer 

def train_fn():
    ##########################
    # Load model and processor
    ##########################

    # BitsAndBytesConfig int-4 config
    bnb_config = BitsAndBytesConfig(
        load_in_4bit=True, bnb_4bit_use_double_quant=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.bfloat16
    )
    model_id = "meta-llama/Llama-3.2-11B-Vision-Instruct"
    processor = AutoProcessor.from_pretrained(model_id)
    model = AutoModelForVision2Seq.from_pretrained(model_id, torch_dtype=torch.bfloat16, quantization_config=bnb_config)

    #######################################################
    # Create a data collator to encode text and image pairs
    #######################################################
    def collate_fn(examples):
        # Get the texts and images, and apply the chat template
        texts = [processor.apply_chat_template(example["messages"], tokenize=False) for example in examples]
        images = [example["images"] for example in examples]

        # Tokenize the texts and process the images
        batch = processor(text=texts, images=images, return_tensors="pt", padding=True)

        # The labels are the input_ids, and we mask the padding tokens in the loss computation
        labels = batch["input_ids"].clone()
        labels[labels == processor.tokenizer.pad_token_id] = -100  #
        # Ignore the image token index in the loss computation (model specific)
        image_token_id = processor.tokenizer.convert_tokens_to_ids(processor.image_token)
        labels[labels == image_token_id] = -100
        batch["labels"] = labels

        return batch

    ##############
    # Load dataset
    ##############
    dataset = load_dataset("HuggingFaceH4/llava-instruct-mix-vsft", split="train[:1000]")
    dataset = dataset.train_test_split(test_size=0.2)
    dataset_train = ray.data.from_huggingface(dataset['train'])
    dataset_val = ray.data.from_huggingface(dataset['test']) 

    ###################
    # Configure trainer
    ###################


    # LoRA config based on QLoRA paper & Sebastian Raschka experiment
    peft_config = LoraConfig(
            lora_alpha=16,
            lora_dropout=0.05,
            r=8,
            bias="none",
            target_modules=["q_proj", "v_proj"],
            task_type="CAUSAL_LM", 
    )
    training_args = SFTConfig(
        output_dir="my-awesome-llama", 
        gradient_checkpointing=True,
        gradient_accumulation_steps=8,
        bf16=True,
        remove_unused_columns=False,
        dataset_kwargs = {"skip_prepare_dataset": True} # important for collator
    )

    trainer = SFTTrainer(
        model=model,
        args=training_args,
        data_collator=collate_fn,
        train_dataset=dataset_train,
        eval_dataset=dataset_val,
        peft_config=peft_config,
        tokenizer=processor.tokenizer,
    )

    # Train!

    callback = ray.train.huggingface.transformers.RayTrainReportCallback()
    trainer.add_callback(callback)
    trainer = ray.train.huggingface.transformers.prepare_trainer(trainer)
    trainer.train()



if __name__ == "__main__":


    # Preparing train configurations
    # training config
    train_loop_config = {
        "per_device_train_batch_size": 1,
        "per_device_eval_batch_size": 1,
        "gradient_accumulation_steps": 4,
        "learning_rate": 2e-4,
        "max_steps": 100,
        "save_steps": 10,
        "logging_steps": 10,
    }


    scaling_config = ScalingConfig(num_workers=4, use_gpu=True)

    run_config = RunConfig(checkpoint_config=CheckpointConfig(num_to_keep=5,
                            checkpoint_score_attribute="loss",
                            checkpoint_score_order="min"),
                            storage_path=f"/local_disk0/train_logs/",
                            name='RAY_TEST_ON_LLAMA_VISUAL')
    trainer = TorchTrainer(
        train_loop_per_worker=train_fn,
        train_loop_config=train_loop_config,
        run_config=run_config,
        scaling_config=scaling_config
    )


    # train
    result = trainer.fit()
    print(f"Training result: {result}")

 

 

0 REPLIES 0

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group