transformers/tests/trainer/test_trainer.py

3993 lines
162 KiB
Python

# coding=utf-8
# Copyright 2018 the HuggingFace Inc. team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import dataclasses
import gc
import json
import math
import os
import random
import re
import subprocess
import sys
import tempfile
import unittest
from functools import partial
from itertools import product
from pathlib import Path
from typing import Dict, List
from unittest.mock import Mock, patch
import numpy as np
from huggingface_hub import HfFolder, ModelCard, delete_repo, list_repo_commits, list_repo_files
from parameterized import parameterized
from requests.exceptions import HTTPError
from transformers import (
AutoTokenizer,
IntervalStrategy,
PretrainedConfig,
TrainerCallback,
TrainingArguments,
get_polynomial_decay_schedule_with_warmup,
is_torch_available,
logging,
)
from transformers.hyperparameter_search import ALL_HYPERPARAMETER_SEARCH_BACKENDS
from transformers.testing_utils import (
ENDPOINT_STAGING,
TOKEN,
USER,
CaptureLogger,
LoggingLevel,
TestCasePlus,
backend_device_count,
execute_subprocess_async,
get_gpu_count,
get_tests_dir,
is_staging_test,
require_accelerate,
require_bitsandbytes,
require_deepspeed,
require_galore_torch,
require_intel_extension_for_pytorch,
require_optuna,
require_peft,
require_ray,
require_safetensors,
require_sentencepiece,
require_sigopt,
require_tensorboard,
require_tokenizers,
require_torch,
require_torch_accelerator,
require_torch_bf16,
require_torch_gpu,
require_torch_multi_accelerator,
require_torch_non_multi_accelerator,
require_torch_non_multi_gpu,
require_torch_tensorrt_fx,
require_torch_tf32,
require_torch_up_to_2_accelerators,
require_torchdynamo,
require_wandb,
slow,
torch_device,
)
from transformers.trainer_utils import PREFIX_CHECKPOINT_DIR, HPSearchBackend, check_target_module_exists
from transformers.training_args import OptimizerNames
from transformers.utils import (
SAFE_WEIGHTS_INDEX_NAME,
SAFE_WEIGHTS_NAME,
WEIGHTS_INDEX_NAME,
WEIGHTS_NAME,
is_accelerate_available,
is_apex_available,
is_bitsandbytes_available,
is_safetensors_available,
is_torchdistx_available,
)
from transformers.utils.hp_naming import TrialShortNamer
if is_torch_available():
import torch
from torch import nn
from torch.utils.data import IterableDataset
import transformers.optimization
from transformers import (
AutoModelForCausalLM,
AutoModelForSequenceClassification,
EarlyStoppingCallback,
GlueDataset,
GlueDataTrainingArguments,
GPT2Config,
GPT2LMHeadModel,
LineByLineTextDataset,
LlamaConfig,
LlamaForCausalLM,
PreTrainedModel,
Trainer,
TrainerState,
)
from transformers.trainer_pt_utils import AcceleratorConfig
if is_safetensors_available():
import safetensors.torch
# for version specific tests in TrainerIntegrationTest
require_accelerate_version_min_0_28 = partial(require_accelerate, min_version="0.28")
GRAD_ACCUM_KWARGS_VERSION_AVAILABLE = is_accelerate_available("0.28")
PATH_SAMPLE_TEXT = f"{get_tests_dir()}/fixtures/sample_text.txt"
class RegressionDataset:
def __init__(self, a=2, b=3, length=64, seed=42, label_names=None):
np.random.seed(seed)
self.label_names = ["labels"] if label_names is None else label_names
self.length = length
self.x = np.random.normal(size=(length,)).astype(np.float32)
self.ys = [a * self.x + b + np.random.normal(scale=0.1, size=(length,)) for _ in self.label_names]
self.ys = [y.astype(np.float32) for y in self.ys]
def __len__(self):
return self.length
def __getitem__(self, i):
result = {name: y[i] for name, y in zip(self.label_names, self.ys)}
result["input_x"] = self.x[i]
return result
# Converting Bytes to Megabytes
def bytes2megabytes(x):
return int(x / 2**20)
# Copied from acclerate: https://github.com/huggingface/accelerate/blob/ee163b66fb7848892519e804688cb4ae981aacbe/src/accelerate/test_utils/scripts/external_deps/test_peak_memory_usage.py#L40C1-L73C68
class TorchTracemalloc:
def __enter__(self):
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.reset_max_memory_allocated() # reset the peak gauge to zero
self.begin = torch.cuda.memory_allocated()
return self
def __exit__(self, *exc):
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
self.end = torch.cuda.memory_allocated()
self.peak = torch.cuda.max_memory_allocated()
self.used = bytes2megabytes(self.end - self.begin)
self.peaked = bytes2megabytes(self.peak - self.begin)
@dataclasses.dataclass
class RegressionTrainingArguments(TrainingArguments):
a: float = 0.0
b: float = 0.0
keep_report_to: bool = False
def __post_init__(self):
super().__post_init__()
# save resources not dealing with reporting unless specified (also avoids the warning when it's not set)
# can be explicitly disabled via `keep_report_to`
if not self.keep_report_to:
self.report_to = []
class RepeatDataset:
def __init__(self, x, length=64):
self.x = x
self.length = length
def __len__(self):
return self.length
def __getitem__(self, i):
return {"input_ids": self.x, "labels": self.x}
class DynamicShapesDataset:
def __init__(self, length=64, seed=42, batch_size=8):
self.length = length
np.random.seed(seed)
sizes = np.random.randint(1, 20, (length // batch_size,))
# For easy batching, we make every batch_size consecutive samples the same size.
self.xs = [np.random.normal(size=(s,)).astype(np.float32) for s in sizes.repeat(batch_size)]
self.ys = [np.random.normal(size=(s,)).astype(np.float32) for s in sizes.repeat(batch_size)]
def __len__(self):
return self.length
def __getitem__(self, i):
return {"input_x": self.xs[i], "labels": self.ys[i]}
class AlmostAccuracy:
def __init__(self, thresh=0.25):
self.thresh = thresh
def __call__(self, eval_pred):
predictions, labels = eval_pred
true = np.abs(predictions - labels) <= self.thresh
return {"accuracy": true.astype(np.float32).mean().item()}
class RegressionModelConfig(PretrainedConfig):
def __init__(self, a=0, b=0, double_output=False, random_torch=True, **kwargs):
super().__init__(**kwargs)
self.a = a
self.b = b
self.double_output = double_output
self.random_torch = random_torch
self.hidden_size = 1
if is_torch_available():
class SampleIterableDataset(IterableDataset):
def __init__(self, a=2, b=3, length=64, seed=42, label_names=None):
self.dataset = RegressionDataset(a=a, b=b, length=length, seed=seed, label_names=label_names)
def __iter__(self):
for i in range(len(self.dataset)):
yield self.dataset[i]
class FiniteIterableDataset(SampleIterableDataset):
def __init__(self, a=2, b=3, length=64, seed=42, label_names=None):
super().__init__(a, b, length, seed, label_names)
self.current_sample = 0
def __iter__(self):
while self.current_sample < len(self.dataset):
yield self.dataset[self.current_sample]
self.current_sample += 1
class MultiLoader:
def __init__(self, loaders):
self.loaders = loaders
def __len__(self):
return sum(len(loader) for loader in self.loaders)
def __iter__(self):
for loader in self.loaders:
yield from loader
class CustomDataloaderTrainer(Trainer):
def get_train_dataloader(self):
dataloaders = [super().get_train_dataloader(), super().get_train_dataloader()]
return MultiLoader(dataloaders)
def get_eval_dataloader(self, eval_dataset):
dataloaders = [super().get_eval_dataloader(eval_dataset), super().get_eval_dataloader(eval_dataset)]
return MultiLoader(dataloaders)
class RegressionModel(nn.Module):
def __init__(self, a=0, b=0, double_output=False):
super().__init__()
self.a = nn.Parameter(torch.tensor(a).float())
self.b = nn.Parameter(torch.tensor(b).float())
self.double_output = double_output
self.config = None
def forward(self, input_x, labels=None, **kwargs):
y = input_x * self.a + self.b
if labels is None:
return (y, y) if self.double_output else (y,)
loss = nn.functional.mse_loss(y, labels)
return (loss, y, y) if self.double_output else (loss, y)
class RegressionDictModel(nn.Module):
def __init__(self, a=0, b=0):
super().__init__()
self.a = nn.Parameter(torch.tensor(a).float())
self.b = nn.Parameter(torch.tensor(b).float())
self.config = None
def forward(self, input_x, labels=None, **kwargs):
y = input_x * self.a + self.b
result = {"output": y}
if labels is not None:
result["loss"] = nn.functional.mse_loss(y, labels)
return result
class RegressionPreTrainedModel(PreTrainedModel):
config_class = RegressionModelConfig
base_model_prefix = "regression"
def __init__(self, config):
super().__init__(config)
self.a = nn.Parameter(torch.tensor(config.a).float())
self.b = nn.Parameter(torch.tensor(config.b).float())
self.double_output = config.double_output
def forward(self, input_x, labels=None, **kwargs):
y = input_x * self.a + self.b
if labels is None:
return (y, y) if self.double_output else (y,)
loss = nn.functional.mse_loss(y, labels)
return (loss, y, y) if self.double_output else (loss, y)
class RegressionPreTrainedModelWithGradientCheckpointing(PreTrainedModel):
config_class = RegressionModelConfig
base_model_prefix = "regression"
supports_gradient_checkpointing = True
def __init__(self, config):
super().__init__(config)
self.layers = nn.ModuleList([nn.Linear(config.hidden_size, config.hidden_size) for _ in range(4)])
self.head = nn.Linear(config.hidden_size, 1)
self.gradient_checkpointing = False
self.double_output = config.double_output
def forward(self, input_x, labels=None, **kwargs):
y = input_x.unsqueeze(0)
for layer in self.layers:
if self.training and self.gradient_checkpointing:
outputs = self._gradient_checkpointing_func(layer.__call__, y)
else:
outputs = layer(y)
y = outputs * 3
logits = self.head(y)
if labels is None:
return (logits, logits) if self.double_output else (logits,)
loss = nn.functional.mse_loss(logits, labels)
return (loss, y, y) if self.double_output else (loss, y)
class RegressionRandomPreTrainedModel(PreTrainedModel):
config_class = RegressionModelConfig
base_model_prefix = "regression"
def __init__(self, config):
super().__init__(config)
self.a = nn.Parameter(torch.tensor(config.a).float())
self.b = nn.Parameter(torch.tensor(config.b).float())
self.random_torch = config.random_torch
def forward(self, input_x, labels=None, **kwargs):
y = input_x * self.a + self.b
if self.random_torch:
torch_rand = torch.randn(1).squeeze()
np_rand = np.random.rand()
rand_rand = random.random()
if self.random_torch:
y += 0.05 * torch_rand
y += 0.05 * torch.tensor(np_rand + rand_rand)
if labels is None:
return (y,)
loss = nn.functional.mse_loss(y, labels)
return (loss, y)
class TstLayer(nn.Module):
def __init__(self, hidden_size):
super().__init__()
self.linear1 = nn.Linear(hidden_size, hidden_size)
self.ln1 = nn.LayerNorm(hidden_size)
self.linear2 = nn.Linear(hidden_size, hidden_size)
self.ln2 = nn.LayerNorm(hidden_size)
self.bias = nn.Parameter(torch.zeros(hidden_size))
def forward(self, x):
h = self.ln1(nn.functional.relu(self.linear1(x)))
h = nn.functional.relu(self.linear2(x))
return self.ln2(x + h + self.bias)
def get_regression_trainer(
a=0, b=0, double_output=False, train_len=64, eval_len=64, pretrained=True, keep_report_to=False, **kwargs
):
label_names = kwargs.get("label_names", None)
gradient_checkpointing = kwargs.get("gradient_checkpointing", False)
train_dataset = RegressionDataset(length=train_len, label_names=label_names)
eval_dataset = RegressionDataset(length=eval_len, label_names=label_names)
model_init = kwargs.pop("model_init", None)
if model_init is not None:
model = None
else:
if pretrained:
config = RegressionModelConfig(a=a, b=b, double_output=double_output)
# We infer the correct model class if one uses gradient_checkpointing or not
target_cls = (
RegressionPreTrainedModel
if not gradient_checkpointing
else RegressionPreTrainedModelWithGradientCheckpointing
)
model = target_cls(config)
else:
model = RegressionModel(a=a, b=b, double_output=double_output)
compute_metrics = kwargs.pop("compute_metrics", None)
data_collator = kwargs.pop("data_collator", None)
optimizers = kwargs.pop("optimizers", (None, None))
output_dir = kwargs.pop("output_dir", "./regression")
preprocess_logits_for_metrics = kwargs.pop("preprocess_logits_for_metrics", None)
args = RegressionTrainingArguments(output_dir, a=a, b=b, keep_report_to=keep_report_to, **kwargs)
return Trainer(
model,
args,
data_collator=data_collator,
train_dataset=train_dataset,
eval_dataset=eval_dataset,
compute_metrics=compute_metrics,
optimizers=optimizers,
model_init=model_init,
preprocess_logits_for_metrics=preprocess_logits_for_metrics,
)
class TrainerIntegrationCommon:
def check_saved_checkpoints(self, output_dir, freq, total, is_pretrained=True, safe_weights=True):
weights_file = WEIGHTS_NAME if not safe_weights else SAFE_WEIGHTS_NAME
file_list = [weights_file, "training_args.bin", "optimizer.pt", "scheduler.pt", "trainer_state.json"]
if is_pretrained:
file_list.append("config.json")
for step in range(freq, total, freq):
checkpoint = os.path.join(output_dir, f"checkpoint-{step}")
self.assertTrue(os.path.isdir(checkpoint))
for filename in file_list:
self.assertTrue(os.path.isfile(os.path.join(checkpoint, filename)))
def check_best_model_has_been_loaded(
self, output_dir, freq, total, trainer, metric, greater_is_better=False, is_pretrained=True, safe_weights=True
):
checkpoint = os.path.join(output_dir, f"checkpoint-{(total // freq) * freq}")
log_history = TrainerState.load_from_json(os.path.join(checkpoint, "trainer_state.json")).log_history
values = [d[metric] for d in log_history]
best_value = max(values) if greater_is_better else min(values)
best_checkpoint = (values.index(best_value) + 1) * freq
checkpoint = os.path.join(output_dir, f"checkpoint-{best_checkpoint}")
if is_pretrained:
best_model = RegressionPreTrainedModel.from_pretrained(checkpoint)
best_model.to(trainer.args.device)
else:
best_model = RegressionModel()
if not safe_weights:
state_dict = torch.load(os.path.join(checkpoint, WEIGHTS_NAME))
else:
state_dict = safetensors.torch.load_file(os.path.join(checkpoint, SAFE_WEIGHTS_NAME))
best_model.load_state_dict(state_dict)
best_model.to(trainer.args.device)
self.assertTrue(torch.allclose(best_model.a, trainer.model.a))
self.assertTrue(torch.allclose(best_model.b, trainer.model.b))
metrics = trainer.evaluate()
self.assertEqual(metrics[metric], best_value)
def check_trainer_state_are_the_same(self, trainer_state, trainer_state1):
# We'll pop things so operate on copies.
state = trainer_state.copy()
state1 = trainer_state1.copy()
# Log history main contain different logs for the time metrics (after resuming a training).
log_history = state.pop("log_history", None)
log_history1 = state1.pop("log_history", None)
self.assertEqual(state, state1)
skip_log_keys = ["train_runtime", "train_samples_per_second", "train_steps_per_second", "train_loss"]
for log, log1 in zip(log_history, log_history1):
for key in skip_log_keys:
_ = log.pop(key, None)
_ = log1.pop(key, None)
self.assertEqual(log, log1)
def convert_to_sharded_checkpoint(self, folder, save_safe=True, load_safe=True):
# Converts a checkpoint of a regression model to a sharded checkpoint.
if load_safe:
loader = safetensors.torch.load_file
weights_file = os.path.join(folder, SAFE_WEIGHTS_NAME)
else:
loader = torch.load
weights_file = os.path.join(folder, WEIGHTS_NAME)
if save_safe:
extension = "safetensors"
saver = safetensors.torch.save_file
index_file = os.path.join(folder, SAFE_WEIGHTS_INDEX_NAME)
shard_name = SAFE_WEIGHTS_NAME
else:
extension = "bin"
saver = torch.save
index_file = os.path.join(folder, WEIGHTS_INDEX_NAME)
shard_name = WEIGHTS_NAME
state_dict = loader(weights_file)
os.remove(weights_file)
keys = list(state_dict.keys())
shard_files = [
shard_name.replace(f".{extension}", f"-{idx+1:05d}-of-{len(keys):05d}.{extension}")
for idx in range(len(keys))
]
index = {"metadata": {}, "weight_map": {key: shard_files[i] for i, key in enumerate(keys)}}
with open(index_file, "w", encoding="utf-8") as f:
content = json.dumps(index, indent=2, sort_keys=True) + "\n"
f.write(content)
for param_name, shard_file in zip(keys, shard_files):
saver({param_name: state_dict[param_name]}, os.path.join(folder, shard_file))
@require_torch
@require_sentencepiece
@require_tokenizers
class TrainerIntegrationPrerunTest(TestCasePlus, TrainerIntegrationCommon):
"""
Only tests that want to tap into the auto-pre-run 2 trainings:
- self.default_trained_model
- self.alternate_trained_model
directly, or via check_trained_model
"""
def setUp(self):
super().setUp()
args = TrainingArguments("..")
self.n_epochs = args.num_train_epochs
self.batch_size = args.train_batch_size
trainer = get_regression_trainer(learning_rate=0.1)
trainer.train()
self.default_trained_model = (trainer.model.a, trainer.model.b)
trainer = get_regression_trainer(learning_rate=0.1, seed=314)
trainer.train()
self.alternate_trained_model = (trainer.model.a, trainer.model.b)
def check_trained_model(self, model, alternate_seed=False):
# Checks a training seeded with learning_rate = 0.1
(a, b) = self.alternate_trained_model if alternate_seed else self.default_trained_model
self.assertTrue(torch.allclose(model.a, a))
self.assertTrue(torch.allclose(model.b, b))
def test_reproducible_training(self):
# Checks that training worked, model trained and seed made a reproducible training.
trainer = get_regression_trainer(learning_rate=0.1)
trainer.train()
self.check_trained_model(trainer.model)
# Checks that a different seed gets different (reproducible) results.
trainer = get_regression_trainer(learning_rate=0.1, seed=314)
trainer.train()
self.check_trained_model(trainer.model, alternate_seed=True)
def test_trainer_with_datasets(self):
import datasets
np.random.seed(42)
x = np.random.normal(size=(64,)).astype(np.float32)
y = 2.0 * x + 3.0 + np.random.normal(scale=0.1, size=(64,)).astype(np.float32)
train_dataset = datasets.Dataset.from_dict({"input_x": x, "label": y})
# Base training. Should have the same results as test_reproducible_training
model = RegressionModel()
args = TrainingArguments("./regression", learning_rate=0.1)
trainer = Trainer(model, args, train_dataset=train_dataset)
trainer.train()
self.check_trained_model(trainer.model)
# Can return tensors.
train_dataset.set_format(type="torch", dtype=torch.float32)
model = RegressionModel()
trainer = Trainer(model, args, train_dataset=train_dataset)
trainer.train()
self.check_trained_model(trainer.model)
# Adding one column not used by the model should have no impact
z = np.random.normal(size=(64,)).astype(np.float32)
train_dataset = datasets.Dataset.from_dict({"input_x": x, "label": y, "extra": z})
model = RegressionModel()
trainer = Trainer(model, args, train_dataset=train_dataset)
trainer.train()
self.check_trained_model(trainer.model)
def test_model_init(self):
train_dataset = RegressionDataset()
args = TrainingArguments("./regression", learning_rate=0.1)
trainer = Trainer(args=args, train_dataset=train_dataset, model_init=lambda: RegressionModel())
trainer.train()
self.check_trained_model(trainer.model)
# Re-training should restart from scratch, thus lead the same results.
trainer.train()
self.check_trained_model(trainer.model)
# Re-training should restart from scratch, thus lead the same results and new seed should be used.
trainer.args.seed = 314
trainer.train()
self.check_trained_model(trainer.model, alternate_seed=True)
def test_gradient_accumulation(self):
# Training with half the batch size but accumulation steps as 2 should give the same results.
trainer = get_regression_trainer(
gradient_accumulation_steps=2, per_device_train_batch_size=4, learning_rate=0.1
)
trainer.train()
self.check_trained_model(trainer.model)
def test_gradient_checkpointing(self):
trainer = get_regression_trainer(
per_device_train_batch_size=1,
learning_rate=0.1,
gradient_checkpointing=True,
gradient_checkpointing_kwargs={"use_reentrant": False},
)
previous_params = {k: v.detach().clone() for k, v in trainer.model.named_parameters()}
trainer.train()
# Check if model weights have been updated
for k, v in trainer.model.named_parameters():
self.assertFalse(
torch.allclose(previous_params[k], v, rtol=1e-4, atol=1e-4),
f"Model weights for {k} have not been updated",
)
def test_training_loss(self):
n_gpus = max(1, backend_device_count(torch_device))
# With even logs
trainer = get_regression_trainer(logging_steps=64 / (8 * n_gpus))
trainer.train()
log_history = trainer.state.log_history
losses = [log["loss"] for log in log_history if "loss" in log]
train_loss = log_history[-1]["train_loss"]
self.assertAlmostEqual(sum(losses) / len(losses), train_loss, places=4)
# With uneven logs
trainer = get_regression_trainer(logging_steps=5)
trainer.train()
log_history = trainer.state.log_history
# Training loss should be the same as before
new_train_loss = log_history[-1]["train_loss"]
self.assertAlmostEqual(train_loss, new_train_loss, places=4)
def test_custom_optimizer(self):
train_dataset = RegressionDataset()
args = TrainingArguments("./regression")
model = RegressionModel()
optimizer = torch.optim.SGD(model.parameters(), lr=1.0)
lr_scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda=lambda x: 1.0)
trainer = Trainer(model, args, train_dataset=train_dataset, optimizers=(optimizer, lr_scheduler))
trainer.train()
(a, b) = self.default_trained_model
self.assertFalse(torch.allclose(trainer.model.a, a))
self.assertFalse(torch.allclose(trainer.model.b, b))
self.assertEqual(trainer.optimizer.state_dict()["param_groups"][0]["lr"], 1.0)
def test_lr_scheduler_kwargs(self):
# test scheduler kwargs passed via TrainingArguments
train_dataset = RegressionDataset()
model = RegressionModel()
num_steps, num_warmup_steps = 10, 2
extra_kwargs = {"power": 5.0, "lr_end": 1e-5} # Non-default arguments
args = TrainingArguments(
"./regression",
lr_scheduler_type="polynomial",
lr_scheduler_kwargs=extra_kwargs,
learning_rate=0.2,
warmup_steps=num_warmup_steps,
)
trainer = Trainer(model, args, train_dataset=train_dataset)
trainer.create_optimizer_and_scheduler(num_training_steps=num_steps)
# Checking that the scheduler was created
self.assertIsNotNone(trainer.lr_scheduler)
# Checking that the correct args were passed
sched1 = trainer.lr_scheduler
sched2 = get_polynomial_decay_schedule_with_warmup(
trainer.optimizer, num_warmup_steps=num_warmup_steps, num_training_steps=num_steps, **extra_kwargs
)
self.assertEqual(sched1.lr_lambdas[0].args, sched2.lr_lambdas[0].args)
self.assertEqual(sched1.lr_lambdas[0].keywords, sched2.lr_lambdas[0].keywords)
def test_cosine_with_min_lr_scheduler(self):
train_dataset = RegressionDataset()
model = RegressionModel()
num_steps, num_warmup_steps = 10, 2
extra_kwargs = {"min_lr": 1e-5} # Non-default arguments
args = TrainingArguments(
"./regression",
lr_scheduler_type="cosine_with_min_lr",
lr_scheduler_kwargs=extra_kwargs,
learning_rate=0.2,
warmup_steps=num_warmup_steps,
)
trainer = Trainer(model, args, train_dataset=train_dataset)
trainer.create_optimizer_and_scheduler(num_training_steps=num_steps)
# Checking that the scheduler was created
self.assertIsNotNone(trainer.lr_scheduler)
# Check the last learning rate
for _ in range(num_steps):
trainer.lr_scheduler.step()
self.assertEqual(trainer.lr_scheduler.get_last_lr()[0], 1e-5)
def test_reduce_lr_on_plateau_args(self):
# test passed arguments for a custom ReduceLROnPlateau scheduler
train_dataset = RegressionDataset(length=64)
eval_dataset = RegressionDataset(length=64)
args = TrainingArguments(
"./regression",
eval_strategy="epoch",
metric_for_best_model="eval_loss",
)
model = RegressionModel()
optimizer = torch.optim.SGD(model.parameters(), lr=1.0)
lr_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, factor=0.2, patience=5, cooldown=2)
trainer = Trainer(
model, args, train_dataset=train_dataset, eval_dataset=eval_dataset, optimizers=(optimizer, lr_scheduler)
)
trainer.train()
self.assertIsInstance(trainer.lr_scheduler, torch.optim.lr_scheduler.ReduceLROnPlateau)
self.assertEqual(trainer.lr_scheduler.factor, 0.2)
self.assertEqual(trainer.lr_scheduler.patience, 5)
self.assertEqual(trainer.lr_scheduler.cooldown, 2)
def test_reduce_lr_on_plateau(self):
# test the ReduceLROnPlateau scheduler
class TrainerWithLRLogs(Trainer):
def log(self, logs):
# the LR is computed after metrics and does not exist for the first epoch
if hasattr(self.lr_scheduler, "_last_lr"):
logs["learning_rate"] = self.lr_scheduler._last_lr[0]
super().log(logs)
train_dataset = RegressionDataset(length=64)
eval_dataset = RegressionDataset(length=64)
args = TrainingArguments(
"./regression",
lr_scheduler_type="reduce_lr_on_plateau",
eval_strategy="epoch",
metric_for_best_model="eval_loss",
num_train_epochs=10,
learning_rate=0.2,
)
model = RegressionModel()
trainer = TrainerWithLRLogs(model, args, train_dataset=train_dataset, eval_dataset=eval_dataset)
trainer.train()
self.assertIsInstance(trainer.lr_scheduler, torch.optim.lr_scheduler.ReduceLROnPlateau)
patience = trainer.lr_scheduler.patience
logs = trainer.state.log_history[1:]
best_loss = logs[0]["eval_loss"]
bad_epochs = 0
for i, log in enumerate(logs[:-1]): # Compare learning rate to next epoch's
loss = log["eval_loss"]
just_decreased = False
if loss > best_loss:
bad_epochs += 1
if bad_epochs > patience:
self.assertLess(logs[i + 1]["learning_rate"], log["learning_rate"])
just_decreased = True
bad_epochs = 0
else:
best_loss = loss
bad_epochs = 0
if not just_decreased:
self.assertEqual(logs[i + 1]["learning_rate"], log["learning_rate"])
def test_adafactor_lr_none(self):
# test the special case where lr=None, since Trainer can't not have lr_scheduler
from transformers.optimization import Adafactor, AdafactorSchedule
train_dataset = RegressionDataset()
args = TrainingArguments("./regression")
model = RegressionModel()
optimizer = Adafactor(model.parameters(), scale_parameter=True, relative_step=True, warmup_init=True, lr=None)
lr_scheduler = AdafactorSchedule(optimizer)
trainer = Trainer(model, args, train_dataset=train_dataset, optimizers=(optimizer, lr_scheduler))
trainer.train()
(a, b) = self.default_trained_model
self.assertFalse(torch.allclose(trainer.model.a, a))
self.assertFalse(torch.allclose(trainer.model.b, b))
self.assertGreater(trainer.optimizer.state_dict()["param_groups"][0]["lr"], 0)
@require_torch_accelerator
@require_torch_bf16
def test_mixed_bf16(self):
# very basic test
trainer = get_regression_trainer(learning_rate=0.1, bf16=True)
trainer.train()
self.check_trained_model(trainer.model)
# --bf16 --half_precision_backend apex can't be used together
with self.assertRaises(ValueError):
trainer = get_regression_trainer(learning_rate=0.1, bf16=True, half_precision_backend="apex")
# will add more specific tests once there are some bugs to fix
@require_torch_gpu
@require_torch_tf32
def test_tf32(self):
# very basic test
trainer = get_regression_trainer(learning_rate=0.1, tf32=True)
trainer.train()
self.check_trained_model(trainer.model)
@require_torch
@require_sentencepiece
@require_tokenizers
class TrainerIntegrationTest(TestCasePlus, TrainerIntegrationCommon):
def setUp(self):
super().setUp()
args = TrainingArguments("..")
self.n_epochs = args.num_train_epochs
self.batch_size = args.train_batch_size
def test_trainer_works_with_dict(self):
# Edge case because Apex with mode O2 will change our models to return dicts. This test checks it doesn't break
# anything.
train_dataset = RegressionDataset()
eval_dataset = RegressionDataset()
model = RegressionDictModel()
args = TrainingArguments("./regression")
trainer = Trainer(model, args, train_dataset=train_dataset, eval_dataset=eval_dataset)
trainer.train()
_ = trainer.evaluate()
_ = trainer.predict(eval_dataset)
def test_evaluation_with_keys_to_drop(self):
config = GPT2Config(vocab_size=100, n_positions=128, n_embd=32, n_layer=3, n_head=4)
tiny_gpt2 = GPT2LMHeadModel(config)
x = torch.randint(0, 100, (128,))
eval_dataset = RepeatDataset(x)
args = TrainingArguments("./test")
trainer = Trainer(tiny_gpt2, args, eval_dataset=eval_dataset)
# By default the past_key_values are removed
result = trainer.predict(eval_dataset)
self.assertTrue(isinstance(result.predictions, np.ndarray))
# We can still get them by setting ignore_keys to []
result = trainer.predict(eval_dataset, ignore_keys=[])
self.assertTrue(isinstance(result.predictions, tuple))
self.assertEqual(len(result.predictions), 2)
def test_training_arguments_are_left_untouched(self):
trainer = get_regression_trainer()
trainer.train()
args = TrainingArguments("./regression", report_to=[])
dict1, dict2 = args.to_dict(), trainer.args.to_dict()
for key in dict1.keys():
# Logging dir can be slightly different as they default to something with the time.
if key != "logging_dir":
self.assertEqual(dict1[key], dict2[key])
def test_number_of_steps_in_training(self):
# Regular training has n_epochs * len(train_dl) steps
trainer = get_regression_trainer(learning_rate=0.1)
train_output = trainer.train()
self.assertEqual(train_output.global_step, self.n_epochs * 64 / self.batch_size)
# Check passing num_train_epochs works (and a float version too):
trainer = get_regression_trainer(learning_rate=0.1, num_train_epochs=1.5)
train_output = trainer.train()
self.assertEqual(train_output.global_step, int(1.5 * 64 / self.batch_size))
# If we pass a max_steps, num_train_epochs is ignored
trainer = get_regression_trainer(learning_rate=0.1, max_steps=10)
train_output = trainer.train()
self.assertEqual(train_output.global_step, 10)
@require_torch_bf16
@require_intel_extension_for_pytorch
def test_number_of_steps_in_training_with_ipex(self):
for mix_bf16 in [True, False]:
# Regular training has n_epochs * len(train_dl) steps
trainer = get_regression_trainer(learning_rate=0.1, use_ipex=True, bf16=mix_bf16, use_cpu=True)
train_output = trainer.train()
self.assertEqual(train_output.global_step, self.n_epochs * 64 / trainer.args.train_batch_size)
# Check passing num_train_epochs works (and a float version too):
trainer = get_regression_trainer(
learning_rate=0.1, num_train_epochs=1.5, use_ipex=True, bf16=mix_bf16, use_cpu=True
)
train_output = trainer.train()
self.assertEqual(train_output.global_step, int(1.5 * 64 / trainer.args.train_batch_size))
# If we pass a max_steps, num_train_epochs is ignored
trainer = get_regression_trainer(
learning_rate=0.1, max_steps=10, use_ipex=True, bf16=mix_bf16, use_cpu=True
)
train_output = trainer.train()
self.assertEqual(train_output.global_step, 10)
@require_peft
@require_bitsandbytes
def test_bnb_compile(self):
from peft import LoraConfig, get_peft_model
# Simply tests if initializing a Trainer with a PEFT + compiled model works out of the box
# QLoRA + torch compile is not really supported yet, but we should at least support the model
# loading and let torch throw the
tiny_model = AutoModelForCausalLM.from_pretrained(
"hf-internal-testing/tiny-random-LlamaForCausalLM", load_in_4bit=True
)
peft_config = LoraConfig(
r=8,
lora_alpha=32,
target_modules=["q_proj", "k_proj", "v_proj"],
lora_dropout=0.05,
bias="none",
task_type="CAUSAL_LM",
)
tiny_model = get_peft_model(tiny_model, peft_config)
tiny_model = torch.compile(tiny_model)
x = torch.randint(0, 100, (128,))
train_dataset = RepeatDataset(x)
with tempfile.TemporaryDirectory() as tmp_dir:
args = TrainingArguments(
tmp_dir,
learning_rate=1e-9,
logging_steps=5,
)
with self.assertRaises(ValueError):
_ = Trainer(tiny_model, args, train_dataset=train_dataset) # noqa
@require_bitsandbytes
def test_rmsprop_bnb(self):
config = GPT2Config(vocab_size=100, n_positions=128, n_embd=32, n_layer=3, n_head=4)
tiny_gpt2 = GPT2LMHeadModel(config)
x = torch.randint(0, 100, (128,))
train_dataset = RepeatDataset(x)
with tempfile.TemporaryDirectory() as tmpdir:
# Trainer without inf/nan filter
args = TrainingArguments(
tmpdir, learning_rate=1e-9, logging_steps=5, logging_nan_inf_filter=False, optim="rmsprop_bnb"
)
trainer = Trainer(tiny_gpt2, args, train_dataset=train_dataset)
# Check that it trains without errors
trainer.train()
@require_bitsandbytes
def test_rmsprop_bnb_8bit(self):
config = GPT2Config(vocab_size=100, n_positions=128, n_embd=32, n_layer=3, n_head=4)
tiny_gpt2 = GPT2LMHeadModel(config)
x = torch.randint(0, 100, (128,))
train_dataset = RepeatDataset(x)
with tempfile.TemporaryDirectory() as tmpdir:
# Trainer without inf/nan filter
args = TrainingArguments(
tmpdir, learning_rate=1e-9, logging_steps=5, logging_nan_inf_filter=False, optim="rmsprop_bnb_8bit"
)
trainer = Trainer(tiny_gpt2, args, train_dataset=train_dataset)
# Check that it trains without errors
trainer.train()
@require_bitsandbytes
def test_rmsprop_bnb_32bit(self):
config = GPT2Config(vocab_size=100, n_positions=128, n_embd=32, n_layer=3, n_head=4)
tiny_gpt2 = GPT2LMHeadModel(config)
x = torch.randint(0, 100, (128,))
train_dataset = RepeatDataset(x)
with tempfile.TemporaryDirectory() as tmpdir:
# Trainer without inf/nan filter
args = TrainingArguments(
tmpdir, learning_rate=1e-9, logging_steps=5, logging_nan_inf_filter=False, optim="rmsprop_bnb_32bit"
)
trainer = Trainer(tiny_gpt2, args, train_dataset=train_dataset)
# Check that it trains without errors
trainer.train()
def test_neftune(self):
config = GPT2Config(vocab_size=100, n_positions=128, n_embd=32, n_layer=3, n_head=4)
tiny_gpt2 = GPT2LMHeadModel(config)
x = torch.randint(0, 100, (128,))
train_dataset = RepeatDataset(x)
# Trainer without inf/nan filter
args = TrainingArguments(
"./test", learning_rate=1e-9, logging_steps=5, logging_nan_inf_filter=False, neftune_noise_alpha=0.4
)
trainer = Trainer(tiny_gpt2, args, train_dataset=train_dataset)
trainer.model = trainer._activate_neftune(trainer.model)
dummy_input = torch.LongTensor([[1, 0, 1]]).to(torch_device)
emb1 = trainer.model.get_input_embeddings()(dummy_input)
emb2 = trainer.model.get_input_embeddings()(dummy_input)
self.assertFalse(torch.allclose(emb1, emb2), "Neftune noise is not applied!")
# redefine the model
tiny_gpt2 = GPT2LMHeadModel(config)
# Trainer without inf/nan filter
args = TrainingArguments(
"./test", learning_rate=1e-9, logging_steps=5, logging_nan_inf_filter=False, neftune_noise_alpha=0.4
)
trainer = Trainer(tiny_gpt2, args, train_dataset=train_dataset)
# Check that it trains without errors
trainer.train()
# Make sure forward pass works fine
_ = trainer.model(dummy_input)
self.assertTrue(len(trainer.model.get_input_embeddings()._forward_hooks) == 0)
trainer.model.eval()
# Check that we get identical embeddings just in case
emb1 = trainer.model.get_input_embeddings()(dummy_input)
emb2 = trainer.model.get_input_embeddings()(dummy_input)
self.assertTrue(torch.allclose(emb1, emb2), "Neftune noise is still applied!")
def test_logging_inf_nan_filter(self):
config = GPT2Config(vocab_size=100, n_positions=128, n_embd=32, n_layer=3, n_head=4)
tiny_gpt2 = GPT2LMHeadModel(config)
x = torch.randint(0, 100, (128,))
train_dataset = RepeatDataset(x)
# Trainer without inf/nan filter
args = TrainingArguments("./test", learning_rate=1e9, logging_steps=5, logging_nan_inf_filter=False)
trainer = Trainer(tiny_gpt2, args, train_dataset=train_dataset)
trainer.train()
log_history_no_filter = trainer.state.log_history
# Trainer with inf/nan filter
args = TrainingArguments("./test", learning_rate=1e9, logging_steps=5, logging_nan_inf_filter=True)
trainer = Trainer(tiny_gpt2, args, train_dataset=train_dataset)
trainer.train()
log_history_filter = trainer.state.log_history
def is_any_loss_nan_or_inf(log_history):
losses = [l["loss"] for l in log_history[:-1]]
return any(math.isnan(x) for x in losses) or any(math.isinf(x) for x in losses)
self.assertTrue(is_any_loss_nan_or_inf(log_history_no_filter))
self.assertFalse(is_any_loss_nan_or_inf(log_history_filter))
def test_train_and_eval_dataloaders(self):
if torch_device == "cuda":
n_gpu = max(1, backend_device_count(torch_device))
else:
n_gpu = 1
trainer = get_regression_trainer(learning_rate=0.1, per_device_train_batch_size=16)
self.assertEqual(trainer.get_train_dataloader().total_batch_size, 16 * n_gpu)
trainer = get_regression_trainer(learning_rate=0.1, per_device_eval_batch_size=16)
self.assertEqual(trainer.get_eval_dataloader().total_batch_size, 16 * n_gpu)
# Check drop_last works
trainer = get_regression_trainer(
train_len=66, eval_len=74, learning_rate=0.1, per_device_train_batch_size=16, per_device_eval_batch_size=32
)
self.assertEqual(len(trainer.get_train_dataloader()), 66 // (16 * n_gpu) + 1)
self.assertEqual(len(trainer.get_eval_dataloader()), 74 // (32 * n_gpu) + 1)
trainer = get_regression_trainer(
train_len=66,
eval_len=74,
learning_rate=0.1,
per_device_train_batch_size=16,
per_device_eval_batch_size=32,
dataloader_drop_last=True,
)
self.assertEqual(len(trainer.get_train_dataloader()), 66 // (16 * n_gpu))
self.assertEqual(len(trainer.get_eval_dataloader()), 74 // (32 * n_gpu))
# Check passing a new dataset for evaluation works
new_eval_dataset = RegressionDataset(length=128)
self.assertEqual(len(trainer.get_eval_dataloader(new_eval_dataset)), 128 // (32 * n_gpu))
# tests that we do not require dataloader to have a .dataset attribute
def test_dataloader_without_dataset(self):
train_dataset = RegressionDataset(length=128)
trainer = CustomDataloaderTrainer(
model=RegressionModel(), train_dataset=train_dataset, eval_dataset=train_dataset
)
trainer.train()
trainer.evaluate()
def test_galore_matched_modules(self):
regex_patterns = [r".*.attn.*", r".*.mlp.*"]
module_names = [
"model.transformer.h.0.ln_1",
"model.transformer.h.0.attn.q_proj",
"model.lm_head",
"model.transformer.h.0.mlp.up_proj",
]
expected_values = [False, True, False, True]
for expected_value, module_name in zip(expected_values, module_names):
is_module_matched, is_regex = check_target_module_exists(regex_patterns, module_name, return_is_regex=True)
self.assertTrue(is_module_matched == expected_value)
if is_module_matched:
self.assertTrue(is_regex)
exact_patterns = ["q_proj", "up_proj"]
module_names = [
"model.transformer.h.0.ln_1",
"model.transformer.h.0.attn.q_proj",
"model.lm_head",
"model.transformer.h.0.mlp.up_proj",
]
expected_values = [False, True, False, True]
for expected_value, module_name in zip(expected_values, module_names):
is_module_matched, is_regex = check_target_module_exists(exact_patterns, module_name, return_is_regex=True)
self.assertTrue(is_module_matched == expected_value)
if is_module_matched:
self.assertFalse(is_regex)
simple_regex = r".*.attn.*"
module_names = [
"model.transformer.h.0.ln_1",
"model.transformer.h.0.attn.q_proj",
"model.lm_head",
"model.transformer.h.0.mlp.up_proj",
]
expected_values = [False, True, False, False]
for expected_value, module_name in zip(expected_values, module_names):
is_module_matched, is_regex = check_target_module_exists(simple_regex, module_name, return_is_regex=True)
self.assertTrue(is_module_matched == expected_value)
if is_module_matched:
self.assertTrue(is_regex)
simple_regex = "model.transformer.h.0.attn.q_proj"
module_names = [
"model.transformer.h.0.ln_1",
"model.transformer.h.0.attn.q_proj",
"model.lm_head",
"model.transformer.h.0.mlp.up_proj",
]
expected_values = [False, True, False, False]
for expected_value, module_name in zip(expected_values, module_names):
is_module_matched, is_regex = check_target_module_exists(simple_regex, module_name, return_is_regex=True)
self.assertTrue(is_module_matched == expected_value)
if is_module_matched:
self.assertFalse(is_regex)
target_modules = ["attn", "mlp"]
module_names = [
"model.transformer.h.0.ln_1",
"model.transformer.h.0.attn.q_proj",
"model.lm_head",
"model.transformer.h.0.mlp.up_proj",
]
expected_values = [False, True, False, True]
for expected_value, module_name in zip(expected_values, module_names):
is_module_matched, is_regex = check_target_module_exists(target_modules, module_name, return_is_regex=True)
self.assertTrue(is_module_matched == expected_value)
if is_module_matched:
self.assertFalse(is_regex)
@require_galore_torch
@require_torch_gpu
def test_galore(self):
config = LlamaConfig(vocab_size=100, hidden_size=32, num_hidden_layers=3, num_attention_heads=4)
tiny_llama = LlamaForCausalLM(config)
x = torch.randint(0, 100, (128,))
train_dataset = RepeatDataset(x)
with tempfile.TemporaryDirectory() as tmpdir:
# Trainer without inf/nan filter
args = TrainingArguments(
tmpdir,
learning_rate=1e-9,
logging_steps=5,
optim="galore_adamw",
optim_target_modules=[r".*attn.*", r".*mlp.*"],
)
trainer = Trainer(tiny_llama, args, train_dataset=train_dataset)
# Check this works
_ = trainer.train()
@require_galore_torch
@require_torch_gpu
def test_galore_extra_args(self):
config = LlamaConfig(vocab_size=100, hidden_size=32, num_hidden_layers=3, num_attention_heads=4)
tiny_llama = LlamaForCausalLM(config)
x = torch.randint(0, 100, (128,))
train_dataset = RepeatDataset(x)
with tempfile.TemporaryDirectory() as tmpdir:
# Trainer without inf/nan filter
args = TrainingArguments(
tmpdir,
learning_rate=1e-9,
logging_steps=5,
optim="galore_adamw",
optim_args="rank=64, update_proj_gap=100, scale=0.10",
optim_target_modules=[r".*attn.*", r".*mlp.*"],
)
trainer = Trainer(tiny_llama, args, train_dataset=train_dataset)
# Check this works
_ = trainer.train()
@require_galore_torch
@require_torch_gpu
def test_galore_layerwise(self):
config = LlamaConfig(vocab_size=100, hidden_size=32, num_hidden_layers=3, num_attention_heads=4)
tiny_llama = LlamaForCausalLM(config)
x = torch.randint(0, 100, (128,))
train_dataset = RepeatDataset(x)
with tempfile.TemporaryDirectory() as tmpdir:
# Trainer without inf/nan filter
args = TrainingArguments(
tmpdir,
learning_rate=1e-9,
logging_steps=5,
optim="galore_adamw_layerwise",
optim_target_modules=[r".*attn.*", r".*mlp.*"],
)
trainer = Trainer(tiny_llama, args, train_dataset=train_dataset)
# Check this works
_ = trainer.train()
@require_galore_torch
@require_torch_gpu
def test_galore_layerwise_with_scheduler(self):
config = LlamaConfig(vocab_size=100, hidden_size=32, num_hidden_layers=3, num_attention_heads=4)
tiny_llama = LlamaForCausalLM(config)
x = torch.randint(0, 100, (128,))
train_dataset = RepeatDataset(x)
with tempfile.TemporaryDirectory() as tmpdir:
# Trainer without inf/nan filter
args = TrainingArguments(
tmpdir,
learning_rate=1e-9,
logging_steps=5,
optim="galore_adamw_layerwise",
lr_scheduler_type="cosine",
optim_target_modules=[r".*attn.*", r".*mlp.*"],
)
trainer = Trainer(tiny_llama, args, train_dataset=train_dataset)
# Check this works
_ = trainer.train()
@require_galore_torch
@require_torch_gpu
def test_galore_adamw_8bit(self):
config = LlamaConfig(vocab_size=100, hidden_size=32, num_hidden_layers=3, num_attention_heads=4)
tiny_llama = LlamaForCausalLM(config)
x = torch.randint(0, 100, (128,))
train_dataset = RepeatDataset(x)
with tempfile.TemporaryDirectory() as tmpdir:
# Trainer without inf/nan filter
args = TrainingArguments(
tmpdir,
learning_rate=1e-9,
logging_steps=5,
optim="galore_adamw_8bit",
optim_target_modules=[r".*attn.*", r".*mlp.*"],
)
trainer = Trainer(tiny_llama, args, train_dataset=train_dataset)
# Check this works
_ = trainer.train()
@require_galore_torch
@require_torch_gpu
def test_galore_adafactor(self):
# These are the intervals of the peak memory usage of training such a tiny model
# if the peak memory goes outside that range, then we know there might be a bug somewhere
upper_bound_pm = 700
lower_bound_pm = 650
config = LlamaConfig(vocab_size=100, hidden_size=32, num_hidden_layers=3, num_attention_heads=4)
tiny_llama = LlamaForCausalLM(config)
x = torch.randint(0, 100, (128,))
train_dataset = RepeatDataset(x)
with tempfile.TemporaryDirectory() as tmpdir, TorchTracemalloc() as tracemalloc:
# Trainer without inf/nan filter
args = TrainingArguments(
tmpdir,
learning_rate=1e-9,
logging_steps=5,
optim="galore_adafactor",
optim_target_modules=[r".*attn.*", r".*mlp.*"],
)
trainer = Trainer(tiny_llama, args, train_dataset=train_dataset)
# Check this works
_ = trainer.train()
galore_peak_memory = tracemalloc.peaked + bytes2megabytes(tracemalloc.begin)
self.assertTrue(galore_peak_memory < upper_bound_pm)
self.assertTrue(lower_bound_pm < galore_peak_memory)
@require_galore_torch
@require_torch_gpu
def test_galore_adafactor_attention_only(self):
# These are the intervals of the peak memory usage of training such a tiny model
# if the peak memory goes outside that range, then we know there might be a bug somewhere
upper_bound_pm = 700
lower_bound_pm = 650
config = LlamaConfig(vocab_size=100, hidden_size=32, num_hidden_layers=3, num_attention_heads=4)
tiny_llama = LlamaForCausalLM(config)
x = torch.randint(0, 100, (128,))
train_dataset = RepeatDataset(x)
with tempfile.TemporaryDirectory() as tmpdir, TorchTracemalloc() as tracemalloc:
# Trainer without inf/nan filter
args = TrainingArguments(
tmpdir,
learning_rate=1e-9,
logging_steps=5,
optim="galore_adafactor",
optim_target_modules=["q_proj", "k_proj", "v_proj"],
)
trainer = Trainer(tiny_llama, args, train_dataset=train_dataset)
# Check this works
_ = trainer.train()
galore_peak_memory = tracemalloc.peaked + bytes2megabytes(tracemalloc.begin)
self.assertTrue(galore_peak_memory < upper_bound_pm)
self.assertTrue(lower_bound_pm < galore_peak_memory)
@require_galore_torch
@require_torch_gpu
def test_galore_adafactor_all_linear(self):
# These are the intervals of the peak memory usage of training such a tiny model
# if the peak memory goes outside that range, then we know there might be a bug somewhere
upper_bound_pm = 700
lower_bound_pm = 650
config = LlamaConfig(vocab_size=100, hidden_size=32, num_hidden_layers=3, num_attention_heads=4)
tiny_llama = LlamaForCausalLM(config)
x = torch.randint(0, 100, (128,))
train_dataset = RepeatDataset(x)
with tempfile.TemporaryDirectory() as tmpdir, TorchTracemalloc() as tracemalloc:
# Trainer without inf/nan filter
args = TrainingArguments(
tmpdir,
learning_rate=1e-9,
logging_steps=5,
optim="galore_adafactor",
optim_target_modules="all-linear",
)
trainer = Trainer(tiny_llama, args, train_dataset=train_dataset)
# Check this works
_ = trainer.train()
galore_peak_memory = tracemalloc.peaked + bytes2megabytes(tracemalloc.begin)
self.assertTrue(galore_peak_memory < upper_bound_pm)
self.assertTrue(lower_bound_pm < galore_peak_memory)
@require_torch_multi_accelerator
def test_data_is_not_parallelized_when_model_is_parallel(self):
model = RegressionModel()
# Make the Trainer believe it's a parallelized model
model.is_parallelizable = True
model.model_parallel = True
args = TrainingArguments("./regression", per_device_train_batch_size=16, per_device_eval_batch_size=16)
trainer = Trainer(model, args, train_dataset=RegressionDataset(), eval_dataset=RegressionDataset())
# Check the Trainer was fooled
self.assertTrue(trainer.is_model_parallel)
self.assertEqual(trainer.args.n_gpu, 1)
# The batch size of the training and evaluation dataloaders should be 16, not 16 * n_gpu
self.assertEqual(trainer.get_train_dataloader().total_batch_size, 16)
self.assertEqual(len(trainer.get_train_dataloader()), 64 // 16)
self.assertEqual(trainer.get_eval_dataloader().total_batch_size, 16)
self.assertEqual(len(trainer.get_eval_dataloader()), 64 // 16)
def test_evaluate(self):
trainer = get_regression_trainer(a=1.5, b=2.5, compute_metrics=AlmostAccuracy())
results = trainer.evaluate()
x, y = trainer.eval_dataset.x, trainer.eval_dataset.ys[0]
pred = 1.5 * x + 2.5
expected_loss = ((pred - y) ** 2).mean()
self.assertAlmostEqual(results["eval_loss"], expected_loss)
expected_acc = AlmostAccuracy()((pred, y))["accuracy"]
self.assertAlmostEqual(results["eval_accuracy"], expected_acc)
# With a number of elements not a round multiple of the batch size
trainer = get_regression_trainer(a=1.5, b=2.5, eval_len=66, compute_metrics=AlmostAccuracy())
results = trainer.evaluate()
x, y = trainer.eval_dataset.x, trainer.eval_dataset.ys[0]
pred = 1.5 * x + 2.5
expected_loss = ((pred - y) ** 2).mean()
self.assertAlmostEqual(results["eval_loss"], expected_loss)
expected_acc = AlmostAccuracy()((pred, y))["accuracy"]
self.assertAlmostEqual(results["eval_accuracy"], expected_acc)
# With logits preprocess
trainer = get_regression_trainer(
a=1.5,
b=2.5,
compute_metrics=AlmostAccuracy(),
preprocess_logits_for_metrics=lambda logits, labels: logits + 1,
)
results = trainer.evaluate()
x, y = trainer.eval_dataset.x, trainer.eval_dataset.ys[0]
pred = 1.5 * x + 2.5
expected_loss = ((pred - y) ** 2).mean()
self.assertAlmostEqual(results["eval_loss"], expected_loss)
expected_acc = AlmostAccuracy()((pred + 1, y))["accuracy"]
self.assertAlmostEqual(results["eval_accuracy"], expected_acc)
def test_evaluate_with_jit(self):
trainer = get_regression_trainer(a=1.5, b=2.5, compute_metrics=AlmostAccuracy(), jit_mode_eval=True)
results = trainer.evaluate()
x, y = trainer.eval_dataset.x, trainer.eval_dataset.ys[0]
pred = 1.5 * x + 2.5
expected_loss = ((pred - y) ** 2).mean()
self.assertAlmostEqual(results["eval_loss"], expected_loss)
expected_acc = AlmostAccuracy()((pred, y))["accuracy"]
self.assertAlmostEqual(results["eval_accuracy"], expected_acc)
# With a number of elements not a round multiple of the batch size
trainer = get_regression_trainer(
a=1.5, b=2.5, eval_len=66, compute_metrics=AlmostAccuracy(), jit_mode_eval=True
)
results = trainer.evaluate()
x, y = trainer.eval_dataset.x, trainer.eval_dataset.ys[0]
pred = 1.5 * x + 2.5
expected_loss = ((pred - y) ** 2).mean()
self.assertAlmostEqual(results["eval_loss"], expected_loss)
expected_acc = AlmostAccuracy()((pred, y))["accuracy"]
self.assertAlmostEqual(results["eval_accuracy"], expected_acc)
# With logits preprocess
trainer = get_regression_trainer(
a=1.5,
b=2.5,
compute_metrics=AlmostAccuracy(),
preprocess_logits_for_metrics=lambda logits, labels: logits + 1,
jit_mode_eval=True,
)
results = trainer.evaluate()
x, y = trainer.eval_dataset.x, trainer.eval_dataset.ys[0]
pred = 1.5 * x + 2.5
expected_loss = ((pred - y) ** 2).mean()
self.assertAlmostEqual(results["eval_loss"], expected_loss)
expected_acc = AlmostAccuracy()((pred + 1, y))["accuracy"]
self.assertAlmostEqual(results["eval_accuracy"], expected_acc)
@require_torch_bf16
@require_intel_extension_for_pytorch
def test_evaluate_with_ipex(self):
for mix_bf16 in [True, False]:
trainer = get_regression_trainer(
a=1.5, b=2.5, use_ipex=True, compute_metrics=AlmostAccuracy(), bf16=mix_bf16, use_cpu=True
)
results = trainer.evaluate()
x, y = trainer.eval_dataset.x, trainer.eval_dataset.ys[0]
pred = 1.5 * x + 2.5
expected_loss = ((pred - y) ** 2).mean()
self.assertAlmostEqual(results["eval_loss"], expected_loss)
expected_acc = AlmostAccuracy()((pred, y))["accuracy"]
self.assertAlmostEqual(results["eval_accuracy"], expected_acc)
# With a number of elements not a round multiple of the batch size
trainer = get_regression_trainer(
a=1.5,
b=2.5,
use_ipex=True,
eval_len=66,
compute_metrics=AlmostAccuracy(),
bf16=mix_bf16,
use_cpu=True,
)
results = trainer.evaluate()
x, y = trainer.eval_dataset.x, trainer.eval_dataset.ys[0]
pred = 1.5 * x + 2.5
expected_loss = ((pred - y) ** 2).mean()
self.assertAlmostEqual(results["eval_loss"], expected_loss)
expected_acc = AlmostAccuracy()((pred, y))["accuracy"]
self.assertAlmostEqual(results["eval_accuracy"], expected_acc)
# With logits preprocess
trainer = get_regression_trainer(
a=1.5,
b=2.5,
use_ipex=True,
compute_metrics=AlmostAccuracy(),
preprocess_logits_for_metrics=lambda logits, labels: logits + 1,
bf16=mix_bf16,
use_cpu=True,
)
results = trainer.evaluate()
x, y = trainer.eval_dataset.x, trainer.eval_dataset.ys[0]
pred = 1.5 * x + 2.5
expected_loss = ((pred - y) ** 2).mean()
self.assertAlmostEqual(results["eval_loss"], expected_loss)
expected_acc = AlmostAccuracy()((pred + 1, y))["accuracy"]
self.assertAlmostEqual(results["eval_accuracy"], expected_acc)
def test_predict(self):
trainer = get_regression_trainer(a=1.5, b=2.5)
preds = trainer.predict(trainer.eval_dataset).predictions
x = trainer.eval_dataset.x
self.assertTrue(np.allclose(preds, 1.5 * x + 2.5))
# With a number of elements not a round multiple of the batch size
trainer = get_regression_trainer(a=1.5, b=2.5, eval_len=66)
preds = trainer.predict(trainer.eval_dataset).predictions
x = trainer.eval_dataset.x
self.assertTrue(np.allclose(preds, 1.5 * x + 2.5))
# With more than one output of the model
trainer = get_regression_trainer(a=1.5, b=2.5, double_output=True)
preds = trainer.predict(trainer.eval_dataset).predictions
x = trainer.eval_dataset.x
self.assertEqual(len(preds), 2)
self.assertTrue(np.allclose(preds[0], 1.5 * x + 2.5))
self.assertTrue(np.allclose(preds[1], 1.5 * x + 2.5))
# With more than one output/label of the model
trainer = get_regression_trainer(a=1.5, b=2.5, double_output=True, label_names=["labels", "labels_2"])
outputs = trainer.predict(trainer.eval_dataset)
preds = outputs.predictions
labels = outputs.label_ids
x = trainer.eval_dataset.x
self.assertEqual(len(preds), 2)
self.assertTrue(np.allclose(preds[0], 1.5 * x + 2.5))
self.assertTrue(np.allclose(preds[1], 1.5 * x + 2.5))
self.assertTrue(np.array_equal(labels[0], trainer.eval_dataset.ys[0]))
self.assertTrue(np.array_equal(labels[1], trainer.eval_dataset.ys[1]))
def test_predict_with_jit(self):
trainer = get_regression_trainer(a=1.5, b=2.5, jit_mode_eval=True)
preds = trainer.predict(trainer.eval_dataset).predictions
x = trainer.eval_dataset.x
self.assertTrue(np.allclose(preds, 1.5 * x + 2.5))
# With a number of elements not a round multiple of the batch size
trainer = get_regression_trainer(a=1.5, b=2.5, eval_len=66, jit_mode_eval=True)
preds = trainer.predict(trainer.eval_dataset).predictions
x = trainer.eval_dataset.x
self.assertTrue(np.allclose(preds, 1.5 * x + 2.5))
# With more than one output of the model
trainer = get_regression_trainer(a=1.5, b=2.5, double_output=True, jit_mode_eval=True)
preds = trainer.predict(trainer.eval_dataset).predictions
x = trainer.eval_dataset.x
self.assertEqual(len(preds), 2)
self.assertTrue(np.allclose(preds[0], 1.5 * x + 2.5))
self.assertTrue(np.allclose(preds[1], 1.5 * x + 2.5))
# With more than one output/label of the model
trainer = get_regression_trainer(
a=1.5, b=2.5, double_output=True, label_names=["labels", "labels_2"], jit_mode_eval=True
)
outputs = trainer.predict(trainer.eval_dataset)
preds = outputs.predictions
labels = outputs.label_ids
x = trainer.eval_dataset.x
self.assertEqual(len(preds), 2)
self.assertTrue(np.allclose(preds[0], 1.5 * x + 2.5))
self.assertTrue(np.allclose(preds[1], 1.5 * x + 2.5))
self.assertTrue(np.array_equal(labels[0], trainer.eval_dataset.ys[0]))
self.assertTrue(np.array_equal(labels[1], trainer.eval_dataset.ys[1]))
@require_torch_bf16
@require_intel_extension_for_pytorch
def test_predict_with_ipex(self):
for mix_bf16 in [True, False]:
trainer = get_regression_trainer(a=1.5, b=2.5, use_ipex=True, bf16=mix_bf16, use_cpu=True)
preds = trainer.predict(trainer.eval_dataset).predictions
x = trainer.eval_dataset.x
self.assertTrue(np.allclose(preds, 1.5 * x + 2.5))
# With a number of elements not a round multiple of the batch size
trainer = get_regression_trainer(a=1.5, b=2.5, eval_len=66, use_ipex=True, bf16=mix_bf16, use_cpu=True)
preds = trainer.predict(trainer.eval_dataset).predictions
x = trainer.eval_dataset.x
self.assertTrue(np.allclose(preds, 1.5 * x + 2.5))
# With more than one output of the model
trainer = get_regression_trainer(
a=1.5, b=2.5, double_output=True, use_ipex=True, bf16=mix_bf16, use_cpu=True
)
preds = trainer.predict(trainer.eval_dataset).predictions
x = trainer.eval_dataset.x
self.assertEqual(len(preds), 2)
self.assertTrue(np.allclose(preds[0], 1.5 * x + 2.5))
self.assertTrue(np.allclose(preds[1], 1.5 * x + 2.5))
# With more than one output/label of the model
trainer = get_regression_trainer(
a=1.5,
b=2.5,
double_output=True,
label_names=["labels", "labels_2"],
use_ipex=True,
bf16=mix_bf16,
use_cpu=True,
)
outputs = trainer.predict(trainer.eval_dataset)
preds = outputs.predictions
labels = outputs.label_ids
x = trainer.eval_dataset.x
self.assertEqual(len(preds), 2)
self.assertTrue(np.allclose(preds[0], 1.5 * x + 2.5))
self.assertTrue(np.allclose(preds[1], 1.5 * x + 2.5))
self.assertTrue(np.array_equal(labels[0], trainer.eval_dataset.ys[0]))
self.assertTrue(np.array_equal(labels[1], trainer.eval_dataset.ys[1]))
def test_dynamic_shapes(self):
eval_dataset = DynamicShapesDataset(batch_size=self.batch_size)
model = RegressionModel(a=2, b=1)
args = TrainingArguments("./regression")
trainer = Trainer(model, args, eval_dataset=eval_dataset)
# Check evaluation can run to completion
_ = trainer.evaluate()
# Check predictions
preds = trainer.predict(eval_dataset)
for expected, seen in zip(eval_dataset.ys, preds.label_ids):
self.assertTrue(np.array_equal(expected, seen[: expected.shape[0]]))
self.assertTrue(np.all(seen[expected.shape[0] :] == -100))
for expected, seen in zip(eval_dataset.xs, preds.predictions):
self.assertTrue(np.array_equal(2 * expected + 1, seen[: expected.shape[0]]))
self.assertTrue(np.all(seen[expected.shape[0] :] == -100))
# Same tests with eval accumulation
args = TrainingArguments("./regression", eval_accumulation_steps=2)
trainer = Trainer(model, args, eval_dataset=eval_dataset)
# Check evaluation can run to completion
_ = trainer.evaluate()
# Check predictions
preds = trainer.predict(eval_dataset)
for expected, seen in zip(eval_dataset.ys, preds.label_ids):
self.assertTrue(np.array_equal(expected, seen[: expected.shape[0]]))
self.assertTrue(np.all(seen[expected.shape[0] :] == -100))
for expected, seen in zip(eval_dataset.xs, preds.predictions):
self.assertTrue(np.array_equal(2 * expected + 1, seen[: expected.shape[0]]))
self.assertTrue(np.all(seen[expected.shape[0] :] == -100))
def test_log_level(self):
# testing only --log_level (--log_level_replica requires multiple gpus and DDP and is tested elsewhere)
logger = logging.get_logger()
log_info_string = "Running training"
# test with the default log_level - should be the same as before and thus we test depending on is_info
is_info = logging.get_verbosity() <= 20
with CaptureLogger(logger) as cl:
trainer = get_regression_trainer()
trainer.train()
if is_info:
self.assertIn(log_info_string, cl.out)
else:
self.assertNotIn(log_info_string, cl.out)
with LoggingLevel(logging.INFO):
# test with low log_level - lower than info
with CaptureLogger(logger) as cl:
trainer = get_regression_trainer(log_level="debug")
trainer.train()
self.assertIn(log_info_string, cl.out)
with LoggingLevel(logging.INFO):
# test with high log_level - should be quiet
with CaptureLogger(logger) as cl:
trainer = get_regression_trainer(log_level="error")
trainer.train()
self.assertNotIn(log_info_string, cl.out)
def test_save_checkpoints(self):
with tempfile.TemporaryDirectory() as tmpdir:
trainer = get_regression_trainer(output_dir=tmpdir, save_steps=5)
trainer.train()
self.check_saved_checkpoints(tmpdir, 5, int(self.n_epochs * 64 / self.batch_size))
# With a regular model that is not a PreTrainedModel
with tempfile.TemporaryDirectory() as tmpdir:
trainer = get_regression_trainer(output_dir=tmpdir, save_steps=5, pretrained=False)
trainer.train()
self.check_saved_checkpoints(tmpdir, 5, int(self.n_epochs * 64 / self.batch_size), False)
@require_safetensors
def test_safe_checkpoints(self):
for save_safetensors in [True, False]:
with tempfile.TemporaryDirectory() as tmpdir:
trainer = get_regression_trainer(output_dir=tmpdir, save_steps=5, save_safetensors=save_safetensors)
trainer.train()
self.check_saved_checkpoints(
tmpdir, 5, int(self.n_epochs * 64 / self.batch_size), safe_weights=save_safetensors
)
# With a regular model that is not a PreTrainedModel
with tempfile.TemporaryDirectory() as tmpdir:
trainer = get_regression_trainer(
output_dir=tmpdir, save_steps=5, pretrained=False, save_safetensors=save_safetensors
)
trainer.train()
self.check_saved_checkpoints(
tmpdir, 5, int(self.n_epochs * 64 / self.batch_size), False, safe_weights=save_safetensors
)
@require_torch_multi_accelerator
def test_run_seq2seq_double_train_wrap_once(self):
# test that we don't wrap the model more than once
# since wrapping primarily happens on multi-gpu setup we want multiple gpus to test for
# example DataParallel(DataParallel(model))
trainer = get_regression_trainer()
trainer.train()
model_wrapped_before = trainer.model_wrapped
trainer.train()
model_wrapped_after = trainer.model_wrapped
self.assertIs(model_wrapped_before, model_wrapped_after, "should be not wrapped twice")
@require_torch_up_to_2_accelerators
def test_can_resume_training(self):
# This test will fail for more than 2 GPUs since the batch size will get bigger and with the number of
# save_steps, the checkpoint will resume training at epoch 2 or more (so the data seen by the model
# won't be the same since the training dataloader is shuffled).
with tempfile.TemporaryDirectory() as tmpdir:
kwargs = {
"output_dir": tmpdir,
"train_len": 128,
"save_steps": 5,
"learning_rate": 0.1,
"logging_steps": 5,
}
trainer = get_regression_trainer(**kwargs)
trainer.train()
(a, b) = trainer.model.a.item(), trainer.model.b.item()
state = dataclasses.asdict(trainer.state)
checkpoint = os.path.join(tmpdir, "checkpoint-5")
# Reinitialize trainer
trainer = get_regression_trainer(**kwargs)
trainer.train(resume_from_checkpoint=checkpoint)
(a1, b1) = trainer.model.a.item(), trainer.model.b.item()
state1 = dataclasses.asdict(trainer.state)
self.assertEqual(a, a1)
self.assertEqual(b, b1)
self.check_trainer_state_are_the_same(state, state1)
# Now check with a later checkpoint that it also works when we span over one epoch
checkpoint = os.path.join(tmpdir, "checkpoint-15")
# Reinitialize trainer and load model
trainer = get_regression_trainer(**kwargs)
trainer.train(resume_from_checkpoint=checkpoint)
(a1, b1) = trainer.model.a.item(), trainer.model.b.item()
state1 = dataclasses.asdict(trainer.state)
self.assertEqual(a, a1)
self.assertEqual(b, b1)
self.check_trainer_state_are_the_same(state, state1)
# With a regular model that is not a PreTrainedModel
with tempfile.TemporaryDirectory() as tmpdir:
kwargs = {
"output_dir": tmpdir,
"train_len": 128,
"save_steps": 5,
"learning_rate": 0.1,
"pretrained": False,
}
trainer = get_regression_trainer(**kwargs)
trainer.train()
(a, b) = trainer.model.a.item(), trainer.model.b.item()
state = dataclasses.asdict(trainer.state)
checkpoint = os.path.join(tmpdir, "checkpoint-5")
# Reinitialize trainer and load model
trainer = get_regression_trainer(**kwargs)
trainer.train(resume_from_checkpoint=checkpoint)
(a1, b1) = trainer.model.a.item(), trainer.model.b.item()
state1 = dataclasses.asdict(trainer.state)
self.assertEqual(a, a1)
self.assertEqual(b, b1)
self.check_trainer_state_are_the_same(state, state1)
# Now check with a later checkpoint that it also works when we span over one epoch
checkpoint = os.path.join(tmpdir, "checkpoint-15")
# Reinitialize trainer and load model
trainer = get_regression_trainer(**kwargs)
trainer.train(resume_from_checkpoint=checkpoint)
(a1, b1) = trainer.model.a.item(), trainer.model.b.item()
state1 = dataclasses.asdict(trainer.state)
self.assertEqual(a, a1)
self.assertEqual(b, b1)
self.check_trainer_state_are_the_same(state, state1)
# Now check failures
# 1. fail to find a bogus checkpoint
trainer = get_regression_trainer()
with self.assertRaises(Exception) as context:
trainer.train(resume_from_checkpoint=f"{checkpoint}-bogus")
self.assertTrue("Can't find a valid checkpoint at" in str(context.exception))
# 2. fail to find any checkpoint - due a fresh output_dir
output_dir2 = self.get_auto_remove_tmp_dir()
trainer = get_regression_trainer(output_dir=output_dir2)
with self.assertRaises(Exception) as context:
trainer.train(resume_from_checkpoint=True)
self.assertTrue("No valid checkpoint found in output directory" in str(context.exception))
@unittest.skip(
reason="@muellerzr: Fix once Trainer can take an accelerate configuration. Need to set `seedable_sampler=True`."
)
def test_resume_training_with_randomness(self):
# For more than 1 GPUs, since the randomness is introduced in the model and with DataParallel (which is used
# in this test for more than 2 GPUs), the calls to the torch RNG will happen in a random order (sometimes
# GPU 0 will call first and sometimes GPU 1).
random_torch = not torch.cuda.is_available() or torch.cuda.device_count() <= 1
if torch.cuda.is_available():
torch.backends.cudnn.deterministic = True
train_dataset = RegressionDataset(length=128)
eval_dataset = RegressionDataset()
with self.subTest("Test every step"):
config = RegressionModelConfig(a=0, b=2, random_torch=random_torch)
model = RegressionRandomPreTrainedModel(config)
tmp_dir = self.get_auto_remove_tmp_dir()
args = RegressionTrainingArguments(tmp_dir, save_steps=5, learning_rate=0.1)
trainer = Trainer(model, args, train_dataset=train_dataset, eval_dataset=eval_dataset)
trainer.train()
(a, b) = trainer.model.a.item(), trainer.model.b.item()
model = RegressionRandomPreTrainedModel(config)
trainer = Trainer(model, args, train_dataset=train_dataset, eval_dataset=eval_dataset)
trainer.train(resume_from_checkpoint=os.path.join(tmp_dir, "checkpoint-15"))
(a1, b1) = trainer.model.a.item(), trainer.model.b.item()
self.assertAlmostEqual(a, a1, delta=1e-5)
self.assertAlmostEqual(b, b1, delta=1e-5)
with self.subTest("Test every epoch"):
config = RegressionModelConfig(a=0, b=2, random_torch=random_torch)
model = RegressionRandomPreTrainedModel(config)
tmp_dir = self.get_auto_remove_tmp_dir()
args = RegressionTrainingArguments(tmp_dir, save_strategy="epoch", learning_rate=0.1)
trainer = Trainer(model, args, train_dataset=train_dataset, eval_dataset=eval_dataset)
trainer.train()
(a, b) = trainer.model.a.item(), trainer.model.b.item()
model = RegressionRandomPreTrainedModel(config)
trainer = Trainer(model, args, train_dataset=train_dataset, eval_dataset=eval_dataset)
checkpoints = [d for d in os.listdir(tmp_dir) if d.startswith("checkpoint-")]
# There should be one checkpoint per epoch.
self.assertEqual(len(checkpoints), 3)
checkpoint_dir = sorted(checkpoints, key=lambda x: int(x.replace("checkpoint-", "")))[0]
trainer.train(resume_from_checkpoint=os.path.join(tmp_dir, checkpoint_dir))
(a1, b1) = trainer.model.a.item(), trainer.model.b.item()
self.assertAlmostEqual(a, a1, delta=1e-5)
self.assertAlmostEqual(b, b1, delta=1e-5)
@slow
@require_accelerate
@require_torch_non_multi_accelerator
def test_auto_batch_size_finder(self):
if torch.cuda.is_available():
torch.backends.cudnn.deterministic = True
SRC_DIR = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..", "examples", "pytorch", "text-classification")
)
sys.path.append(SRC_DIR)
import run_glue
with tempfile.TemporaryDirectory() as tmpdir:
testargs = f"""
run_glue.py
--model_name_or_path distilbert/distilbert-base-uncased
--task_name mrpc
--do_train
--do_eval
--max_seq_len 128
--per_device_train_batch_size 4096
--learning_rate 2e-5
--num_train_epochs 1
--output_dir {tmpdir}
--auto_find_batch_size 0
""".split()
with self.assertRaises(RuntimeError):
with patch.object(sys, "argv", testargs):
run_glue.main()
testargs[-1] = "1"
with patch.object(sys, "argv", testargs):
run_glue.main()
@require_deepspeed
def test_auto_batch_size_with_resume_from_checkpoint_with_deepspeed(self):
train_dataset = RegressionDataset(length=128)
config = RegressionModelConfig(a=0, b=2)
model = RegressionRandomPreTrainedModel(config)
tmp_dir = self.get_auto_remove_tmp_dir()
class MockCudaOOMCallback(TrainerCallback):
def on_step_end(self, args, state, control, **kwargs):
# simulate OOM on the first step
if state.train_batch_size >= 16:
raise RuntimeError("CUDA out of memory.")
deepspeed = {
"zero_optimization": {
"stage": 1,
},
"train_batch_size": "auto",
"train_micro_batch_size_per_gpu": "auto",
}
args = RegressionTrainingArguments(
tmp_dir,
do_train=True,
max_steps=2,
save_steps=1,
per_device_train_batch_size=16,
auto_find_batch_size=True,
deepspeed=deepspeed,
)
# Note: This can have issues, for now we don't support this functionality
# ref: https://github.com/huggingface/transformers/pull/29057
with self.assertRaises(NotImplementedError):
_ = Trainer(model, args, train_dataset=train_dataset, callbacks=[MockCudaOOMCallback()])
def test_auto_batch_size_with_resume_from_checkpoint(self):
train_dataset = RegressionDataset(length=128)
config = RegressionModelConfig(a=0, b=2)
model = RegressionRandomPreTrainedModel(config)
tmp_dir = self.get_auto_remove_tmp_dir()
class MockCudaOOMCallback(TrainerCallback):
def on_step_end(self, args, state, control, **kwargs):
# simulate OOM on the first step
if state.train_batch_size >= 16:
raise RuntimeError("CUDA out of memory.")
args = RegressionTrainingArguments(
tmp_dir,
do_train=True,
max_steps=2,
save_steps=1,
per_device_train_batch_size=16,
auto_find_batch_size=True,
)
trainer = Trainer(model, args, train_dataset=train_dataset, callbacks=[MockCudaOOMCallback()])
trainer.train()
# After `auto_find_batch_size` is ran we should now be at 8
self.assertEqual(trainer._train_batch_size, 8)
# We can then make a new Trainer
trainer = Trainer(model, args, train_dataset=train_dataset)
# Check we are at 16 to start
self.assertEqual(trainer._train_batch_size, 16 * max(trainer.args.n_gpu, 1))
trainer.train(resume_from_checkpoint=True)
# We should be back to 8 again, picking up based upon the last ran Trainer
self.assertEqual(trainer._train_batch_size, 8)
# regression for this issue: https://github.com/huggingface/transformers/issues/12970
def test_training_with_resume_from_checkpoint_false(self):
train_dataset = RegressionDataset(length=128)
eval_dataset = RegressionDataset()
config = RegressionModelConfig(a=0, b=2)
model = RegressionRandomPreTrainedModel(config)
tmp_dir = self.get_auto_remove_tmp_dir()
args = RegressionTrainingArguments(tmp_dir, save_steps=5, learning_rate=0.1)
trainer = Trainer(model, args, train_dataset=train_dataset, eval_dataset=eval_dataset)
trainer.train(resume_from_checkpoint=False)
@require_torch_up_to_2_accelerators
def test_resume_training_with_shard_checkpoint(self):
# This test will fail for more than 2 GPUs since the batch size will get bigger and with the number of
# save_steps, the checkpoint will resume training at epoch 2 or more (so the data seen by the model
# won't be the same since the training dataloader is shuffled).
with tempfile.TemporaryDirectory() as tmpdir:
trainer = get_regression_trainer(output_dir=tmpdir, train_len=128, save_steps=5, learning_rate=0.1)
trainer.train()
(a, b) = trainer.model.a.item(), trainer.model.b.item()
state = dataclasses.asdict(trainer.state)
checkpoint = os.path.join(tmpdir, "checkpoint-5")
self.convert_to_sharded_checkpoint(checkpoint)
# Reinitialize trainer
trainer = get_regression_trainer(output_dir=tmpdir, train_len=128, save_steps=5, learning_rate=0.1)
trainer.train(resume_from_checkpoint=checkpoint)
(a1, b1) = trainer.model.a.item(), trainer.model.b.item()
state1 = dataclasses.asdict(trainer.state)
self.assertEqual(a, a1)
self.assertEqual(b, b1)
self.check_trainer_state_are_the_same(state, state1)
@require_safetensors
@require_torch_up_to_2_accelerators
def test_resume_training_with_safe_checkpoint(self):
# This test will fail for more than 2 GPUs since the batch size will get bigger and with the number of
# save_steps, the checkpoint will resume training at epoch 2 or more (so the data seen by the model
# won't be the same since the training dataloader is shuffled).
for initial_safe in [False, True]:
for loaded_safe in [False, True]:
with tempfile.TemporaryDirectory() as tmpdir:
trainer = get_regression_trainer(
output_dir=tmpdir,
train_len=128,
save_steps=5,
learning_rate=0.1,
save_safetensors=initial_safe,
)
trainer.train()
(a, b) = trainer.model.a.item(), trainer.model.b.item()
state = dataclasses.asdict(trainer.state)
checkpoint = os.path.join(tmpdir, "checkpoint-5")
self.convert_to_sharded_checkpoint(checkpoint, load_safe=initial_safe, save_safe=loaded_safe)
# Reinitialize trainer
trainer = get_regression_trainer(
output_dir=tmpdir, train_len=128, save_steps=5, learning_rate=0.1, save_safetensors=loaded_safe
)
trainer.train(resume_from_checkpoint=checkpoint)
(a1, b1) = trainer.model.a.item(), trainer.model.b.item()
state1 = dataclasses.asdict(trainer.state)
self.assertEqual(a, a1)
self.assertEqual(b, b1)
self.check_trainer_state_are_the_same(state, state1)
@require_torch_up_to_2_accelerators
def test_resume_training_with_gradient_accumulation(self):
# This test will fail for more than 2 GPUs since the batch size will get bigger and with the number of
# save_steps, the checkpoint will resume training at epoch 2 or more (so the data seen by the model
# won't be the same since the training dataloader is shuffled).
with tempfile.TemporaryDirectory() as tmpdir:
trainer = get_regression_trainer(
output_dir=tmpdir,
train_len=128,
gradient_accumulation_steps=2,
per_device_train_batch_size=4,
save_steps=5,
learning_rate=0.1,
)
trainer.train()
(a, b) = trainer.model.a.item(), trainer.model.b.item()
state = dataclasses.asdict(trainer.state)
checkpoint = os.path.join(tmpdir, "checkpoint-5")
# Reinitialize trainer
trainer = get_regression_trainer(
output_dir=tmpdir,
train_len=128,
gradient_accumulation_steps=2,
per_device_train_batch_size=4,
save_steps=5,
learning_rate=0.1,
)
trainer.train(resume_from_checkpoint=checkpoint)
(a1, b1) = trainer.model.a.item(), trainer.model.b.item()
state1 = dataclasses.asdict(trainer.state)
self.assertEqual(a, a1)
self.assertEqual(b, b1)
self.check_trainer_state_are_the_same(state, state1)
@require_torch_up_to_2_accelerators
def test_resume_training_with_frozen_params(self):
# This test will fail for more than 2 GPUs since the batch size will get bigger and with the number of
# save_steps, the checkpoint will resume training at epoch 2 or more (so the data seen by the model
# won't be the same since the training dataloader is shuffled).
with tempfile.TemporaryDirectory() as tmpdir:
trainer = get_regression_trainer(
output_dir=tmpdir,
train_len=128,
per_device_train_batch_size=4,
save_steps=5,
learning_rate=0.1,
)
trainer.model.a.requires_grad_(False)
trainer.train()
(a, b) = trainer.model.a.item(), trainer.model.b.item()
state = dataclasses.asdict(trainer.state)
checkpoint = os.path.join(tmpdir, "checkpoint-5")
# Reinitialize trainer
trainer = get_regression_trainer(
output_dir=tmpdir,
train_len=128,
per_device_train_batch_size=4,
save_steps=5,
learning_rate=0.1,
)
trainer.model.a.requires_grad_(False)
trainer.train(resume_from_checkpoint=checkpoint)
self.assertFalse(trainer.model.a.requires_grad)
(a1, b1) = trainer.model.a.item(), trainer.model.b.item()
state1 = dataclasses.asdict(trainer.state)
self.assertEqual(a, a1)
self.assertEqual(b, b1)
self.check_trainer_state_are_the_same(state, state1)
def test_load_best_model_at_end(self):
total = int(self.n_epochs * 64 / self.batch_size)
with tempfile.TemporaryDirectory() as tmpdir:
trainer = get_regression_trainer(
a=1.5,
b=2.5,
output_dir=tmpdir,
learning_rate=0.1,
eval_steps=5,
eval_strategy="steps",
save_steps=5,
load_best_model_at_end=True,
)
self.assertFalse(trainer.args.greater_is_better)
trainer.train()
self.check_saved_checkpoints(tmpdir, 5, total)
self.check_best_model_has_been_loaded(tmpdir, 5, total, trainer, "eval_loss")
with tempfile.TemporaryDirectory() as tmpdir:
trainer = get_regression_trainer(
a=1.5,
b=2.5,
output_dir=tmpdir,
learning_rate=0.1,
eval_steps=5,
eval_strategy="steps",
save_steps=5,
load_best_model_at_end=True,
metric_for_best_model="accuracy",
compute_metrics=AlmostAccuracy(),
)
self.assertTrue(trainer.args.greater_is_better)
trainer.train()
self.check_saved_checkpoints(tmpdir, 5, total)
self.check_best_model_has_been_loaded(tmpdir, 5, total, trainer, "eval_accuracy", greater_is_better=True)
with tempfile.TemporaryDirectory() as tmpdir:
trainer = get_regression_trainer(
a=1.5,
b=2.5,
output_dir=tmpdir,
learning_rate=0.1,
eval_strategy="epoch",
save_strategy="epoch",
load_best_model_at_end=True,
metric_for_best_model="accuracy",
compute_metrics=AlmostAccuracy(),
)
self.assertTrue(trainer.args.greater_is_better)
trainer.train()
self.check_saved_checkpoints(tmpdir, 64 // self.batch_size, total)
self.check_best_model_has_been_loaded(
tmpdir, 64 // self.batch_size, total, trainer, "eval_accuracy", greater_is_better=True
)
# Test this works with a non PreTrainedModel
with tempfile.TemporaryDirectory() as tmpdir:
trainer = get_regression_trainer(
output_dir=tmpdir,
learning_rate=0.1,
eval_steps=5,
eval_strategy="steps",
save_steps=5,
load_best_model_at_end=True,
pretrained=False,
)
self.assertFalse(trainer.args.greater_is_better)
trainer.train()
self.check_saved_checkpoints(tmpdir, 5, total, is_pretrained=False)
self.check_best_model_has_been_loaded(tmpdir, 5, total, trainer, "eval_loss", is_pretrained=False)
@require_safetensors
def test_load_best_model_from_safetensors(self):
total = int(self.n_epochs * 64 / self.batch_size)
for save_safetensors, pretrained in product([False, True], [False, True]):
with tempfile.TemporaryDirectory() as tmpdir:
trainer = get_regression_trainer(
a=1.5,
b=2.5,
output_dir=tmpdir,
learning_rate=0.1,
eval_steps=5,
eval_strategy="steps",
save_steps=5,
load_best_model_at_end=True,
save_safetensors=save_safetensors,
pretrained=pretrained,
)
self.assertFalse(trainer.args.greater_is_better)
trainer.train()
self.check_saved_checkpoints(tmpdir, 5, total, is_pretrained=pretrained, safe_weights=save_safetensors)
self.check_best_model_has_been_loaded(
tmpdir, 5, total, trainer, "eval_loss", is_pretrained=pretrained, safe_weights=save_safetensors
)
@slow
def test_trainer_eval_mrpc(self):
MODEL_ID = "google-bert/bert-base-cased-finetuned-mrpc"
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID)
model = AutoModelForSequenceClassification.from_pretrained(MODEL_ID)
data_args = GlueDataTrainingArguments(
task_name="mrpc", data_dir=f"{get_tests_dir()}/fixtures/tests_samples/MRPC", overwrite_cache=True
)
eval_dataset = GlueDataset(data_args, tokenizer=tokenizer, mode="dev")
training_args = TrainingArguments(output_dir="./examples", use_cpu=True)
trainer = Trainer(model=model, args=training_args, eval_dataset=eval_dataset)
result = trainer.evaluate()
self.assertLess(result["eval_loss"], 0.2)
@slow
def test_trainer_eval_multiple(self):
MODEL_ID = "openai-community/gpt2"
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID)
model = AutoModelForCausalLM.from_pretrained(MODEL_ID)
dataset = LineByLineTextDataset(
tokenizer=tokenizer,
file_path=PATH_SAMPLE_TEXT,
block_size=tokenizer.max_len_single_sentence,
)
for example in dataset.examples:
example["labels"] = example["input_ids"]
training_args = TrainingArguments(
output_dir="./examples",
use_cpu=True,
per_device_eval_batch_size=1,
)
trainer = Trainer(
model=model,
args=training_args,
eval_dataset={
"data1": dataset,
"data2": dataset,
},
)
result = trainer.evaluate()
self.assertIn("eval_data1_loss", result)
self.assertIn("eval_data2_loss", result)
@slow
def test_trainer_eval_lm(self):
MODEL_ID = "distilbert/distilroberta-base"
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID)
dataset = LineByLineTextDataset(
tokenizer=tokenizer,
file_path=PATH_SAMPLE_TEXT,
block_size=tokenizer.max_len_single_sentence,
)
self.assertEqual(len(dataset), 31)
def test_training_iterable_dataset(self):
config = RegressionModelConfig()
model = RegressionPreTrainedModel(config)
# Adding one column not used by the model should have no impact
train_dataset = SampleIterableDataset(label_names=["labels", "extra"])
args = RegressionTrainingArguments(output_dir="./examples", max_steps=4)
trainer = Trainer(model=model, args=args, train_dataset=train_dataset)
trainer.train()
self.assertEqual(trainer.state.global_step, 4)
loader = trainer.get_train_dataloader()
self.assertIsInstance(loader, torch.utils.data.DataLoader)
self.assertIsInstance(loader.sampler, torch.utils.data.dataloader._InfiniteConstantSampler)
def test_evaluation_iterable_dataset(self):
config = RegressionModelConfig(a=1.5, b=2.5)
model = RegressionPreTrainedModel(config)
# Adding one column not used by the model should have no impact
eval_dataset = SampleIterableDataset(label_names=["labels", "extra"])
args = RegressionTrainingArguments(output_dir="./examples")
trainer = Trainer(model=model, args=args, eval_dataset=eval_dataset, compute_metrics=AlmostAccuracy())
results = trainer.evaluate()
x, y = trainer.eval_dataset.dataset.x, trainer.eval_dataset.dataset.ys[0]
pred = 1.5 * x + 2.5
expected_loss = ((pred - y) ** 2).mean()
self.assertAlmostEqual(results["eval_loss"], expected_loss)
expected_acc = AlmostAccuracy()((pred, y))["accuracy"]
self.assertAlmostEqual(results["eval_accuracy"], expected_acc)
# With a number of elements not a round multiple of the batch size
eval_dataset = SampleIterableDataset(length=66)
results = trainer.evaluate(eval_dataset)
x, y = eval_dataset.dataset.x, eval_dataset.dataset.ys[0]
pred = 1.5 * x + 2.5
expected_loss = ((pred - y) ** 2).mean()
self.assertAlmostEqual(results["eval_loss"], expected_loss)
expected_acc = AlmostAccuracy()((pred, y))["accuracy"]
self.assertAlmostEqual(results["eval_accuracy"], expected_acc)
def test_predict_iterable_dataset(self):
config = RegressionModelConfig(a=1.5, b=2.5)
model = RegressionPreTrainedModel(config)
eval_dataset = SampleIterableDataset()
args = RegressionTrainingArguments(output_dir="./examples")
trainer = Trainer(model=model, args=args, eval_dataset=eval_dataset, compute_metrics=AlmostAccuracy())
preds = trainer.predict(trainer.eval_dataset).predictions
x = eval_dataset.dataset.x
self.assertTrue(np.allclose(preds, 1.5 * x + 2.5))
# With a number of elements not a round multiple of the batch size
# Adding one column not used by the model should have no impact
test_dataset = SampleIterableDataset(length=66, label_names=["labels", "extra"])
preds = trainer.predict(test_dataset).predictions
x = test_dataset.dataset.x
self.assertTrue(np.allclose(preds, 1.5 * x + 2.5))
def test_num_train_epochs_in_training(self):
# len(train_dl) < gradient_accumulation_steps shouldn't give ``ZeroDivisionError`` when ``max_steps`` is given.
# It should give 1 update step for each epoch.
trainer = get_regression_trainer(
max_steps=3, train_len=64, per_device_train_batch_size=16, gradient_accumulation_steps=5
)
train_output = trainer.train()
self.assertEqual(train_output.global_step, 3)
# Even ``max_steps`` is not specified, we still expect 1 update step for each epoch if
# len(train_dl) < gradient_accumulation_steps.
trainer = get_regression_trainer(train_len=64, per_device_train_batch_size=16, gradient_accumulation_steps=5)
train_output = trainer.train()
self.assertEqual(train_output.global_step, int(self.n_epochs))
def test_early_stopping_callback(self):
# early stopping stops training before num_training_epochs
with tempfile.TemporaryDirectory() as tmp_dir:
trainer = get_regression_trainer(
output_dir=tmp_dir,
num_train_epochs=20,
gradient_accumulation_steps=1,
per_device_train_batch_size=16,
load_best_model_at_end=True,
eval_strategy=IntervalStrategy.EPOCH,
save_strategy=IntervalStrategy.EPOCH,
compute_metrics=AlmostAccuracy(),
metric_for_best_model="accuracy",
)
trainer.add_callback(EarlyStoppingCallback(1, 0.0001))
train_output = trainer.train()
self.assertLess(train_output.global_step, 20 * 64 / 16)
# Invalid inputs to trainer with early stopping callback result in assertion error
with tempfile.TemporaryDirectory() as tmp_dir:
trainer = get_regression_trainer(
output_dir=tmp_dir,
num_train_epochs=20,
gradient_accumulation_steps=1,
per_device_train_batch_size=16,
eval_strategy=IntervalStrategy.EPOCH,
compute_metrics=AlmostAccuracy(),
metric_for_best_model="accuracy",
)
trainer.add_callback(EarlyStoppingCallback(1))
self.assertEqual(trainer.state.global_step, 0)
try:
trainer.train()
except AssertionError:
self.assertEqual(trainer.state.global_step, 0)
def test_flos_extraction(self):
trainer = get_regression_trainer(learning_rate=0.1)
def assert_flos_extraction(trainer, wrapped_model_to_check):
self.assertEqual(trainer.model, trainer.accelerator.unwrap_model(wrapped_model_to_check))
self.assertGreaterEqual(
getattr(trainer.accelerator.unwrap_model(wrapped_model_to_check).config, "total_flos", 0), 0
)
# with plain model
assert_flos_extraction(trainer, trainer.model)
# with enforced DataParallel
assert_flos_extraction(trainer, nn.DataParallel(trainer.model))
trainer.train()
self.assertTrue(isinstance(trainer.state.total_flos, float))
def check_checkpoint_deletion(self, trainer, output_dir, expected):
# Make fake checkpoints
for n in [5, 10, 15, 20, 25]:
os.makedirs(os.path.join(output_dir, f"{PREFIX_CHECKPOINT_DIR}-{n}"), exist_ok=True)
trainer._rotate_checkpoints(output_dir=output_dir)
glob_checkpoints = [str(x) for x in Path(output_dir).glob(f"{PREFIX_CHECKPOINT_DIR}-*")]
values = [int(re.match(f".*{PREFIX_CHECKPOINT_DIR}-([0-9]+)", d).groups()[0]) for d in glob_checkpoints]
self.assertSetEqual(set(values), set(expected))
def test_checkpoint_rotation(self):
with tempfile.TemporaryDirectory() as tmp_dir:
# Without best model at end
trainer = get_regression_trainer(output_dir=tmp_dir, save_total_limit=2)
self.check_checkpoint_deletion(trainer, tmp_dir, [20, 25])
# With best model at end
trainer = get_regression_trainer(
output_dir=tmp_dir, eval_strategy="steps", load_best_model_at_end=True, save_total_limit=2
)
trainer.state.best_model_checkpoint = os.path.join(tmp_dir, "checkpoint-5")
self.check_checkpoint_deletion(trainer, tmp_dir, [5, 25])
# Edge case: we don't always honor save_total_limit=1 if load_best_model_at_end=True to be able to resume
# from checkpoint
trainer = get_regression_trainer(
output_dir=tmp_dir, eval_strategy="steps", load_best_model_at_end=True, save_total_limit=1
)
trainer.state.best_model_checkpoint = os.path.join(tmp_dir, "checkpoint-25")
self.check_checkpoint_deletion(trainer, tmp_dir, [25])
trainer.state.best_model_checkpoint = os.path.join(tmp_dir, "checkpoint-5")
self.check_checkpoint_deletion(trainer, tmp_dir, [5, 25])
def test_compare_trainer_and_checkpoint_args_logging(self):
logger = logging.get_logger()
with tempfile.TemporaryDirectory() as tmpdir, CaptureLogger(logger) as cl:
trainer = get_regression_trainer(
output_dir=tmpdir,
train_len=128,
eval_steps=5,
gradient_accumulation_steps=2,
per_device_train_batch_size=4,
save_steps=5,
learning_rate=0.1,
)
trainer.train()
checkpoint = os.path.join(tmpdir, "checkpoint-5")
checkpoint_trainer = get_regression_trainer(
output_dir=tmpdir,
train_len=256,
eval_steps=10,
gradient_accumulation_steps=4,
per_device_train_batch_size=8,
save_steps=10,
learning_rate=0.1,
)
checkpoint_trainer.train(resume_from_checkpoint=checkpoint)
self.assertIn("save_steps: 10 (from args) != 5 (from trainer_state.json)", cl.out)
self.assertIn(
"per_device_train_batch_size: 8 (from args) != 4 (from trainer_state.json)",
cl.out,
)
self.assertIn(
"eval_steps: 10 (from args) != 5 (from trainer_state.json)",
cl.out,
)
def check_mem_metrics(self, trainer, check_func):
metrics = trainer.train().metrics
check_func("init_mem_cpu_alloc_delta", metrics)
check_func("train_mem_cpu_alloc_delta", metrics)
if backend_device_count(torch_device) > 0:
check_func("init_mem_gpu_alloc_delta", metrics)
check_func("train_mem_gpu_alloc_delta", metrics)
metrics = trainer.evaluate()
check_func("eval_mem_cpu_alloc_delta", metrics)
if backend_device_count(torch_device) > 0:
check_func("eval_mem_gpu_alloc_delta", metrics)
metrics = trainer.predict(RegressionDataset()).metrics
check_func("test_mem_cpu_alloc_delta", metrics)
if backend_device_count(torch_device) > 0:
check_func("test_mem_gpu_alloc_delta", metrics)
def test_mem_metrics(self):
# with mem metrics enabled
trainer = get_regression_trainer(skip_memory_metrics=False)
self.check_mem_metrics(trainer, self.assertIn)
# with mem metrics disabled
trainer = get_regression_trainer(skip_memory_metrics=True)
self.check_mem_metrics(trainer, self.assertNotIn)
@require_torch_accelerator
def test_fp16_full_eval(self):
# this is a sensitive test so let's keep debugging printouts in place for quick diagnosis.
# it's using pretty large safety margins, but small enough to detect broken functionality.
debug = 0
n_gpus = backend_device_count(torch_device)
bs = 8
eval_len = 16 * n_gpus
# make the params somewhat big so that there will be enough RAM consumed to be able to
# measure things. We should get about 64KB for a+b in fp32
a = torch.ones(1000, bs) + 0.001
b = torch.ones(1000, bs) - 0.001
# 1. with fp16_full_eval disabled
trainer = get_regression_trainer(a=a, b=b, eval_len=eval_len, skip_memory_metrics=False)
metrics = trainer.evaluate()
del trainer
gc.collect()
fp32_init = metrics["init_mem_gpu_alloc_delta"]
fp32_eval = metrics["eval_mem_gpu_alloc_delta"]
if debug:
print(f"fp32_init {fp32_init}")
print(f"fp32_eval {fp32_eval}")
# here we expect the model to be preloaded in trainer.__init__ and consume around 64K gpu ram.
# perfect world: fp32_init == 64<<10
self.assertGreater(fp32_init, 59_000)
# after eval should be no extra memory allocated - with a small margin (other than the peak
# memory consumption for the forward calculation that gets recovered)
# perfect world: fp32_eval == close to zero
self.assertLess(fp32_eval, 5_000)
# 2. with fp16_full_eval enabled
trainer = get_regression_trainer(a=a, b=b, eval_len=eval_len, fp16_full_eval=True, skip_memory_metrics=False)
metrics = trainer.evaluate()
fp16_init = metrics["init_mem_gpu_alloc_delta"]
fp16_eval = metrics["eval_mem_gpu_alloc_delta"]
if debug:
print(f"fp16_init {fp16_init}")
print(f"fp16_eval {fp16_eval}")
# here we expect the model to not be preloaded in trainer.__init__, so with a small margin it should be close to 0
# perfect world: fp16_init == close to zero
self.assertLess(fp16_init, 5_000)
# here we put the model on device in eval and only `half()` of it, i.e. about 32K,(again we ignore the peak margin which gets returned back)
# perfect world: fp32_init == 32<<10
self.assertGreater(fp16_eval, 27_000)
# 3. relative comparison fp32 vs full fp16
# should be about half of fp16_init
# perfect world: fp32_init/2 == fp16_eval
self.assertAlmostEqual(fp16_eval, fp32_init / 2, delta=5_000)
@require_torch_non_multi_gpu
@require_torchdynamo
@require_torch_tensorrt_fx
def test_torchdynamo_full_eval(self):
import torchdynamo
# torchdynamo at the moment doesn't support DP/DDP, therefore require a single gpu
n_gpus = get_gpu_count()
bs = 8
eval_len = 16 * n_gpus
# make the params are somewhat big so that there will be enough RAM consumed to be able to
# measure things. We should get about 64KB for a+b in fp32
a = torch.ones(1000, bs) + 0.001
b = torch.ones(1000, bs) - 0.001
# 1. Default - without TorchDynamo
trainer = get_regression_trainer(a=a, b=b, eval_len=eval_len)
metrics = trainer.evaluate()
original_eval_loss = metrics["eval_loss"]
del trainer
# 2. TorchDynamo eager
trainer = get_regression_trainer(a=a, b=b, eval_len=eval_len, torchdynamo="eager")
metrics = trainer.evaluate()
self.assertAlmostEqual(metrics["eval_loss"], original_eval_loss)
del trainer
torchdynamo.reset()
# 3. TorchDynamo nvfuser
trainer = get_regression_trainer(a=a, b=b, eval_len=eval_len, torchdynamo="nvfuser")
metrics = trainer.evaluate()
self.assertAlmostEqual(metrics["eval_loss"], original_eval_loss)
torchdynamo.reset()
# 4. TorchDynamo fx2trt
trainer = get_regression_trainer(a=a, b=b, eval_len=eval_len, torchdynamo="fx2trt")
metrics = trainer.evaluate()
self.assertAlmostEqual(metrics["eval_loss"], original_eval_loss)
torchdynamo.reset()
@unittest.skip("torch 2.0.0 gives `ModuleNotFoundError: No module named 'torchdynamo'`.")
@require_torch_non_multi_gpu
@require_torchdynamo
def test_torchdynamo_memory(self):
# torchdynamo at the moment doesn't support DP/DDP, therefore require a single gpu
import torchdynamo
class CustomTrainer(Trainer):
def compute_loss(self, model, inputs, return_outputs=False):
x = inputs["x"]
output = model(x)
if self.args.n_gpu == 1:
return output.mean()
return output
class MyModule(torch.nn.Module):
"""Simple module that does aggressive fusion"""
def __init__(self):
super().__init__()
def forward(self, x):
for _ in range(20):
x = torch.cos(x)
return x
mod = MyModule()
# 1. without TorchDynamo (eager baseline)
a = torch.ones(1024, 1024, device="cuda", requires_grad=True)
a.grad = None
trainer = CustomTrainer(model=mod)
# warmup
for _ in range(10):
orig_loss = trainer.training_step(mod, {"x": a})
# resets
gc.collect()
torch.cuda.empty_cache()
torch.cuda.reset_peak_memory_stats()
orig_loss = trainer.training_step(mod, {"x": a})
orig_peak_mem = torch.cuda.max_memory_allocated()
torchdynamo.reset()
del trainer
# 2. TorchDynamo nvfuser
a = torch.ones(1024, 1024, device="cuda", requires_grad=True)
a.grad = None
args = TrainingArguments(output_dir="None", torchdynamo="nvfuser")
trainer = CustomTrainer(model=mod, args=args)
# warmup
for _ in range(10):
loss = trainer.training_step(mod, {"x": a})
# resets
gc.collect()
torch.cuda.empty_cache()
torch.cuda.reset_peak_memory_stats()
loss = trainer.training_step(mod, {"x": a})
peak_mem = torch.cuda.max_memory_allocated()
torchdynamo.reset()
del trainer
# Functional check
self.assertAlmostEqual(loss, orig_loss)
# AOT Autograd recomputaion and nvfuser recomputation optimization
# aggressively fuses the operations and reduce the memory footprint.
self.assertGreater(orig_peak_mem, peak_mem * 2)
@require_torch_accelerator
@require_torch_bf16
def test_bf16_full_eval(self):
# note: most of the logic is the same as test_fp16_full_eval
# this is a sensitive test so let's keep debugging printouts in place for quick diagnosis.
# it's using pretty large safety margins, but small enough to detect broken functionality.
debug = 0
n_gpus = backend_device_count(torch_device)
bs = 8
eval_len = 16 * n_gpus
# make the params somewhat big so that there will be enough RAM consumed to be able to
# measure things. We should get about 64KB for a+b in fp32
a = torch.ones(1000, bs) + 0.001
b = torch.ones(1000, bs) - 0.001
# 1. with bf16_full_eval disabled
trainer = get_regression_trainer(a=a, b=b, eval_len=eval_len, skip_memory_metrics=False)
metrics = trainer.evaluate()
del trainer
gc.collect()
fp32_init = metrics["init_mem_gpu_alloc_delta"]
fp32_eval = metrics["eval_mem_gpu_alloc_delta"]
if debug:
print(f"fp32_init {fp32_init}")
print(f"fp32_eval {fp32_eval}")
# here we expect the model to be preloaded in trainer.__init__ and consume around 64K gpu ram.
# perfect world: fp32_init == 64<<10
self.assertGreater(fp32_init, 59_000)
# after eval should be no extra memory allocated - with a small margin (other than the peak
# memory consumption for the forward calculation that gets recovered)
# perfect world: fp32_eval == close to zero
self.assertLess(fp32_eval, 5_000)
# 2. with bf16_full_eval enabled
trainer = get_regression_trainer(a=a, b=b, eval_len=eval_len, bf16_full_eval=True, skip_memory_metrics=False)
metrics = trainer.evaluate()
bf16_init = metrics["init_mem_gpu_alloc_delta"]
bf16_eval = metrics["eval_mem_gpu_alloc_delta"]
if debug:
print(f"bf16_init {bf16_init}")
print(f"bf16_eval {bf16_eval}")
# here we expect the model to not be preloaded in trainer.__init__, so with a small margin it should be close to 0
# perfect world: bf16_init == close to zero
self.assertLess(bf16_init, 5_000)
# here we put the model on device in eval and only `half()` of it, i.e. about 32K,(again we ignore the peak margin which gets returned back)
# perfect world: fp32_init == 32<<10
self.assertGreater(bf16_eval, 27_000)
# 3. relative comparison fp32 vs full bf16
# should be about half of bf16_init
# perfect world: fp32_init/2 == bf16_eval
self.assertAlmostEqual(bf16_eval, fp32_init / 2, delta=5_000)
def test_no_wd_param_group(self):
model = nn.Sequential(TstLayer(128), nn.ModuleList([TstLayer(128), TstLayer(128)]))
trainer = Trainer(model=model)
trainer.create_optimizer_and_scheduler(10)
wd_names = ['0.linear1.weight', '0.linear2.weight', '1.0.linear1.weight', '1.0.linear2.weight', '1.1.linear1.weight', '1.1.linear2.weight'] # fmt: skip
wd_params = [p for n, p in model.named_parameters() if n in wd_names]
no_wd_params = [p for n, p in model.named_parameters() if n not in wd_names]
self.assertListEqual(trainer.optimizer.param_groups[0]["params"], wd_params)
self.assertListEqual(trainer.optimizer.param_groups[1]["params"], no_wd_params)
@slow
@require_torch_multi_accelerator
def test_end_to_end_example(self):
# Tests that `translation.py` will run without issues
script_path = os.path.abspath(
os.path.join(
os.path.dirname(__file__), "..", "..", "examples", "pytorch", "translation", "run_translation.py"
)
)
with tempfile.TemporaryDirectory() as tmpdir:
command = [
"accelerate",
"launch",
script_path,
"--model_name_or_path",
"google-t5/t5-small",
"--per_device_train_batch_size",
"1",
"--output_dir",
tmpdir,
"--overwrite_output_dir",
"--do_train",
"--max_train_samples",
"64",
"--num_train_epochs",
"1",
"--dataset_name",
"wmt16",
"--dataset_config",
"ro-en",
"--source_lang",
"en",
"--target_lang",
"ro",
"--do_predict",
"--max_predict_samples",
"64",
"--predict_with_generate",
"--ddp_timeout",
"60",
]
execute_subprocess_async(command)
# successful return here == success - any errors would have caused an error or a timeout in the sub-call
def test_accelerator_config_empty(self):
# Checks that a config can be made with the defaults if not passed
with tempfile.TemporaryDirectory() as tmp_dir:
config = RegressionModelConfig(a=1.5, b=2.5)
model = RegressionPreTrainedModel(config)
eval_dataset = SampleIterableDataset()
# Leaves one option as something *not* basic
args = RegressionTrainingArguments(
output_dir=tmp_dir,
)
trainer = Trainer(model=model, args=args, eval_dataset=eval_dataset)
self.assertEqual(trainer.accelerator.split_batches, False)
self.assertEqual(trainer.accelerator.dispatch_batches, None)
self.assertEqual(trainer.accelerator.even_batches, True)
self.assertEqual(trainer.accelerator.use_seedable_sampler, True)
if GRAD_ACCUM_KWARGS_VERSION_AVAILABLE:
# gradient accumulation kwargs configures gradient_state
self.assertNotIn("sync_each_batch", trainer.accelerator.gradient_state.plugin_kwargs)
def test_accelerator_config_from_dict(self):
# Checks that accelerator kwargs can be passed through
# and the accelerator is initialized respectively
with tempfile.TemporaryDirectory() as tmp_dir:
config = RegressionModelConfig(a=1.5, b=2.5)
model = RegressionPreTrainedModel(config)
eval_dataset = SampleIterableDataset()
accelerator_config = {
"split_batches": True,
"dispatch_batches": True,
"even_batches": False,
"use_seedable_sampler": True,
}
if GRAD_ACCUM_KWARGS_VERSION_AVAILABLE:
accelerator_config["gradient_accumulation_kwargs"] = {"sync_each_batch": True}
# Leaves all options as something *not* basic
args = RegressionTrainingArguments(
output_dir=tmp_dir,
accelerator_config=accelerator_config,
)
trainer = Trainer(model=model, args=args, eval_dataset=eval_dataset)
self.assertEqual(trainer.accelerator.split_batches, True)
self.assertEqual(trainer.accelerator.dispatch_batches, True)
self.assertEqual(trainer.accelerator.even_batches, False)
self.assertEqual(trainer.accelerator.use_seedable_sampler, True)
if GRAD_ACCUM_KWARGS_VERSION_AVAILABLE:
self.assertEqual(trainer.accelerator.gradient_state.plugin_kwargs["sync_each_batch"], True)
def test_accelerator_config_from_yaml(self):
# Checks that accelerator kwargs can be passed through
# and the accelerator is initialized respectively
with tempfile.TemporaryDirectory() as tmp_dir:
path_file = Path(tmp_dir) / "accelerator_config.json"
with open(path_file, "w") as f:
accelerator_config = {
"split_batches": True,
"dispatch_batches": True,
"even_batches": False,
"use_seedable_sampler": False,
}
if GRAD_ACCUM_KWARGS_VERSION_AVAILABLE:
accelerator_config["gradient_accumulation_kwargs"] = {"sync_each_batch": True}
json.dump(accelerator_config, f)
config = RegressionModelConfig(a=1.5, b=2.5)
model = RegressionPreTrainedModel(config)
eval_dataset = SampleIterableDataset()
# Leaves all options as something *not* basic
args = RegressionTrainingArguments(output_dir=tmp_dir, accelerator_config=path_file)
trainer = Trainer(model=model, args=args, eval_dataset=eval_dataset)
self.assertEqual(trainer.accelerator.split_batches, True)
self.assertEqual(trainer.accelerator.dispatch_batches, True)
self.assertEqual(trainer.accelerator.even_batches, False)
self.assertEqual(trainer.accelerator.use_seedable_sampler, False)
if GRAD_ACCUM_KWARGS_VERSION_AVAILABLE:
self.assertEqual(trainer.accelerator.gradient_state.plugin_kwargs["sync_each_batch"], True)
def test_accelerator_config_from_dataclass(self):
# Checks that accelerator kwargs can be passed through
# and the accelerator is initialized respectively
accelerator_config = AcceleratorConfig(
split_batches=True,
dispatch_batches=True,
even_batches=False,
use_seedable_sampler=False,
)
config = RegressionModelConfig(a=1.5, b=2.5)
model = RegressionPreTrainedModel(config)
eval_dataset = SampleIterableDataset()
with tempfile.TemporaryDirectory() as tmp_dir:
args = RegressionTrainingArguments(output_dir=tmp_dir, accelerator_config=accelerator_config)
trainer = Trainer(model=model, args=args, eval_dataset=eval_dataset)
self.assertEqual(trainer.accelerator.split_batches, True)
self.assertEqual(trainer.accelerator.dispatch_batches, True)
self.assertEqual(trainer.accelerator.even_batches, False)
self.assertEqual(trainer.accelerator.use_seedable_sampler, False)
@require_accelerate_version_min_0_28
def test_accelerate_config_from_dataclass_grad_accum(self):
# Checks that accelerator kwargs can be passed through
# and the accelerator is initialized respectively
grad_acc_kwargs = {
"num_steps": 10,
"adjust_scheduler": False,
"sync_with_dataloader": False,
"sync_each_batch": True,
}
accelerator_config = AcceleratorConfig(
split_batches=True,
dispatch_batches=True,
even_batches=False,
use_seedable_sampler=False,
gradient_accumulation_kwargs=grad_acc_kwargs,
)
config = RegressionModelConfig(a=1.5, b=2.5)
model = RegressionPreTrainedModel(config)
eval_dataset = SampleIterableDataset()
with tempfile.TemporaryDirectory() as tmp_dir:
args = RegressionTrainingArguments(output_dir=tmp_dir, accelerator_config=accelerator_config)
trainer = Trainer(model=model, args=args, eval_dataset=eval_dataset)
self.assertEqual(trainer.accelerator.gradient_state.plugin_kwargs["num_steps"], 10)
self.assertEqual(trainer.accelerator.gradient_state.plugin_kwargs["adjust_scheduler"], False)
self.assertEqual(trainer.accelerator.gradient_state.plugin_kwargs["sync_with_dataloader"], False)
self.assertEqual(trainer.accelerator.gradient_state.plugin_kwargs["sync_each_batch"], True)
def test_accelerator_config_from_partial(self):
# Checks that accelerator kwargs can be passed through
# and the accelerator is initialized respectively
with tempfile.TemporaryDirectory() as tmp_dir:
config = RegressionModelConfig(a=1.5, b=2.5)
model = RegressionPreTrainedModel(config)
eval_dataset = SampleIterableDataset()
# Leaves one option as something *not* basic
args = RegressionTrainingArguments(
output_dir=tmp_dir,
accelerator_config={
"split_batches": True,
},
)
trainer = Trainer(model=model, args=args, eval_dataset=eval_dataset)
self.assertEqual(trainer.accelerator.split_batches, True)
self.assertEqual(trainer.accelerator.dispatch_batches, None)
self.assertEqual(trainer.accelerator.even_batches, True)
self.assertEqual(trainer.accelerator.use_seedable_sampler, True)
def test_accelerator_config_from_dict_with_deprecated_args(self):
# Checks that accelerator kwargs can be passed through
# and the accelerator is initialized respectively
# and maintains the deprecated args if passed in
with tempfile.TemporaryDirectory() as tmp_dir:
config = RegressionModelConfig(a=1.5, b=2.5)
model = RegressionPreTrainedModel(config)
eval_dataset = SampleIterableDataset()
# Leaves all options as something *not* basic
with self.assertWarns(FutureWarning) as cm:
args = RegressionTrainingArguments(
output_dir=tmp_dir,
accelerator_config={
"split_batches": True,
},
dispatch_batches=False,
)
self.assertIn("dispatch_batches", str(cm.warnings[0].message))
trainer = Trainer(model=model, args=args, eval_dataset=eval_dataset)
self.assertEqual(trainer.accelerator.dispatch_batches, False)
self.assertEqual(trainer.accelerator.split_batches, True)
with self.assertWarns(FutureWarning) as cm:
args = RegressionTrainingArguments(
output_dir=tmp_dir,
accelerator_config={
"even_batches": False,
},
split_batches=True,
)
self.assertIn("split_batches", str(cm.warnings[0].message))
trainer = Trainer(model=model, args=args, eval_dataset=eval_dataset)
self.assertEqual(trainer.accelerator.split_batches, True)
self.assertEqual(trainer.accelerator.even_batches, False)
self.assertEqual(trainer.accelerator.dispatch_batches, None)
def test_accelerator_config_only_deprecated_args(self):
with tempfile.TemporaryDirectory() as tmp_dir:
with self.assertWarns(FutureWarning) as cm:
args = RegressionTrainingArguments(
output_dir=tmp_dir,
split_batches=True,
)
self.assertIn("split_batches", str(cm.warnings[0].message))
config = RegressionModelConfig(a=1.5, b=2.5)
model = RegressionPreTrainedModel(config)
eval_dataset = SampleIterableDataset()
trainer = Trainer(model=model, args=args, eval_dataset=eval_dataset)
self.assertEqual(trainer.accelerator.split_batches, True)
@require_accelerate_version_min_0_28
def test_accelerator_config_from_dict_grad_accum_num_steps(self):
with tempfile.TemporaryDirectory() as tmp_dir:
config = RegressionModelConfig(a=1.5, b=2.5)
model = RegressionPreTrainedModel(config)
eval_dataset = SampleIterableDataset()
# case - TrainingArguments.gradient_accumulation_steps == 1
# - gradient_accumulation_kwargs['num_steps] == 1
# results in grad accum set to 1
args = RegressionTrainingArguments(
output_dir=tmp_dir,
gradient_accumulation_steps=1,
accelerator_config={
"gradient_accumulation_kwargs": {
"num_steps": 1,
}
},
)
trainer = Trainer(model=model, args=args, eval_dataset=eval_dataset)
self.assertEqual(trainer.accelerator.gradient_state.plugin_kwargs["num_steps"], 1)
# case - TrainingArguments.gradient_accumulation_steps > 1
# - gradient_accumulation_kwargs['num_steps] specified
# results in exception raised
args = RegressionTrainingArguments(
output_dir=tmp_dir,
gradient_accumulation_steps=2,
accelerator_config={
"gradient_accumulation_kwargs": {
"num_steps": 10,
}
},
)
with self.assertRaises(Exception) as context:
trainer = Trainer(model=model, args=args, eval_dataset=eval_dataset)
self.assertTrue("The `AcceleratorConfig`'s `num_steps` is set but" in str(context.exception))
def test_accelerator_config_not_instantiated(self):
# Checks that accelerator kwargs can be passed through
# and the accelerator is initialized respectively
with tempfile.TemporaryDirectory() as tmp_dir:
with self.assertRaises(NotImplementedError) as context:
_ = RegressionTrainingArguments(
output_dir=tmp_dir,
accelerator_config=AcceleratorConfig,
)
self.assertTrue("Tried passing in a callable to `accelerator_config`" in str(context.exception))
# Now test with a custom subclass
@dataclasses.dataclass
class CustomAcceleratorConfig(AcceleratorConfig):
pass
@dataclasses.dataclass
class CustomTrainingArguments(TrainingArguments):
accelerator_config: dict = dataclasses.field(
default=CustomAcceleratorConfig,
)
with tempfile.TemporaryDirectory() as tmp_dir:
with self.assertRaises(NotImplementedError) as context:
_ = CustomTrainingArguments(
output_dir=tmp_dir,
)
self.assertTrue("Tried passing in a callable to `accelerator_config`" in str(context.exception))
@require_torch
@is_staging_test
class TrainerIntegrationWithHubTester(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls._token = TOKEN
HfFolder.save_token(TOKEN)
@classmethod
def tearDownClass(cls):
for model in [
"test-trainer",
"test-trainer-epoch",
"test-trainer-step",
"test-trainer-tensorboard",
"test-trainer-tags",
]:
try:
delete_repo(token=cls._token, repo_id=model)
except HTTPError:
pass
try:
delete_repo(token=cls._token, repo_id="valid_org/test-trainer-org")
except HTTPError:
pass
def test_push_to_hub(self):
with tempfile.TemporaryDirectory() as tmp_dir:
trainer = get_regression_trainer(
output_dir=os.path.join(tmp_dir, "test-trainer"),
push_to_hub=True,
hub_token=self._token,
)
url = trainer.push_to_hub()
# Extract repo_name from the url
re_search = re.search(ENDPOINT_STAGING + r"/([^/]+/[^/]+)/", url)
self.assertTrue(re_search is not None)
repo_name = re_search.groups()[0]
self.assertEqual(repo_name, f"{USER}/test-trainer")
model = RegressionPreTrainedModel.from_pretrained(repo_name)
self.assertEqual(model.a.item(), trainer.model.a.item())
self.assertEqual(model.b.item(), trainer.model.b.item())
def test_push_to_hub_in_organization(self):
with tempfile.TemporaryDirectory() as tmp_dir:
trainer = get_regression_trainer(output_dir=tmp_dir)
trainer.save_model()
trainer = get_regression_trainer(
output_dir=os.path.join(tmp_dir, "test-trainer-org"),
push_to_hub=True,
hub_model_id="valid_org/test-trainer-org",
hub_token=self._token,
)
url = trainer.push_to_hub()
# Extract repo_name from the url
re_search = re.search(ENDPOINT_STAGING + r"/([^/]+/[^/]+)/", url)
self.assertTrue(re_search is not None)
repo_name = re_search.groups()[0]
self.assertEqual(repo_name, "valid_org/test-trainer-org")
model = RegressionPreTrainedModel.from_pretrained("valid_org/test-trainer-org")
self.assertEqual(model.a.item(), trainer.model.a.item())
self.assertEqual(model.b.item(), trainer.model.b.item())
def get_commit_history(self, repo):
commit_logs = subprocess.run(
"git log".split(),
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
check=True,
encoding="utf-8",
cwd=repo,
).stdout
commits = commit_logs.split("\n\n")[1::2]
return [commit.strip() for commit in commits]
def test_push_to_hub_with_saves_each_epoch(self):
with tempfile.TemporaryDirectory() as tmp_dir:
trainer = get_regression_trainer(
output_dir=os.path.join(tmp_dir, "test-trainer-epoch"),
push_to_hub=True,
hub_token=self._token,
# To avoid any flakiness if the training goes faster than the uploads.
hub_always_push=True,
save_strategy="epoch",
)
trainer.train()
commits = list_repo_commits(f"{USER}/test-trainer-epoch", token=self._token)
commits = [c.title for c in commits]
self.assertIn("initial commit", commits)
for i in range(1, 4):
self.assertIn(f"Training in progress, epoch {i}", commits)
def test_push_to_hub_with_saves_each_n_steps(self):
num_gpus = max(1, backend_device_count(torch_device))
if num_gpus > 2:
return
with tempfile.TemporaryDirectory() as tmp_dir:
trainer = get_regression_trainer(
output_dir=os.path.join(tmp_dir, "test-trainer-step"),
push_to_hub=True,
hub_token=self._token,
# To avoid any flakiness if the training goes faster than the uploads.
hub_always_push=True,
save_strategy="steps",
save_steps=5,
)
trainer.train()
commits = list_repo_commits(f"{USER}/test-trainer-step", token=self._token)
commits = [c.title for c in commits]
self.assertIn("initial commit", commits)
# max_steps depend on the number of available GPUs
max_steps = math.ceil(trainer.args.num_train_epochs * len(trainer.get_train_dataloader()))
for i in range(5, max_steps, 5):
self.assertIn(f"Training in progress, step {i}", commits)
@require_tensorboard
def test_push_to_hub_with_tensorboard_logs(self):
with tempfile.TemporaryDirectory() as tmp_dir:
trainer = get_regression_trainer(
output_dir=os.path.join(tmp_dir, "test-trainer-tensorboard"),
hub_token=self._token,
save_strategy="epoch",
report_to=["tensorboard"],
keep_report_to=True,
)
trainer.train()
# Push the runs via `push_to_hub()`
trainer.push_to_hub()
files = list_repo_files(f"{USER}/test-trainer-tensorboard", token=self._token)
found_log = False
for f in files:
if len(f.split("runs")) > 1 and "events.out.tfevents" in f:
found_log = True
assert found_log is True, "No tensorboard log found in repo"
def test_push_to_hub_tags(self):
# Checks if `trainer.push_to_hub()` works correctly by adding the desired
# tag without having to pass `tags` in `push_to_hub`
# see:
with tempfile.TemporaryDirectory() as tmp_dir:
trainer = get_regression_trainer(
output_dir=os.path.join(tmp_dir, "test-trainer-tags"),
push_to_hub=True,
hub_token=self._token,
)
trainer.model.add_model_tags(["test-trainer-tags"])
url = trainer.push_to_hub()
# Extract repo_name from the url
re_search = re.search(ENDPOINT_STAGING + r"/([^/]+/[^/]+)/", url)
self.assertTrue(re_search is not None)
repo_name = re_search.groups()[0]
self.assertEqual(repo_name, f"{USER}/test-trainer-tags")
model_card = ModelCard.load(repo_name)
self.assertTrue("test-trainer-tags" in model_card.data.tags)
@require_torch
@require_optuna
class TrainerHyperParameterOptunaIntegrationTest(unittest.TestCase):
def setUp(self):
args = TrainingArguments("..")
self.n_epochs = args.num_train_epochs
self.batch_size = args.train_batch_size
def test_hyperparameter_search(self):
class MyTrialShortNamer(TrialShortNamer):
DEFAULTS = {"a": 0, "b": 0}
def hp_space(trial):
return {}
def model_init(trial):
if trial is not None:
a = trial.suggest_int("a", -4, 4)
b = trial.suggest_int("b", -4, 4)
else:
a = 0
b = 0
config = RegressionModelConfig(a=a, b=b, double_output=False)
return RegressionPreTrainedModel(config)
def hp_name(trial):
return MyTrialShortNamer.shortname(trial.params)
with tempfile.TemporaryDirectory() as tmp_dir:
trainer = get_regression_trainer(
output_dir=tmp_dir,
learning_rate=0.1,
logging_steps=1,
eval_strategy=IntervalStrategy.EPOCH,
save_strategy=IntervalStrategy.EPOCH,
num_train_epochs=4,
disable_tqdm=True,
load_best_model_at_end=True,
logging_dir="runs",
run_name="test",
model_init=model_init,
)
trainer.hyperparameter_search(direction="minimize", hp_space=hp_space, hp_name=hp_name, n_trials=4)
@require_torch
@require_optuna
class TrainerHyperParameterMultiObjectOptunaIntegrationTest(unittest.TestCase):
def setUp(self):
args = TrainingArguments("..")
self.n_epochs = args.num_train_epochs
self.batch_size = args.train_batch_size
def test_hyperparameter_search(self):
class MyTrialShortNamer(TrialShortNamer):
DEFAULTS = {"a": 0, "b": 0}
def hp_space(trial):
return {}
def model_init(trial):
if trial is not None:
a = trial.suggest_int("a", -4, 4)
b = trial.suggest_int("b", -4, 4)
else:
a = 0
b = 0
config = RegressionModelConfig(a=a, b=b, double_output=False)
return RegressionPreTrainedModel(config)
def hp_name(trial):
return MyTrialShortNamer.shortname(trial.params)
def compute_objective(metrics: Dict[str, float]) -> List[float]:
return metrics["eval_loss"], metrics["eval_accuracy"]
with tempfile.TemporaryDirectory() as tmp_dir:
trainer = get_regression_trainer(
output_dir=tmp_dir,
learning_rate=0.1,
logging_steps=1,
eval_strategy=IntervalStrategy.EPOCH,
save_strategy=IntervalStrategy.EPOCH,
num_train_epochs=10,
disable_tqdm=True,
load_best_model_at_end=True,
logging_dir="runs",
run_name="test",
model_init=model_init,
compute_metrics=AlmostAccuracy(),
)
trainer.hyperparameter_search(
direction=["minimize", "maximize"],
hp_space=hp_space,
hp_name=hp_name,
n_trials=4,
compute_objective=compute_objective,
)
@require_torch
@require_ray
class TrainerHyperParameterRayIntegrationTest(unittest.TestCase):
def setUp(self):
args = TrainingArguments("..")
self.n_epochs = args.num_train_epochs
self.batch_size = args.train_batch_size
def ray_hyperparameter_search(self):
class MyTrialShortNamer(TrialShortNamer):
DEFAULTS = {"a": 0, "b": 0}
def hp_space(trial):
from ray import tune
return {
"a": tune.randint(-4, 4),
"b": tune.randint(-4, 4),
}
def model_init(config):
if config is None:
a = 0
b = 0
else:
a = config["a"]
b = config["b"]
model_config = RegressionModelConfig(a=a, b=b, double_output=False)
return RegressionPreTrainedModel(model_config)
def hp_name(params):
return MyTrialShortNamer.shortname(params)
with tempfile.TemporaryDirectory() as tmp_dir:
trainer = get_regression_trainer(
output_dir=tmp_dir,
learning_rate=0.1,
logging_steps=1,
eval_strategy=IntervalStrategy.EPOCH,
save_strategy=IntervalStrategy.EPOCH,
num_train_epochs=4,
disable_tqdm=True,
load_best_model_at_end=True,
logging_dir="runs",
run_name="test",
model_init=model_init,
)
trainer.hyperparameter_search(
direction="minimize", hp_space=hp_space, hp_name=hp_name, backend="ray", n_trials=4
)
def test_hyperparameter_search(self):
self.ray_hyperparameter_search()
def test_hyperparameter_search_ray_client(self):
import ray
from ray.util.client.ray_client_helpers import ray_start_client_server
with ray_start_client_server():
assert ray.util.client.ray.is_connected()
self.ray_hyperparameter_search()
@slow
@require_torch
@require_sigopt
class TrainerHyperParameterSigOptIntegrationTest(unittest.TestCase):
def setUp(self):
args = TrainingArguments("..")
self.n_epochs = args.num_train_epochs
self.batch_size = args.train_batch_size
def test_hyperparameter_search(self):
class MyTrialShortNamer(TrialShortNamer):
DEFAULTS = {"a": 0, "b": 0}
def hp_space(trial):
return [
{"bounds": {"min": -4, "max": 4}, "name": "a", "type": "int"},
{"bounds": {"min": -4, "max": 4}, "name": "b", "type": "int"},
]
def model_init(trial):
if trial is not None:
a = trial.assignments["a"]
b = trial.assignments["b"]
else:
a = 0
b = 0
config = RegressionModelConfig(a=a, b=b, double_output=False)
return RegressionPreTrainedModel(config)
def hp_name(trial):
return MyTrialShortNamer.shortname(trial.assignments)
with tempfile.TemporaryDirectory() as tmp_dir:
trainer = get_regression_trainer(
output_dir=tmp_dir,
learning_rate=0.1,
logging_steps=1,
eval_strategy=IntervalStrategy.EPOCH,
save_strategy=IntervalStrategy.EPOCH,
num_train_epochs=4,
disable_tqdm=True,
load_best_model_at_end=True,
logging_dir="runs",
run_name="test",
model_init=model_init,
)
trainer.hyperparameter_search(
direction="minimize", hp_space=hp_space, hp_name=hp_name, backend="sigopt", n_trials=4
)
optim_test_params = []
if is_torch_available():
default_adam_kwargs = {
"betas": (TrainingArguments.adam_beta1, TrainingArguments.adam_beta2),
"eps": TrainingArguments.adam_epsilon,
"lr": TrainingArguments.learning_rate,
}
default_lion_kwargs = {
"betas": (TrainingArguments.adam_beta1, TrainingArguments.adam_beta2),
"lr": TrainingArguments.learning_rate,
}
default_anyprecision_kwargs = {
"use_kahan_summation": False,
"momentum_dtype": torch.float32,
"variance_dtype": torch.float32,
"compensation_buffer_dtype": torch.bfloat16,
}
optim_test_params = [
(
TrainingArguments(optim=OptimizerNames.ADAMW_HF, output_dir="None"),
transformers.optimization.AdamW,
default_adam_kwargs,
),
(
TrainingArguments(optim=OptimizerNames.ADAMW_HF.value, output_dir="None"),
transformers.optimization.AdamW,
default_adam_kwargs,
),
(
TrainingArguments(optim=OptimizerNames.ADAMW_TORCH, output_dir="None"),
torch.optim.AdamW,
default_adam_kwargs,
),
(
TrainingArguments(optim=OptimizerNames.ADAFACTOR, output_dir="None"),
transformers.optimization.Adafactor,
{
"scale_parameter": False,
"relative_step": False,
"lr": TrainingArguments.learning_rate,
},
),
]
if is_apex_available():
import apex
optim_test_params.append(
(
TrainingArguments(optim=OptimizerNames.ADAMW_APEX_FUSED, output_dir="None"),
apex.optimizers.FusedAdam,
default_adam_kwargs,
)
)
if is_bitsandbytes_available():
import bitsandbytes as bnb
optim_test_params.append(
(
TrainingArguments(optim=OptimizerNames.ADAMW_BNB, output_dir="None"),
bnb.optim.AdamW,
default_adam_kwargs,
)
)
optim_test_params.append(
(
TrainingArguments(optim=OptimizerNames.ADAMW_8BIT, output_dir="None"),
bnb.optim.AdamW,
default_adam_kwargs,
)
)
optim_test_params.append(
(
TrainingArguments(optim=OptimizerNames.PAGED_ADAMW, output_dir="None"),
bnb.optim.AdamW,
default_adam_kwargs,
)
)
optim_test_params.append(
(
TrainingArguments(optim=OptimizerNames.PAGED_ADAMW_8BIT, output_dir="None"),
bnb.optim.AdamW,
default_adam_kwargs,
)
)
optim_test_params.append(
(
TrainingArguments(optim=OptimizerNames.LION, output_dir="None"),
bnb.optim.Lion,
default_lion_kwargs,
)
)
optim_test_params.append(
(
TrainingArguments(optim=OptimizerNames.LION_8BIT, output_dir="None"),
bnb.optim.Lion,
default_lion_kwargs,
)
)
optim_test_params.append(
(
TrainingArguments(optim=OptimizerNames.PAGED_LION_8BIT, output_dir="None"),
bnb.optim.Lion,
default_lion_kwargs,
)
)
if is_torchdistx_available():
import torchdistx
optim_test_params.append(
(
TrainingArguments(optim=OptimizerNames.ADAMW_ANYPRECISION, output_dir="None"),
torchdistx.optimizers.AnyPrecisionAdamW,
dict(default_adam_kwargs, **default_anyprecision_kwargs),
)
)
@require_torch
class TrainerOptimizerChoiceTest(unittest.TestCase):
def check_optim_and_kwargs(self, training_args: TrainingArguments, expected_cls, expected_kwargs):
actual_cls, optim_kwargs = Trainer.get_optimizer_cls_and_kwargs(training_args)
self.assertEqual(expected_cls, actual_cls)
self.assertIsNotNone(optim_kwargs)
for p, v in expected_kwargs.items():
self.assertTrue(p in optim_kwargs)
actual_v = optim_kwargs[p]
self.assertTrue(actual_v == v, f"Failed check for {p}. Expected {v}, but got {actual_v}.")
@parameterized.expand(optim_test_params, skip_on_empty=True)
def test_optim_supported(self, training_args: TrainingArguments, expected_cls, expected_kwargs):
# exercises all the valid --optim options
self.check_optim_and_kwargs(training_args, expected_cls, expected_kwargs)
trainer = get_regression_trainer(**training_args.to_dict())
trainer.train()
def test_fused_adam(self):
# Pretend that apex is installed and mock apex.optimizers.FusedAdam exists.
# Trainer.get_optimizer_cls_and_kwargs does not use FusedAdam. It only has to return the
# class given, so mocking apex.optimizers.FusedAdam should be fine for testing and allow
# the test to run without requiring an apex installation.
mock = Mock()
modules = {
"apex": mock,
"apex.optimizers": mock.optimizers,
"apex.optimizers.FusedAdam": mock.optimizers.FusedAdam,
}
with patch.dict("sys.modules", modules):
self.check_optim_and_kwargs(
TrainingArguments(optim=OptimizerNames.ADAMW_APEX_FUSED, output_dir="None"),
mock.optimizers.FusedAdam,
default_adam_kwargs,
)
def test_fused_adam_no_apex(self):
args = TrainingArguments(optim=OptimizerNames.ADAMW_APEX_FUSED, output_dir="None")
# Pretend that apex does not exist, even if installed. By setting apex to None, importing
# apex will fail even if apex is installed.
with patch.dict("sys.modules", {"apex.optimizers": None}):
with self.assertRaises(ValueError):
Trainer.get_optimizer_cls_and_kwargs(args)
def test_bnb_adam8bit(self):
# Pretend that Bits and Bytes is installed and mock bnb.optim.Adam8bit exists.
# Trainer.get_optimizer_cls_and_kwargs does not use Adam8bit. It only has to return the
# class given, so mocking bnb.optim.Adam8bit should be fine for testing and allow
# the test to run without requiring a bnb installation.
mock = Mock()
modules = {
"bitsandbytes": mock,
"bitsandbytes.optim": mock.optim,
"bitsandbytes.optim.AdamW": mock.optim.AdamW,
}
with patch.dict("sys.modules", modules):
self.check_optim_and_kwargs(
TrainingArguments(optim=OptimizerNames.ADAMW_BNB, output_dir="None"),
mock.optim.AdamW,
default_adam_kwargs,
)
def test_bnb_paged_adam8bit_alias(self):
mock = Mock()
modules = {
"bitsandbytes": mock,
"bitsandbytes.optim": mock.optim,
"bitsandbytes.optim.AdamW": mock.optim.AdamW,
}
with patch.dict("sys.modules", modules):
self.check_optim_and_kwargs(
TrainingArguments(optim=OptimizerNames.ADAMW_8BIT, output_dir="None"),
mock.optim.AdamW,
default_adam_kwargs,
)
def test_bnb_paged_adam(self):
mock = Mock()
modules = {
"bitsandbytes": mock,
"bitsandbytes.optim": mock.optim,
"bitsandbytes.optim.AdamW": mock.optim.AdamW,
}
with patch.dict("sys.modules", modules):
self.check_optim_and_kwargs(
TrainingArguments(optim=OptimizerNames.PAGED_ADAMW, output_dir="None"),
mock.optim.AdamW,
default_adam_kwargs,
)
def test_bnb_paged_adam8bit(self):
mock = Mock()
modules = {
"bitsandbytes": mock,
"bitsandbytes.optim": mock.optim,
"bitsandbytes.optim.AdamW": mock.optim.AdamW,
}
with patch.dict("sys.modules", modules):
self.check_optim_and_kwargs(
TrainingArguments(optim=OptimizerNames.PAGED_ADAMW_8BIT, output_dir="None"),
mock.optim.AdamW,
default_adam_kwargs,
)
def test_bnb_lion(self):
mock = Mock()
modules = {
"bitsandbytes": mock,
"bitsandbytes.optim": mock.optim,
"bitsandbytes.optim.Lion": mock.optim.Lion,
}
with patch.dict("sys.modules", modules):
self.check_optim_and_kwargs(
TrainingArguments(optim=OptimizerNames.LION, output_dir="None"),
mock.optim.Lion,
default_lion_kwargs,
)
def test_bnb_lion8bit(self):
mock = Mock()
modules = {
"bitsandbytes": mock,
"bitsandbytes.optim": mock.optim,
"bitsandbytes.optim.Lion": mock.optim.Lion,
}
with patch.dict("sys.modules", modules):
self.check_optim_and_kwargs(
TrainingArguments(optim=OptimizerNames.LION_8BIT, output_dir="None"),
mock.optim.Lion,
default_lion_kwargs,
)
def test_bnb_paged_lion8bit(self):
mock = Mock()
modules = {
"bitsandbytes": mock,
"bitsandbytes.optim": mock.optim,
"bitsandbytes.optim.Lion": mock.optim.Lion,
}
with patch.dict("sys.modules", modules):
self.check_optim_and_kwargs(
TrainingArguments(optim=OptimizerNames.PAGED_LION_8BIT, output_dir="None"),
mock.optim.Lion,
default_lion_kwargs,
)
def test_bnb_paged_lion(self):
mock = Mock()
modules = {
"bitsandbytes": mock,
"bitsandbytes.optim": mock.optim,
"bitsandbytes.optim.Lion": mock.optim.Lion,
}
with patch.dict("sys.modules", modules):
self.check_optim_and_kwargs(
TrainingArguments(optim=OptimizerNames.PAGED_LION, output_dir="None"),
mock.optim.Lion,
default_lion_kwargs,
)
def test_bnb_adam8bit_no_bnb(self):
args = TrainingArguments(optim=OptimizerNames.ADAMW_BNB, output_dir="None")
# Pretend that bnb does not exist, even if installed. By setting bnb to None, importing
# bnb will fail even if bnb is installed.
with patch.dict("sys.modules", {"bitsandbytes.optim": None}):
with self.assertRaises(ValueError):
Trainer.get_optimizer_cls_and_kwargs(args)
def test_bnb_paged_adam_no_bnb(self):
args = TrainingArguments(optim=OptimizerNames.PAGED_ADAMW, output_dir="None")
# Pretend that bnb does not exist, even if installed. By setting bnb to None, importing
# bnb will fail even if bnb is installed.
with patch.dict("sys.modules", {"bitsandbytes.optim": None}):
with self.assertRaises(ValueError):
Trainer.get_optimizer_cls_and_kwargs(args)
def test_bnb_paged_adam8bit_no_bnb(self):
args = TrainingArguments(optim=OptimizerNames.PAGED_ADAMW_8BIT, output_dir="None")
# Pretend that bnb does not exist, even if installed. By setting bnb to None, importing
# bnb will fail even if bnb is installed.
with patch.dict("sys.modules", {"bitsandbytes.optim": None}):
with self.assertRaises(ValueError):
Trainer.get_optimizer_cls_and_kwargs(args)
def test_bnb_paged_lion_no_bnb(self):
args = TrainingArguments(optim=OptimizerNames.PAGED_LION, output_dir="None")
# Pretend that bnb does not exist, even if installed. By setting bnb to None, importing
# bnb will fail even if bnb is installed.
with patch.dict("sys.modules", {"bitsandbytes.optim": None}):
with self.assertRaises(ValueError):
Trainer.get_optimizer_cls_and_kwargs(args)
def test_bnb_paged_lion8bit_no_bnb(self):
args = TrainingArguments(optim=OptimizerNames.PAGED_LION_8BIT, output_dir="None")
# Pretend that bnb does not exist, even if installed. By setting bnb to None, importing
# bnb will fail even if bnb is installed.
with patch.dict("sys.modules", {"bitsandbytes.optim": None}):
with self.assertRaises(ValueError):
Trainer.get_optimizer_cls_and_kwargs(args)
def test_anyprecision_adamw(self):
# Pretend that torchdistx is installed and mock torchdistx.optimizers.AnyPrecisionAdamW exists.
# Trainer.get_optimizer_cls_and_kwargs does not use AnyPrecisioinAdamW. It only has to return the
# class given, so mocking torchdistx.optimizers.AnyPrecisionAdamW should be fine for testing and allow
# the test to run without requiring a bnb installation.
mock = Mock()
modules = {
"torchdistx": mock,
"torchdistx.optimizers": mock.optimizers,
"torchdistx.optimizers.AnyPrecisionAdamW.": mock.optimizers.AnyPrecisionAdamW,
}
with patch.dict("sys.modules", modules):
self.check_optim_and_kwargs(
TrainingArguments(optim=OptimizerNames.ADAMW_ANYPRECISION, output_dir="None"),
mock.optimizers.AnyPrecisionAdamW,
dict(default_adam_kwargs, **default_anyprecision_kwargs),
)
def test_no_torchdistx_anyprecision_adamw(self):
args = TrainingArguments(optim=OptimizerNames.ADAMW_ANYPRECISION, output_dir="None")
# Pretend that torchdistx does not exist, even if installed. By setting torchdistx to None, importing
# torchdistx.optimizers will fail even if torchdistx is installed.
with patch.dict("sys.modules", {"torchdistx.optimizers": None}):
with self.assertRaises(ValueError):
Trainer.get_optimizer_cls_and_kwargs(args)
@require_torch
@require_wandb
class TrainerHyperParameterWandbIntegrationTest(unittest.TestCase):
def setUp(self):
args = TrainingArguments("..")
self.n_epochs = args.num_train_epochs
self.batch_size = args.train_batch_size
def test_hyperparameter_search(self):
class MyTrialShortNamer(TrialShortNamer):
DEFAULTS = {"a": 0, "b": 0}
def hp_space(trial):
return {
"method": "random",
"metric": {},
"parameters": {
"a": {"distribution": "uniform", "min": 1e-6, "max": 1e-4},
"b": {"distribution": "int_uniform", "min": 1, "max": 6},
},
}
def model_init(config):
if config is None:
a = 0
b = 0
else:
a = config["a"]
b = config["b"]
model_config = RegressionModelConfig(a=a, b=b, double_output=False)
return RegressionPreTrainedModel(model_config)
def hp_name(params):
return MyTrialShortNamer.shortname(params)
with tempfile.TemporaryDirectory() as tmp_dir:
trainer = get_regression_trainer(
output_dir=tmp_dir,
learning_rate=0.1,
logging_steps=1,
eval_strategy=IntervalStrategy.EPOCH,
save_strategy=IntervalStrategy.EPOCH,
num_train_epochs=4,
disable_tqdm=True,
load_best_model_at_end=True,
logging_dir="runs",
run_name="test",
model_init=model_init,
)
trainer.hyperparameter_search(
direction="minimize", hp_space=hp_space, hp_name=hp_name, backend="wandb", n_trials=4, anonymous="must"
)
class HyperParameterSearchBackendsTest(unittest.TestCase):
def test_hyperparameter_search_backends(self):
self.assertEqual(
list(ALL_HYPERPARAMETER_SEARCH_BACKENDS.keys()),
list(HPSearchBackend),
)
@require_torch
class OptimizerAndModelInspectionTest(unittest.TestCase):
def test_get_num_trainable_parameters(self):
model = nn.Sequential(nn.Linear(128, 64), nn.Linear(64, 32))
# in_features * out_features + bias
layer_1 = 128 * 64 + 64
layer_2 = 64 * 32 + 32
trainer = Trainer(model=model)
self.assertEqual(trainer.get_num_trainable_parameters(), layer_1 + layer_2)
# Freeze the last layer
for param in model[-1].parameters():
param.requires_grad = False
self.assertEqual(trainer.get_num_trainable_parameters(), layer_1)
def test_get_learning_rates(self):
model = nn.Sequential(nn.Linear(128, 64))
trainer = Trainer(model=model)
with self.assertRaises(ValueError):
trainer.get_learning_rates()
trainer.create_optimizer()
self.assertEqual(trainer.get_learning_rates(), [5e-05, 5e-05])
def test_get_optimizer_group(self):
model = nn.Sequential(nn.Linear(128, 64))
trainer = Trainer(model=model)
# ValueError is raised if optimizer is None
with self.assertRaises(ValueError):
trainer.get_optimizer_group()
trainer.create_optimizer()
# Get groups
num_groups = len(trainer.get_optimizer_group())
self.assertEqual(num_groups, 2)
# Get group of parameter
param = next(model.parameters())
group = trainer.get_optimizer_group(param)
self.assertIn(param, group["params"])