All kinds of PO are too difficult! This blog will be updated later after the update of simple reinforcement learning.
Introduction
Offline RL: The model learns from a purely pre-collected prompt-response(-reward) tuple. It is like a huge defined table for action space and reward functions. No fresh responses generated during the training process.
It is more like a detective.
The action space and reward functions are all finite.
The ultimate goal for offline RL is to learn optimized strategies based on stable historical data.
Online RL:
We will interact with environment!
The model learns by generating new responses in real time, and update its weights.
Online RL methods
Prompts → Current Models → (Prompts, Answers) → (Prompts, Answers, Rewards) (Given a fixed reward function.)
import os import json import torch import pandas as pd import warnings import argparse
from tqdm import tqdm from accelerate import Accelerator from datasets import load_dataset, Dataset from transformers import TrainingArguments, AutoTokenizer, AutoModelForCausalLM from trl import SFTTrainer, SFTConfig
SYSTEM_PROMPT = "You are a software engineer good at solving all kinds of problems." USER_PROMPT = "The Task you need to solve is \n\n\n ============= TASK ============= \n{task}\n =======================\n\nPlease keep your response to approximately {num} words." warnings.filterwarnings("ignore") # accelerator = Accelerator()
defgenerate_responses( model, tokenizer, user_message=None, system_message=None, max_new_tokens=3000, full_message=None, ): # Format chat using tokenizer's chat template if full_message: messages = full_message else: messages = [] if system_message: messages.append({"role": "system", "content": system_message}) messages.append({"role": "user", "content": user_message})
defload_model_and_tokenizer(model_name, use_gpu=False, gpu_device="cuda"): # Load base model and tokenizer global tokenizer tokenizer = AutoTokenizer.from_pretrained(model_name) model = AutoModelForCausalLM.from_pretrained( model_name, dtype=torch.bfloat16, device_map="auto" )
defevaluate(model, tokenizer, output_path): # feat: support resumable download ifnot os.path.exists(output_path): withopen(output_path, "w") as file: # create new output file pass
withopen(output_path, "r") as file: current_lines = sum([1for line in file])
print(f"Loading from {current_lines}") total_lines = len(test_dataset) withopen(output_path, "a", encoding="utf-8") as file: # generating test problems for models before tuning for index, test_data in tqdm( enumerate(test_dataset), total=total_lines, colour="CYAN", ): if index < current_lines: # for resumable download continue
answer = dict() # get query data query_id = test_data["query-id"] query_row = query_data[query_data["_id"] == query_id] query_text = query_row["text"].iloc[0]
# get corpus data corpus_id = test_data["corpus-id"] corpus_row = corpus_data[corpus_data["_id"] == corpus_id] corpus_answer = str(corpus_row["text"].iloc[0])
# SFTTrainer config sft_config = SFTConfig( learning_rate=8e-5, # Learning rate for training. num_train_epochs=1, # Set the number of epochs to train the model. per_device_train_batch_size=1, # Batch size for each device (e.g., GPU) during training. gradient_accumulation_steps=8, # Number of steps before performing a backward/update pass to accumulate gradients. gradient_checkpointing=False, # Enable gradient checkpointing to reduce memory usage during training at the cost of slower training speed. logging_steps=2, # Frequency of logging training progress (log every 2 steps). )
if args.eva isTrue: os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3" print("device count:", torch.cuda.device_count()) print("Start Evaluating") print(f"Evaluation of model: {model_path} before post-training") test_before_post_training()
if args.tune isTrue: os.environ["CUDA_VISIBLE_DEVICES"] = "4,5,6,7" print("device count:", torch.cuda.device_count()) print("Start finetuning using SFT") # todo add more tuning methods in the future SFT_train(train_dataset_op=train_dataset_op)
import os import json import torch import pandas as pd import warnings import argparse
from tqdm import tqdm from accelerate import Accelerator from datasets import load_dataset, Dataset from transformers import TrainingArguments, AutoTokenizer, AutoModelForCausalLM from trl import DPOTrainer, DPOConfig
defload_model_and_tokenizer(model_name, use_gpu=False, gpu_device="cuda"): # Load base model and tokenizer global tokenizer tokenizer = AutoTokenizer.from_pretrained(model_name) model = AutoModelForCausalLM.from_pretrained( model_name, dtype=torch.bfloat16, device_map="auto" )
defbuild_dpo_chatml(example): msgs = example["conversations"] prompt = next(m["value"] for m inreversed(msgs) if m["from"] == "human") try: # view the response the model generate as the rejected response rejected_resp = generate_responses(model, tokenizer, prompt) except Exception as e: rejected_resp = "Error: failed to generate response." print(f"Generation error for prompt: {prompt}\n{e}")
# set up trainer dpo_trainer = DPOTrainer( model=model, ref_model=None, args=config, processing_class=tokenizer, train_dataset=dpo_ds, ) dpo_trainer.train()
# evaluate after post_Training questions = [ "What is your name?", "Are you ChatGPT?", "Tell me about your name and organization." "9.11 and 9.9, which number is bigger?", ]
defgen_dpo_dataset(): # for this demo, we will use a identity dataset to optimize model behavior raw_ds = load_dataset("./data/mrfakename/identity", split="train") print(len(raw_ds)) dpo_ds = raw_ds.map(build_dpo_chatml, remove_columns=raw_ds.column_names) print(f"Loading data successfully. Length: {len(dpo_ds)}") return dpo_ds
if __name__ == "__main__": os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3,4,5,6,7" # parsing argument parser = argparse.ArgumentParser( description="argument for SFT training and evaluation" ) parser.add_argument("--eval", action="store_true") parser.add_argument("--tune", action="store_true") args = parser.parse_args()
print("Demo for the sft tuning process.") model_path = "./models/Qwen/Qwen2.5-7B" print(f"Using default model: {model_path}")
# loading datasets print("Loading datasets") global model model, tokenizer = load_model_and_tokenizer(model_name=model_path, use_gpu=True)
import torch import re import os import pandas as pd from tqdm import tqdm from datasets import load_dataset, Dataset from transformers import AutoTokenizer, AutoModelForCausalLM, TrainingArguments from trl import GRPOTrainer, GRPOConfig
defgenerate_responses( model, tokenizer, user_message=None, system_message=None, max_new_tokens=300, full_message=None, ): # Format chat using tokenizer's chat template if full_message: messages = full_message else: messages = [] if system_message: messages.append({"role": "system", "content": system_message}) messages.append({"role": "user", "content": user_message})
# Load base model and tokenizer tokenizer = AutoTokenizer.from_pretrained(model_name, device_map="auto") model = AutoModelForCausalLM.from_pretrained(model_name)
defdisplay_dataset(dataset): # Visualize the dataset rows = [] for i inrange(3): example = dataset[i] user_msg = next( m["content"] for m in example["messages"] if m["role"] == "user" ) assistant_msg = next( m["content"] for m in example["messages"] if m["role"] == "assistant" ) rows.append({"User Prompt": user_msg, "Assistant Response": assistant_msg})
# Display as table df = pd.DataFrame(rows) print(df)
defpost_process_dataset(example: dict) -> dict: """ Extracts the final numeric answer and formats the prompt for the model.
Args: example (dict): A single example from the dataset.
Returns: dict: The processed example with 'ground_truth' and 'prompt' keys. """ match = re.search(r"####\s*(-?\d+)", example["answer"]) example["ground_truth"] = match.group(1) ifmatchelseNone SYSTEM_PROMPT = ( "You are a helpful assistant that solves problems step-by-step. " "Always include the final numeric answer inside \\boxed{}." ) example["prompt"] = [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": example["question"]}, ] return example
defreward_func(completions, ground_truth, **kwargs): """ Calculates the reward for model completions based on a ground truth.
Args: completions (list): A list of model completions, each a list of dictionaries. ground_truth (list): A list of true answers.
Returns: list: A list of rewards (1.0 for correct, 0.0 for incorrect). """ # Regular expression to capture content inside \boxed{} matches = [ re.search(r"\\boxed\{(.*?)\}", completion[0]["content"]) for completion in completions ] contents = [match.group(1) ifmatchelse""formatchin matches] # Reward 1 if the content is the same as the ground truth, 0 otherwise return [1.0if c == gt else0.0for c, gt inzip(contents, ground_truth)]
defevaluate_model(model, tokenizer, eval_dataset: torch.utils.data.Dataset): """ Evaluates a model's performance on a given dataset using the reward function.
Args: model: The model to evaluate. tokenizer: The tokenizer for the model. eval_dataset (Dataset): The evaluation dataset. """ all_preds = [] all_labels = []
print("Starting evaluation...") for example in tqdm(eval_dataset, desc="Evaluating"): input_prompt = example["prompt"] ground_truth = example["ground_truth"] with torch.no_grad(): response = generate_responses(model, tokenizer, full_message=input_prompt) all_preds.append([{"role": "assistant", "content": response}]) all_labels.append(ground_truth)
defmain(): """ Main function to orchestrate the GRPO training and evaluation process. """ USE_GPU = torch.cuda.is_available() DATASET_PATH = "./data/openai/gsm8k" TRAIN_MODEL_NAME = "HuggingFaceTB/SmolLM2-135M-Instruct" EVAL_MODEL_NAME = "Qwen/Qwen2.5-0.5B-Instruct"