# Copyright 2020-2025 The HuggingFace Team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import tempfile import unittest from unittest.mock import patch import torch from datasets import load_dataset from parameterized import parameterized from transformers import AutoModelForCausalLM, AutoModelForSequenceClassification, AutoTokenizer from transformers.testing_utils import require_peft from transformers.utils import is_peft_available from trl import GRPOConfig, GRPOTrainer from trl.trainer.grpo_trainer import RepeatSampler, shuffle_tensor_dict, split_tensor_dict from .testing_utils import require_vllm if is_peft_available(): from peft import LoraConfig, PeftModel class SplitTensorDictTester(unittest.TestCase): def test_split_equal_chunks(self): x = torch.arange(12).reshape(6, 2) y = torch.arange(6).reshape(6, 1) tensor_dict = {"x": x, "y": y} result = split_tensor_dict(tensor_dict, 3) expected_x_chunks = torch.chunk(x, 3, dim=0) expected_y_chunks = torch.chunk(y, 3, dim=0) self.assertEqual(len(result), 3) for i in range(3): self.assertTrue(torch.equal(result[i]["x"], expected_x_chunks[i])) self.assertTrue(torch.equal(result[i]["y"], expected_y_chunks[i])) def test_with_none_tensor(self): x = torch.arange(12).reshape(6, 2) tensor_dict = {"x": x, "y": None} result = split_tensor_dict(tensor_dict, 2) expected_x_chunks = torch.chunk(x, 2, dim=0) self.assertEqual(len(result), 2) for i in range(2): self.assertTrue(torch.equal(result[i]["x"], expected_x_chunks[i])) self.assertIsNone(result[i]["y"]) class ShuffleTensorDictTester(unittest.TestCase): def test_shuffle_preserves_shape(self): x = torch.arange(6).reshape(3, 2) y = torch.arange(3).reshape(3, 1) tensor_dict = {"x": x.clone(), "y": y.clone()} shuffled = shuffle_tensor_dict(tensor_dict) self.assertEqual(shuffled["x"].shape, x.shape) self.assertEqual(shuffled["y"].shape, y.shape) def test_shuffle_consistent_across_tensors(self): # Use known patterns to check alignment x = torch.tensor([[10, 11], [20, 21], [30, 31]]) y = torch.tensor([[1], [2], [3]]) tensor_dict = {"x": x.clone(), "y": y.clone()} shuffled = shuffle_tensor_dict(tensor_dict) # Build a reverse map from shuffled x rows to y values for i in range(3): x_row = shuffled["x"][i] y_val = shuffled["y"][i].item() if torch.equal(x_row, torch.tensor([10, 11])): self.assertEqual(y_val, 1) elif torch.equal(x_row, torch.tensor([20, 21])): self.assertEqual(y_val, 2) elif torch.equal(x_row, torch.tensor([30, 31])): self.assertEqual(y_val, 3) else: self.fail("Unexpected x row in shuffled output.") def test_none_tensor_remains_none(self): x = torch.arange(6).reshape(3, 2) tensor_dict = {"x": x.clone(), "y": None} shuffled = shuffle_tensor_dict(tensor_dict) self.assertIsNone(shuffled["y"]) self.assertEqual(shuffled["x"].shape, x.shape) class RepeatRandomSamplerTester(unittest.TestCase): def test_sampler(self): dataset = ["a", "b", "c", "d", "e", "f", "g"] sampler = RepeatSampler(dataset, mini_repeat_count=2) # Should output something like [4, 4, 3, 3, 0, 0, 1, 1, 2, 2, 6, 6, 5, 5] sampled = list(sampler) # Check that the length is doubled assert len(sampled) == 2 * len(dataset) # Check that all indexes are present assert set(sampled) == set(range(len(dataset))) # Check that each element is repeated twice assert all(sampled[i] == sampled[i + 1] for i in range(0, len(sampled), 2)) def test_sampler_no_shuffle(self): dataset = ["a", "b", "c", "d", "e", "f", "g"] sampler = RepeatSampler(dataset, mini_repeat_count=2, shuffle=False) sampled = list(sampler) expected = [0, 0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6] self.assertEqual(sampled, expected) def test_sampler_no_repeat(self): dataset = ["a", "b", "c", "d", "e", "f", "g"] sampler = RepeatSampler(dataset, mini_repeat_count=1) # Should output something like [4, 3, 0, 1, 2, 6, 5] sampled = list(sampler) # Check that the length is the same assert len(sampled) == len(dataset) # Check that all indexes are present assert set(sampled) == set(range(len(dataset))) def test_sampler_with_batch_size(self): dataset = ["a", "b", "c", "d", "e", "f", "g", "h"] sampler = RepeatSampler(dataset, mini_repeat_count=1, batch_size=2, repeat_count=2) # Should output something like [4, 3, 4, 3, 0, 1, 0, 1, 2, 6, 2, 6, 5, 7, 5, 7] sampled = list(sampler) # Check that the length is doubled assert len(sampled) == 2 * len(dataset) # Check that all indexes are present assert set(sampled) == set(range(len(dataset))) # Check that each element is repeated as expected assert all(sampled[i : i + 1] == sampled[i + 2 : i + 3] for i in range(0, len(sampled), 4)) def test_sampler_with_batch_size_and_drop(self): dataset = ["a", "b", "c", "d", "e", "f", "g"] sampler = RepeatSampler(dataset, mini_repeat_count=1, batch_size=2, repeat_count=2) # Should output something like [4, 3, 4, 3, 0, 1, 0, 1, 2, 6, 2, 6] sampled = list(sampler) # Check that the length is doubled assert len(sampled) == 2 * ( len(dataset) - 1 ) # one element is dropped, because it's not enough to form a batch # Check that the sampled indexes are a subset of the dataset indexes assert set(sampled).issubset(set(range(len(dataset)))) # Check that each element is repeated as expected assert all(sampled[i : i + 1] == sampled[i + 2 : i + 3] for i in range(0, len(sampled), 4)) def test_sampler_with_mini_repeat_count_and_batch_size_1(self): dataset = ["a", "b", "c", "d", "e", "f", "g"] sampler = RepeatSampler(dataset, mini_repeat_count=2, batch_size=3, repeat_count=2) # Should output something like [4, 4, 3, 3, 0, 0, 4, 4, 3, 3, 0, 0, # 1, 1, 2, 2, 6, 6, 1, 1, 2, 2, 6, 6] sampled = list(sampler) # Check that the length is quadrupled assert len(sampled) == 4 * (len(dataset) - 1) # 1 element is dropped, because it's not enough to form a batch # Check that the sampled indexes are a subset of the dataset indexes assert set(sampled).issubset(set(range(len(dataset)))) # Check that each element is repeated as expected assert all(sampled[i] == sampled[i + 1] for i in range(0, len(sampled), 2)) # Check that the batch is repeated as expected assert sampled[0:6] == sampled[6:12] assert sampled[12:18] == sampled[18:24] def test_sampler_with_mini_repeat_count_and_batch_size_2(self): dataset = ["a", "b", "c", "d", "e", "f", "g"] sampler = RepeatSampler(dataset, mini_repeat_count=3, batch_size=2, repeat_count=2) # Should output something like [4, 4, 4, 3, 3, 3, 4, 4, 4, 3, 3, 3, # 0, 0, 0, 1, 1, 1, 0, 0, 0, 1, 1, 1, # 2, 2, 2, 6, 6, 6, 2, 2, 2, 6, 6, 6] sampled = list(sampler) # Check that the length is sextupled assert len(sampled) == 6 * (len(dataset) - 1) # 1 element is dropped, because it's not enough to form a batch # Check that the sampled indexes are a subset of the dataset indexes assert set(sampled).issubset(set(range(len(dataset)))) # Check that each element is repeated as expected assert all(sampled[i] == sampled[i + 1] == sampled[i + 2] for i in range(0, len(sampled), 3)) # Check that the batch is repeated as expected assert sampled[0:6] == sampled[6:12] assert sampled[12:18] == sampled[18:24] assert sampled[24:30] == sampled[30:36] def test_sampler_with_mini_repeat_count_and_batch_size_3(self): dataset = ["a", "b", "c", "d", "e", "f", "g"] sampler = RepeatSampler(dataset, mini_repeat_count=2, batch_size=2, repeat_count=3) # Should output something like [4, 4, 3, 3, 4, 4, 3, 3, 4, 4, 3, 3, # 0, 0, 1, 1, 0, 0, 1, 1, 0, 0, 1, 1, # 2, 2, 6, 6, 2, 2, 6, 6, 2, 2, 6, 6] sampled = list(sampler) # Check that the length is sextupled assert len(sampled) == 6 * (len(dataset) - 1) # 1 element is dropped, because it's not enough to form a batch # Check that the sampled indexes are a subset of the dataset indexes assert set(sampled).issubset(set(range(len(dataset)))) # Check that each element is repeated as expected assert all(sampled[i] == sampled[i + 1] for i in range(0, len(sampled), 2)) # Check that the batch is repeated as expected assert sampled[0:4] == sampled[4:8] == sampled[8:12] assert sampled[12:16] == sampled[16:20] == sampled[20:24] assert sampled[24:28] == sampled[28:32] == sampled[32:36] class GRPOTrainerTester(unittest.TestCase): def test_init_minimal(self): # Test that GRPOTrainer can be instantiated with only model, reward_model and train_dataset dataset = load_dataset("trl-internal-testing/zen", "standard_prompt_only", split="train") GRPOTrainer( model="trl-internal-testing/tiny-Qwen2ForCausalLM-2.5", reward_funcs="trl-internal-testing/tiny-Qwen2ForSequenceClassification-2.5", train_dataset=dataset, ) @parameterized.expand([("standard_prompt_only",), ("conversational_prompt_only",)]) def test_training(self, config_name): dataset = load_dataset("trl-internal-testing/zen", config_name, split="train") with tempfile.TemporaryDirectory() as tmp_dir: training_args = GRPOConfig( output_dir=tmp_dir, learning_rate=0.1, # increase the learning rate to speed up the test per_device_train_batch_size=3, # reduce the batch size to reduce memory usage num_generations=3, # reduce the number of generations to reduce memory usage max_completion_length=8, # reduce the completion length to reduce memory usage report_to="none", ) trainer = GRPOTrainer( model="trl-internal-testing/tiny-Qwen2ForCausalLM-2.5", reward_funcs="trl-internal-testing/tiny-Qwen2ForSequenceClassification-2.5", args=training_args, train_dataset=dataset, ) previous_trainable_params = {n: param.clone() for n, param in trainer.model.named_parameters()} trainer.train() self.assertIsNotNone(trainer.state.log_history[-1]["train_loss"]) # Check that the params have changed for n, param in previous_trainable_params.items(): new_param = trainer.model.get_parameter(n) self.assertFalse(torch.equal(param, new_param), f"Parameter {n} has not changed.") @parameterized.expand([("bnpo",), ("dr_grpo",)]) def test_training_loss_types(self, loss_type): dataset = load_dataset("trl-internal-testing/zen", "standard_prompt_only", split="train") with tempfile.TemporaryDirectory() as tmp_dir: training_args = GRPOConfig( output_dir=tmp_dir, learning_rate=0.1, # increase the learning rate to speed up the test per_device_train_batch_size=3, # reduce the batch size to reduce memory usage num_generations=3, # reduce the number of generations to reduce memory usage max_completion_length=32, # reduce the completion length to reduce memory usage loss_type=loss_type, report_to="none", ) trainer = GRPOTrainer( model="trl-internal-testing/tiny-Qwen2ForCausalLM-2.5", reward_funcs="trl-internal-testing/tiny-Qwen2ForSequenceClassification-2.5", args=training_args, train_dataset=dataset, ) previous_trainable_params = {n: param.clone() for n, param in trainer.model.named_parameters()} trainer.train() self.assertIsNotNone(trainer.state.log_history[-1]["train_loss"]) # Check that the params have changed for n, param in previous_trainable_params.items(): new_param = trainer.model.get_parameter(n) self.assertFalse(torch.equal(param, new_param), f"Parameter {n} has not changed.") def test_training_with_eval(self): dataset = load_dataset("trl-internal-testing/zen", "standard_prompt_only") with tempfile.TemporaryDirectory() as tmp_dir: training_args = GRPOConfig( output_dir=tmp_dir, per_device_train_batch_size=3, # reduce the batch size to reduce memory usage per_device_eval_batch_size=3, # reduce the batch size to reduce memory usage num_generations=3, # reduce the number of generations to reduce memory usage max_completion_length=8, # reduce the completion length to reduce memory usage eval_strategy="steps", eval_steps=2, report_to="none", ) trainer = GRPOTrainer( model="trl-internal-testing/tiny-Qwen2ForCausalLM-2.5", reward_funcs="trl-internal-testing/tiny-Qwen2ForSequenceClassification-2.5", args=training_args, train_dataset=dataset["train"], eval_dataset=dataset["test"], ) trainer.train() def test_training_multiple_iterations(self): dataset = load_dataset("trl-internal-testing/zen", "standard_prompt_only", split="train") with tempfile.TemporaryDirectory() as tmp_dir: training_args = GRPOConfig( output_dir=tmp_dir, learning_rate=0.1, # increase the learning rate to speed up the test per_device_train_batch_size=3, # reduce the batch size to reduce memory usage num_generations=3, # reduce the number of generations to reduce memory usage max_completion_length=8, # reduce the completion length to reduce memory usage num_iterations=2, report_to="none", ) trainer = GRPOTrainer( model="trl-internal-testing/tiny-Qwen2ForCausalLM-2.5", reward_funcs="trl-internal-testing/tiny-Qwen2ForSequenceClassification-2.5", args=training_args, train_dataset=dataset, ) previous_trainable_params = {n: param.clone() for n, param in trainer.model.named_parameters()} trainer.train() self.assertIsNotNone(trainer.state.log_history[-1]["train_loss"]) # Check that the params have changed for n, param in previous_trainable_params.items(): new_param = trainer.model.get_parameter(n) self.assertFalse(torch.equal(param, new_param), f"Parameter {n} has not changed.") @require_peft def test_training_peft(self): model = AutoModelForCausalLM.from_pretrained("trl-internal-testing/tiny-Qwen2ForCausalLM-2.5") base_param_names = [f"base_model.model.{n}" for n, _ in model.named_parameters()] dataset = load_dataset("trl-internal-testing/zen", "standard_prompt_only", split="train") with tempfile.TemporaryDirectory() as tmp_dir: training_args = GRPOConfig( output_dir=tmp_dir, learning_rate=0.1, # increase the learning rate to speed up the test per_device_train_batch_size=3, # reduce the batch size to reduce memory usage num_generations=3, # reduce the number of generations to reduce memory usage max_completion_length=8, # reduce the completion length to reduce memory usage report_to="none", ) trainer = GRPOTrainer( model=model, reward_funcs="trl-internal-testing/tiny-Qwen2ForSequenceClassification-2.5", args=training_args, train_dataset=dataset, peft_config=LoraConfig(), ) previous_trainable_params = {n: param.clone() for n, param in trainer.model.named_parameters()} trainer.train() self.assertIsNotNone(trainer.state.log_history[-1]["train_loss"]) # Check that the peft params have changed and the base model params have not changed for n, param in previous_trainable_params.items(): new_param = trainer.model.get_parameter(n) if n in base_param_names: # We expect the base model params to be the same self.assertTrue(torch.allclose(param, new_param), f"Parameter {n} has changed.") elif "base_layer" not in n: # We expect the peft params to be different (except for the base layer) self.assertFalse(torch.allclose(param, new_param), f"Parameter {n} has not changed.") @require_peft def test_training_peft_with_gradient_checkpointing(self): """Test that training works with PEFT and gradient checkpointing enabled.""" dataset = load_dataset("trl-internal-testing/zen", "standard_prompt_only", split="train") model = AutoModelForCausalLM.from_pretrained( "trl-internal-testing/tiny-Qwen2ForCausalLM-2.5", torch_dtype=torch.float32, # Use float32 for testing to avoid precision issues use_cache=False, # Required for gradient checkpointing ) lora_config = LoraConfig( r=8, lora_alpha=32, target_modules=["q_proj", "v_proj"], lora_dropout=0.05, bias="none" ) with tempfile.TemporaryDirectory() as tmp_dir: training_args = GRPOConfig( output_dir=tmp_dir, learning_rate=0.1, per_device_train_batch_size=3, num_generations=3, max_completion_length=8, gradient_checkpointing=True, # Enable gradient checkpointing report_to="none", ) trainer = GRPOTrainer( model=model, reward_funcs="trl-internal-testing/tiny-Qwen2ForSequenceClassification-2.5", args=training_args, train_dataset=dataset, peft_config=lora_config, ) # Verify gradient checkpointing is enabled self.assertIsInstance(trainer.model, PeftModel) # Store initial parameters to check which ones change previous_trainable_params = {n: param.clone() for n, param in trainer.model.named_parameters()} trainer.train() self.assertIsNotNone(trainer.state.log_history[-1]["train_loss"]) # Check that only LoRA parameters have changed, base model parameters remain unchanged for n, param in previous_trainable_params.items(): new_param = trainer.model.get_parameter(n) if "lora" in n.lower(): # LoRA parameters should change self.assertFalse(torch.equal(param, new_param), f"LoRA parameter {n} has not changed.") else: # Base model parameters should not change self.assertTrue(torch.equal(param, new_param), f"Base parameter {n} has changed.") def test_training_different_reward_model(self): # Use a reward model different from the model: different chat template, tokenization, etc. dataset = load_dataset("trl-internal-testing/zen", "conversational_prompt_only", split="train") reward_model_id = "trl-internal-testing/tiny-LlamaForSequenceClassification-3.2" reward_model = AutoModelForSequenceClassification.from_pretrained(reward_model_id) reward_tokenizer = AutoTokenizer.from_pretrained(reward_model_id) # By default, the trainer uses the eos token as the padding token. However, for Llama models, the eos token # appears in the chat template. Using it as a pad token disrupts the reward calculation, as the calculation # considers the score of the last token before the first pad token. To ensure correct reward calculations, # we use a separate pad token instead. reward_tokenizer.pad_token = "<|finetune_right_pad_id|>" with tempfile.TemporaryDirectory() as tmp_dir: training_args = GRPOConfig( output_dir=tmp_dir, learning_rate=0.1, # increase the learning rate to speed up the test per_device_train_batch_size=3, # reduce the batch size to reduce memory usage num_generations=3, # reduce the number of generations to reduce memory usage max_completion_length=8, # reduce the completion length to reduce memory usage report_to="none", ) trainer = GRPOTrainer( model="trl-internal-testing/tiny-Qwen2ForCausalLM-2.5", reward_funcs=reward_model, args=training_args, train_dataset=dataset, reward_processing_classes=reward_tokenizer, ) previous_trainable_params = {n: param.clone() for n, param in trainer.model.named_parameters()} trainer.train() self.assertIsNotNone(trainer.state.log_history[-1]["train_loss"]) # Check that the params have changed for n, param in previous_trainable_params.items(): new_param = trainer.model.get_parameter(n) self.assertFalse(torch.equal(param, new_param), f"Parameter {n} has not changed.") def test_training_reward_func_standard(self): # Test if trainer can handle reward function with standard format dataset = load_dataset("trl-internal-testing/zen", "standard_prompt_only", split="train") def reward_func(completions, **kwargs): """Reward function that rewards longer completions.""" return [float(len(completion)) for completion in completions] with tempfile.TemporaryDirectory() as tmp_dir: training_args = GRPOConfig( output_dir=tmp_dir, learning_rate=0.1, # increase the learning rate to speed up the test per_device_train_batch_size=3, # reduce the batch size to reduce memory usage num_generations=3, # reduce the number of generations to reduce memory usage max_completion_length=8, # reduce the completion length to reduce memory usage report_to="none", ) trainer = GRPOTrainer( model="trl-internal-testing/tiny-Qwen2ForCausalLM-2.5", reward_funcs=reward_func, args=training_args, train_dataset=dataset, ) previous_trainable_params = {n: param.clone() for n, param in trainer.model.named_parameters()} trainer.train() self.assertIsNotNone(trainer.state.log_history[-1]["train_loss"]) # Check that the params have changed for n, param in previous_trainable_params.items(): new_param = trainer.model.get_parameter(n) self.assertFalse(torch.equal(param, new_param), f"Parameter {n} has not changed.") def test_training_reward_func_conversational(self): # Test if trainer can handle reward function with conversational format dataset = load_dataset("trl-internal-testing/zen", "conversational_prompt_only", split="train") def reward_func(completions, **kwargs): """Reward function that gives higher scores to longer completion content.""" completion_contents = [completion[0]["content"] for completion in completions] return [float(len(content)) for content in completion_contents] with tempfile.TemporaryDirectory() as tmp_dir: training_args = GRPOConfig( output_dir=tmp_dir, learning_rate=0.1, # increase the learning rate to speed up the test per_device_train_batch_size=3, # reduce the batch size to reduce memory usage num_generations=3, # reduce the number of generations to reduce memory usage max_completion_length=8, # reduce the completion length to reduce memory usage report_to="none", ) trainer = GRPOTrainer( model="trl-internal-testing/tiny-Qwen2ForCausalLM-2.5", reward_funcs=reward_func, args=training_args, train_dataset=dataset, ) previous_trainable_params = {n: param.clone() for n, param in trainer.model.named_parameters()} trainer.train() self.assertIsNotNone(trainer.state.log_history[-1]["train_loss"]) # Check that the params have changed for n, param in previous_trainable_params.items(): new_param = trainer.model.get_parameter(n) self.assertFalse(torch.equal(param, new_param), f"Parameter {n} has not changed.") def test_training_multiple_reward_funcs(self): # Test that GRPOTrainer can be instantiated with multiple reward functions dataset = load_dataset("trl-internal-testing/zen", "standard_prompt_only", split="train") def reward_func1(completions, **kwargs): """Reward function that rewards longer completions.""" return [float(len(completion)) for completion in completions] def reward_func2(completions, **kwargs): """Reward function that rewards completions with more unique letters.""" return [float(len(set(completion))) for completion in completions] with tempfile.TemporaryDirectory() as tmp_dir: training_args = GRPOConfig( output_dir=tmp_dir, learning_rate=0.1, # increase the learning rate to speed up the test per_device_train_batch_size=3, # reduce the batch size to reduce memory usage num_generations=3, # reduce the number of generations to reduce memory usage max_completion_length=8, # reduce the completion length to reduce memory usage report_to="none", ) trainer = GRPOTrainer( model="trl-internal-testing/tiny-Qwen2ForCausalLM-2.5", reward_funcs=[reward_func1, reward_func2], args=training_args, train_dataset=dataset, ) previous_trainable_params = {n: param.clone() for n, param in trainer.model.named_parameters()} trainer.train() self.assertIsNotNone(trainer.state.log_history[-1]["train_loss"]) # Check that the params have changed for n, param in previous_trainable_params.items(): new_param = trainer.model.get_parameter(n) self.assertFalse(torch.equal(param, new_param), f"Parameter {n} has not changed.") def test_training_multiple_reward_funcs_with_None_output(self): """Test that a valid math reward function is processed correctly while the code reward function returns None.""" dataset = load_dataset("trl-internal-testing/zen", "standard_prompt_only", split="train") def applicable_reward_func(completions, **kwargs): """A reward function that rewards longer completions.""" return [float(len(completion)) for completion in completions] def non_applicable_reward_func(completions, **kwargs): """A reward function that returns None for all inputs, as it is not applicable to this sample.""" return [None] * len(completions) with tempfile.TemporaryDirectory() as tmp_dir: training_args = GRPOConfig( output_dir=tmp_dir, learning_rate=0.1, per_device_train_batch_size=3, num_generations=3, max_completion_length=8, report_to="none", ) trainer = GRPOTrainer( model="trl-internal-testing/tiny-Qwen2ForCausalLM-2.5", reward_funcs=[ applicable_reward_func, non_applicable_reward_func, ], # One applicable, one non applicable args=training_args, train_dataset=dataset, ) previous_trainable_params = { n: param.clone() for n, param in trainer.model.named_parameters() if param.requires_grad } trainer.train() self.assertIsNotNone(trainer.state.log_history[-1]["train_loss"]) # Check that the params have changed for n, param in previous_trainable_params.items(): new_param = trainer.model.get_parameter(n) self.assertFalse(torch.equal(param, new_param), f"Parameter {n} has not changed.") def test_training_multiple_reward_funcs_with_weights(self): """Test that GRPOTrainer can handle multiple reward functions with weights.""" dataset = load_dataset("trl-internal-testing/zen", "standard_prompt_only", split="train") def reward_func1(completions, **kwargs): """Reward function that rewards longer completions.""" return [float(len(completion)) for completion in completions] def reward_func2(completions, **kwargs): """Reward function that rewards completions with more unique letters.""" return [float(len(set(completion))) for completion in completions] with tempfile.TemporaryDirectory() as tmp_dir: training_args = GRPOConfig( output_dir=tmp_dir, learning_rate=0.1, # increase the learning rate to speed up the test per_device_train_batch_size=3, # reduce the batch size to reduce memory usage num_generations=3, # reduce the number of generations to reduce memory usage max_completion_length=8, # reduce the completion length to reduce memory usage report_to="none", reward_weights=[0.7, 0.3], # weight of reward_func1 and reward_func2 respectively ) trainer = GRPOTrainer( model="trl-internal-testing/tiny-Qwen2ForCausalLM-2.5", reward_funcs=[reward_func1, reward_func2], args=training_args, train_dataset=dataset, ) previous_trainable_params = {n: param.clone() for n, param in trainer.model.named_parameters()} trainer.train() # Check that training logs contain both reward metrics self.assertIsNotNone(trainer.state.log_history[-1]["train_loss"]) self.assertIn("rewards/reward_func1/mean", trainer.state.log_history[-1]) self.assertIn("rewards/reward_func1/std", trainer.state.log_history[-1]) self.assertIn("rewards/reward_func2/mean", trainer.state.log_history[-1]) self.assertIn("rewards/reward_func2/std", trainer.state.log_history[-1]) # Check that the params have changed for n, param in previous_trainable_params.items(): new_param = trainer.model.get_parameter(n) self.assertFalse(torch.equal(param, new_param), f"Parameter {n} has not changed.") def test_training_multiple_mixed_reward_funcs(self): # Test if the trainer can handle a mix of reward functions and reward models dataset = load_dataset("trl-internal-testing/zen", "standard_prompt_only", split="train") def reward_func(completions, **kwargs): """Reward function that rewards longer completions.""" return [float(len(completion)) for completion in completions] with tempfile.TemporaryDirectory() as tmp_dir: training_args = GRPOConfig( output_dir=tmp_dir, learning_rate=0.1, # increase the learning rate to speed up the test per_device_train_batch_size=3, # reduce the batch size to reduce memory usage num_generations=3, # reduce the number of generations to reduce memory usage max_completion_length=8, # reduce the completion length to reduce memory usage report_to="none", ) trainer = GRPOTrainer( model="trl-internal-testing/tiny-Qwen2ForCausalLM-2.5", reward_funcs=[reward_func, "trl-internal-testing/tiny-Qwen2ForSequenceClassification-2.5"], args=training_args, train_dataset=dataset, ) previous_trainable_params = {n: param.clone() for n, param in trainer.model.named_parameters()} trainer.train() self.assertIsNotNone(trainer.state.log_history[-1]["train_loss"]) # Check that the params have changed for n, param in previous_trainable_params.items(): new_param = trainer.model.get_parameter(n) self.assertFalse(torch.equal(param, new_param), f"Parameter {n} has not changed.") def test_training_reward_func_additional_column(self): # Test if trainer can handle reward function that rely on additional columns in the dataset dataset = load_dataset("trl-internal-testing/zen", "standard_prompt_only", split="train") # Add a column to the dataset (dummy example, the column could be anything) some_values = list(range(len(dataset))) dataset = dataset.add_column("some_values", some_values) def reward_func(completions, some_values, **kwargs): """Reward function that rewards completions with lengths closer to the values in some_values.""" return [float(abs(len(completion) - value)) for completion, value in zip(completions, some_values)] with tempfile.TemporaryDirectory() as tmp_dir: training_args = GRPOConfig( output_dir=tmp_dir, learning_rate=0.1, # increase the learning rate to speed up the test per_device_train_batch_size=3, # reduce the batch size to reduce memory usage num_generations=3, # reduce the number of generations to reduce memory usage max_completion_length=8, # reduce the completion length to reduce memory usage report_to="none", ) trainer = GRPOTrainer( model="trl-internal-testing/tiny-Qwen2ForCausalLM-2.5", reward_funcs=reward_func, args=training_args, train_dataset=dataset, ) previous_trainable_params = {n: param.clone() for n, param in trainer.model.named_parameters()} trainer.train() self.assertIsNotNone(trainer.state.log_history[-1]["train_loss"]) # Check that the params have changed for n, param in previous_trainable_params.items(): new_param = trainer.model.get_parameter(n) self.assertFalse(torch.equal(param, new_param), f"Parameter {n} has not changed.") @require_vllm @unittest.skip("We should add a mock for the vLLM server.") def test_training_vllm(self): """Test that training works with vLLM for generation.""" dataset = load_dataset("trl-internal-testing/zen", "standard_prompt_only", split="train") with tempfile.TemporaryDirectory() as tmp_dir: training_args = GRPOConfig( output_dir=tmp_dir, learning_rate=0.1, # increase the learning rate to speed up the test per_device_train_batch_size=3, # reduce the batch size to reduce memory usage num_generations=3, # reduce the number of generations to reduce memory usage max_completion_length=8, # reduce the completion length to reduce memory usage report_to="none", use_vllm=True, ) trainer = GRPOTrainer( model="Qwen/Qwen2.5-0.5B-Instruct", # tiny is too small for vLLM reward_funcs="trl-internal-testing/tiny-Qwen2ForSequenceClassification-2.5", args=training_args, train_dataset=dataset, ) previous_trainable_params = {n: param.clone() for n, param in trainer.model.named_parameters()} trainer.train() self.assertIsNotNone(trainer.state.log_history[-1]["train_loss"]) # Check that the params have changed for n, param in previous_trainable_params.items(): new_param = trainer.model.get_parameter(n) self.assertFalse(torch.equal(param, new_param), f"Parameter {n} has not changed.") def test_training_with_sync_ref_model(self): dataset = load_dataset("trl-internal-testing/zen", "standard_prompt_only", split="train") with tempfile.TemporaryDirectory() as tmp_dir: training_args = GRPOConfig( output_dir=tmp_dir, learning_rate=0.1, # increase the learning rate to speed up the test per_device_train_batch_size=3, # reduce the batch size to reduce memory usage num_generations=3, # reduce the number of generations to reduce memory usage max_completion_length=8, # reduce the completion length to reduce memory usage sync_ref_model=True, ref_model_sync_steps=2, # reduce sync steps to ensure a sync happens report_to="none", ) trainer = GRPOTrainer( model="trl-internal-testing/tiny-Qwen2ForCausalLM-2.5", reward_funcs="trl-internal-testing/tiny-Qwen2ForSequenceClassification-2.5", args=training_args, train_dataset=dataset, ) previous_trainable_params = {n: param.clone() for n, param in trainer.model.named_parameters()} trainer.train() self.assertIsNotNone(trainer.state.log_history[-1]["train_loss"]) # Check that the params have changed for n, param in previous_trainable_params.items(): new_param = trainer.model.get_parameter(n) self.assertFalse(torch.equal(param, new_param), f"Parameter {n} has not changed.") def test_training_beta_non_zero(self): dataset = load_dataset("trl-internal-testing/zen", "standard_prompt_only", split="train") with tempfile.TemporaryDirectory() as tmp_dir: training_args = GRPOConfig( output_dir=tmp_dir, beta=0.1, # set beta to non-zero value to test the case where the reference model is used learning_rate=0.1, # increase the learning rate to speed up the test per_device_train_batch_size=3, # reduce the batch size to reduce memory usage num_generations=3, # reduce the number of generations to reduce memory usage max_completion_length=8, # reduce the completion length to reduce memory usage report_to="none", ) trainer = GRPOTrainer( model="trl-internal-testing/tiny-Qwen2ForCausalLM-2.5", reward_funcs="trl-internal-testing/tiny-Qwen2ForSequenceClassification-2.5", args=training_args, train_dataset=dataset, ) previous_trainable_params = {n: param.clone() for n, param in trainer.model.named_parameters()} trainer.train() self.assertIsNotNone(trainer.state.log_history[-1]["train_loss"]) # Check that the params have changed for n, param in previous_trainable_params.items(): new_param = trainer.model.get_parameter(n) self.assertFalse(torch.equal(param, new_param), f"Parameter {n} has not changed.") @unittest.skip("We should add a mock for the vLLM server.") @require_peft @require_vllm def test_training_vllm_and_peft(self): """Test that training works with vLLM for generation.""" model = AutoModelForCausalLM.from_pretrained("Qwen/Qwen2.5-0.5B-Instruct") # tiny model is too small for vLLM base_param_names = [f"base_model.model.{n}" for n, _ in model.named_parameters()] dataset = load_dataset("trl-internal-testing/zen", "standard_prompt_only", split="train") with tempfile.TemporaryDirectory() as tmp_dir: training_args = GRPOConfig( output_dir=tmp_dir, learning_rate=0.1, # increase the learning rate to speed up the test per_device_train_batch_size=3, # reduce the batch size to reduce memory usage num_generations=3, # reduce the number of generations to reduce memory usage max_completion_length=8, # reduce the completion length to reduce memory usage report_to="none", use_vllm=True, ) lora_config = LoraConfig( target_modules="all-linear", # test with non-default modules as it add extra keys in state_dict tht we need to handle modules_to_save=["embed_tokens", "lm_head"], ) trainer = GRPOTrainer( model=model, reward_funcs="trl-internal-testing/tiny-Qwen2ForSequenceClassification-2.5", args=training_args, train_dataset=dataset, peft_config=lora_config, ) previous_trainable_params = {n: param.clone() for n, param in trainer.model.named_parameters()} trainer.train() self.assertIsNotNone(trainer.state.log_history[-1]["train_loss"]) # Check that the peft params have changed and the base model params have not changed for n, param in previous_trainable_params.items(): new_param = trainer.model.get_parameter(n) if n in base_param_names: # We expect the base model params to be the same self.assertTrue(torch.allclose(param, new_param), f"Parameter {n} has changed.") elif "base_layer" not in n and "original_module" not in n: # We expect the peft params to be different (except for the base layer) self.assertFalse(torch.allclose(param, new_param), f"Parameter {n} has not changed.") @require_vllm @unittest.skip("We should add a mock for the vLLM server.") def test_training_vllm_guided_decoding(self): """Test that training works with vLLM for generation with guided decoding.""" dataset = load_dataset("trl-internal-testing/zen", "standard_prompt_only", split="train") with tempfile.TemporaryDirectory() as tmp_dir: training_args = GRPOConfig( output_dir=tmp_dir, learning_rate=0.1, # increase the learning rate to speed up the test per_device_train_batch_size=3, # reduce the batch size to reduce memory usage num_generations=3, # reduce the number of generations to reduce memory usage max_completion_length=8, # reduce the completion length to reduce memory usage report_to="none", use_vllm=True, vllm_guided_decoding_regex=r"\n.*\n\n\n.*\n", ) trainer = GRPOTrainer( model="Qwen/Qwen2.5-0.5B-Instruct", # tiny model is too small for vLLM reward_funcs="trl-internal-testing/tiny-Qwen2ForSequenceClassification-2.5", args=training_args, train_dataset=dataset, ) previous_trainable_params = {n: param.clone() for n, param in trainer.model.named_parameters()} trainer.train() self.assertIsNotNone(trainer.state.log_history[-1]["train_loss"]) # Check that the params have changed for n, param in previous_trainable_params.items(): new_param = trainer.model.get_parameter(n) self.assertFalse(torch.equal(param, new_param), f"Parameter {n} has not changed.") def test_training_with_additional_generation_kwargs(self): """Test that training works with additional generation kwargs.""" dataset = load_dataset("trl-internal-testing/zen", "standard_prompt_only", split="train") with tempfile.TemporaryDirectory() as tmp_dir: training_args = GRPOConfig( output_dir=tmp_dir, learning_rate=0.1, # increase the learning rate to speed up the test per_device_train_batch_size=3, # reduce the batch size to reduce memory usage num_generations=3, # reduce the number of generations to reduce memory usage max_completion_length=8, # reduce the completion length to reduce memory usage report_to="none", top_p=0.9, top_k=10, min_p=0.01, repetition_penalty=1.1, ) trainer = GRPOTrainer( model="trl-internal-testing/tiny-Qwen2ForCausalLM-2.5", reward_funcs="trl-internal-testing/tiny-Qwen2ForSequenceClassification-2.5", args=training_args, train_dataset=dataset, ) previous_trainable_params = {n: param.clone() for n, param in trainer.model.named_parameters()} trainer.train() self.assertIsNotNone(trainer.state.log_history[-1]["train_loss"]) # Check that the params have changed for n, param in previous_trainable_params.items(): new_param = trainer.model.get_parameter(n) self.assertFalse(torch.equal(param, new_param), f"Parameter {n} has not changed.") @require_vllm @unittest.skip("We should add a mock for the vLLM server.") def test_training_vllm_with_additional_generation_kwargs(self): """Test that training works with vLLM and additional generation kwargs.""" dataset = load_dataset("trl-internal-testing/zen", "standard_prompt_only", split="train") with tempfile.TemporaryDirectory() as tmp_dir: training_args = GRPOConfig( output_dir=tmp_dir, learning_rate=0.1, # increase the learning rate to speed up the test per_device_train_batch_size=3, # reduce the batch size to reduce memory usage num_generations=3, # reduce the number of generations to reduce memory usage max_completion_length=8, # reduce the completion length to reduce memory usage report_to="none", use_vllm=True, top_p=0.9, top_k=10, min_p=0.01, repetition_penalty=1.1, ) trainer = GRPOTrainer( model="Qwen/Qwen2.5-0.5B-Instruct", # tiny model is too small for vLLM reward_funcs="trl-internal-testing/tiny-Qwen2ForSequenceClassification-2.5", args=training_args, train_dataset=dataset, ) previous_trainable_params = {n: param.clone() for n, param in trainer.model.named_parameters()} trainer.train() self.assertIsNotNone(trainer.state.log_history[-1]["train_loss"]) # Check that the params have changed for n, param in previous_trainable_params.items(): new_param = trainer.model.get_parameter(n) self.assertFalse(torch.equal(param, new_param), f"Parameter {n} has not changed.") def test_training_no_scale_rewards(self): dataset = load_dataset("trl-internal-testing/zen", "standard_prompt_only", split="train") with tempfile.TemporaryDirectory() as tmp_dir: training_args = GRPOConfig( output_dir=tmp_dir, learning_rate=0.1, # increase the learning rate to speed up the test per_device_train_batch_size=3, # reduce the batch size to reduce memory usage num_generations=3, # reduce the number of generations to reduce memory usage max_completion_length=8, # reduce the completion length to reduce memory usage scale_rewards=False, report_to="none", ) trainer = GRPOTrainer( model="trl-internal-testing/tiny-Qwen2ForCausalLM-2.5", reward_funcs="trl-internal-testing/tiny-Qwen2ForSequenceClassification-2.5", args=training_args, train_dataset=dataset, ) previous_trainable_params = {n: param.clone() for n, param in trainer.model.named_parameters()} trainer.train() self.assertIsNotNone(trainer.state.log_history[-1]["train_loss"]) # Check that the params have changed for n, param in previous_trainable_params.items(): new_param = trainer.model.get_parameter(n) self.assertFalse(torch.equal(param, new_param), f"Parameter {n} has not changed.") @patch("transformers.generation.utils.GenerationMixin.generate") def test_training_with_mask_truncated_completions(self, mock_generate): """Test that training works with mask_truncated_completions=True parameter.""" # We mock the generate method because the model's random weights make it extremely unlikely to produce a # sequence containing the EOS token within the allowed max_completion_length. As a result, all tokens are # masked in the loss, the model doesn't update, and the final check (which verifies the update) fails. def fake_generate(prompt_ids, **kwargs): # pad_token_id = 151643; eos_token_id = 151645 completions_ids = torch.tensor( [ [1, 2, 3, 4, 5, 6, 7, 8], # this one is truncated [9, 10, 11, 151645, 151643, 151643, 151643, 151643], # this one contains eos [12, 13, 14, 15, 16, 17, 18, 151645], # particular case, eos is generated just within the limit ], device=prompt_ids.device, ) return torch.cat([prompt_ids, completions_ids], dim=1) mock_generate.side_effect = fake_generate dataset = load_dataset("trl-internal-testing/zen", "standard_prompt_only", split="train") with tempfile.TemporaryDirectory() as tmp_dir: training_args = GRPOConfig( output_dir=tmp_dir, learning_rate=0.1, # increase the learning rate to speed up the test per_device_train_batch_size=3, # reduce the batch size to reduce memory usage num_generations=3, # reduce the number of generations to reduce memory usage max_completion_length=8, # reduce the completion length to reduce memory usage mask_truncated_completions=True, # Enable masking of truncated completions report_to="none", ) trainer = GRPOTrainer( model="trl-internal-testing/tiny-Qwen2ForCausalLM-2.5", reward_funcs="trl-internal-testing/tiny-Qwen2ForSequenceClassification-2.5", args=training_args, train_dataset=dataset, ) previous_trainable_params = {n: param.clone() for n, param in trainer.model.named_parameters()} trainer.train() self.assertIsNotNone(trainer.state.log_history[-1]["train_loss"]) # Check that the params have changed for n, param in previous_trainable_params.items(): new_param = trainer.model.get_parameter(n) self.assertFalse(torch.equal(param, new_param), f"Parameter {n} has not changed.") def test_training_with_mask_truncated_completions_all_masked(self): """ Test that when all generated completions are truncated (i.e., none contain an EOS token), and mask_truncated_completions=True, the model receives no effective learning signal and therefore does not update its parameters. Here, we don't mock the generate method, be we rely on the fact that the model the probability of generating the EOS token is extremely low, so all generated completions are truncated. """ dataset = load_dataset("trl-internal-testing/zen", "standard_prompt_only", split="train") with tempfile.TemporaryDirectory() as tmp_dir: training_args = GRPOConfig( output_dir=tmp_dir, learning_rate=0.1, # increase the learning rate to speed up the test per_device_train_batch_size=3, # reduce the batch size to reduce memory usage num_generations=3, # reduce the number of generations to reduce memory usage max_completion_length=8, # reduce the completion length to reduce memory usage mask_truncated_completions=True, # Enable masking of truncated completions report_to="none", ) trainer = GRPOTrainer( model="trl-internal-testing/tiny-Qwen2ForCausalLM-2.5", reward_funcs="trl-internal-testing/tiny-Qwen2ForSequenceClassification-2.5", args=training_args, train_dataset=dataset, ) previous_trainable_params = {n: param.clone() for n, param in trainer.model.named_parameters()} trainer.train() self.assertIsNotNone(trainer.state.log_history[-1]["train_loss"]) # Check that the params have changed for n, param in previous_trainable_params.items(): new_param = trainer.model.get_parameter(n) self.assertTrue(torch.equal(param, new_param), f"Parameter {n} has changed.") def test_training_num_generations_larger_than_batch_size(self): dataset = load_dataset("trl-internal-testing/zen", "standard_prompt_only", split="train") with tempfile.TemporaryDirectory() as tmp_dir: training_args = GRPOConfig( output_dir=tmp_dir, learning_rate=0.1, # increase the learning rate to speed up the test per_device_train_batch_size=3, # reduce the batch size to reduce memory usage max_completion_length=8, # reduce the completion length to reduce memory usage num_generations=6, # the number of generations is larger than the batch size, but gradient_accumulation_steps=2, # gradient accumulation should allow that report_to="none", ) trainer = GRPOTrainer( model="trl-internal-testing/tiny-Qwen2ForCausalLM-2.5", reward_funcs="trl-internal-testing/tiny-Qwen2ForSequenceClassification-2.5", args=training_args, train_dataset=dataset, ) previous_trainable_params = {n: param.clone() for n, param in trainer.model.named_parameters()} trainer.train() self.assertIsNotNone(trainer.state.log_history[-1]["train_loss"]) # Check that the params have changed for n, param in previous_trainable_params.items(): new_param = trainer.model.get_parameter(n) self.assertFalse(torch.equal(param, new_param), f"Parameter {n} has not changed.") def test_training_delta_clipping(self): dataset = load_dataset("trl-internal-testing/zen", "standard_prompt_only", split="train") with tempfile.TemporaryDirectory() as tmp_dir: training_args = GRPOConfig( output_dir=tmp_dir, learning_rate=0.1, # increase the learning rate to speed up the test per_device_train_batch_size=3, # reduce the batch size to reduce memory usage num_generations=3, # reduce the number of generations to reduce memory usage max_completion_length=8, # reduce the completion length to reduce memory usage delta=2.0, # set delta to a non-None value report_to="none", ) trainer = GRPOTrainer( model="trl-internal-testing/tiny-Qwen2ForCausalLM-2.5", reward_funcs="trl-internal-testing/tiny-Qwen2ForSequenceClassification-2.5", args=training_args, train_dataset=dataset, ) previous_trainable_params = {n: param.clone() for n, param in trainer.model.named_parameters()} trainer.train() self.assertIsNotNone(trainer.state.log_history[-1]["train_loss"]) # Check that the params have changed for n, param in previous_trainable_params.items(): new_param = trainer.model.get_parameter(n) self.assertFalse(torch.equal(param, new_param), f"Parameter {n} has not changed.") def test_training_multiple_dataloader_workers(self): dataset = load_dataset("trl-internal-testing/zen", "standard_prompt_only", split="train") with tempfile.TemporaryDirectory() as tmp_dir: training_args = GRPOConfig( output_dir=tmp_dir, learning_rate=0.1, # increase the learning rate to speed up the test per_device_train_batch_size=3, # reduce the batch size to reduce memory usage num_generations=3, # reduce the number of generations to reduce memory usage max_completion_length=8, # reduce the completion length to reduce memory usage dataloader_num_workers=2, # use multiple dataloader workers report_to="none", ) trainer = GRPOTrainer( model="trl-internal-testing/tiny-Qwen2ForCausalLM-2.5", reward_funcs="trl-internal-testing/tiny-Qwen2ForSequenceClassification-2.5", args=training_args, train_dataset=dataset, ) previous_trainable_params = {n: param.clone() for n, param in trainer.model.named_parameters()} trainer.train() self.assertIsNotNone(trainer.state.log_history[-1]["train_loss"]) # Check that the params have changed for n, param in previous_trainable_params.items(): new_param = trainer.model.get_parameter(n) self.assertFalse(torch.equal(param, new_param), f"Parameter {n} has not changed.")