3993 lines
162 KiB
Python
3993 lines
162 KiB
Python
# coding=utf-8
|
|
# Copyright 2018 the HuggingFace Inc. team.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import dataclasses
|
|
import gc
|
|
import json
|
|
import math
|
|
import os
|
|
import random
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import unittest
|
|
from functools import partial
|
|
from itertools import product
|
|
from pathlib import Path
|
|
from typing import Dict, List
|
|
from unittest.mock import Mock, patch
|
|
|
|
import numpy as np
|
|
from huggingface_hub import HfFolder, ModelCard, delete_repo, list_repo_commits, list_repo_files
|
|
from parameterized import parameterized
|
|
from requests.exceptions import HTTPError
|
|
|
|
from transformers import (
|
|
AutoTokenizer,
|
|
IntervalStrategy,
|
|
PretrainedConfig,
|
|
TrainerCallback,
|
|
TrainingArguments,
|
|
get_polynomial_decay_schedule_with_warmup,
|
|
is_torch_available,
|
|
logging,
|
|
)
|
|
from transformers.hyperparameter_search import ALL_HYPERPARAMETER_SEARCH_BACKENDS
|
|
from transformers.testing_utils import (
|
|
ENDPOINT_STAGING,
|
|
TOKEN,
|
|
USER,
|
|
CaptureLogger,
|
|
LoggingLevel,
|
|
TestCasePlus,
|
|
backend_device_count,
|
|
execute_subprocess_async,
|
|
get_gpu_count,
|
|
get_tests_dir,
|
|
is_staging_test,
|
|
require_accelerate,
|
|
require_bitsandbytes,
|
|
require_deepspeed,
|
|
require_galore_torch,
|
|
require_intel_extension_for_pytorch,
|
|
require_optuna,
|
|
require_peft,
|
|
require_ray,
|
|
require_safetensors,
|
|
require_sentencepiece,
|
|
require_sigopt,
|
|
require_tensorboard,
|
|
require_tokenizers,
|
|
require_torch,
|
|
require_torch_accelerator,
|
|
require_torch_bf16,
|
|
require_torch_gpu,
|
|
require_torch_multi_accelerator,
|
|
require_torch_non_multi_accelerator,
|
|
require_torch_non_multi_gpu,
|
|
require_torch_tensorrt_fx,
|
|
require_torch_tf32,
|
|
require_torch_up_to_2_accelerators,
|
|
require_torchdynamo,
|
|
require_wandb,
|
|
slow,
|
|
torch_device,
|
|
)
|
|
from transformers.trainer_utils import PREFIX_CHECKPOINT_DIR, HPSearchBackend, check_target_module_exists
|
|
from transformers.training_args import OptimizerNames
|
|
from transformers.utils import (
|
|
SAFE_WEIGHTS_INDEX_NAME,
|
|
SAFE_WEIGHTS_NAME,
|
|
WEIGHTS_INDEX_NAME,
|
|
WEIGHTS_NAME,
|
|
is_accelerate_available,
|
|
is_apex_available,
|
|
is_bitsandbytes_available,
|
|
is_safetensors_available,
|
|
is_torchdistx_available,
|
|
)
|
|
from transformers.utils.hp_naming import TrialShortNamer
|
|
|
|
|
|
if is_torch_available():
|
|
import torch
|
|
from torch import nn
|
|
from torch.utils.data import IterableDataset
|
|
|
|
import transformers.optimization
|
|
from transformers import (
|
|
AutoModelForCausalLM,
|
|
AutoModelForSequenceClassification,
|
|
EarlyStoppingCallback,
|
|
GlueDataset,
|
|
GlueDataTrainingArguments,
|
|
GPT2Config,
|
|
GPT2LMHeadModel,
|
|
LineByLineTextDataset,
|
|
LlamaConfig,
|
|
LlamaForCausalLM,
|
|
PreTrainedModel,
|
|
Trainer,
|
|
TrainerState,
|
|
)
|
|
from transformers.trainer_pt_utils import AcceleratorConfig
|
|
|
|
if is_safetensors_available():
|
|
import safetensors.torch
|
|
|
|
# for version specific tests in TrainerIntegrationTest
|
|
require_accelerate_version_min_0_28 = partial(require_accelerate, min_version="0.28")
|
|
GRAD_ACCUM_KWARGS_VERSION_AVAILABLE = is_accelerate_available("0.28")
|
|
|
|
PATH_SAMPLE_TEXT = f"{get_tests_dir()}/fixtures/sample_text.txt"
|
|
|
|
|
|
class RegressionDataset:
|
|
def __init__(self, a=2, b=3, length=64, seed=42, label_names=None):
|
|
np.random.seed(seed)
|
|
self.label_names = ["labels"] if label_names is None else label_names
|
|
self.length = length
|
|
self.x = np.random.normal(size=(length,)).astype(np.float32)
|
|
self.ys = [a * self.x + b + np.random.normal(scale=0.1, size=(length,)) for _ in self.label_names]
|
|
self.ys = [y.astype(np.float32) for y in self.ys]
|
|
|
|
def __len__(self):
|
|
return self.length
|
|
|
|
def __getitem__(self, i):
|
|
result = {name: y[i] for name, y in zip(self.label_names, self.ys)}
|
|
result["input_x"] = self.x[i]
|
|
return result
|
|
|
|
|
|
# Converting Bytes to Megabytes
|
|
def bytes2megabytes(x):
|
|
return int(x / 2**20)
|
|
|
|
|
|
# Copied from acclerate: https://github.com/huggingface/accelerate/blob/ee163b66fb7848892519e804688cb4ae981aacbe/src/accelerate/test_utils/scripts/external_deps/test_peak_memory_usage.py#L40C1-L73C68
|
|
class TorchTracemalloc:
|
|
def __enter__(self):
|
|
gc.collect()
|
|
if torch.cuda.is_available():
|
|
torch.cuda.empty_cache()
|
|
torch.cuda.reset_max_memory_allocated() # reset the peak gauge to zero
|
|
self.begin = torch.cuda.memory_allocated()
|
|
return self
|
|
|
|
def __exit__(self, *exc):
|
|
gc.collect()
|
|
if torch.cuda.is_available():
|
|
torch.cuda.empty_cache()
|
|
self.end = torch.cuda.memory_allocated()
|
|
self.peak = torch.cuda.max_memory_allocated()
|
|
self.used = bytes2megabytes(self.end - self.begin)
|
|
self.peaked = bytes2megabytes(self.peak - self.begin)
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class RegressionTrainingArguments(TrainingArguments):
|
|
a: float = 0.0
|
|
b: float = 0.0
|
|
keep_report_to: bool = False
|
|
|
|
def __post_init__(self):
|
|
super().__post_init__()
|
|
# save resources not dealing with reporting unless specified (also avoids the warning when it's not set)
|
|
# can be explicitly disabled via `keep_report_to`
|
|
if not self.keep_report_to:
|
|
self.report_to = []
|
|
|
|
|
|
class RepeatDataset:
|
|
def __init__(self, x, length=64):
|
|
self.x = x
|
|
self.length = length
|
|
|
|
def __len__(self):
|
|
return self.length
|
|
|
|
def __getitem__(self, i):
|
|
return {"input_ids": self.x, "labels": self.x}
|
|
|
|
|
|
class DynamicShapesDataset:
|
|
def __init__(self, length=64, seed=42, batch_size=8):
|
|
self.length = length
|
|
np.random.seed(seed)
|
|
sizes = np.random.randint(1, 20, (length // batch_size,))
|
|
# For easy batching, we make every batch_size consecutive samples the same size.
|
|
self.xs = [np.random.normal(size=(s,)).astype(np.float32) for s in sizes.repeat(batch_size)]
|
|
self.ys = [np.random.normal(size=(s,)).astype(np.float32) for s in sizes.repeat(batch_size)]
|
|
|
|
def __len__(self):
|
|
return self.length
|
|
|
|
def __getitem__(self, i):
|
|
return {"input_x": self.xs[i], "labels": self.ys[i]}
|
|
|
|
|
|
class AlmostAccuracy:
|
|
def __init__(self, thresh=0.25):
|
|
self.thresh = thresh
|
|
|
|
def __call__(self, eval_pred):
|
|
predictions, labels = eval_pred
|
|
true = np.abs(predictions - labels) <= self.thresh
|
|
return {"accuracy": true.astype(np.float32).mean().item()}
|
|
|
|
|
|
class RegressionModelConfig(PretrainedConfig):
|
|
def __init__(self, a=0, b=0, double_output=False, random_torch=True, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.a = a
|
|
self.b = b
|
|
self.double_output = double_output
|
|
self.random_torch = random_torch
|
|
self.hidden_size = 1
|
|
|
|
|
|
if is_torch_available():
|
|
|
|
class SampleIterableDataset(IterableDataset):
|
|
def __init__(self, a=2, b=3, length=64, seed=42, label_names=None):
|
|
self.dataset = RegressionDataset(a=a, b=b, length=length, seed=seed, label_names=label_names)
|
|
|
|
def __iter__(self):
|
|
for i in range(len(self.dataset)):
|
|
yield self.dataset[i]
|
|
|
|
class FiniteIterableDataset(SampleIterableDataset):
|
|
def __init__(self, a=2, b=3, length=64, seed=42, label_names=None):
|
|
super().__init__(a, b, length, seed, label_names)
|
|
self.current_sample = 0
|
|
|
|
def __iter__(self):
|
|
while self.current_sample < len(self.dataset):
|
|
yield self.dataset[self.current_sample]
|
|
self.current_sample += 1
|
|
|
|
class MultiLoader:
|
|
def __init__(self, loaders):
|
|
self.loaders = loaders
|
|
|
|
def __len__(self):
|
|
return sum(len(loader) for loader in self.loaders)
|
|
|
|
def __iter__(self):
|
|
for loader in self.loaders:
|
|
yield from loader
|
|
|
|
class CustomDataloaderTrainer(Trainer):
|
|
def get_train_dataloader(self):
|
|
dataloaders = [super().get_train_dataloader(), super().get_train_dataloader()]
|
|
return MultiLoader(dataloaders)
|
|
|
|
def get_eval_dataloader(self, eval_dataset):
|
|
dataloaders = [super().get_eval_dataloader(eval_dataset), super().get_eval_dataloader(eval_dataset)]
|
|
return MultiLoader(dataloaders)
|
|
|
|
class RegressionModel(nn.Module):
|
|
def __init__(self, a=0, b=0, double_output=False):
|
|
super().__init__()
|
|
self.a = nn.Parameter(torch.tensor(a).float())
|
|
self.b = nn.Parameter(torch.tensor(b).float())
|
|
self.double_output = double_output
|
|
self.config = None
|
|
|
|
def forward(self, input_x, labels=None, **kwargs):
|
|
y = input_x * self.a + self.b
|
|
if labels is None:
|
|
return (y, y) if self.double_output else (y,)
|
|
loss = nn.functional.mse_loss(y, labels)
|
|
return (loss, y, y) if self.double_output else (loss, y)
|
|
|
|
class RegressionDictModel(nn.Module):
|
|
def __init__(self, a=0, b=0):
|
|
super().__init__()
|
|
self.a = nn.Parameter(torch.tensor(a).float())
|
|
self.b = nn.Parameter(torch.tensor(b).float())
|
|
self.config = None
|
|
|
|
def forward(self, input_x, labels=None, **kwargs):
|
|
y = input_x * self.a + self.b
|
|
result = {"output": y}
|
|
if labels is not None:
|
|
result["loss"] = nn.functional.mse_loss(y, labels)
|
|
return result
|
|
|
|
class RegressionPreTrainedModel(PreTrainedModel):
|
|
config_class = RegressionModelConfig
|
|
base_model_prefix = "regression"
|
|
|
|
def __init__(self, config):
|
|
super().__init__(config)
|
|
self.a = nn.Parameter(torch.tensor(config.a).float())
|
|
self.b = nn.Parameter(torch.tensor(config.b).float())
|
|
self.double_output = config.double_output
|
|
|
|
def forward(self, input_x, labels=None, **kwargs):
|
|
y = input_x * self.a + self.b
|
|
if labels is None:
|
|
return (y, y) if self.double_output else (y,)
|
|
loss = nn.functional.mse_loss(y, labels)
|
|
return (loss, y, y) if self.double_output else (loss, y)
|
|
|
|
class RegressionPreTrainedModelWithGradientCheckpointing(PreTrainedModel):
|
|
config_class = RegressionModelConfig
|
|
base_model_prefix = "regression"
|
|
supports_gradient_checkpointing = True
|
|
|
|
def __init__(self, config):
|
|
super().__init__(config)
|
|
self.layers = nn.ModuleList([nn.Linear(config.hidden_size, config.hidden_size) for _ in range(4)])
|
|
self.head = nn.Linear(config.hidden_size, 1)
|
|
self.gradient_checkpointing = False
|
|
self.double_output = config.double_output
|
|
|
|
def forward(self, input_x, labels=None, **kwargs):
|
|
y = input_x.unsqueeze(0)
|
|
|
|
for layer in self.layers:
|
|
if self.training and self.gradient_checkpointing:
|
|
outputs = self._gradient_checkpointing_func(layer.__call__, y)
|
|
else:
|
|
outputs = layer(y)
|
|
|
|
y = outputs * 3
|
|
|
|
logits = self.head(y)
|
|
|
|
if labels is None:
|
|
return (logits, logits) if self.double_output else (logits,)
|
|
|
|
loss = nn.functional.mse_loss(logits, labels)
|
|
|
|
return (loss, y, y) if self.double_output else (loss, y)
|
|
|
|
class RegressionRandomPreTrainedModel(PreTrainedModel):
|
|
config_class = RegressionModelConfig
|
|
base_model_prefix = "regression"
|
|
|
|
def __init__(self, config):
|
|
super().__init__(config)
|
|
self.a = nn.Parameter(torch.tensor(config.a).float())
|
|
self.b = nn.Parameter(torch.tensor(config.b).float())
|
|
self.random_torch = config.random_torch
|
|
|
|
def forward(self, input_x, labels=None, **kwargs):
|
|
y = input_x * self.a + self.b
|
|
if self.random_torch:
|
|
torch_rand = torch.randn(1).squeeze()
|
|
np_rand = np.random.rand()
|
|
rand_rand = random.random()
|
|
|
|
if self.random_torch:
|
|
y += 0.05 * torch_rand
|
|
y += 0.05 * torch.tensor(np_rand + rand_rand)
|
|
|
|
if labels is None:
|
|
return (y,)
|
|
loss = nn.functional.mse_loss(y, labels)
|
|
return (loss, y)
|
|
|
|
class TstLayer(nn.Module):
|
|
def __init__(self, hidden_size):
|
|
super().__init__()
|
|
self.linear1 = nn.Linear(hidden_size, hidden_size)
|
|
self.ln1 = nn.LayerNorm(hidden_size)
|
|
self.linear2 = nn.Linear(hidden_size, hidden_size)
|
|
self.ln2 = nn.LayerNorm(hidden_size)
|
|
self.bias = nn.Parameter(torch.zeros(hidden_size))
|
|
|
|
def forward(self, x):
|
|
h = self.ln1(nn.functional.relu(self.linear1(x)))
|
|
h = nn.functional.relu(self.linear2(x))
|
|
return self.ln2(x + h + self.bias)
|
|
|
|
def get_regression_trainer(
|
|
a=0, b=0, double_output=False, train_len=64, eval_len=64, pretrained=True, keep_report_to=False, **kwargs
|
|
):
|
|
label_names = kwargs.get("label_names", None)
|
|
gradient_checkpointing = kwargs.get("gradient_checkpointing", False)
|
|
train_dataset = RegressionDataset(length=train_len, label_names=label_names)
|
|
eval_dataset = RegressionDataset(length=eval_len, label_names=label_names)
|
|
|
|
model_init = kwargs.pop("model_init", None)
|
|
if model_init is not None:
|
|
model = None
|
|
else:
|
|
if pretrained:
|
|
config = RegressionModelConfig(a=a, b=b, double_output=double_output)
|
|
# We infer the correct model class if one uses gradient_checkpointing or not
|
|
target_cls = (
|
|
RegressionPreTrainedModel
|
|
if not gradient_checkpointing
|
|
else RegressionPreTrainedModelWithGradientCheckpointing
|
|
)
|
|
model = target_cls(config)
|
|
else:
|
|
model = RegressionModel(a=a, b=b, double_output=double_output)
|
|
|
|
compute_metrics = kwargs.pop("compute_metrics", None)
|
|
data_collator = kwargs.pop("data_collator", None)
|
|
optimizers = kwargs.pop("optimizers", (None, None))
|
|
output_dir = kwargs.pop("output_dir", "./regression")
|
|
preprocess_logits_for_metrics = kwargs.pop("preprocess_logits_for_metrics", None)
|
|
|
|
args = RegressionTrainingArguments(output_dir, a=a, b=b, keep_report_to=keep_report_to, **kwargs)
|
|
return Trainer(
|
|
model,
|
|
args,
|
|
data_collator=data_collator,
|
|
train_dataset=train_dataset,
|
|
eval_dataset=eval_dataset,
|
|
compute_metrics=compute_metrics,
|
|
optimizers=optimizers,
|
|
model_init=model_init,
|
|
preprocess_logits_for_metrics=preprocess_logits_for_metrics,
|
|
)
|
|
|
|
|
|
class TrainerIntegrationCommon:
|
|
def check_saved_checkpoints(self, output_dir, freq, total, is_pretrained=True, safe_weights=True):
|
|
weights_file = WEIGHTS_NAME if not safe_weights else SAFE_WEIGHTS_NAME
|
|
file_list = [weights_file, "training_args.bin", "optimizer.pt", "scheduler.pt", "trainer_state.json"]
|
|
if is_pretrained:
|
|
file_list.append("config.json")
|
|
for step in range(freq, total, freq):
|
|
checkpoint = os.path.join(output_dir, f"checkpoint-{step}")
|
|
self.assertTrue(os.path.isdir(checkpoint))
|
|
for filename in file_list:
|
|
self.assertTrue(os.path.isfile(os.path.join(checkpoint, filename)))
|
|
|
|
def check_best_model_has_been_loaded(
|
|
self, output_dir, freq, total, trainer, metric, greater_is_better=False, is_pretrained=True, safe_weights=True
|
|
):
|
|
checkpoint = os.path.join(output_dir, f"checkpoint-{(total // freq) * freq}")
|
|
log_history = TrainerState.load_from_json(os.path.join(checkpoint, "trainer_state.json")).log_history
|
|
|
|
values = [d[metric] for d in log_history]
|
|
best_value = max(values) if greater_is_better else min(values)
|
|
best_checkpoint = (values.index(best_value) + 1) * freq
|
|
checkpoint = os.path.join(output_dir, f"checkpoint-{best_checkpoint}")
|
|
if is_pretrained:
|
|
best_model = RegressionPreTrainedModel.from_pretrained(checkpoint)
|
|
best_model.to(trainer.args.device)
|
|
else:
|
|
best_model = RegressionModel()
|
|
if not safe_weights:
|
|
state_dict = torch.load(os.path.join(checkpoint, WEIGHTS_NAME))
|
|
else:
|
|
state_dict = safetensors.torch.load_file(os.path.join(checkpoint, SAFE_WEIGHTS_NAME))
|
|
best_model.load_state_dict(state_dict)
|
|
best_model.to(trainer.args.device)
|
|
self.assertTrue(torch.allclose(best_model.a, trainer.model.a))
|
|
self.assertTrue(torch.allclose(best_model.b, trainer.model.b))
|
|
|
|
metrics = trainer.evaluate()
|
|
self.assertEqual(metrics[metric], best_value)
|
|
|
|
def check_trainer_state_are_the_same(self, trainer_state, trainer_state1):
|
|
# We'll pop things so operate on copies.
|
|
state = trainer_state.copy()
|
|
state1 = trainer_state1.copy()
|
|
# Log history main contain different logs for the time metrics (after resuming a training).
|
|
log_history = state.pop("log_history", None)
|
|
log_history1 = state1.pop("log_history", None)
|
|
self.assertEqual(state, state1)
|
|
skip_log_keys = ["train_runtime", "train_samples_per_second", "train_steps_per_second", "train_loss"]
|
|
for log, log1 in zip(log_history, log_history1):
|
|
for key in skip_log_keys:
|
|
_ = log.pop(key, None)
|
|
_ = log1.pop(key, None)
|
|
self.assertEqual(log, log1)
|
|
|
|
def convert_to_sharded_checkpoint(self, folder, save_safe=True, load_safe=True):
|
|
# Converts a checkpoint of a regression model to a sharded checkpoint.
|
|
if load_safe:
|
|
loader = safetensors.torch.load_file
|
|
weights_file = os.path.join(folder, SAFE_WEIGHTS_NAME)
|
|
else:
|
|
loader = torch.load
|
|
weights_file = os.path.join(folder, WEIGHTS_NAME)
|
|
|
|
if save_safe:
|
|
extension = "safetensors"
|
|
saver = safetensors.torch.save_file
|
|
index_file = os.path.join(folder, SAFE_WEIGHTS_INDEX_NAME)
|
|
shard_name = SAFE_WEIGHTS_NAME
|
|
else:
|
|
extension = "bin"
|
|
saver = torch.save
|
|
index_file = os.path.join(folder, WEIGHTS_INDEX_NAME)
|
|
shard_name = WEIGHTS_NAME
|
|
|
|
state_dict = loader(weights_file)
|
|
|
|
os.remove(weights_file)
|
|
keys = list(state_dict.keys())
|
|
|
|
shard_files = [
|
|
shard_name.replace(f".{extension}", f"-{idx+1:05d}-of-{len(keys):05d}.{extension}")
|
|
for idx in range(len(keys))
|
|
]
|
|
index = {"metadata": {}, "weight_map": {key: shard_files[i] for i, key in enumerate(keys)}}
|
|
|
|
with open(index_file, "w", encoding="utf-8") as f:
|
|
content = json.dumps(index, indent=2, sort_keys=True) + "\n"
|
|
f.write(content)
|
|
|
|
for param_name, shard_file in zip(keys, shard_files):
|
|
saver({param_name: state_dict[param_name]}, os.path.join(folder, shard_file))
|
|
|
|
|
|
@require_torch
|
|
@require_sentencepiece
|
|
@require_tokenizers
|
|
class TrainerIntegrationPrerunTest(TestCasePlus, TrainerIntegrationCommon):
|
|
"""
|
|
Only tests that want to tap into the auto-pre-run 2 trainings:
|
|
- self.default_trained_model
|
|
- self.alternate_trained_model
|
|
directly, or via check_trained_model
|
|
"""
|
|
|
|
def setUp(self):
|
|
super().setUp()
|
|
args = TrainingArguments("..")
|
|
self.n_epochs = args.num_train_epochs
|
|
self.batch_size = args.train_batch_size
|
|
trainer = get_regression_trainer(learning_rate=0.1)
|
|
trainer.train()
|
|
self.default_trained_model = (trainer.model.a, trainer.model.b)
|
|
|
|
trainer = get_regression_trainer(learning_rate=0.1, seed=314)
|
|
trainer.train()
|
|
self.alternate_trained_model = (trainer.model.a, trainer.model.b)
|
|
|
|
def check_trained_model(self, model, alternate_seed=False):
|
|
# Checks a training seeded with learning_rate = 0.1
|
|
(a, b) = self.alternate_trained_model if alternate_seed else self.default_trained_model
|
|
self.assertTrue(torch.allclose(model.a, a))
|
|
self.assertTrue(torch.allclose(model.b, b))
|
|
|
|
def test_reproducible_training(self):
|
|
# Checks that training worked, model trained and seed made a reproducible training.
|
|
trainer = get_regression_trainer(learning_rate=0.1)
|
|
trainer.train()
|
|
self.check_trained_model(trainer.model)
|
|
|
|
# Checks that a different seed gets different (reproducible) results.
|
|
trainer = get_regression_trainer(learning_rate=0.1, seed=314)
|
|
trainer.train()
|
|
self.check_trained_model(trainer.model, alternate_seed=True)
|
|
|
|
def test_trainer_with_datasets(self):
|
|
import datasets
|
|
|
|
np.random.seed(42)
|
|
x = np.random.normal(size=(64,)).astype(np.float32)
|
|
y = 2.0 * x + 3.0 + np.random.normal(scale=0.1, size=(64,)).astype(np.float32)
|
|
train_dataset = datasets.Dataset.from_dict({"input_x": x, "label": y})
|
|
|
|
# Base training. Should have the same results as test_reproducible_training
|
|
model = RegressionModel()
|
|
args = TrainingArguments("./regression", learning_rate=0.1)
|
|
trainer = Trainer(model, args, train_dataset=train_dataset)
|
|
trainer.train()
|
|
self.check_trained_model(trainer.model)
|
|
|
|
# Can return tensors.
|
|
train_dataset.set_format(type="torch", dtype=torch.float32)
|
|
model = RegressionModel()
|
|
trainer = Trainer(model, args, train_dataset=train_dataset)
|
|
trainer.train()
|
|
self.check_trained_model(trainer.model)
|
|
|
|
# Adding one column not used by the model should have no impact
|
|
z = np.random.normal(size=(64,)).astype(np.float32)
|
|
train_dataset = datasets.Dataset.from_dict({"input_x": x, "label": y, "extra": z})
|
|
model = RegressionModel()
|
|
trainer = Trainer(model, args, train_dataset=train_dataset)
|
|
trainer.train()
|
|
self.check_trained_model(trainer.model)
|
|
|
|
def test_model_init(self):
|
|
train_dataset = RegressionDataset()
|
|
args = TrainingArguments("./regression", learning_rate=0.1)
|
|
trainer = Trainer(args=args, train_dataset=train_dataset, model_init=lambda: RegressionModel())
|
|
trainer.train()
|
|
self.check_trained_model(trainer.model)
|
|
|
|
# Re-training should restart from scratch, thus lead the same results.
|
|
trainer.train()
|
|
self.check_trained_model(trainer.model)
|
|
|
|
# Re-training should restart from scratch, thus lead the same results and new seed should be used.
|
|
trainer.args.seed = 314
|
|
trainer.train()
|
|
self.check_trained_model(trainer.model, alternate_seed=True)
|
|
|
|
def test_gradient_accumulation(self):
|
|
# Training with half the batch size but accumulation steps as 2 should give the same results.
|
|
trainer = get_regression_trainer(
|
|
gradient_accumulation_steps=2, per_device_train_batch_size=4, learning_rate=0.1
|
|
)
|
|
trainer.train()
|
|
self.check_trained_model(trainer.model)
|
|
|
|
def test_gradient_checkpointing(self):
|
|
trainer = get_regression_trainer(
|
|
per_device_train_batch_size=1,
|
|
learning_rate=0.1,
|
|
gradient_checkpointing=True,
|
|
gradient_checkpointing_kwargs={"use_reentrant": False},
|
|
)
|
|
previous_params = {k: v.detach().clone() for k, v in trainer.model.named_parameters()}
|
|
|
|
trainer.train()
|
|
|
|
# Check if model weights have been updated
|
|
for k, v in trainer.model.named_parameters():
|
|
self.assertFalse(
|
|
torch.allclose(previous_params[k], v, rtol=1e-4, atol=1e-4),
|
|
f"Model weights for {k} have not been updated",
|
|
)
|
|
|
|
def test_training_loss(self):
|
|
n_gpus = max(1, backend_device_count(torch_device))
|
|
|
|
# With even logs
|
|
trainer = get_regression_trainer(logging_steps=64 / (8 * n_gpus))
|
|
trainer.train()
|
|
log_history = trainer.state.log_history
|
|
|
|
losses = [log["loss"] for log in log_history if "loss" in log]
|
|
train_loss = log_history[-1]["train_loss"]
|
|
self.assertAlmostEqual(sum(losses) / len(losses), train_loss, places=4)
|
|
|
|
# With uneven logs
|
|
trainer = get_regression_trainer(logging_steps=5)
|
|
trainer.train()
|
|
log_history = trainer.state.log_history
|
|
|
|
# Training loss should be the same as before
|
|
new_train_loss = log_history[-1]["train_loss"]
|
|
self.assertAlmostEqual(train_loss, new_train_loss, places=4)
|
|
|
|
def test_custom_optimizer(self):
|
|
train_dataset = RegressionDataset()
|
|
args = TrainingArguments("./regression")
|
|
model = RegressionModel()
|
|
optimizer = torch.optim.SGD(model.parameters(), lr=1.0)
|
|
lr_scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda=lambda x: 1.0)
|
|
trainer = Trainer(model, args, train_dataset=train_dataset, optimizers=(optimizer, lr_scheduler))
|
|
trainer.train()
|
|
|
|
(a, b) = self.default_trained_model
|
|
self.assertFalse(torch.allclose(trainer.model.a, a))
|
|
self.assertFalse(torch.allclose(trainer.model.b, b))
|
|
self.assertEqual(trainer.optimizer.state_dict()["param_groups"][0]["lr"], 1.0)
|
|
|
|
def test_lr_scheduler_kwargs(self):
|
|
# test scheduler kwargs passed via TrainingArguments
|
|
train_dataset = RegressionDataset()
|
|
model = RegressionModel()
|
|
num_steps, num_warmup_steps = 10, 2
|
|
extra_kwargs = {"power": 5.0, "lr_end": 1e-5} # Non-default arguments
|
|
args = TrainingArguments(
|
|
"./regression",
|
|
lr_scheduler_type="polynomial",
|
|
lr_scheduler_kwargs=extra_kwargs,
|
|
learning_rate=0.2,
|
|
warmup_steps=num_warmup_steps,
|
|
)
|
|
trainer = Trainer(model, args, train_dataset=train_dataset)
|
|
trainer.create_optimizer_and_scheduler(num_training_steps=num_steps)
|
|
|
|
# Checking that the scheduler was created
|
|
self.assertIsNotNone(trainer.lr_scheduler)
|
|
|
|
# Checking that the correct args were passed
|
|
sched1 = trainer.lr_scheduler
|
|
sched2 = get_polynomial_decay_schedule_with_warmup(
|
|
trainer.optimizer, num_warmup_steps=num_warmup_steps, num_training_steps=num_steps, **extra_kwargs
|
|
)
|
|
self.assertEqual(sched1.lr_lambdas[0].args, sched2.lr_lambdas[0].args)
|
|
self.assertEqual(sched1.lr_lambdas[0].keywords, sched2.lr_lambdas[0].keywords)
|
|
|
|
def test_cosine_with_min_lr_scheduler(self):
|
|
train_dataset = RegressionDataset()
|
|
model = RegressionModel()
|
|
num_steps, num_warmup_steps = 10, 2
|
|
extra_kwargs = {"min_lr": 1e-5} # Non-default arguments
|
|
args = TrainingArguments(
|
|
"./regression",
|
|
lr_scheduler_type="cosine_with_min_lr",
|
|
lr_scheduler_kwargs=extra_kwargs,
|
|
learning_rate=0.2,
|
|
warmup_steps=num_warmup_steps,
|
|
)
|
|
trainer = Trainer(model, args, train_dataset=train_dataset)
|
|
trainer.create_optimizer_and_scheduler(num_training_steps=num_steps)
|
|
|
|
# Checking that the scheduler was created
|
|
self.assertIsNotNone(trainer.lr_scheduler)
|
|
|
|
# Check the last learning rate
|
|
for _ in range(num_steps):
|
|
trainer.lr_scheduler.step()
|
|
self.assertEqual(trainer.lr_scheduler.get_last_lr()[0], 1e-5)
|
|
|
|
def test_reduce_lr_on_plateau_args(self):
|
|
# test passed arguments for a custom ReduceLROnPlateau scheduler
|
|
train_dataset = RegressionDataset(length=64)
|
|
eval_dataset = RegressionDataset(length=64)
|
|
args = TrainingArguments(
|
|
"./regression",
|
|
eval_strategy="epoch",
|
|
metric_for_best_model="eval_loss",
|
|
)
|
|
model = RegressionModel()
|
|
optimizer = torch.optim.SGD(model.parameters(), lr=1.0)
|
|
lr_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, factor=0.2, patience=5, cooldown=2)
|
|
trainer = Trainer(
|
|
model, args, train_dataset=train_dataset, eval_dataset=eval_dataset, optimizers=(optimizer, lr_scheduler)
|
|
)
|
|
trainer.train()
|
|
|
|
self.assertIsInstance(trainer.lr_scheduler, torch.optim.lr_scheduler.ReduceLROnPlateau)
|
|
self.assertEqual(trainer.lr_scheduler.factor, 0.2)
|
|
self.assertEqual(trainer.lr_scheduler.patience, 5)
|
|
self.assertEqual(trainer.lr_scheduler.cooldown, 2)
|
|
|
|
def test_reduce_lr_on_plateau(self):
|
|
# test the ReduceLROnPlateau scheduler
|
|
|
|
class TrainerWithLRLogs(Trainer):
|
|
def log(self, logs):
|
|
# the LR is computed after metrics and does not exist for the first epoch
|
|
if hasattr(self.lr_scheduler, "_last_lr"):
|
|
logs["learning_rate"] = self.lr_scheduler._last_lr[0]
|
|
super().log(logs)
|
|
|
|
train_dataset = RegressionDataset(length=64)
|
|
eval_dataset = RegressionDataset(length=64)
|
|
|
|
args = TrainingArguments(
|
|
"./regression",
|
|
lr_scheduler_type="reduce_lr_on_plateau",
|
|
eval_strategy="epoch",
|
|
metric_for_best_model="eval_loss",
|
|
num_train_epochs=10,
|
|
learning_rate=0.2,
|
|
)
|
|
model = RegressionModel()
|
|
trainer = TrainerWithLRLogs(model, args, train_dataset=train_dataset, eval_dataset=eval_dataset)
|
|
trainer.train()
|
|
|
|
self.assertIsInstance(trainer.lr_scheduler, torch.optim.lr_scheduler.ReduceLROnPlateau)
|
|
patience = trainer.lr_scheduler.patience
|
|
|
|
logs = trainer.state.log_history[1:]
|
|
best_loss = logs[0]["eval_loss"]
|
|
bad_epochs = 0
|
|
for i, log in enumerate(logs[:-1]): # Compare learning rate to next epoch's
|
|
loss = log["eval_loss"]
|
|
just_decreased = False
|
|
if loss > best_loss:
|
|
bad_epochs += 1
|
|
if bad_epochs > patience:
|
|
self.assertLess(logs[i + 1]["learning_rate"], log["learning_rate"])
|
|
just_decreased = True
|
|
bad_epochs = 0
|
|
else:
|
|
best_loss = loss
|
|
bad_epochs = 0
|
|
if not just_decreased:
|
|
self.assertEqual(logs[i + 1]["learning_rate"], log["learning_rate"])
|
|
|
|
def test_adafactor_lr_none(self):
|
|
# test the special case where lr=None, since Trainer can't not have lr_scheduler
|
|
|
|
from transformers.optimization import Adafactor, AdafactorSchedule
|
|
|
|
train_dataset = RegressionDataset()
|
|
args = TrainingArguments("./regression")
|
|
model = RegressionModel()
|
|
optimizer = Adafactor(model.parameters(), scale_parameter=True, relative_step=True, warmup_init=True, lr=None)
|
|
lr_scheduler = AdafactorSchedule(optimizer)
|
|
trainer = Trainer(model, args, train_dataset=train_dataset, optimizers=(optimizer, lr_scheduler))
|
|
trainer.train()
|
|
|
|
(a, b) = self.default_trained_model
|
|
self.assertFalse(torch.allclose(trainer.model.a, a))
|
|
self.assertFalse(torch.allclose(trainer.model.b, b))
|
|
self.assertGreater(trainer.optimizer.state_dict()["param_groups"][0]["lr"], 0)
|
|
|
|
@require_torch_accelerator
|
|
@require_torch_bf16
|
|
def test_mixed_bf16(self):
|
|
# very basic test
|
|
trainer = get_regression_trainer(learning_rate=0.1, bf16=True)
|
|
trainer.train()
|
|
self.check_trained_model(trainer.model)
|
|
|
|
# --bf16 --half_precision_backend apex can't be used together
|
|
with self.assertRaises(ValueError):
|
|
trainer = get_regression_trainer(learning_rate=0.1, bf16=True, half_precision_backend="apex")
|
|
|
|
# will add more specific tests once there are some bugs to fix
|
|
|
|
@require_torch_gpu
|
|
@require_torch_tf32
|
|
def test_tf32(self):
|
|
# very basic test
|
|
trainer = get_regression_trainer(learning_rate=0.1, tf32=True)
|
|
trainer.train()
|
|
self.check_trained_model(trainer.model)
|
|
|
|
|
|
@require_torch
|
|
@require_sentencepiece
|
|
@require_tokenizers
|
|
class TrainerIntegrationTest(TestCasePlus, TrainerIntegrationCommon):
|
|
def setUp(self):
|
|
super().setUp()
|
|
args = TrainingArguments("..")
|
|
self.n_epochs = args.num_train_epochs
|
|
self.batch_size = args.train_batch_size
|
|
|
|
def test_trainer_works_with_dict(self):
|
|
# Edge case because Apex with mode O2 will change our models to return dicts. This test checks it doesn't break
|
|
# anything.
|
|
train_dataset = RegressionDataset()
|
|
eval_dataset = RegressionDataset()
|
|
model = RegressionDictModel()
|
|
args = TrainingArguments("./regression")
|
|
trainer = Trainer(model, args, train_dataset=train_dataset, eval_dataset=eval_dataset)
|
|
trainer.train()
|
|
_ = trainer.evaluate()
|
|
_ = trainer.predict(eval_dataset)
|
|
|
|
def test_evaluation_with_keys_to_drop(self):
|
|
config = GPT2Config(vocab_size=100, n_positions=128, n_embd=32, n_layer=3, n_head=4)
|
|
tiny_gpt2 = GPT2LMHeadModel(config)
|
|
x = torch.randint(0, 100, (128,))
|
|
eval_dataset = RepeatDataset(x)
|
|
args = TrainingArguments("./test")
|
|
trainer = Trainer(tiny_gpt2, args, eval_dataset=eval_dataset)
|
|
# By default the past_key_values are removed
|
|
result = trainer.predict(eval_dataset)
|
|
self.assertTrue(isinstance(result.predictions, np.ndarray))
|
|
# We can still get them by setting ignore_keys to []
|
|
result = trainer.predict(eval_dataset, ignore_keys=[])
|
|
self.assertTrue(isinstance(result.predictions, tuple))
|
|
self.assertEqual(len(result.predictions), 2)
|
|
|
|
def test_training_arguments_are_left_untouched(self):
|
|
trainer = get_regression_trainer()
|
|
trainer.train()
|
|
args = TrainingArguments("./regression", report_to=[])
|
|
dict1, dict2 = args.to_dict(), trainer.args.to_dict()
|
|
for key in dict1.keys():
|
|
# Logging dir can be slightly different as they default to something with the time.
|
|
if key != "logging_dir":
|
|
self.assertEqual(dict1[key], dict2[key])
|
|
|
|
def test_number_of_steps_in_training(self):
|
|
# Regular training has n_epochs * len(train_dl) steps
|
|
trainer = get_regression_trainer(learning_rate=0.1)
|
|
train_output = trainer.train()
|
|
self.assertEqual(train_output.global_step, self.n_epochs * 64 / self.batch_size)
|
|
|
|
# Check passing num_train_epochs works (and a float version too):
|
|
trainer = get_regression_trainer(learning_rate=0.1, num_train_epochs=1.5)
|
|
train_output = trainer.train()
|
|
self.assertEqual(train_output.global_step, int(1.5 * 64 / self.batch_size))
|
|
|
|
# If we pass a max_steps, num_train_epochs is ignored
|
|
trainer = get_regression_trainer(learning_rate=0.1, max_steps=10)
|
|
train_output = trainer.train()
|
|
self.assertEqual(train_output.global_step, 10)
|
|
|
|
@require_torch_bf16
|
|
@require_intel_extension_for_pytorch
|
|
def test_number_of_steps_in_training_with_ipex(self):
|
|
for mix_bf16 in [True, False]:
|
|
# Regular training has n_epochs * len(train_dl) steps
|
|
trainer = get_regression_trainer(learning_rate=0.1, use_ipex=True, bf16=mix_bf16, use_cpu=True)
|
|
train_output = trainer.train()
|
|
self.assertEqual(train_output.global_step, self.n_epochs * 64 / trainer.args.train_batch_size)
|
|
|
|
# Check passing num_train_epochs works (and a float version too):
|
|
trainer = get_regression_trainer(
|
|
learning_rate=0.1, num_train_epochs=1.5, use_ipex=True, bf16=mix_bf16, use_cpu=True
|
|
)
|
|
train_output = trainer.train()
|
|
self.assertEqual(train_output.global_step, int(1.5 * 64 / trainer.args.train_batch_size))
|
|
|
|
# If we pass a max_steps, num_train_epochs is ignored
|
|
trainer = get_regression_trainer(
|
|
learning_rate=0.1, max_steps=10, use_ipex=True, bf16=mix_bf16, use_cpu=True
|
|
)
|
|
train_output = trainer.train()
|
|
self.assertEqual(train_output.global_step, 10)
|
|
|
|
@require_peft
|
|
@require_bitsandbytes
|
|
def test_bnb_compile(self):
|
|
from peft import LoraConfig, get_peft_model
|
|
|
|
# Simply tests if initializing a Trainer with a PEFT + compiled model works out of the box
|
|
# QLoRA + torch compile is not really supported yet, but we should at least support the model
|
|
# loading and let torch throw the
|
|
tiny_model = AutoModelForCausalLM.from_pretrained(
|
|
"hf-internal-testing/tiny-random-LlamaForCausalLM", load_in_4bit=True
|
|
)
|
|
|
|
peft_config = LoraConfig(
|
|
r=8,
|
|
lora_alpha=32,
|
|
target_modules=["q_proj", "k_proj", "v_proj"],
|
|
lora_dropout=0.05,
|
|
bias="none",
|
|
task_type="CAUSAL_LM",
|
|
)
|
|
tiny_model = get_peft_model(tiny_model, peft_config)
|
|
|
|
tiny_model = torch.compile(tiny_model)
|
|
|
|
x = torch.randint(0, 100, (128,))
|
|
train_dataset = RepeatDataset(x)
|
|
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
args = TrainingArguments(
|
|
tmp_dir,
|
|
learning_rate=1e-9,
|
|
logging_steps=5,
|
|
)
|
|
with self.assertRaises(ValueError):
|
|
_ = Trainer(tiny_model, args, train_dataset=train_dataset) # noqa
|
|
|
|
@require_bitsandbytes
|
|
def test_rmsprop_bnb(self):
|
|
config = GPT2Config(vocab_size=100, n_positions=128, n_embd=32, n_layer=3, n_head=4)
|
|
tiny_gpt2 = GPT2LMHeadModel(config)
|
|
x = torch.randint(0, 100, (128,))
|
|
train_dataset = RepeatDataset(x)
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
# Trainer without inf/nan filter
|
|
args = TrainingArguments(
|
|
tmpdir, learning_rate=1e-9, logging_steps=5, logging_nan_inf_filter=False, optim="rmsprop_bnb"
|
|
)
|
|
trainer = Trainer(tiny_gpt2, args, train_dataset=train_dataset)
|
|
|
|
# Check that it trains without errors
|
|
trainer.train()
|
|
|
|
@require_bitsandbytes
|
|
def test_rmsprop_bnb_8bit(self):
|
|
config = GPT2Config(vocab_size=100, n_positions=128, n_embd=32, n_layer=3, n_head=4)
|
|
tiny_gpt2 = GPT2LMHeadModel(config)
|
|
x = torch.randint(0, 100, (128,))
|
|
train_dataset = RepeatDataset(x)
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
# Trainer without inf/nan filter
|
|
args = TrainingArguments(
|
|
tmpdir, learning_rate=1e-9, logging_steps=5, logging_nan_inf_filter=False, optim="rmsprop_bnb_8bit"
|
|
)
|
|
trainer = Trainer(tiny_gpt2, args, train_dataset=train_dataset)
|
|
|
|
# Check that it trains without errors
|
|
trainer.train()
|
|
|
|
@require_bitsandbytes
|
|
def test_rmsprop_bnb_32bit(self):
|
|
config = GPT2Config(vocab_size=100, n_positions=128, n_embd=32, n_layer=3, n_head=4)
|
|
tiny_gpt2 = GPT2LMHeadModel(config)
|
|
x = torch.randint(0, 100, (128,))
|
|
train_dataset = RepeatDataset(x)
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
# Trainer without inf/nan filter
|
|
args = TrainingArguments(
|
|
tmpdir, learning_rate=1e-9, logging_steps=5, logging_nan_inf_filter=False, optim="rmsprop_bnb_32bit"
|
|
)
|
|
trainer = Trainer(tiny_gpt2, args, train_dataset=train_dataset)
|
|
|
|
# Check that it trains without errors
|
|
trainer.train()
|
|
|
|
def test_neftune(self):
|
|
config = GPT2Config(vocab_size=100, n_positions=128, n_embd=32, n_layer=3, n_head=4)
|
|
tiny_gpt2 = GPT2LMHeadModel(config)
|
|
x = torch.randint(0, 100, (128,))
|
|
train_dataset = RepeatDataset(x)
|
|
|
|
# Trainer without inf/nan filter
|
|
args = TrainingArguments(
|
|
"./test", learning_rate=1e-9, logging_steps=5, logging_nan_inf_filter=False, neftune_noise_alpha=0.4
|
|
)
|
|
trainer = Trainer(tiny_gpt2, args, train_dataset=train_dataset)
|
|
|
|
trainer.model = trainer._activate_neftune(trainer.model)
|
|
|
|
dummy_input = torch.LongTensor([[1, 0, 1]]).to(torch_device)
|
|
|
|
emb1 = trainer.model.get_input_embeddings()(dummy_input)
|
|
emb2 = trainer.model.get_input_embeddings()(dummy_input)
|
|
|
|
self.assertFalse(torch.allclose(emb1, emb2), "Neftune noise is not applied!")
|
|
|
|
# redefine the model
|
|
tiny_gpt2 = GPT2LMHeadModel(config)
|
|
# Trainer without inf/nan filter
|
|
args = TrainingArguments(
|
|
"./test", learning_rate=1e-9, logging_steps=5, logging_nan_inf_filter=False, neftune_noise_alpha=0.4
|
|
)
|
|
trainer = Trainer(tiny_gpt2, args, train_dataset=train_dataset)
|
|
|
|
# Check that it trains without errors
|
|
trainer.train()
|
|
|
|
# Make sure forward pass works fine
|
|
_ = trainer.model(dummy_input)
|
|
self.assertTrue(len(trainer.model.get_input_embeddings()._forward_hooks) == 0)
|
|
|
|
trainer.model.eval()
|
|
|
|
# Check that we get identical embeddings just in case
|
|
emb1 = trainer.model.get_input_embeddings()(dummy_input)
|
|
emb2 = trainer.model.get_input_embeddings()(dummy_input)
|
|
|
|
self.assertTrue(torch.allclose(emb1, emb2), "Neftune noise is still applied!")
|
|
|
|
def test_logging_inf_nan_filter(self):
|
|
config = GPT2Config(vocab_size=100, n_positions=128, n_embd=32, n_layer=3, n_head=4)
|
|
tiny_gpt2 = GPT2LMHeadModel(config)
|
|
x = torch.randint(0, 100, (128,))
|
|
train_dataset = RepeatDataset(x)
|
|
|
|
# Trainer without inf/nan filter
|
|
args = TrainingArguments("./test", learning_rate=1e9, logging_steps=5, logging_nan_inf_filter=False)
|
|
trainer = Trainer(tiny_gpt2, args, train_dataset=train_dataset)
|
|
trainer.train()
|
|
log_history_no_filter = trainer.state.log_history
|
|
|
|
# Trainer with inf/nan filter
|
|
args = TrainingArguments("./test", learning_rate=1e9, logging_steps=5, logging_nan_inf_filter=True)
|
|
trainer = Trainer(tiny_gpt2, args, train_dataset=train_dataset)
|
|
trainer.train()
|
|
log_history_filter = trainer.state.log_history
|
|
|
|
def is_any_loss_nan_or_inf(log_history):
|
|
losses = [l["loss"] for l in log_history[:-1]]
|
|
return any(math.isnan(x) for x in losses) or any(math.isinf(x) for x in losses)
|
|
|
|
self.assertTrue(is_any_loss_nan_or_inf(log_history_no_filter))
|
|
self.assertFalse(is_any_loss_nan_or_inf(log_history_filter))
|
|
|
|
def test_train_and_eval_dataloaders(self):
|
|
if torch_device == "cuda":
|
|
n_gpu = max(1, backend_device_count(torch_device))
|
|
else:
|
|
n_gpu = 1
|
|
trainer = get_regression_trainer(learning_rate=0.1, per_device_train_batch_size=16)
|
|
self.assertEqual(trainer.get_train_dataloader().total_batch_size, 16 * n_gpu)
|
|
trainer = get_regression_trainer(learning_rate=0.1, per_device_eval_batch_size=16)
|
|
self.assertEqual(trainer.get_eval_dataloader().total_batch_size, 16 * n_gpu)
|
|
|
|
# Check drop_last works
|
|
trainer = get_regression_trainer(
|
|
train_len=66, eval_len=74, learning_rate=0.1, per_device_train_batch_size=16, per_device_eval_batch_size=32
|
|
)
|
|
self.assertEqual(len(trainer.get_train_dataloader()), 66 // (16 * n_gpu) + 1)
|
|
self.assertEqual(len(trainer.get_eval_dataloader()), 74 // (32 * n_gpu) + 1)
|
|
|
|
trainer = get_regression_trainer(
|
|
train_len=66,
|
|
eval_len=74,
|
|
learning_rate=0.1,
|
|
per_device_train_batch_size=16,
|
|
per_device_eval_batch_size=32,
|
|
dataloader_drop_last=True,
|
|
)
|
|
self.assertEqual(len(trainer.get_train_dataloader()), 66 // (16 * n_gpu))
|
|
self.assertEqual(len(trainer.get_eval_dataloader()), 74 // (32 * n_gpu))
|
|
|
|
# Check passing a new dataset for evaluation works
|
|
new_eval_dataset = RegressionDataset(length=128)
|
|
self.assertEqual(len(trainer.get_eval_dataloader(new_eval_dataset)), 128 // (32 * n_gpu))
|
|
|
|
# tests that we do not require dataloader to have a .dataset attribute
|
|
def test_dataloader_without_dataset(self):
|
|
train_dataset = RegressionDataset(length=128)
|
|
trainer = CustomDataloaderTrainer(
|
|
model=RegressionModel(), train_dataset=train_dataset, eval_dataset=train_dataset
|
|
)
|
|
trainer.train()
|
|
trainer.evaluate()
|
|
|
|
def test_galore_matched_modules(self):
|
|
regex_patterns = [r".*.attn.*", r".*.mlp.*"]
|
|
|
|
module_names = [
|
|
"model.transformer.h.0.ln_1",
|
|
"model.transformer.h.0.attn.q_proj",
|
|
"model.lm_head",
|
|
"model.transformer.h.0.mlp.up_proj",
|
|
]
|
|
expected_values = [False, True, False, True]
|
|
|
|
for expected_value, module_name in zip(expected_values, module_names):
|
|
is_module_matched, is_regex = check_target_module_exists(regex_patterns, module_name, return_is_regex=True)
|
|
self.assertTrue(is_module_matched == expected_value)
|
|
if is_module_matched:
|
|
self.assertTrue(is_regex)
|
|
|
|
exact_patterns = ["q_proj", "up_proj"]
|
|
|
|
module_names = [
|
|
"model.transformer.h.0.ln_1",
|
|
"model.transformer.h.0.attn.q_proj",
|
|
"model.lm_head",
|
|
"model.transformer.h.0.mlp.up_proj",
|
|
]
|
|
expected_values = [False, True, False, True]
|
|
|
|
for expected_value, module_name in zip(expected_values, module_names):
|
|
is_module_matched, is_regex = check_target_module_exists(exact_patterns, module_name, return_is_regex=True)
|
|
self.assertTrue(is_module_matched == expected_value)
|
|
if is_module_matched:
|
|
self.assertFalse(is_regex)
|
|
|
|
simple_regex = r".*.attn.*"
|
|
|
|
module_names = [
|
|
"model.transformer.h.0.ln_1",
|
|
"model.transformer.h.0.attn.q_proj",
|
|
"model.lm_head",
|
|
"model.transformer.h.0.mlp.up_proj",
|
|
]
|
|
expected_values = [False, True, False, False]
|
|
|
|
for expected_value, module_name in zip(expected_values, module_names):
|
|
is_module_matched, is_regex = check_target_module_exists(simple_regex, module_name, return_is_regex=True)
|
|
self.assertTrue(is_module_matched == expected_value)
|
|
if is_module_matched:
|
|
self.assertTrue(is_regex)
|
|
|
|
simple_regex = "model.transformer.h.0.attn.q_proj"
|
|
|
|
module_names = [
|
|
"model.transformer.h.0.ln_1",
|
|
"model.transformer.h.0.attn.q_proj",
|
|
"model.lm_head",
|
|
"model.transformer.h.0.mlp.up_proj",
|
|
]
|
|
expected_values = [False, True, False, False]
|
|
|
|
for expected_value, module_name in zip(expected_values, module_names):
|
|
is_module_matched, is_regex = check_target_module_exists(simple_regex, module_name, return_is_regex=True)
|
|
self.assertTrue(is_module_matched == expected_value)
|
|
if is_module_matched:
|
|
self.assertFalse(is_regex)
|
|
|
|
target_modules = ["attn", "mlp"]
|
|
|
|
module_names = [
|
|
"model.transformer.h.0.ln_1",
|
|
"model.transformer.h.0.attn.q_proj",
|
|
"model.lm_head",
|
|
"model.transformer.h.0.mlp.up_proj",
|
|
]
|
|
expected_values = [False, True, False, True]
|
|
|
|
for expected_value, module_name in zip(expected_values, module_names):
|
|
is_module_matched, is_regex = check_target_module_exists(target_modules, module_name, return_is_regex=True)
|
|
self.assertTrue(is_module_matched == expected_value)
|
|
if is_module_matched:
|
|
self.assertFalse(is_regex)
|
|
|
|
@require_galore_torch
|
|
@require_torch_gpu
|
|
def test_galore(self):
|
|
config = LlamaConfig(vocab_size=100, hidden_size=32, num_hidden_layers=3, num_attention_heads=4)
|
|
tiny_llama = LlamaForCausalLM(config)
|
|
x = torch.randint(0, 100, (128,))
|
|
train_dataset = RepeatDataset(x)
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
# Trainer without inf/nan filter
|
|
args = TrainingArguments(
|
|
tmpdir,
|
|
learning_rate=1e-9,
|
|
logging_steps=5,
|
|
optim="galore_adamw",
|
|
optim_target_modules=[r".*attn.*", r".*mlp.*"],
|
|
)
|
|
trainer = Trainer(tiny_llama, args, train_dataset=train_dataset)
|
|
|
|
# Check this works
|
|
_ = trainer.train()
|
|
|
|
@require_galore_torch
|
|
@require_torch_gpu
|
|
def test_galore_extra_args(self):
|
|
config = LlamaConfig(vocab_size=100, hidden_size=32, num_hidden_layers=3, num_attention_heads=4)
|
|
tiny_llama = LlamaForCausalLM(config)
|
|
x = torch.randint(0, 100, (128,))
|
|
train_dataset = RepeatDataset(x)
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
# Trainer without inf/nan filter
|
|
args = TrainingArguments(
|
|
tmpdir,
|
|
learning_rate=1e-9,
|
|
logging_steps=5,
|
|
optim="galore_adamw",
|
|
optim_args="rank=64, update_proj_gap=100, scale=0.10",
|
|
optim_target_modules=[r".*attn.*", r".*mlp.*"],
|
|
)
|
|
trainer = Trainer(tiny_llama, args, train_dataset=train_dataset)
|
|
|
|
# Check this works
|
|
_ = trainer.train()
|
|
|
|
@require_galore_torch
|
|
@require_torch_gpu
|
|
def test_galore_layerwise(self):
|
|
config = LlamaConfig(vocab_size=100, hidden_size=32, num_hidden_layers=3, num_attention_heads=4)
|
|
tiny_llama = LlamaForCausalLM(config)
|
|
x = torch.randint(0, 100, (128,))
|
|
train_dataset = RepeatDataset(x)
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
# Trainer without inf/nan filter
|
|
args = TrainingArguments(
|
|
tmpdir,
|
|
learning_rate=1e-9,
|
|
logging_steps=5,
|
|
optim="galore_adamw_layerwise",
|
|
optim_target_modules=[r".*attn.*", r".*mlp.*"],
|
|
)
|
|
trainer = Trainer(tiny_llama, args, train_dataset=train_dataset)
|
|
|
|
# Check this works
|
|
_ = trainer.train()
|
|
|
|
@require_galore_torch
|
|
@require_torch_gpu
|
|
def test_galore_layerwise_with_scheduler(self):
|
|
config = LlamaConfig(vocab_size=100, hidden_size=32, num_hidden_layers=3, num_attention_heads=4)
|
|
tiny_llama = LlamaForCausalLM(config)
|
|
x = torch.randint(0, 100, (128,))
|
|
train_dataset = RepeatDataset(x)
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
# Trainer without inf/nan filter
|
|
args = TrainingArguments(
|
|
tmpdir,
|
|
learning_rate=1e-9,
|
|
logging_steps=5,
|
|
optim="galore_adamw_layerwise",
|
|
lr_scheduler_type="cosine",
|
|
optim_target_modules=[r".*attn.*", r".*mlp.*"],
|
|
)
|
|
trainer = Trainer(tiny_llama, args, train_dataset=train_dataset)
|
|
|
|
# Check this works
|
|
_ = trainer.train()
|
|
|
|
@require_galore_torch
|
|
@require_torch_gpu
|
|
def test_galore_adamw_8bit(self):
|
|
config = LlamaConfig(vocab_size=100, hidden_size=32, num_hidden_layers=3, num_attention_heads=4)
|
|
tiny_llama = LlamaForCausalLM(config)
|
|
x = torch.randint(0, 100, (128,))
|
|
train_dataset = RepeatDataset(x)
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
# Trainer without inf/nan filter
|
|
args = TrainingArguments(
|
|
tmpdir,
|
|
learning_rate=1e-9,
|
|
logging_steps=5,
|
|
optim="galore_adamw_8bit",
|
|
optim_target_modules=[r".*attn.*", r".*mlp.*"],
|
|
)
|
|
trainer = Trainer(tiny_llama, args, train_dataset=train_dataset)
|
|
|
|
# Check this works
|
|
_ = trainer.train()
|
|
|
|
@require_galore_torch
|
|
@require_torch_gpu
|
|
def test_galore_adafactor(self):
|
|
# These are the intervals of the peak memory usage of training such a tiny model
|
|
# if the peak memory goes outside that range, then we know there might be a bug somewhere
|
|
upper_bound_pm = 700
|
|
lower_bound_pm = 650
|
|
|
|
config = LlamaConfig(vocab_size=100, hidden_size=32, num_hidden_layers=3, num_attention_heads=4)
|
|
tiny_llama = LlamaForCausalLM(config)
|
|
x = torch.randint(0, 100, (128,))
|
|
train_dataset = RepeatDataset(x)
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir, TorchTracemalloc() as tracemalloc:
|
|
# Trainer without inf/nan filter
|
|
args = TrainingArguments(
|
|
tmpdir,
|
|
learning_rate=1e-9,
|
|
logging_steps=5,
|
|
optim="galore_adafactor",
|
|
optim_target_modules=[r".*attn.*", r".*mlp.*"],
|
|
)
|
|
trainer = Trainer(tiny_llama, args, train_dataset=train_dataset)
|
|
|
|
# Check this works
|
|
_ = trainer.train()
|
|
|
|
galore_peak_memory = tracemalloc.peaked + bytes2megabytes(tracemalloc.begin)
|
|
|
|
self.assertTrue(galore_peak_memory < upper_bound_pm)
|
|
self.assertTrue(lower_bound_pm < galore_peak_memory)
|
|
|
|
@require_galore_torch
|
|
@require_torch_gpu
|
|
def test_galore_adafactor_attention_only(self):
|
|
# These are the intervals of the peak memory usage of training such a tiny model
|
|
# if the peak memory goes outside that range, then we know there might be a bug somewhere
|
|
upper_bound_pm = 700
|
|
lower_bound_pm = 650
|
|
|
|
config = LlamaConfig(vocab_size=100, hidden_size=32, num_hidden_layers=3, num_attention_heads=4)
|
|
tiny_llama = LlamaForCausalLM(config)
|
|
x = torch.randint(0, 100, (128,))
|
|
train_dataset = RepeatDataset(x)
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir, TorchTracemalloc() as tracemalloc:
|
|
# Trainer without inf/nan filter
|
|
args = TrainingArguments(
|
|
tmpdir,
|
|
learning_rate=1e-9,
|
|
logging_steps=5,
|
|
optim="galore_adafactor",
|
|
optim_target_modules=["q_proj", "k_proj", "v_proj"],
|
|
)
|
|
trainer = Trainer(tiny_llama, args, train_dataset=train_dataset)
|
|
|
|
# Check this works
|
|
_ = trainer.train()
|
|
|
|
galore_peak_memory = tracemalloc.peaked + bytes2megabytes(tracemalloc.begin)
|
|
self.assertTrue(galore_peak_memory < upper_bound_pm)
|
|
self.assertTrue(lower_bound_pm < galore_peak_memory)
|
|
|
|
@require_galore_torch
|
|
@require_torch_gpu
|
|
def test_galore_adafactor_all_linear(self):
|
|
# These are the intervals of the peak memory usage of training such a tiny model
|
|
# if the peak memory goes outside that range, then we know there might be a bug somewhere
|
|
upper_bound_pm = 700
|
|
lower_bound_pm = 650
|
|
|
|
config = LlamaConfig(vocab_size=100, hidden_size=32, num_hidden_layers=3, num_attention_heads=4)
|
|
tiny_llama = LlamaForCausalLM(config)
|
|
x = torch.randint(0, 100, (128,))
|
|
train_dataset = RepeatDataset(x)
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir, TorchTracemalloc() as tracemalloc:
|
|
# Trainer without inf/nan filter
|
|
args = TrainingArguments(
|
|
tmpdir,
|
|
learning_rate=1e-9,
|
|
logging_steps=5,
|
|
optim="galore_adafactor",
|
|
optim_target_modules="all-linear",
|
|
)
|
|
trainer = Trainer(tiny_llama, args, train_dataset=train_dataset)
|
|
|
|
# Check this works
|
|
_ = trainer.train()
|
|
|
|
galore_peak_memory = tracemalloc.peaked + bytes2megabytes(tracemalloc.begin)
|
|
self.assertTrue(galore_peak_memory < upper_bound_pm)
|
|
self.assertTrue(lower_bound_pm < galore_peak_memory)
|
|
|
|
@require_torch_multi_accelerator
|
|
def test_data_is_not_parallelized_when_model_is_parallel(self):
|
|
model = RegressionModel()
|
|
# Make the Trainer believe it's a parallelized model
|
|
model.is_parallelizable = True
|
|
model.model_parallel = True
|
|
args = TrainingArguments("./regression", per_device_train_batch_size=16, per_device_eval_batch_size=16)
|
|
trainer = Trainer(model, args, train_dataset=RegressionDataset(), eval_dataset=RegressionDataset())
|
|
# Check the Trainer was fooled
|
|
self.assertTrue(trainer.is_model_parallel)
|
|
self.assertEqual(trainer.args.n_gpu, 1)
|
|
|
|
# The batch size of the training and evaluation dataloaders should be 16, not 16 * n_gpu
|
|
self.assertEqual(trainer.get_train_dataloader().total_batch_size, 16)
|
|
self.assertEqual(len(trainer.get_train_dataloader()), 64 // 16)
|
|
self.assertEqual(trainer.get_eval_dataloader().total_batch_size, 16)
|
|
self.assertEqual(len(trainer.get_eval_dataloader()), 64 // 16)
|
|
|
|
def test_evaluate(self):
|
|
trainer = get_regression_trainer(a=1.5, b=2.5, compute_metrics=AlmostAccuracy())
|
|
results = trainer.evaluate()
|
|
|
|
x, y = trainer.eval_dataset.x, trainer.eval_dataset.ys[0]
|
|
pred = 1.5 * x + 2.5
|
|
expected_loss = ((pred - y) ** 2).mean()
|
|
self.assertAlmostEqual(results["eval_loss"], expected_loss)
|
|
expected_acc = AlmostAccuracy()((pred, y))["accuracy"]
|
|
self.assertAlmostEqual(results["eval_accuracy"], expected_acc)
|
|
|
|
# With a number of elements not a round multiple of the batch size
|
|
trainer = get_regression_trainer(a=1.5, b=2.5, eval_len=66, compute_metrics=AlmostAccuracy())
|
|
results = trainer.evaluate()
|
|
|
|
x, y = trainer.eval_dataset.x, trainer.eval_dataset.ys[0]
|
|
pred = 1.5 * x + 2.5
|
|
expected_loss = ((pred - y) ** 2).mean()
|
|
self.assertAlmostEqual(results["eval_loss"], expected_loss)
|
|
expected_acc = AlmostAccuracy()((pred, y))["accuracy"]
|
|
self.assertAlmostEqual(results["eval_accuracy"], expected_acc)
|
|
|
|
# With logits preprocess
|
|
trainer = get_regression_trainer(
|
|
a=1.5,
|
|
b=2.5,
|
|
compute_metrics=AlmostAccuracy(),
|
|
preprocess_logits_for_metrics=lambda logits, labels: logits + 1,
|
|
)
|
|
results = trainer.evaluate()
|
|
|
|
x, y = trainer.eval_dataset.x, trainer.eval_dataset.ys[0]
|
|
pred = 1.5 * x + 2.5
|
|
expected_loss = ((pred - y) ** 2).mean()
|
|
self.assertAlmostEqual(results["eval_loss"], expected_loss)
|
|
expected_acc = AlmostAccuracy()((pred + 1, y))["accuracy"]
|
|
self.assertAlmostEqual(results["eval_accuracy"], expected_acc)
|
|
|
|
def test_evaluate_with_jit(self):
|
|
trainer = get_regression_trainer(a=1.5, b=2.5, compute_metrics=AlmostAccuracy(), jit_mode_eval=True)
|
|
results = trainer.evaluate()
|
|
|
|
x, y = trainer.eval_dataset.x, trainer.eval_dataset.ys[0]
|
|
pred = 1.5 * x + 2.5
|
|
expected_loss = ((pred - y) ** 2).mean()
|
|
self.assertAlmostEqual(results["eval_loss"], expected_loss)
|
|
expected_acc = AlmostAccuracy()((pred, y))["accuracy"]
|
|
self.assertAlmostEqual(results["eval_accuracy"], expected_acc)
|
|
|
|
# With a number of elements not a round multiple of the batch size
|
|
trainer = get_regression_trainer(
|
|
a=1.5, b=2.5, eval_len=66, compute_metrics=AlmostAccuracy(), jit_mode_eval=True
|
|
)
|
|
results = trainer.evaluate()
|
|
|
|
x, y = trainer.eval_dataset.x, trainer.eval_dataset.ys[0]
|
|
pred = 1.5 * x + 2.5
|
|
expected_loss = ((pred - y) ** 2).mean()
|
|
self.assertAlmostEqual(results["eval_loss"], expected_loss)
|
|
expected_acc = AlmostAccuracy()((pred, y))["accuracy"]
|
|
self.assertAlmostEqual(results["eval_accuracy"], expected_acc)
|
|
|
|
# With logits preprocess
|
|
trainer = get_regression_trainer(
|
|
a=1.5,
|
|
b=2.5,
|
|
compute_metrics=AlmostAccuracy(),
|
|
preprocess_logits_for_metrics=lambda logits, labels: logits + 1,
|
|
jit_mode_eval=True,
|
|
)
|
|
results = trainer.evaluate()
|
|
|
|
x, y = trainer.eval_dataset.x, trainer.eval_dataset.ys[0]
|
|
pred = 1.5 * x + 2.5
|
|
expected_loss = ((pred - y) ** 2).mean()
|
|
self.assertAlmostEqual(results["eval_loss"], expected_loss)
|
|
expected_acc = AlmostAccuracy()((pred + 1, y))["accuracy"]
|
|
self.assertAlmostEqual(results["eval_accuracy"], expected_acc)
|
|
|
|
@require_torch_bf16
|
|
@require_intel_extension_for_pytorch
|
|
def test_evaluate_with_ipex(self):
|
|
for mix_bf16 in [True, False]:
|
|
trainer = get_regression_trainer(
|
|
a=1.5, b=2.5, use_ipex=True, compute_metrics=AlmostAccuracy(), bf16=mix_bf16, use_cpu=True
|
|
)
|
|
results = trainer.evaluate()
|
|
|
|
x, y = trainer.eval_dataset.x, trainer.eval_dataset.ys[0]
|
|
pred = 1.5 * x + 2.5
|
|
expected_loss = ((pred - y) ** 2).mean()
|
|
self.assertAlmostEqual(results["eval_loss"], expected_loss)
|
|
expected_acc = AlmostAccuracy()((pred, y))["accuracy"]
|
|
self.assertAlmostEqual(results["eval_accuracy"], expected_acc)
|
|
|
|
# With a number of elements not a round multiple of the batch size
|
|
trainer = get_regression_trainer(
|
|
a=1.5,
|
|
b=2.5,
|
|
use_ipex=True,
|
|
eval_len=66,
|
|
compute_metrics=AlmostAccuracy(),
|
|
bf16=mix_bf16,
|
|
use_cpu=True,
|
|
)
|
|
results = trainer.evaluate()
|
|
|
|
x, y = trainer.eval_dataset.x, trainer.eval_dataset.ys[0]
|
|
pred = 1.5 * x + 2.5
|
|
expected_loss = ((pred - y) ** 2).mean()
|
|
self.assertAlmostEqual(results["eval_loss"], expected_loss)
|
|
expected_acc = AlmostAccuracy()((pred, y))["accuracy"]
|
|
self.assertAlmostEqual(results["eval_accuracy"], expected_acc)
|
|
|
|
# With logits preprocess
|
|
trainer = get_regression_trainer(
|
|
a=1.5,
|
|
b=2.5,
|
|
use_ipex=True,
|
|
compute_metrics=AlmostAccuracy(),
|
|
preprocess_logits_for_metrics=lambda logits, labels: logits + 1,
|
|
bf16=mix_bf16,
|
|
use_cpu=True,
|
|
)
|
|
results = trainer.evaluate()
|
|
|
|
x, y = trainer.eval_dataset.x, trainer.eval_dataset.ys[0]
|
|
pred = 1.5 * x + 2.5
|
|
expected_loss = ((pred - y) ** 2).mean()
|
|
self.assertAlmostEqual(results["eval_loss"], expected_loss)
|
|
expected_acc = AlmostAccuracy()((pred + 1, y))["accuracy"]
|
|
self.assertAlmostEqual(results["eval_accuracy"], expected_acc)
|
|
|
|
def test_predict(self):
|
|
trainer = get_regression_trainer(a=1.5, b=2.5)
|
|
preds = trainer.predict(trainer.eval_dataset).predictions
|
|
x = trainer.eval_dataset.x
|
|
self.assertTrue(np.allclose(preds, 1.5 * x + 2.5))
|
|
|
|
# With a number of elements not a round multiple of the batch size
|
|
trainer = get_regression_trainer(a=1.5, b=2.5, eval_len=66)
|
|
preds = trainer.predict(trainer.eval_dataset).predictions
|
|
x = trainer.eval_dataset.x
|
|
self.assertTrue(np.allclose(preds, 1.5 * x + 2.5))
|
|
|
|
# With more than one output of the model
|
|
trainer = get_regression_trainer(a=1.5, b=2.5, double_output=True)
|
|
preds = trainer.predict(trainer.eval_dataset).predictions
|
|
x = trainer.eval_dataset.x
|
|
self.assertEqual(len(preds), 2)
|
|
self.assertTrue(np.allclose(preds[0], 1.5 * x + 2.5))
|
|
self.assertTrue(np.allclose(preds[1], 1.5 * x + 2.5))
|
|
|
|
# With more than one output/label of the model
|
|
trainer = get_regression_trainer(a=1.5, b=2.5, double_output=True, label_names=["labels", "labels_2"])
|
|
outputs = trainer.predict(trainer.eval_dataset)
|
|
preds = outputs.predictions
|
|
labels = outputs.label_ids
|
|
x = trainer.eval_dataset.x
|
|
self.assertEqual(len(preds), 2)
|
|
self.assertTrue(np.allclose(preds[0], 1.5 * x + 2.5))
|
|
self.assertTrue(np.allclose(preds[1], 1.5 * x + 2.5))
|
|
self.assertTrue(np.array_equal(labels[0], trainer.eval_dataset.ys[0]))
|
|
self.assertTrue(np.array_equal(labels[1], trainer.eval_dataset.ys[1]))
|
|
|
|
def test_predict_with_jit(self):
|
|
trainer = get_regression_trainer(a=1.5, b=2.5, jit_mode_eval=True)
|
|
preds = trainer.predict(trainer.eval_dataset).predictions
|
|
x = trainer.eval_dataset.x
|
|
self.assertTrue(np.allclose(preds, 1.5 * x + 2.5))
|
|
|
|
# With a number of elements not a round multiple of the batch size
|
|
trainer = get_regression_trainer(a=1.5, b=2.5, eval_len=66, jit_mode_eval=True)
|
|
preds = trainer.predict(trainer.eval_dataset).predictions
|
|
x = trainer.eval_dataset.x
|
|
self.assertTrue(np.allclose(preds, 1.5 * x + 2.5))
|
|
|
|
# With more than one output of the model
|
|
trainer = get_regression_trainer(a=1.5, b=2.5, double_output=True, jit_mode_eval=True)
|
|
preds = trainer.predict(trainer.eval_dataset).predictions
|
|
x = trainer.eval_dataset.x
|
|
self.assertEqual(len(preds), 2)
|
|
self.assertTrue(np.allclose(preds[0], 1.5 * x + 2.5))
|
|
self.assertTrue(np.allclose(preds[1], 1.5 * x + 2.5))
|
|
|
|
# With more than one output/label of the model
|
|
trainer = get_regression_trainer(
|
|
a=1.5, b=2.5, double_output=True, label_names=["labels", "labels_2"], jit_mode_eval=True
|
|
)
|
|
outputs = trainer.predict(trainer.eval_dataset)
|
|
preds = outputs.predictions
|
|
labels = outputs.label_ids
|
|
x = trainer.eval_dataset.x
|
|
self.assertEqual(len(preds), 2)
|
|
self.assertTrue(np.allclose(preds[0], 1.5 * x + 2.5))
|
|
self.assertTrue(np.allclose(preds[1], 1.5 * x + 2.5))
|
|
self.assertTrue(np.array_equal(labels[0], trainer.eval_dataset.ys[0]))
|
|
self.assertTrue(np.array_equal(labels[1], trainer.eval_dataset.ys[1]))
|
|
|
|
@require_torch_bf16
|
|
@require_intel_extension_for_pytorch
|
|
def test_predict_with_ipex(self):
|
|
for mix_bf16 in [True, False]:
|
|
trainer = get_regression_trainer(a=1.5, b=2.5, use_ipex=True, bf16=mix_bf16, use_cpu=True)
|
|
preds = trainer.predict(trainer.eval_dataset).predictions
|
|
x = trainer.eval_dataset.x
|
|
self.assertTrue(np.allclose(preds, 1.5 * x + 2.5))
|
|
|
|
# With a number of elements not a round multiple of the batch size
|
|
trainer = get_regression_trainer(a=1.5, b=2.5, eval_len=66, use_ipex=True, bf16=mix_bf16, use_cpu=True)
|
|
preds = trainer.predict(trainer.eval_dataset).predictions
|
|
x = trainer.eval_dataset.x
|
|
self.assertTrue(np.allclose(preds, 1.5 * x + 2.5))
|
|
|
|
# With more than one output of the model
|
|
trainer = get_regression_trainer(
|
|
a=1.5, b=2.5, double_output=True, use_ipex=True, bf16=mix_bf16, use_cpu=True
|
|
)
|
|
preds = trainer.predict(trainer.eval_dataset).predictions
|
|
x = trainer.eval_dataset.x
|
|
self.assertEqual(len(preds), 2)
|
|
self.assertTrue(np.allclose(preds[0], 1.5 * x + 2.5))
|
|
self.assertTrue(np.allclose(preds[1], 1.5 * x + 2.5))
|
|
|
|
# With more than one output/label of the model
|
|
trainer = get_regression_trainer(
|
|
a=1.5,
|
|
b=2.5,
|
|
double_output=True,
|
|
label_names=["labels", "labels_2"],
|
|
use_ipex=True,
|
|
bf16=mix_bf16,
|
|
use_cpu=True,
|
|
)
|
|
outputs = trainer.predict(trainer.eval_dataset)
|
|
preds = outputs.predictions
|
|
labels = outputs.label_ids
|
|
x = trainer.eval_dataset.x
|
|
self.assertEqual(len(preds), 2)
|
|
self.assertTrue(np.allclose(preds[0], 1.5 * x + 2.5))
|
|
self.assertTrue(np.allclose(preds[1], 1.5 * x + 2.5))
|
|
self.assertTrue(np.array_equal(labels[0], trainer.eval_dataset.ys[0]))
|
|
self.assertTrue(np.array_equal(labels[1], trainer.eval_dataset.ys[1]))
|
|
|
|
def test_dynamic_shapes(self):
|
|
eval_dataset = DynamicShapesDataset(batch_size=self.batch_size)
|
|
model = RegressionModel(a=2, b=1)
|
|
args = TrainingArguments("./regression")
|
|
trainer = Trainer(model, args, eval_dataset=eval_dataset)
|
|
|
|
# Check evaluation can run to completion
|
|
_ = trainer.evaluate()
|
|
|
|
# Check predictions
|
|
preds = trainer.predict(eval_dataset)
|
|
for expected, seen in zip(eval_dataset.ys, preds.label_ids):
|
|
self.assertTrue(np.array_equal(expected, seen[: expected.shape[0]]))
|
|
self.assertTrue(np.all(seen[expected.shape[0] :] == -100))
|
|
|
|
for expected, seen in zip(eval_dataset.xs, preds.predictions):
|
|
self.assertTrue(np.array_equal(2 * expected + 1, seen[: expected.shape[0]]))
|
|
self.assertTrue(np.all(seen[expected.shape[0] :] == -100))
|
|
|
|
# Same tests with eval accumulation
|
|
args = TrainingArguments("./regression", eval_accumulation_steps=2)
|
|
trainer = Trainer(model, args, eval_dataset=eval_dataset)
|
|
|
|
# Check evaluation can run to completion
|
|
_ = trainer.evaluate()
|
|
|
|
# Check predictions
|
|
preds = trainer.predict(eval_dataset)
|
|
for expected, seen in zip(eval_dataset.ys, preds.label_ids):
|
|
self.assertTrue(np.array_equal(expected, seen[: expected.shape[0]]))
|
|
self.assertTrue(np.all(seen[expected.shape[0] :] == -100))
|
|
|
|
for expected, seen in zip(eval_dataset.xs, preds.predictions):
|
|
self.assertTrue(np.array_equal(2 * expected + 1, seen[: expected.shape[0]]))
|
|
self.assertTrue(np.all(seen[expected.shape[0] :] == -100))
|
|
|
|
def test_log_level(self):
|
|
# testing only --log_level (--log_level_replica requires multiple gpus and DDP and is tested elsewhere)
|
|
logger = logging.get_logger()
|
|
log_info_string = "Running training"
|
|
|
|
# test with the default log_level - should be the same as before and thus we test depending on is_info
|
|
is_info = logging.get_verbosity() <= 20
|
|
with CaptureLogger(logger) as cl:
|
|
trainer = get_regression_trainer()
|
|
trainer.train()
|
|
if is_info:
|
|
self.assertIn(log_info_string, cl.out)
|
|
else:
|
|
self.assertNotIn(log_info_string, cl.out)
|
|
|
|
with LoggingLevel(logging.INFO):
|
|
# test with low log_level - lower than info
|
|
with CaptureLogger(logger) as cl:
|
|
trainer = get_regression_trainer(log_level="debug")
|
|
trainer.train()
|
|
self.assertIn(log_info_string, cl.out)
|
|
|
|
with LoggingLevel(logging.INFO):
|
|
# test with high log_level - should be quiet
|
|
with CaptureLogger(logger) as cl:
|
|
trainer = get_regression_trainer(log_level="error")
|
|
trainer.train()
|
|
self.assertNotIn(log_info_string, cl.out)
|
|
|
|
def test_save_checkpoints(self):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
trainer = get_regression_trainer(output_dir=tmpdir, save_steps=5)
|
|
trainer.train()
|
|
self.check_saved_checkpoints(tmpdir, 5, int(self.n_epochs * 64 / self.batch_size))
|
|
|
|
# With a regular model that is not a PreTrainedModel
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
trainer = get_regression_trainer(output_dir=tmpdir, save_steps=5, pretrained=False)
|
|
trainer.train()
|
|
self.check_saved_checkpoints(tmpdir, 5, int(self.n_epochs * 64 / self.batch_size), False)
|
|
|
|
@require_safetensors
|
|
def test_safe_checkpoints(self):
|
|
for save_safetensors in [True, False]:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
trainer = get_regression_trainer(output_dir=tmpdir, save_steps=5, save_safetensors=save_safetensors)
|
|
trainer.train()
|
|
self.check_saved_checkpoints(
|
|
tmpdir, 5, int(self.n_epochs * 64 / self.batch_size), safe_weights=save_safetensors
|
|
)
|
|
|
|
# With a regular model that is not a PreTrainedModel
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
trainer = get_regression_trainer(
|
|
output_dir=tmpdir, save_steps=5, pretrained=False, save_safetensors=save_safetensors
|
|
)
|
|
trainer.train()
|
|
self.check_saved_checkpoints(
|
|
tmpdir, 5, int(self.n_epochs * 64 / self.batch_size), False, safe_weights=save_safetensors
|
|
)
|
|
|
|
@require_torch_multi_accelerator
|
|
def test_run_seq2seq_double_train_wrap_once(self):
|
|
# test that we don't wrap the model more than once
|
|
# since wrapping primarily happens on multi-gpu setup we want multiple gpus to test for
|
|
# example DataParallel(DataParallel(model))
|
|
|
|
trainer = get_regression_trainer()
|
|
trainer.train()
|
|
model_wrapped_before = trainer.model_wrapped
|
|
trainer.train()
|
|
model_wrapped_after = trainer.model_wrapped
|
|
self.assertIs(model_wrapped_before, model_wrapped_after, "should be not wrapped twice")
|
|
|
|
@require_torch_up_to_2_accelerators
|
|
def test_can_resume_training(self):
|
|
# This test will fail for more than 2 GPUs since the batch size will get bigger and with the number of
|
|
# save_steps, the checkpoint will resume training at epoch 2 or more (so the data seen by the model
|
|
# won't be the same since the training dataloader is shuffled).
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
kwargs = {
|
|
"output_dir": tmpdir,
|
|
"train_len": 128,
|
|
"save_steps": 5,
|
|
"learning_rate": 0.1,
|
|
"logging_steps": 5,
|
|
}
|
|
trainer = get_regression_trainer(**kwargs)
|
|
trainer.train()
|
|
(a, b) = trainer.model.a.item(), trainer.model.b.item()
|
|
state = dataclasses.asdict(trainer.state)
|
|
|
|
checkpoint = os.path.join(tmpdir, "checkpoint-5")
|
|
|
|
# Reinitialize trainer
|
|
trainer = get_regression_trainer(**kwargs)
|
|
|
|
trainer.train(resume_from_checkpoint=checkpoint)
|
|
(a1, b1) = trainer.model.a.item(), trainer.model.b.item()
|
|
state1 = dataclasses.asdict(trainer.state)
|
|
self.assertEqual(a, a1)
|
|
self.assertEqual(b, b1)
|
|
self.check_trainer_state_are_the_same(state, state1)
|
|
|
|
# Now check with a later checkpoint that it also works when we span over one epoch
|
|
checkpoint = os.path.join(tmpdir, "checkpoint-15")
|
|
|
|
# Reinitialize trainer and load model
|
|
trainer = get_regression_trainer(**kwargs)
|
|
|
|
trainer.train(resume_from_checkpoint=checkpoint)
|
|
(a1, b1) = trainer.model.a.item(), trainer.model.b.item()
|
|
state1 = dataclasses.asdict(trainer.state)
|
|
self.assertEqual(a, a1)
|
|
self.assertEqual(b, b1)
|
|
self.check_trainer_state_are_the_same(state, state1)
|
|
|
|
# With a regular model that is not a PreTrainedModel
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
kwargs = {
|
|
"output_dir": tmpdir,
|
|
"train_len": 128,
|
|
"save_steps": 5,
|
|
"learning_rate": 0.1,
|
|
"pretrained": False,
|
|
}
|
|
|
|
trainer = get_regression_trainer(**kwargs)
|
|
trainer.train()
|
|
(a, b) = trainer.model.a.item(), trainer.model.b.item()
|
|
state = dataclasses.asdict(trainer.state)
|
|
|
|
checkpoint = os.path.join(tmpdir, "checkpoint-5")
|
|
|
|
# Reinitialize trainer and load model
|
|
trainer = get_regression_trainer(**kwargs)
|
|
|
|
trainer.train(resume_from_checkpoint=checkpoint)
|
|
(a1, b1) = trainer.model.a.item(), trainer.model.b.item()
|
|
state1 = dataclasses.asdict(trainer.state)
|
|
self.assertEqual(a, a1)
|
|
self.assertEqual(b, b1)
|
|
self.check_trainer_state_are_the_same(state, state1)
|
|
|
|
# Now check with a later checkpoint that it also works when we span over one epoch
|
|
checkpoint = os.path.join(tmpdir, "checkpoint-15")
|
|
|
|
# Reinitialize trainer and load model
|
|
trainer = get_regression_trainer(**kwargs)
|
|
|
|
trainer.train(resume_from_checkpoint=checkpoint)
|
|
(a1, b1) = trainer.model.a.item(), trainer.model.b.item()
|
|
state1 = dataclasses.asdict(trainer.state)
|
|
self.assertEqual(a, a1)
|
|
self.assertEqual(b, b1)
|
|
self.check_trainer_state_are_the_same(state, state1)
|
|
|
|
# Now check failures
|
|
|
|
# 1. fail to find a bogus checkpoint
|
|
trainer = get_regression_trainer()
|
|
with self.assertRaises(Exception) as context:
|
|
trainer.train(resume_from_checkpoint=f"{checkpoint}-bogus")
|
|
self.assertTrue("Can't find a valid checkpoint at" in str(context.exception))
|
|
|
|
# 2. fail to find any checkpoint - due a fresh output_dir
|
|
output_dir2 = self.get_auto_remove_tmp_dir()
|
|
trainer = get_regression_trainer(output_dir=output_dir2)
|
|
with self.assertRaises(Exception) as context:
|
|
trainer.train(resume_from_checkpoint=True)
|
|
self.assertTrue("No valid checkpoint found in output directory" in str(context.exception))
|
|
|
|
@unittest.skip(
|
|
reason="@muellerzr: Fix once Trainer can take an accelerate configuration. Need to set `seedable_sampler=True`."
|
|
)
|
|
def test_resume_training_with_randomness(self):
|
|
# For more than 1 GPUs, since the randomness is introduced in the model and with DataParallel (which is used
|
|
# in this test for more than 2 GPUs), the calls to the torch RNG will happen in a random order (sometimes
|
|
# GPU 0 will call first and sometimes GPU 1).
|
|
random_torch = not torch.cuda.is_available() or torch.cuda.device_count() <= 1
|
|
|
|
if torch.cuda.is_available():
|
|
torch.backends.cudnn.deterministic = True
|
|
train_dataset = RegressionDataset(length=128)
|
|
eval_dataset = RegressionDataset()
|
|
|
|
with self.subTest("Test every step"):
|
|
config = RegressionModelConfig(a=0, b=2, random_torch=random_torch)
|
|
model = RegressionRandomPreTrainedModel(config)
|
|
|
|
tmp_dir = self.get_auto_remove_tmp_dir()
|
|
args = RegressionTrainingArguments(tmp_dir, save_steps=5, learning_rate=0.1)
|
|
trainer = Trainer(model, args, train_dataset=train_dataset, eval_dataset=eval_dataset)
|
|
|
|
trainer.train()
|
|
(a, b) = trainer.model.a.item(), trainer.model.b.item()
|
|
|
|
model = RegressionRandomPreTrainedModel(config)
|
|
trainer = Trainer(model, args, train_dataset=train_dataset, eval_dataset=eval_dataset)
|
|
trainer.train(resume_from_checkpoint=os.path.join(tmp_dir, "checkpoint-15"))
|
|
(a1, b1) = trainer.model.a.item(), trainer.model.b.item()
|
|
|
|
self.assertAlmostEqual(a, a1, delta=1e-5)
|
|
self.assertAlmostEqual(b, b1, delta=1e-5)
|
|
|
|
with self.subTest("Test every epoch"):
|
|
config = RegressionModelConfig(a=0, b=2, random_torch=random_torch)
|
|
model = RegressionRandomPreTrainedModel(config)
|
|
|
|
tmp_dir = self.get_auto_remove_tmp_dir()
|
|
args = RegressionTrainingArguments(tmp_dir, save_strategy="epoch", learning_rate=0.1)
|
|
trainer = Trainer(model, args, train_dataset=train_dataset, eval_dataset=eval_dataset)
|
|
|
|
trainer.train()
|
|
(a, b) = trainer.model.a.item(), trainer.model.b.item()
|
|
|
|
model = RegressionRandomPreTrainedModel(config)
|
|
trainer = Trainer(model, args, train_dataset=train_dataset, eval_dataset=eval_dataset)
|
|
|
|
checkpoints = [d for d in os.listdir(tmp_dir) if d.startswith("checkpoint-")]
|
|
# There should be one checkpoint per epoch.
|
|
self.assertEqual(len(checkpoints), 3)
|
|
checkpoint_dir = sorted(checkpoints, key=lambda x: int(x.replace("checkpoint-", "")))[0]
|
|
|
|
trainer.train(resume_from_checkpoint=os.path.join(tmp_dir, checkpoint_dir))
|
|
(a1, b1) = trainer.model.a.item(), trainer.model.b.item()
|
|
|
|
self.assertAlmostEqual(a, a1, delta=1e-5)
|
|
self.assertAlmostEqual(b, b1, delta=1e-5)
|
|
|
|
@slow
|
|
@require_accelerate
|
|
@require_torch_non_multi_accelerator
|
|
def test_auto_batch_size_finder(self):
|
|
if torch.cuda.is_available():
|
|
torch.backends.cudnn.deterministic = True
|
|
|
|
SRC_DIR = os.path.abspath(
|
|
os.path.join(os.path.dirname(__file__), "..", "..", "examples", "pytorch", "text-classification")
|
|
)
|
|
sys.path.append(SRC_DIR)
|
|
import run_glue
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
testargs = f"""
|
|
run_glue.py
|
|
--model_name_or_path distilbert/distilbert-base-uncased
|
|
--task_name mrpc
|
|
--do_train
|
|
--do_eval
|
|
--max_seq_len 128
|
|
--per_device_train_batch_size 4096
|
|
--learning_rate 2e-5
|
|
--num_train_epochs 1
|
|
--output_dir {tmpdir}
|
|
--auto_find_batch_size 0
|
|
""".split()
|
|
with self.assertRaises(RuntimeError):
|
|
with patch.object(sys, "argv", testargs):
|
|
run_glue.main()
|
|
|
|
testargs[-1] = "1"
|
|
with patch.object(sys, "argv", testargs):
|
|
run_glue.main()
|
|
|
|
@require_deepspeed
|
|
def test_auto_batch_size_with_resume_from_checkpoint_with_deepspeed(self):
|
|
train_dataset = RegressionDataset(length=128)
|
|
|
|
config = RegressionModelConfig(a=0, b=2)
|
|
model = RegressionRandomPreTrainedModel(config)
|
|
|
|
tmp_dir = self.get_auto_remove_tmp_dir()
|
|
|
|
class MockCudaOOMCallback(TrainerCallback):
|
|
def on_step_end(self, args, state, control, **kwargs):
|
|
# simulate OOM on the first step
|
|
if state.train_batch_size >= 16:
|
|
raise RuntimeError("CUDA out of memory.")
|
|
|
|
deepspeed = {
|
|
"zero_optimization": {
|
|
"stage": 1,
|
|
},
|
|
"train_batch_size": "auto",
|
|
"train_micro_batch_size_per_gpu": "auto",
|
|
}
|
|
|
|
args = RegressionTrainingArguments(
|
|
tmp_dir,
|
|
do_train=True,
|
|
max_steps=2,
|
|
save_steps=1,
|
|
per_device_train_batch_size=16,
|
|
auto_find_batch_size=True,
|
|
deepspeed=deepspeed,
|
|
)
|
|
# Note: This can have issues, for now we don't support this functionality
|
|
# ref: https://github.com/huggingface/transformers/pull/29057
|
|
with self.assertRaises(NotImplementedError):
|
|
_ = Trainer(model, args, train_dataset=train_dataset, callbacks=[MockCudaOOMCallback()])
|
|
|
|
def test_auto_batch_size_with_resume_from_checkpoint(self):
|
|
train_dataset = RegressionDataset(length=128)
|
|
|
|
config = RegressionModelConfig(a=0, b=2)
|
|
model = RegressionRandomPreTrainedModel(config)
|
|
|
|
tmp_dir = self.get_auto_remove_tmp_dir()
|
|
|
|
class MockCudaOOMCallback(TrainerCallback):
|
|
def on_step_end(self, args, state, control, **kwargs):
|
|
# simulate OOM on the first step
|
|
if state.train_batch_size >= 16:
|
|
raise RuntimeError("CUDA out of memory.")
|
|
|
|
args = RegressionTrainingArguments(
|
|
tmp_dir,
|
|
do_train=True,
|
|
max_steps=2,
|
|
save_steps=1,
|
|
per_device_train_batch_size=16,
|
|
auto_find_batch_size=True,
|
|
)
|
|
trainer = Trainer(model, args, train_dataset=train_dataset, callbacks=[MockCudaOOMCallback()])
|
|
trainer.train()
|
|
# After `auto_find_batch_size` is ran we should now be at 8
|
|
self.assertEqual(trainer._train_batch_size, 8)
|
|
|
|
# We can then make a new Trainer
|
|
trainer = Trainer(model, args, train_dataset=train_dataset)
|
|
# Check we are at 16 to start
|
|
self.assertEqual(trainer._train_batch_size, 16 * max(trainer.args.n_gpu, 1))
|
|
trainer.train(resume_from_checkpoint=True)
|
|
# We should be back to 8 again, picking up based upon the last ran Trainer
|
|
self.assertEqual(trainer._train_batch_size, 8)
|
|
|
|
# regression for this issue: https://github.com/huggingface/transformers/issues/12970
|
|
def test_training_with_resume_from_checkpoint_false(self):
|
|
train_dataset = RegressionDataset(length=128)
|
|
eval_dataset = RegressionDataset()
|
|
|
|
config = RegressionModelConfig(a=0, b=2)
|
|
model = RegressionRandomPreTrainedModel(config)
|
|
|
|
tmp_dir = self.get_auto_remove_tmp_dir()
|
|
args = RegressionTrainingArguments(tmp_dir, save_steps=5, learning_rate=0.1)
|
|
trainer = Trainer(model, args, train_dataset=train_dataset, eval_dataset=eval_dataset)
|
|
|
|
trainer.train(resume_from_checkpoint=False)
|
|
|
|
@require_torch_up_to_2_accelerators
|
|
def test_resume_training_with_shard_checkpoint(self):
|
|
# This test will fail for more than 2 GPUs since the batch size will get bigger and with the number of
|
|
# save_steps, the checkpoint will resume training at epoch 2 or more (so the data seen by the model
|
|
# won't be the same since the training dataloader is shuffled).
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
trainer = get_regression_trainer(output_dir=tmpdir, train_len=128, save_steps=5, learning_rate=0.1)
|
|
trainer.train()
|
|
(a, b) = trainer.model.a.item(), trainer.model.b.item()
|
|
state = dataclasses.asdict(trainer.state)
|
|
|
|
checkpoint = os.path.join(tmpdir, "checkpoint-5")
|
|
self.convert_to_sharded_checkpoint(checkpoint)
|
|
|
|
# Reinitialize trainer
|
|
trainer = get_regression_trainer(output_dir=tmpdir, train_len=128, save_steps=5, learning_rate=0.1)
|
|
|
|
trainer.train(resume_from_checkpoint=checkpoint)
|
|
(a1, b1) = trainer.model.a.item(), trainer.model.b.item()
|
|
state1 = dataclasses.asdict(trainer.state)
|
|
self.assertEqual(a, a1)
|
|
self.assertEqual(b, b1)
|
|
self.check_trainer_state_are_the_same(state, state1)
|
|
|
|
@require_safetensors
|
|
@require_torch_up_to_2_accelerators
|
|
def test_resume_training_with_safe_checkpoint(self):
|
|
# This test will fail for more than 2 GPUs since the batch size will get bigger and with the number of
|
|
# save_steps, the checkpoint will resume training at epoch 2 or more (so the data seen by the model
|
|
# won't be the same since the training dataloader is shuffled).
|
|
|
|
for initial_safe in [False, True]:
|
|
for loaded_safe in [False, True]:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
trainer = get_regression_trainer(
|
|
output_dir=tmpdir,
|
|
train_len=128,
|
|
save_steps=5,
|
|
learning_rate=0.1,
|
|
save_safetensors=initial_safe,
|
|
)
|
|
trainer.train()
|
|
(a, b) = trainer.model.a.item(), trainer.model.b.item()
|
|
state = dataclasses.asdict(trainer.state)
|
|
|
|
checkpoint = os.path.join(tmpdir, "checkpoint-5")
|
|
self.convert_to_sharded_checkpoint(checkpoint, load_safe=initial_safe, save_safe=loaded_safe)
|
|
|
|
# Reinitialize trainer
|
|
trainer = get_regression_trainer(
|
|
output_dir=tmpdir, train_len=128, save_steps=5, learning_rate=0.1, save_safetensors=loaded_safe
|
|
)
|
|
|
|
trainer.train(resume_from_checkpoint=checkpoint)
|
|
(a1, b1) = trainer.model.a.item(), trainer.model.b.item()
|
|
state1 = dataclasses.asdict(trainer.state)
|
|
self.assertEqual(a, a1)
|
|
self.assertEqual(b, b1)
|
|
self.check_trainer_state_are_the_same(state, state1)
|
|
|
|
@require_torch_up_to_2_accelerators
|
|
def test_resume_training_with_gradient_accumulation(self):
|
|
# This test will fail for more than 2 GPUs since the batch size will get bigger and with the number of
|
|
# save_steps, the checkpoint will resume training at epoch 2 or more (so the data seen by the model
|
|
# won't be the same since the training dataloader is shuffled).
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
trainer = get_regression_trainer(
|
|
output_dir=tmpdir,
|
|
train_len=128,
|
|
gradient_accumulation_steps=2,
|
|
per_device_train_batch_size=4,
|
|
save_steps=5,
|
|
learning_rate=0.1,
|
|
)
|
|
trainer.train()
|
|
(a, b) = trainer.model.a.item(), trainer.model.b.item()
|
|
state = dataclasses.asdict(trainer.state)
|
|
|
|
checkpoint = os.path.join(tmpdir, "checkpoint-5")
|
|
|
|
# Reinitialize trainer
|
|
trainer = get_regression_trainer(
|
|
output_dir=tmpdir,
|
|
train_len=128,
|
|
gradient_accumulation_steps=2,
|
|
per_device_train_batch_size=4,
|
|
save_steps=5,
|
|
learning_rate=0.1,
|
|
)
|
|
|
|
trainer.train(resume_from_checkpoint=checkpoint)
|
|
(a1, b1) = trainer.model.a.item(), trainer.model.b.item()
|
|
state1 = dataclasses.asdict(trainer.state)
|
|
self.assertEqual(a, a1)
|
|
self.assertEqual(b, b1)
|
|
self.check_trainer_state_are_the_same(state, state1)
|
|
|
|
@require_torch_up_to_2_accelerators
|
|
def test_resume_training_with_frozen_params(self):
|
|
# This test will fail for more than 2 GPUs since the batch size will get bigger and with the number of
|
|
# save_steps, the checkpoint will resume training at epoch 2 or more (so the data seen by the model
|
|
# won't be the same since the training dataloader is shuffled).
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
trainer = get_regression_trainer(
|
|
output_dir=tmpdir,
|
|
train_len=128,
|
|
per_device_train_batch_size=4,
|
|
save_steps=5,
|
|
learning_rate=0.1,
|
|
)
|
|
trainer.model.a.requires_grad_(False)
|
|
trainer.train()
|
|
(a, b) = trainer.model.a.item(), trainer.model.b.item()
|
|
state = dataclasses.asdict(trainer.state)
|
|
|
|
checkpoint = os.path.join(tmpdir, "checkpoint-5")
|
|
|
|
# Reinitialize trainer
|
|
trainer = get_regression_trainer(
|
|
output_dir=tmpdir,
|
|
train_len=128,
|
|
per_device_train_batch_size=4,
|
|
save_steps=5,
|
|
learning_rate=0.1,
|
|
)
|
|
trainer.model.a.requires_grad_(False)
|
|
|
|
trainer.train(resume_from_checkpoint=checkpoint)
|
|
|
|
self.assertFalse(trainer.model.a.requires_grad)
|
|
(a1, b1) = trainer.model.a.item(), trainer.model.b.item()
|
|
state1 = dataclasses.asdict(trainer.state)
|
|
self.assertEqual(a, a1)
|
|
self.assertEqual(b, b1)
|
|
self.check_trainer_state_are_the_same(state, state1)
|
|
|
|
def test_load_best_model_at_end(self):
|
|
total = int(self.n_epochs * 64 / self.batch_size)
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
trainer = get_regression_trainer(
|
|
a=1.5,
|
|
b=2.5,
|
|
output_dir=tmpdir,
|
|
learning_rate=0.1,
|
|
eval_steps=5,
|
|
eval_strategy="steps",
|
|
save_steps=5,
|
|
load_best_model_at_end=True,
|
|
)
|
|
self.assertFalse(trainer.args.greater_is_better)
|
|
trainer.train()
|
|
self.check_saved_checkpoints(tmpdir, 5, total)
|
|
self.check_best_model_has_been_loaded(tmpdir, 5, total, trainer, "eval_loss")
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
trainer = get_regression_trainer(
|
|
a=1.5,
|
|
b=2.5,
|
|
output_dir=tmpdir,
|
|
learning_rate=0.1,
|
|
eval_steps=5,
|
|
eval_strategy="steps",
|
|
save_steps=5,
|
|
load_best_model_at_end=True,
|
|
metric_for_best_model="accuracy",
|
|
compute_metrics=AlmostAccuracy(),
|
|
)
|
|
self.assertTrue(trainer.args.greater_is_better)
|
|
trainer.train()
|
|
self.check_saved_checkpoints(tmpdir, 5, total)
|
|
self.check_best_model_has_been_loaded(tmpdir, 5, total, trainer, "eval_accuracy", greater_is_better=True)
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
trainer = get_regression_trainer(
|
|
a=1.5,
|
|
b=2.5,
|
|
output_dir=tmpdir,
|
|
learning_rate=0.1,
|
|
eval_strategy="epoch",
|
|
save_strategy="epoch",
|
|
load_best_model_at_end=True,
|
|
metric_for_best_model="accuracy",
|
|
compute_metrics=AlmostAccuracy(),
|
|
)
|
|
self.assertTrue(trainer.args.greater_is_better)
|
|
trainer.train()
|
|
self.check_saved_checkpoints(tmpdir, 64 // self.batch_size, total)
|
|
self.check_best_model_has_been_loaded(
|
|
tmpdir, 64 // self.batch_size, total, trainer, "eval_accuracy", greater_is_better=True
|
|
)
|
|
|
|
# Test this works with a non PreTrainedModel
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
trainer = get_regression_trainer(
|
|
output_dir=tmpdir,
|
|
learning_rate=0.1,
|
|
eval_steps=5,
|
|
eval_strategy="steps",
|
|
save_steps=5,
|
|
load_best_model_at_end=True,
|
|
pretrained=False,
|
|
)
|
|
self.assertFalse(trainer.args.greater_is_better)
|
|
trainer.train()
|
|
self.check_saved_checkpoints(tmpdir, 5, total, is_pretrained=False)
|
|
self.check_best_model_has_been_loaded(tmpdir, 5, total, trainer, "eval_loss", is_pretrained=False)
|
|
|
|
@require_safetensors
|
|
def test_load_best_model_from_safetensors(self):
|
|
total = int(self.n_epochs * 64 / self.batch_size)
|
|
for save_safetensors, pretrained in product([False, True], [False, True]):
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
trainer = get_regression_trainer(
|
|
a=1.5,
|
|
b=2.5,
|
|
output_dir=tmpdir,
|
|
learning_rate=0.1,
|
|
eval_steps=5,
|
|
eval_strategy="steps",
|
|
save_steps=5,
|
|
load_best_model_at_end=True,
|
|
save_safetensors=save_safetensors,
|
|
pretrained=pretrained,
|
|
)
|
|
self.assertFalse(trainer.args.greater_is_better)
|
|
trainer.train()
|
|
self.check_saved_checkpoints(tmpdir, 5, total, is_pretrained=pretrained, safe_weights=save_safetensors)
|
|
self.check_best_model_has_been_loaded(
|
|
tmpdir, 5, total, trainer, "eval_loss", is_pretrained=pretrained, safe_weights=save_safetensors
|
|
)
|
|
|
|
@slow
|
|
def test_trainer_eval_mrpc(self):
|
|
MODEL_ID = "google-bert/bert-base-cased-finetuned-mrpc"
|
|
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID)
|
|
model = AutoModelForSequenceClassification.from_pretrained(MODEL_ID)
|
|
data_args = GlueDataTrainingArguments(
|
|
task_name="mrpc", data_dir=f"{get_tests_dir()}/fixtures/tests_samples/MRPC", overwrite_cache=True
|
|
)
|
|
eval_dataset = GlueDataset(data_args, tokenizer=tokenizer, mode="dev")
|
|
|
|
training_args = TrainingArguments(output_dir="./examples", use_cpu=True)
|
|
trainer = Trainer(model=model, args=training_args, eval_dataset=eval_dataset)
|
|
result = trainer.evaluate()
|
|
self.assertLess(result["eval_loss"], 0.2)
|
|
|
|
@slow
|
|
def test_trainer_eval_multiple(self):
|
|
MODEL_ID = "openai-community/gpt2"
|
|
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID)
|
|
model = AutoModelForCausalLM.from_pretrained(MODEL_ID)
|
|
dataset = LineByLineTextDataset(
|
|
tokenizer=tokenizer,
|
|
file_path=PATH_SAMPLE_TEXT,
|
|
block_size=tokenizer.max_len_single_sentence,
|
|
)
|
|
for example in dataset.examples:
|
|
example["labels"] = example["input_ids"]
|
|
training_args = TrainingArguments(
|
|
output_dir="./examples",
|
|
use_cpu=True,
|
|
per_device_eval_batch_size=1,
|
|
)
|
|
trainer = Trainer(
|
|
model=model,
|
|
args=training_args,
|
|
eval_dataset={
|
|
"data1": dataset,
|
|
"data2": dataset,
|
|
},
|
|
)
|
|
result = trainer.evaluate()
|
|
self.assertIn("eval_data1_loss", result)
|
|
self.assertIn("eval_data2_loss", result)
|
|
|
|
@slow
|
|
def test_trainer_eval_lm(self):
|
|
MODEL_ID = "distilbert/distilroberta-base"
|
|
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID)
|
|
dataset = LineByLineTextDataset(
|
|
tokenizer=tokenizer,
|
|
file_path=PATH_SAMPLE_TEXT,
|
|
block_size=tokenizer.max_len_single_sentence,
|
|
)
|
|
self.assertEqual(len(dataset), 31)
|
|
|
|
def test_training_iterable_dataset(self):
|
|
config = RegressionModelConfig()
|
|
model = RegressionPreTrainedModel(config)
|
|
# Adding one column not used by the model should have no impact
|
|
train_dataset = SampleIterableDataset(label_names=["labels", "extra"])
|
|
|
|
args = RegressionTrainingArguments(output_dir="./examples", max_steps=4)
|
|
trainer = Trainer(model=model, args=args, train_dataset=train_dataset)
|
|
trainer.train()
|
|
self.assertEqual(trainer.state.global_step, 4)
|
|
|
|
loader = trainer.get_train_dataloader()
|
|
self.assertIsInstance(loader, torch.utils.data.DataLoader)
|
|
self.assertIsInstance(loader.sampler, torch.utils.data.dataloader._InfiniteConstantSampler)
|
|
|
|
def test_evaluation_iterable_dataset(self):
|
|
config = RegressionModelConfig(a=1.5, b=2.5)
|
|
model = RegressionPreTrainedModel(config)
|
|
# Adding one column not used by the model should have no impact
|
|
eval_dataset = SampleIterableDataset(label_names=["labels", "extra"])
|
|
|
|
args = RegressionTrainingArguments(output_dir="./examples")
|
|
trainer = Trainer(model=model, args=args, eval_dataset=eval_dataset, compute_metrics=AlmostAccuracy())
|
|
results = trainer.evaluate()
|
|
|
|
x, y = trainer.eval_dataset.dataset.x, trainer.eval_dataset.dataset.ys[0]
|
|
pred = 1.5 * x + 2.5
|
|
expected_loss = ((pred - y) ** 2).mean()
|
|
self.assertAlmostEqual(results["eval_loss"], expected_loss)
|
|
expected_acc = AlmostAccuracy()((pred, y))["accuracy"]
|
|
self.assertAlmostEqual(results["eval_accuracy"], expected_acc)
|
|
|
|
# With a number of elements not a round multiple of the batch size
|
|
eval_dataset = SampleIterableDataset(length=66)
|
|
results = trainer.evaluate(eval_dataset)
|
|
|
|
x, y = eval_dataset.dataset.x, eval_dataset.dataset.ys[0]
|
|
pred = 1.5 * x + 2.5
|
|
expected_loss = ((pred - y) ** 2).mean()
|
|
self.assertAlmostEqual(results["eval_loss"], expected_loss)
|
|
expected_acc = AlmostAccuracy()((pred, y))["accuracy"]
|
|
self.assertAlmostEqual(results["eval_accuracy"], expected_acc)
|
|
|
|
def test_predict_iterable_dataset(self):
|
|
config = RegressionModelConfig(a=1.5, b=2.5)
|
|
model = RegressionPreTrainedModel(config)
|
|
eval_dataset = SampleIterableDataset()
|
|
|
|
args = RegressionTrainingArguments(output_dir="./examples")
|
|
trainer = Trainer(model=model, args=args, eval_dataset=eval_dataset, compute_metrics=AlmostAccuracy())
|
|
|
|
preds = trainer.predict(trainer.eval_dataset).predictions
|
|
x = eval_dataset.dataset.x
|
|
self.assertTrue(np.allclose(preds, 1.5 * x + 2.5))
|
|
|
|
# With a number of elements not a round multiple of the batch size
|
|
# Adding one column not used by the model should have no impact
|
|
test_dataset = SampleIterableDataset(length=66, label_names=["labels", "extra"])
|
|
preds = trainer.predict(test_dataset).predictions
|
|
x = test_dataset.dataset.x
|
|
self.assertTrue(np.allclose(preds, 1.5 * x + 2.5))
|
|
|
|
def test_num_train_epochs_in_training(self):
|
|
# len(train_dl) < gradient_accumulation_steps shouldn't give ``ZeroDivisionError`` when ``max_steps`` is given.
|
|
# It should give 1 update step for each epoch.
|
|
trainer = get_regression_trainer(
|
|
max_steps=3, train_len=64, per_device_train_batch_size=16, gradient_accumulation_steps=5
|
|
)
|
|
train_output = trainer.train()
|
|
self.assertEqual(train_output.global_step, 3)
|
|
|
|
# Even ``max_steps`` is not specified, we still expect 1 update step for each epoch if
|
|
# len(train_dl) < gradient_accumulation_steps.
|
|
trainer = get_regression_trainer(train_len=64, per_device_train_batch_size=16, gradient_accumulation_steps=5)
|
|
train_output = trainer.train()
|
|
self.assertEqual(train_output.global_step, int(self.n_epochs))
|
|
|
|
def test_early_stopping_callback(self):
|
|
# early stopping stops training before num_training_epochs
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
trainer = get_regression_trainer(
|
|
output_dir=tmp_dir,
|
|
num_train_epochs=20,
|
|
gradient_accumulation_steps=1,
|
|
per_device_train_batch_size=16,
|
|
load_best_model_at_end=True,
|
|
eval_strategy=IntervalStrategy.EPOCH,
|
|
save_strategy=IntervalStrategy.EPOCH,
|
|
compute_metrics=AlmostAccuracy(),
|
|
metric_for_best_model="accuracy",
|
|
)
|
|
trainer.add_callback(EarlyStoppingCallback(1, 0.0001))
|
|
train_output = trainer.train()
|
|
self.assertLess(train_output.global_step, 20 * 64 / 16)
|
|
|
|
# Invalid inputs to trainer with early stopping callback result in assertion error
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
trainer = get_regression_trainer(
|
|
output_dir=tmp_dir,
|
|
num_train_epochs=20,
|
|
gradient_accumulation_steps=1,
|
|
per_device_train_batch_size=16,
|
|
eval_strategy=IntervalStrategy.EPOCH,
|
|
compute_metrics=AlmostAccuracy(),
|
|
metric_for_best_model="accuracy",
|
|
)
|
|
trainer.add_callback(EarlyStoppingCallback(1))
|
|
self.assertEqual(trainer.state.global_step, 0)
|
|
try:
|
|
trainer.train()
|
|
except AssertionError:
|
|
self.assertEqual(trainer.state.global_step, 0)
|
|
|
|
def test_flos_extraction(self):
|
|
trainer = get_regression_trainer(learning_rate=0.1)
|
|
|
|
def assert_flos_extraction(trainer, wrapped_model_to_check):
|
|
self.assertEqual(trainer.model, trainer.accelerator.unwrap_model(wrapped_model_to_check))
|
|
self.assertGreaterEqual(
|
|
getattr(trainer.accelerator.unwrap_model(wrapped_model_to_check).config, "total_flos", 0), 0
|
|
)
|
|
|
|
# with plain model
|
|
assert_flos_extraction(trainer, trainer.model)
|
|
|
|
# with enforced DataParallel
|
|
assert_flos_extraction(trainer, nn.DataParallel(trainer.model))
|
|
|
|
trainer.train()
|
|
self.assertTrue(isinstance(trainer.state.total_flos, float))
|
|
|
|
def check_checkpoint_deletion(self, trainer, output_dir, expected):
|
|
# Make fake checkpoints
|
|
for n in [5, 10, 15, 20, 25]:
|
|
os.makedirs(os.path.join(output_dir, f"{PREFIX_CHECKPOINT_DIR}-{n}"), exist_ok=True)
|
|
trainer._rotate_checkpoints(output_dir=output_dir)
|
|
glob_checkpoints = [str(x) for x in Path(output_dir).glob(f"{PREFIX_CHECKPOINT_DIR}-*")]
|
|
values = [int(re.match(f".*{PREFIX_CHECKPOINT_DIR}-([0-9]+)", d).groups()[0]) for d in glob_checkpoints]
|
|
self.assertSetEqual(set(values), set(expected))
|
|
|
|
def test_checkpoint_rotation(self):
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
# Without best model at end
|
|
trainer = get_regression_trainer(output_dir=tmp_dir, save_total_limit=2)
|
|
self.check_checkpoint_deletion(trainer, tmp_dir, [20, 25])
|
|
|
|
# With best model at end
|
|
trainer = get_regression_trainer(
|
|
output_dir=tmp_dir, eval_strategy="steps", load_best_model_at_end=True, save_total_limit=2
|
|
)
|
|
trainer.state.best_model_checkpoint = os.path.join(tmp_dir, "checkpoint-5")
|
|
self.check_checkpoint_deletion(trainer, tmp_dir, [5, 25])
|
|
|
|
# Edge case: we don't always honor save_total_limit=1 if load_best_model_at_end=True to be able to resume
|
|
# from checkpoint
|
|
trainer = get_regression_trainer(
|
|
output_dir=tmp_dir, eval_strategy="steps", load_best_model_at_end=True, save_total_limit=1
|
|
)
|
|
trainer.state.best_model_checkpoint = os.path.join(tmp_dir, "checkpoint-25")
|
|
self.check_checkpoint_deletion(trainer, tmp_dir, [25])
|
|
|
|
trainer.state.best_model_checkpoint = os.path.join(tmp_dir, "checkpoint-5")
|
|
self.check_checkpoint_deletion(trainer, tmp_dir, [5, 25])
|
|
|
|
def test_compare_trainer_and_checkpoint_args_logging(self):
|
|
logger = logging.get_logger()
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir, CaptureLogger(logger) as cl:
|
|
trainer = get_regression_trainer(
|
|
output_dir=tmpdir,
|
|
train_len=128,
|
|
eval_steps=5,
|
|
gradient_accumulation_steps=2,
|
|
per_device_train_batch_size=4,
|
|
save_steps=5,
|
|
learning_rate=0.1,
|
|
)
|
|
trainer.train()
|
|
|
|
checkpoint = os.path.join(tmpdir, "checkpoint-5")
|
|
checkpoint_trainer = get_regression_trainer(
|
|
output_dir=tmpdir,
|
|
train_len=256,
|
|
eval_steps=10,
|
|
gradient_accumulation_steps=4,
|
|
per_device_train_batch_size=8,
|
|
save_steps=10,
|
|
learning_rate=0.1,
|
|
)
|
|
checkpoint_trainer.train(resume_from_checkpoint=checkpoint)
|
|
|
|
self.assertIn("save_steps: 10 (from args) != 5 (from trainer_state.json)", cl.out)
|
|
|
|
self.assertIn(
|
|
"per_device_train_batch_size: 8 (from args) != 4 (from trainer_state.json)",
|
|
cl.out,
|
|
)
|
|
self.assertIn(
|
|
"eval_steps: 10 (from args) != 5 (from trainer_state.json)",
|
|
cl.out,
|
|
)
|
|
|
|
def check_mem_metrics(self, trainer, check_func):
|
|
metrics = trainer.train().metrics
|
|
check_func("init_mem_cpu_alloc_delta", metrics)
|
|
check_func("train_mem_cpu_alloc_delta", metrics)
|
|
if backend_device_count(torch_device) > 0:
|
|
check_func("init_mem_gpu_alloc_delta", metrics)
|
|
check_func("train_mem_gpu_alloc_delta", metrics)
|
|
|
|
metrics = trainer.evaluate()
|
|
check_func("eval_mem_cpu_alloc_delta", metrics)
|
|
if backend_device_count(torch_device) > 0:
|
|
check_func("eval_mem_gpu_alloc_delta", metrics)
|
|
|
|
metrics = trainer.predict(RegressionDataset()).metrics
|
|
check_func("test_mem_cpu_alloc_delta", metrics)
|
|
if backend_device_count(torch_device) > 0:
|
|
check_func("test_mem_gpu_alloc_delta", metrics)
|
|
|
|
def test_mem_metrics(self):
|
|
# with mem metrics enabled
|
|
trainer = get_regression_trainer(skip_memory_metrics=False)
|
|
self.check_mem_metrics(trainer, self.assertIn)
|
|
|
|
# with mem metrics disabled
|
|
trainer = get_regression_trainer(skip_memory_metrics=True)
|
|
self.check_mem_metrics(trainer, self.assertNotIn)
|
|
|
|
@require_torch_accelerator
|
|
def test_fp16_full_eval(self):
|
|
# this is a sensitive test so let's keep debugging printouts in place for quick diagnosis.
|
|
# it's using pretty large safety margins, but small enough to detect broken functionality.
|
|
debug = 0
|
|
n_gpus = backend_device_count(torch_device)
|
|
|
|
bs = 8
|
|
eval_len = 16 * n_gpus
|
|
# make the params somewhat big so that there will be enough RAM consumed to be able to
|
|
# measure things. We should get about 64KB for a+b in fp32
|
|
a = torch.ones(1000, bs) + 0.001
|
|
b = torch.ones(1000, bs) - 0.001
|
|
|
|
# 1. with fp16_full_eval disabled
|
|
trainer = get_regression_trainer(a=a, b=b, eval_len=eval_len, skip_memory_metrics=False)
|
|
metrics = trainer.evaluate()
|
|
del trainer
|
|
gc.collect()
|
|
|
|
fp32_init = metrics["init_mem_gpu_alloc_delta"]
|
|
fp32_eval = metrics["eval_mem_gpu_alloc_delta"]
|
|
|
|
if debug:
|
|
print(f"fp32_init {fp32_init}")
|
|
print(f"fp32_eval {fp32_eval}")
|
|
|
|
# here we expect the model to be preloaded in trainer.__init__ and consume around 64K gpu ram.
|
|
# perfect world: fp32_init == 64<<10
|
|
self.assertGreater(fp32_init, 59_000)
|
|
# after eval should be no extra memory allocated - with a small margin (other than the peak
|
|
# memory consumption for the forward calculation that gets recovered)
|
|
# perfect world: fp32_eval == close to zero
|
|
self.assertLess(fp32_eval, 5_000)
|
|
|
|
# 2. with fp16_full_eval enabled
|
|
trainer = get_regression_trainer(a=a, b=b, eval_len=eval_len, fp16_full_eval=True, skip_memory_metrics=False)
|
|
metrics = trainer.evaluate()
|
|
fp16_init = metrics["init_mem_gpu_alloc_delta"]
|
|
fp16_eval = metrics["eval_mem_gpu_alloc_delta"]
|
|
|
|
if debug:
|
|
print(f"fp16_init {fp16_init}")
|
|
print(f"fp16_eval {fp16_eval}")
|
|
|
|
# here we expect the model to not be preloaded in trainer.__init__, so with a small margin it should be close to 0
|
|
# perfect world: fp16_init == close to zero
|
|
self.assertLess(fp16_init, 5_000)
|
|
# here we put the model on device in eval and only `half()` of it, i.e. about 32K,(again we ignore the peak margin which gets returned back)
|
|
# perfect world: fp32_init == 32<<10
|
|
self.assertGreater(fp16_eval, 27_000)
|
|
|
|
# 3. relative comparison fp32 vs full fp16
|
|
# should be about half of fp16_init
|
|
# perfect world: fp32_init/2 == fp16_eval
|
|
self.assertAlmostEqual(fp16_eval, fp32_init / 2, delta=5_000)
|
|
|
|
@require_torch_non_multi_gpu
|
|
@require_torchdynamo
|
|
@require_torch_tensorrt_fx
|
|
def test_torchdynamo_full_eval(self):
|
|
import torchdynamo
|
|
|
|
# torchdynamo at the moment doesn't support DP/DDP, therefore require a single gpu
|
|
n_gpus = get_gpu_count()
|
|
|
|
bs = 8
|
|
eval_len = 16 * n_gpus
|
|
# make the params are somewhat big so that there will be enough RAM consumed to be able to
|
|
# measure things. We should get about 64KB for a+b in fp32
|
|
a = torch.ones(1000, bs) + 0.001
|
|
b = torch.ones(1000, bs) - 0.001
|
|
|
|
# 1. Default - without TorchDynamo
|
|
trainer = get_regression_trainer(a=a, b=b, eval_len=eval_len)
|
|
metrics = trainer.evaluate()
|
|
original_eval_loss = metrics["eval_loss"]
|
|
del trainer
|
|
|
|
# 2. TorchDynamo eager
|
|
trainer = get_regression_trainer(a=a, b=b, eval_len=eval_len, torchdynamo="eager")
|
|
metrics = trainer.evaluate()
|
|
self.assertAlmostEqual(metrics["eval_loss"], original_eval_loss)
|
|
del trainer
|
|
torchdynamo.reset()
|
|
|
|
# 3. TorchDynamo nvfuser
|
|
trainer = get_regression_trainer(a=a, b=b, eval_len=eval_len, torchdynamo="nvfuser")
|
|
metrics = trainer.evaluate()
|
|
self.assertAlmostEqual(metrics["eval_loss"], original_eval_loss)
|
|
torchdynamo.reset()
|
|
|
|
# 4. TorchDynamo fx2trt
|
|
trainer = get_regression_trainer(a=a, b=b, eval_len=eval_len, torchdynamo="fx2trt")
|
|
metrics = trainer.evaluate()
|
|
self.assertAlmostEqual(metrics["eval_loss"], original_eval_loss)
|
|
torchdynamo.reset()
|
|
|
|
@unittest.skip("torch 2.0.0 gives `ModuleNotFoundError: No module named 'torchdynamo'`.")
|
|
@require_torch_non_multi_gpu
|
|
@require_torchdynamo
|
|
def test_torchdynamo_memory(self):
|
|
# torchdynamo at the moment doesn't support DP/DDP, therefore require a single gpu
|
|
import torchdynamo
|
|
|
|
class CustomTrainer(Trainer):
|
|
def compute_loss(self, model, inputs, return_outputs=False):
|
|
x = inputs["x"]
|
|
output = model(x)
|
|
if self.args.n_gpu == 1:
|
|
return output.mean()
|
|
return output
|
|
|
|
class MyModule(torch.nn.Module):
|
|
"""Simple module that does aggressive fusion"""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
def forward(self, x):
|
|
for _ in range(20):
|
|
x = torch.cos(x)
|
|
return x
|
|
|
|
mod = MyModule()
|
|
|
|
# 1. without TorchDynamo (eager baseline)
|
|
a = torch.ones(1024, 1024, device="cuda", requires_grad=True)
|
|
a.grad = None
|
|
trainer = CustomTrainer(model=mod)
|
|
# warmup
|
|
for _ in range(10):
|
|
orig_loss = trainer.training_step(mod, {"x": a})
|
|
|
|
# resets
|
|
gc.collect()
|
|
torch.cuda.empty_cache()
|
|
torch.cuda.reset_peak_memory_stats()
|
|
|
|
orig_loss = trainer.training_step(mod, {"x": a})
|
|
orig_peak_mem = torch.cuda.max_memory_allocated()
|
|
torchdynamo.reset()
|
|
del trainer
|
|
|
|
# 2. TorchDynamo nvfuser
|
|
a = torch.ones(1024, 1024, device="cuda", requires_grad=True)
|
|
a.grad = None
|
|
args = TrainingArguments(output_dir="None", torchdynamo="nvfuser")
|
|
trainer = CustomTrainer(model=mod, args=args)
|
|
# warmup
|
|
for _ in range(10):
|
|
loss = trainer.training_step(mod, {"x": a})
|
|
|
|
# resets
|
|
gc.collect()
|
|
torch.cuda.empty_cache()
|
|
torch.cuda.reset_peak_memory_stats()
|
|
|
|
loss = trainer.training_step(mod, {"x": a})
|
|
peak_mem = torch.cuda.max_memory_allocated()
|
|
torchdynamo.reset()
|
|
del trainer
|
|
|
|
# Functional check
|
|
self.assertAlmostEqual(loss, orig_loss)
|
|
|
|
# AOT Autograd recomputaion and nvfuser recomputation optimization
|
|
# aggressively fuses the operations and reduce the memory footprint.
|
|
self.assertGreater(orig_peak_mem, peak_mem * 2)
|
|
|
|
@require_torch_accelerator
|
|
@require_torch_bf16
|
|
def test_bf16_full_eval(self):
|
|
# note: most of the logic is the same as test_fp16_full_eval
|
|
|
|
# this is a sensitive test so let's keep debugging printouts in place for quick diagnosis.
|
|
# it's using pretty large safety margins, but small enough to detect broken functionality.
|
|
debug = 0
|
|
n_gpus = backend_device_count(torch_device)
|
|
|
|
bs = 8
|
|
eval_len = 16 * n_gpus
|
|
# make the params somewhat big so that there will be enough RAM consumed to be able to
|
|
# measure things. We should get about 64KB for a+b in fp32
|
|
a = torch.ones(1000, bs) + 0.001
|
|
b = torch.ones(1000, bs) - 0.001
|
|
|
|
# 1. with bf16_full_eval disabled
|
|
trainer = get_regression_trainer(a=a, b=b, eval_len=eval_len, skip_memory_metrics=False)
|
|
metrics = trainer.evaluate()
|
|
del trainer
|
|
gc.collect()
|
|
|
|
fp32_init = metrics["init_mem_gpu_alloc_delta"]
|
|
fp32_eval = metrics["eval_mem_gpu_alloc_delta"]
|
|
|
|
if debug:
|
|
print(f"fp32_init {fp32_init}")
|
|
print(f"fp32_eval {fp32_eval}")
|
|
|
|
# here we expect the model to be preloaded in trainer.__init__ and consume around 64K gpu ram.
|
|
# perfect world: fp32_init == 64<<10
|
|
self.assertGreater(fp32_init, 59_000)
|
|
# after eval should be no extra memory allocated - with a small margin (other than the peak
|
|
# memory consumption for the forward calculation that gets recovered)
|
|
# perfect world: fp32_eval == close to zero
|
|
self.assertLess(fp32_eval, 5_000)
|
|
|
|
# 2. with bf16_full_eval enabled
|
|
trainer = get_regression_trainer(a=a, b=b, eval_len=eval_len, bf16_full_eval=True, skip_memory_metrics=False)
|
|
metrics = trainer.evaluate()
|
|
bf16_init = metrics["init_mem_gpu_alloc_delta"]
|
|
bf16_eval = metrics["eval_mem_gpu_alloc_delta"]
|
|
|
|
if debug:
|
|
print(f"bf16_init {bf16_init}")
|
|
print(f"bf16_eval {bf16_eval}")
|
|
|
|
# here we expect the model to not be preloaded in trainer.__init__, so with a small margin it should be close to 0
|
|
# perfect world: bf16_init == close to zero
|
|
self.assertLess(bf16_init, 5_000)
|
|
# here we put the model on device in eval and only `half()` of it, i.e. about 32K,(again we ignore the peak margin which gets returned back)
|
|
# perfect world: fp32_init == 32<<10
|
|
self.assertGreater(bf16_eval, 27_000)
|
|
|
|
# 3. relative comparison fp32 vs full bf16
|
|
# should be about half of bf16_init
|
|
# perfect world: fp32_init/2 == bf16_eval
|
|
self.assertAlmostEqual(bf16_eval, fp32_init / 2, delta=5_000)
|
|
|
|
def test_no_wd_param_group(self):
|
|
model = nn.Sequential(TstLayer(128), nn.ModuleList([TstLayer(128), TstLayer(128)]))
|
|
trainer = Trainer(model=model)
|
|
trainer.create_optimizer_and_scheduler(10)
|
|
wd_names = ['0.linear1.weight', '0.linear2.weight', '1.0.linear1.weight', '1.0.linear2.weight', '1.1.linear1.weight', '1.1.linear2.weight'] # fmt: skip
|
|
wd_params = [p for n, p in model.named_parameters() if n in wd_names]
|
|
no_wd_params = [p for n, p in model.named_parameters() if n not in wd_names]
|
|
self.assertListEqual(trainer.optimizer.param_groups[0]["params"], wd_params)
|
|
self.assertListEqual(trainer.optimizer.param_groups[1]["params"], no_wd_params)
|
|
|
|
@slow
|
|
@require_torch_multi_accelerator
|
|
def test_end_to_end_example(self):
|
|
# Tests that `translation.py` will run without issues
|
|
script_path = os.path.abspath(
|
|
os.path.join(
|
|
os.path.dirname(__file__), "..", "..", "examples", "pytorch", "translation", "run_translation.py"
|
|
)
|
|
)
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
command = [
|
|
"accelerate",
|
|
"launch",
|
|
script_path,
|
|
"--model_name_or_path",
|
|
"google-t5/t5-small",
|
|
"--per_device_train_batch_size",
|
|
"1",
|
|
"--output_dir",
|
|
tmpdir,
|
|
"--overwrite_output_dir",
|
|
"--do_train",
|
|
"--max_train_samples",
|
|
"64",
|
|
"--num_train_epochs",
|
|
"1",
|
|
"--dataset_name",
|
|
"wmt16",
|
|
"--dataset_config",
|
|
"ro-en",
|
|
"--source_lang",
|
|
"en",
|
|
"--target_lang",
|
|
"ro",
|
|
"--do_predict",
|
|
"--max_predict_samples",
|
|
"64",
|
|
"--predict_with_generate",
|
|
"--ddp_timeout",
|
|
"60",
|
|
]
|
|
execute_subprocess_async(command)
|
|
# successful return here == success - any errors would have caused an error or a timeout in the sub-call
|
|
|
|
def test_accelerator_config_empty(self):
|
|
# Checks that a config can be made with the defaults if not passed
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
config = RegressionModelConfig(a=1.5, b=2.5)
|
|
model = RegressionPreTrainedModel(config)
|
|
eval_dataset = SampleIterableDataset()
|
|
|
|
# Leaves one option as something *not* basic
|
|
args = RegressionTrainingArguments(
|
|
output_dir=tmp_dir,
|
|
)
|
|
trainer = Trainer(model=model, args=args, eval_dataset=eval_dataset)
|
|
self.assertEqual(trainer.accelerator.split_batches, False)
|
|
self.assertEqual(trainer.accelerator.dispatch_batches, None)
|
|
self.assertEqual(trainer.accelerator.even_batches, True)
|
|
self.assertEqual(trainer.accelerator.use_seedable_sampler, True)
|
|
|
|
if GRAD_ACCUM_KWARGS_VERSION_AVAILABLE:
|
|
# gradient accumulation kwargs configures gradient_state
|
|
self.assertNotIn("sync_each_batch", trainer.accelerator.gradient_state.plugin_kwargs)
|
|
|
|
def test_accelerator_config_from_dict(self):
|
|
# Checks that accelerator kwargs can be passed through
|
|
# and the accelerator is initialized respectively
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
config = RegressionModelConfig(a=1.5, b=2.5)
|
|
model = RegressionPreTrainedModel(config)
|
|
eval_dataset = SampleIterableDataset()
|
|
|
|
accelerator_config = {
|
|
"split_batches": True,
|
|
"dispatch_batches": True,
|
|
"even_batches": False,
|
|
"use_seedable_sampler": True,
|
|
}
|
|
if GRAD_ACCUM_KWARGS_VERSION_AVAILABLE:
|
|
accelerator_config["gradient_accumulation_kwargs"] = {"sync_each_batch": True}
|
|
|
|
# Leaves all options as something *not* basic
|
|
args = RegressionTrainingArguments(
|
|
output_dir=tmp_dir,
|
|
accelerator_config=accelerator_config,
|
|
)
|
|
trainer = Trainer(model=model, args=args, eval_dataset=eval_dataset)
|
|
self.assertEqual(trainer.accelerator.split_batches, True)
|
|
self.assertEqual(trainer.accelerator.dispatch_batches, True)
|
|
self.assertEqual(trainer.accelerator.even_batches, False)
|
|
self.assertEqual(trainer.accelerator.use_seedable_sampler, True)
|
|
|
|
if GRAD_ACCUM_KWARGS_VERSION_AVAILABLE:
|
|
self.assertEqual(trainer.accelerator.gradient_state.plugin_kwargs["sync_each_batch"], True)
|
|
|
|
def test_accelerator_config_from_yaml(self):
|
|
# Checks that accelerator kwargs can be passed through
|
|
# and the accelerator is initialized respectively
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
path_file = Path(tmp_dir) / "accelerator_config.json"
|
|
with open(path_file, "w") as f:
|
|
accelerator_config = {
|
|
"split_batches": True,
|
|
"dispatch_batches": True,
|
|
"even_batches": False,
|
|
"use_seedable_sampler": False,
|
|
}
|
|
if GRAD_ACCUM_KWARGS_VERSION_AVAILABLE:
|
|
accelerator_config["gradient_accumulation_kwargs"] = {"sync_each_batch": True}
|
|
json.dump(accelerator_config, f)
|
|
config = RegressionModelConfig(a=1.5, b=2.5)
|
|
model = RegressionPreTrainedModel(config)
|
|
eval_dataset = SampleIterableDataset()
|
|
|
|
# Leaves all options as something *not* basic
|
|
args = RegressionTrainingArguments(output_dir=tmp_dir, accelerator_config=path_file)
|
|
trainer = Trainer(model=model, args=args, eval_dataset=eval_dataset)
|
|
self.assertEqual(trainer.accelerator.split_batches, True)
|
|
self.assertEqual(trainer.accelerator.dispatch_batches, True)
|
|
self.assertEqual(trainer.accelerator.even_batches, False)
|
|
self.assertEqual(trainer.accelerator.use_seedable_sampler, False)
|
|
|
|
if GRAD_ACCUM_KWARGS_VERSION_AVAILABLE:
|
|
self.assertEqual(trainer.accelerator.gradient_state.plugin_kwargs["sync_each_batch"], True)
|
|
|
|
def test_accelerator_config_from_dataclass(self):
|
|
# Checks that accelerator kwargs can be passed through
|
|
# and the accelerator is initialized respectively
|
|
|
|
accelerator_config = AcceleratorConfig(
|
|
split_batches=True,
|
|
dispatch_batches=True,
|
|
even_batches=False,
|
|
use_seedable_sampler=False,
|
|
)
|
|
config = RegressionModelConfig(a=1.5, b=2.5)
|
|
model = RegressionPreTrainedModel(config)
|
|
eval_dataset = SampleIterableDataset()
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
args = RegressionTrainingArguments(output_dir=tmp_dir, accelerator_config=accelerator_config)
|
|
trainer = Trainer(model=model, args=args, eval_dataset=eval_dataset)
|
|
self.assertEqual(trainer.accelerator.split_batches, True)
|
|
self.assertEqual(trainer.accelerator.dispatch_batches, True)
|
|
self.assertEqual(trainer.accelerator.even_batches, False)
|
|
self.assertEqual(trainer.accelerator.use_seedable_sampler, False)
|
|
|
|
@require_accelerate_version_min_0_28
|
|
def test_accelerate_config_from_dataclass_grad_accum(self):
|
|
# Checks that accelerator kwargs can be passed through
|
|
# and the accelerator is initialized respectively
|
|
|
|
grad_acc_kwargs = {
|
|
"num_steps": 10,
|
|
"adjust_scheduler": False,
|
|
"sync_with_dataloader": False,
|
|
"sync_each_batch": True,
|
|
}
|
|
accelerator_config = AcceleratorConfig(
|
|
split_batches=True,
|
|
dispatch_batches=True,
|
|
even_batches=False,
|
|
use_seedable_sampler=False,
|
|
gradient_accumulation_kwargs=grad_acc_kwargs,
|
|
)
|
|
config = RegressionModelConfig(a=1.5, b=2.5)
|
|
model = RegressionPreTrainedModel(config)
|
|
eval_dataset = SampleIterableDataset()
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
args = RegressionTrainingArguments(output_dir=tmp_dir, accelerator_config=accelerator_config)
|
|
trainer = Trainer(model=model, args=args, eval_dataset=eval_dataset)
|
|
self.assertEqual(trainer.accelerator.gradient_state.plugin_kwargs["num_steps"], 10)
|
|
self.assertEqual(trainer.accelerator.gradient_state.plugin_kwargs["adjust_scheduler"], False)
|
|
self.assertEqual(trainer.accelerator.gradient_state.plugin_kwargs["sync_with_dataloader"], False)
|
|
self.assertEqual(trainer.accelerator.gradient_state.plugin_kwargs["sync_each_batch"], True)
|
|
|
|
def test_accelerator_config_from_partial(self):
|
|
# Checks that accelerator kwargs can be passed through
|
|
# and the accelerator is initialized respectively
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
config = RegressionModelConfig(a=1.5, b=2.5)
|
|
model = RegressionPreTrainedModel(config)
|
|
eval_dataset = SampleIterableDataset()
|
|
|
|
# Leaves one option as something *not* basic
|
|
args = RegressionTrainingArguments(
|
|
output_dir=tmp_dir,
|
|
accelerator_config={
|
|
"split_batches": True,
|
|
},
|
|
)
|
|
trainer = Trainer(model=model, args=args, eval_dataset=eval_dataset)
|
|
self.assertEqual(trainer.accelerator.split_batches, True)
|
|
self.assertEqual(trainer.accelerator.dispatch_batches, None)
|
|
self.assertEqual(trainer.accelerator.even_batches, True)
|
|
self.assertEqual(trainer.accelerator.use_seedable_sampler, True)
|
|
|
|
def test_accelerator_config_from_dict_with_deprecated_args(self):
|
|
# Checks that accelerator kwargs can be passed through
|
|
# and the accelerator is initialized respectively
|
|
# and maintains the deprecated args if passed in
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
config = RegressionModelConfig(a=1.5, b=2.5)
|
|
model = RegressionPreTrainedModel(config)
|
|
eval_dataset = SampleIterableDataset()
|
|
|
|
# Leaves all options as something *not* basic
|
|
with self.assertWarns(FutureWarning) as cm:
|
|
args = RegressionTrainingArguments(
|
|
output_dir=tmp_dir,
|
|
accelerator_config={
|
|
"split_batches": True,
|
|
},
|
|
dispatch_batches=False,
|
|
)
|
|
self.assertIn("dispatch_batches", str(cm.warnings[0].message))
|
|
trainer = Trainer(model=model, args=args, eval_dataset=eval_dataset)
|
|
self.assertEqual(trainer.accelerator.dispatch_batches, False)
|
|
self.assertEqual(trainer.accelerator.split_batches, True)
|
|
with self.assertWarns(FutureWarning) as cm:
|
|
args = RegressionTrainingArguments(
|
|
output_dir=tmp_dir,
|
|
accelerator_config={
|
|
"even_batches": False,
|
|
},
|
|
split_batches=True,
|
|
)
|
|
self.assertIn("split_batches", str(cm.warnings[0].message))
|
|
trainer = Trainer(model=model, args=args, eval_dataset=eval_dataset)
|
|
self.assertEqual(trainer.accelerator.split_batches, True)
|
|
self.assertEqual(trainer.accelerator.even_batches, False)
|
|
self.assertEqual(trainer.accelerator.dispatch_batches, None)
|
|
|
|
def test_accelerator_config_only_deprecated_args(self):
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
with self.assertWarns(FutureWarning) as cm:
|
|
args = RegressionTrainingArguments(
|
|
output_dir=tmp_dir,
|
|
split_batches=True,
|
|
)
|
|
self.assertIn("split_batches", str(cm.warnings[0].message))
|
|
config = RegressionModelConfig(a=1.5, b=2.5)
|
|
model = RegressionPreTrainedModel(config)
|
|
eval_dataset = SampleIterableDataset()
|
|
trainer = Trainer(model=model, args=args, eval_dataset=eval_dataset)
|
|
self.assertEqual(trainer.accelerator.split_batches, True)
|
|
|
|
@require_accelerate_version_min_0_28
|
|
def test_accelerator_config_from_dict_grad_accum_num_steps(self):
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
config = RegressionModelConfig(a=1.5, b=2.5)
|
|
model = RegressionPreTrainedModel(config)
|
|
eval_dataset = SampleIterableDataset()
|
|
|
|
# case - TrainingArguments.gradient_accumulation_steps == 1
|
|
# - gradient_accumulation_kwargs['num_steps] == 1
|
|
# results in grad accum set to 1
|
|
args = RegressionTrainingArguments(
|
|
output_dir=tmp_dir,
|
|
gradient_accumulation_steps=1,
|
|
accelerator_config={
|
|
"gradient_accumulation_kwargs": {
|
|
"num_steps": 1,
|
|
}
|
|
},
|
|
)
|
|
trainer = Trainer(model=model, args=args, eval_dataset=eval_dataset)
|
|
self.assertEqual(trainer.accelerator.gradient_state.plugin_kwargs["num_steps"], 1)
|
|
|
|
# case - TrainingArguments.gradient_accumulation_steps > 1
|
|
# - gradient_accumulation_kwargs['num_steps] specified
|
|
# results in exception raised
|
|
args = RegressionTrainingArguments(
|
|
output_dir=tmp_dir,
|
|
gradient_accumulation_steps=2,
|
|
accelerator_config={
|
|
"gradient_accumulation_kwargs": {
|
|
"num_steps": 10,
|
|
}
|
|
},
|
|
)
|
|
with self.assertRaises(Exception) as context:
|
|
trainer = Trainer(model=model, args=args, eval_dataset=eval_dataset)
|
|
self.assertTrue("The `AcceleratorConfig`'s `num_steps` is set but" in str(context.exception))
|
|
|
|
def test_accelerator_config_not_instantiated(self):
|
|
# Checks that accelerator kwargs can be passed through
|
|
# and the accelerator is initialized respectively
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
with self.assertRaises(NotImplementedError) as context:
|
|
_ = RegressionTrainingArguments(
|
|
output_dir=tmp_dir,
|
|
accelerator_config=AcceleratorConfig,
|
|
)
|
|
self.assertTrue("Tried passing in a callable to `accelerator_config`" in str(context.exception))
|
|
|
|
# Now test with a custom subclass
|
|
@dataclasses.dataclass
|
|
class CustomAcceleratorConfig(AcceleratorConfig):
|
|
pass
|
|
|
|
@dataclasses.dataclass
|
|
class CustomTrainingArguments(TrainingArguments):
|
|
accelerator_config: dict = dataclasses.field(
|
|
default=CustomAcceleratorConfig,
|
|
)
|
|
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
with self.assertRaises(NotImplementedError) as context:
|
|
_ = CustomTrainingArguments(
|
|
output_dir=tmp_dir,
|
|
)
|
|
self.assertTrue("Tried passing in a callable to `accelerator_config`" in str(context.exception))
|
|
|
|
|
|
@require_torch
|
|
@is_staging_test
|
|
class TrainerIntegrationWithHubTester(unittest.TestCase):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
cls._token = TOKEN
|
|
HfFolder.save_token(TOKEN)
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
for model in [
|
|
"test-trainer",
|
|
"test-trainer-epoch",
|
|
"test-trainer-step",
|
|
"test-trainer-tensorboard",
|
|
"test-trainer-tags",
|
|
]:
|
|
try:
|
|
delete_repo(token=cls._token, repo_id=model)
|
|
except HTTPError:
|
|
pass
|
|
|
|
try:
|
|
delete_repo(token=cls._token, repo_id="valid_org/test-trainer-org")
|
|
except HTTPError:
|
|
pass
|
|
|
|
def test_push_to_hub(self):
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
trainer = get_regression_trainer(
|
|
output_dir=os.path.join(tmp_dir, "test-trainer"),
|
|
push_to_hub=True,
|
|
hub_token=self._token,
|
|
)
|
|
url = trainer.push_to_hub()
|
|
|
|
# Extract repo_name from the url
|
|
re_search = re.search(ENDPOINT_STAGING + r"/([^/]+/[^/]+)/", url)
|
|
self.assertTrue(re_search is not None)
|
|
repo_name = re_search.groups()[0]
|
|
|
|
self.assertEqual(repo_name, f"{USER}/test-trainer")
|
|
|
|
model = RegressionPreTrainedModel.from_pretrained(repo_name)
|
|
self.assertEqual(model.a.item(), trainer.model.a.item())
|
|
self.assertEqual(model.b.item(), trainer.model.b.item())
|
|
|
|
def test_push_to_hub_in_organization(self):
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
trainer = get_regression_trainer(output_dir=tmp_dir)
|
|
trainer.save_model()
|
|
trainer = get_regression_trainer(
|
|
output_dir=os.path.join(tmp_dir, "test-trainer-org"),
|
|
push_to_hub=True,
|
|
hub_model_id="valid_org/test-trainer-org",
|
|
hub_token=self._token,
|
|
)
|
|
url = trainer.push_to_hub()
|
|
|
|
# Extract repo_name from the url
|
|
re_search = re.search(ENDPOINT_STAGING + r"/([^/]+/[^/]+)/", url)
|
|
self.assertTrue(re_search is not None)
|
|
repo_name = re_search.groups()[0]
|
|
self.assertEqual(repo_name, "valid_org/test-trainer-org")
|
|
|
|
model = RegressionPreTrainedModel.from_pretrained("valid_org/test-trainer-org")
|
|
self.assertEqual(model.a.item(), trainer.model.a.item())
|
|
self.assertEqual(model.b.item(), trainer.model.b.item())
|
|
|
|
def get_commit_history(self, repo):
|
|
commit_logs = subprocess.run(
|
|
"git log".split(),
|
|
stderr=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
check=True,
|
|
encoding="utf-8",
|
|
cwd=repo,
|
|
).stdout
|
|
commits = commit_logs.split("\n\n")[1::2]
|
|
return [commit.strip() for commit in commits]
|
|
|
|
def test_push_to_hub_with_saves_each_epoch(self):
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
trainer = get_regression_trainer(
|
|
output_dir=os.path.join(tmp_dir, "test-trainer-epoch"),
|
|
push_to_hub=True,
|
|
hub_token=self._token,
|
|
# To avoid any flakiness if the training goes faster than the uploads.
|
|
hub_always_push=True,
|
|
save_strategy="epoch",
|
|
)
|
|
trainer.train()
|
|
|
|
commits = list_repo_commits(f"{USER}/test-trainer-epoch", token=self._token)
|
|
commits = [c.title for c in commits]
|
|
self.assertIn("initial commit", commits)
|
|
for i in range(1, 4):
|
|
self.assertIn(f"Training in progress, epoch {i}", commits)
|
|
|
|
def test_push_to_hub_with_saves_each_n_steps(self):
|
|
num_gpus = max(1, backend_device_count(torch_device))
|
|
if num_gpus > 2:
|
|
return
|
|
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
trainer = get_regression_trainer(
|
|
output_dir=os.path.join(tmp_dir, "test-trainer-step"),
|
|
push_to_hub=True,
|
|
hub_token=self._token,
|
|
# To avoid any flakiness if the training goes faster than the uploads.
|
|
hub_always_push=True,
|
|
save_strategy="steps",
|
|
save_steps=5,
|
|
)
|
|
trainer.train()
|
|
|
|
commits = list_repo_commits(f"{USER}/test-trainer-step", token=self._token)
|
|
commits = [c.title for c in commits]
|
|
self.assertIn("initial commit", commits)
|
|
|
|
# max_steps depend on the number of available GPUs
|
|
max_steps = math.ceil(trainer.args.num_train_epochs * len(trainer.get_train_dataloader()))
|
|
for i in range(5, max_steps, 5):
|
|
self.assertIn(f"Training in progress, step {i}", commits)
|
|
|
|
@require_tensorboard
|
|
def test_push_to_hub_with_tensorboard_logs(self):
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
trainer = get_regression_trainer(
|
|
output_dir=os.path.join(tmp_dir, "test-trainer-tensorboard"),
|
|
hub_token=self._token,
|
|
save_strategy="epoch",
|
|
report_to=["tensorboard"],
|
|
keep_report_to=True,
|
|
)
|
|
trainer.train()
|
|
# Push the runs via `push_to_hub()`
|
|
trainer.push_to_hub()
|
|
|
|
files = list_repo_files(f"{USER}/test-trainer-tensorboard", token=self._token)
|
|
found_log = False
|
|
for f in files:
|
|
if len(f.split("runs")) > 1 and "events.out.tfevents" in f:
|
|
found_log = True
|
|
|
|
assert found_log is True, "No tensorboard log found in repo"
|
|
|
|
def test_push_to_hub_tags(self):
|
|
# Checks if `trainer.push_to_hub()` works correctly by adding the desired
|
|
# tag without having to pass `tags` in `push_to_hub`
|
|
# see:
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
trainer = get_regression_trainer(
|
|
output_dir=os.path.join(tmp_dir, "test-trainer-tags"),
|
|
push_to_hub=True,
|
|
hub_token=self._token,
|
|
)
|
|
|
|
trainer.model.add_model_tags(["test-trainer-tags"])
|
|
|
|
url = trainer.push_to_hub()
|
|
|
|
# Extract repo_name from the url
|
|
re_search = re.search(ENDPOINT_STAGING + r"/([^/]+/[^/]+)/", url)
|
|
self.assertTrue(re_search is not None)
|
|
repo_name = re_search.groups()[0]
|
|
|
|
self.assertEqual(repo_name, f"{USER}/test-trainer-tags")
|
|
|
|
model_card = ModelCard.load(repo_name)
|
|
self.assertTrue("test-trainer-tags" in model_card.data.tags)
|
|
|
|
|
|
@require_torch
|
|
@require_optuna
|
|
class TrainerHyperParameterOptunaIntegrationTest(unittest.TestCase):
|
|
def setUp(self):
|
|
args = TrainingArguments("..")
|
|
self.n_epochs = args.num_train_epochs
|
|
self.batch_size = args.train_batch_size
|
|
|
|
def test_hyperparameter_search(self):
|
|
class MyTrialShortNamer(TrialShortNamer):
|
|
DEFAULTS = {"a": 0, "b": 0}
|
|
|
|
def hp_space(trial):
|
|
return {}
|
|
|
|
def model_init(trial):
|
|
if trial is not None:
|
|
a = trial.suggest_int("a", -4, 4)
|
|
b = trial.suggest_int("b", -4, 4)
|
|
else:
|
|
a = 0
|
|
b = 0
|
|
config = RegressionModelConfig(a=a, b=b, double_output=False)
|
|
|
|
return RegressionPreTrainedModel(config)
|
|
|
|
def hp_name(trial):
|
|
return MyTrialShortNamer.shortname(trial.params)
|
|
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
trainer = get_regression_trainer(
|
|
output_dir=tmp_dir,
|
|
learning_rate=0.1,
|
|
logging_steps=1,
|
|
eval_strategy=IntervalStrategy.EPOCH,
|
|
save_strategy=IntervalStrategy.EPOCH,
|
|
num_train_epochs=4,
|
|
disable_tqdm=True,
|
|
load_best_model_at_end=True,
|
|
logging_dir="runs",
|
|
run_name="test",
|
|
model_init=model_init,
|
|
)
|
|
trainer.hyperparameter_search(direction="minimize", hp_space=hp_space, hp_name=hp_name, n_trials=4)
|
|
|
|
|
|
@require_torch
|
|
@require_optuna
|
|
class TrainerHyperParameterMultiObjectOptunaIntegrationTest(unittest.TestCase):
|
|
def setUp(self):
|
|
args = TrainingArguments("..")
|
|
self.n_epochs = args.num_train_epochs
|
|
self.batch_size = args.train_batch_size
|
|
|
|
def test_hyperparameter_search(self):
|
|
class MyTrialShortNamer(TrialShortNamer):
|
|
DEFAULTS = {"a": 0, "b": 0}
|
|
|
|
def hp_space(trial):
|
|
return {}
|
|
|
|
def model_init(trial):
|
|
if trial is not None:
|
|
a = trial.suggest_int("a", -4, 4)
|
|
b = trial.suggest_int("b", -4, 4)
|
|
else:
|
|
a = 0
|
|
b = 0
|
|
config = RegressionModelConfig(a=a, b=b, double_output=False)
|
|
|
|
return RegressionPreTrainedModel(config)
|
|
|
|
def hp_name(trial):
|
|
return MyTrialShortNamer.shortname(trial.params)
|
|
|
|
def compute_objective(metrics: Dict[str, float]) -> List[float]:
|
|
return metrics["eval_loss"], metrics["eval_accuracy"]
|
|
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
trainer = get_regression_trainer(
|
|
output_dir=tmp_dir,
|
|
learning_rate=0.1,
|
|
logging_steps=1,
|
|
eval_strategy=IntervalStrategy.EPOCH,
|
|
save_strategy=IntervalStrategy.EPOCH,
|
|
num_train_epochs=10,
|
|
disable_tqdm=True,
|
|
load_best_model_at_end=True,
|
|
logging_dir="runs",
|
|
run_name="test",
|
|
model_init=model_init,
|
|
compute_metrics=AlmostAccuracy(),
|
|
)
|
|
trainer.hyperparameter_search(
|
|
direction=["minimize", "maximize"],
|
|
hp_space=hp_space,
|
|
hp_name=hp_name,
|
|
n_trials=4,
|
|
compute_objective=compute_objective,
|
|
)
|
|
|
|
|
|
@require_torch
|
|
@require_ray
|
|
class TrainerHyperParameterRayIntegrationTest(unittest.TestCase):
|
|
def setUp(self):
|
|
args = TrainingArguments("..")
|
|
self.n_epochs = args.num_train_epochs
|
|
self.batch_size = args.train_batch_size
|
|
|
|
def ray_hyperparameter_search(self):
|
|
class MyTrialShortNamer(TrialShortNamer):
|
|
DEFAULTS = {"a": 0, "b": 0}
|
|
|
|
def hp_space(trial):
|
|
from ray import tune
|
|
|
|
return {
|
|
"a": tune.randint(-4, 4),
|
|
"b": tune.randint(-4, 4),
|
|
}
|
|
|
|
def model_init(config):
|
|
if config is None:
|
|
a = 0
|
|
b = 0
|
|
else:
|
|
a = config["a"]
|
|
b = config["b"]
|
|
model_config = RegressionModelConfig(a=a, b=b, double_output=False)
|
|
|
|
return RegressionPreTrainedModel(model_config)
|
|
|
|
def hp_name(params):
|
|
return MyTrialShortNamer.shortname(params)
|
|
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
trainer = get_regression_trainer(
|
|
output_dir=tmp_dir,
|
|
learning_rate=0.1,
|
|
logging_steps=1,
|
|
eval_strategy=IntervalStrategy.EPOCH,
|
|
save_strategy=IntervalStrategy.EPOCH,
|
|
num_train_epochs=4,
|
|
disable_tqdm=True,
|
|
load_best_model_at_end=True,
|
|
logging_dir="runs",
|
|
run_name="test",
|
|
model_init=model_init,
|
|
)
|
|
trainer.hyperparameter_search(
|
|
direction="minimize", hp_space=hp_space, hp_name=hp_name, backend="ray", n_trials=4
|
|
)
|
|
|
|
def test_hyperparameter_search(self):
|
|
self.ray_hyperparameter_search()
|
|
|
|
def test_hyperparameter_search_ray_client(self):
|
|
import ray
|
|
from ray.util.client.ray_client_helpers import ray_start_client_server
|
|
|
|
with ray_start_client_server():
|
|
assert ray.util.client.ray.is_connected()
|
|
self.ray_hyperparameter_search()
|
|
|
|
|
|
@slow
|
|
@require_torch
|
|
@require_sigopt
|
|
class TrainerHyperParameterSigOptIntegrationTest(unittest.TestCase):
|
|
def setUp(self):
|
|
args = TrainingArguments("..")
|
|
self.n_epochs = args.num_train_epochs
|
|
self.batch_size = args.train_batch_size
|
|
|
|
def test_hyperparameter_search(self):
|
|
class MyTrialShortNamer(TrialShortNamer):
|
|
DEFAULTS = {"a": 0, "b": 0}
|
|
|
|
def hp_space(trial):
|
|
return [
|
|
{"bounds": {"min": -4, "max": 4}, "name": "a", "type": "int"},
|
|
{"bounds": {"min": -4, "max": 4}, "name": "b", "type": "int"},
|
|
]
|
|
|
|
def model_init(trial):
|
|
if trial is not None:
|
|
a = trial.assignments["a"]
|
|
b = trial.assignments["b"]
|
|
else:
|
|
a = 0
|
|
b = 0
|
|
config = RegressionModelConfig(a=a, b=b, double_output=False)
|
|
|
|
return RegressionPreTrainedModel(config)
|
|
|
|
def hp_name(trial):
|
|
return MyTrialShortNamer.shortname(trial.assignments)
|
|
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
trainer = get_regression_trainer(
|
|
output_dir=tmp_dir,
|
|
learning_rate=0.1,
|
|
logging_steps=1,
|
|
eval_strategy=IntervalStrategy.EPOCH,
|
|
save_strategy=IntervalStrategy.EPOCH,
|
|
num_train_epochs=4,
|
|
disable_tqdm=True,
|
|
load_best_model_at_end=True,
|
|
logging_dir="runs",
|
|
run_name="test",
|
|
model_init=model_init,
|
|
)
|
|
trainer.hyperparameter_search(
|
|
direction="minimize", hp_space=hp_space, hp_name=hp_name, backend="sigopt", n_trials=4
|
|
)
|
|
|
|
|
|
optim_test_params = []
|
|
if is_torch_available():
|
|
default_adam_kwargs = {
|
|
"betas": (TrainingArguments.adam_beta1, TrainingArguments.adam_beta2),
|
|
"eps": TrainingArguments.adam_epsilon,
|
|
"lr": TrainingArguments.learning_rate,
|
|
}
|
|
|
|
default_lion_kwargs = {
|
|
"betas": (TrainingArguments.adam_beta1, TrainingArguments.adam_beta2),
|
|
"lr": TrainingArguments.learning_rate,
|
|
}
|
|
|
|
default_anyprecision_kwargs = {
|
|
"use_kahan_summation": False,
|
|
"momentum_dtype": torch.float32,
|
|
"variance_dtype": torch.float32,
|
|
"compensation_buffer_dtype": torch.bfloat16,
|
|
}
|
|
|
|
optim_test_params = [
|
|
(
|
|
TrainingArguments(optim=OptimizerNames.ADAMW_HF, output_dir="None"),
|
|
transformers.optimization.AdamW,
|
|
default_adam_kwargs,
|
|
),
|
|
(
|
|
TrainingArguments(optim=OptimizerNames.ADAMW_HF.value, output_dir="None"),
|
|
transformers.optimization.AdamW,
|
|
default_adam_kwargs,
|
|
),
|
|
(
|
|
TrainingArguments(optim=OptimizerNames.ADAMW_TORCH, output_dir="None"),
|
|
torch.optim.AdamW,
|
|
default_adam_kwargs,
|
|
),
|
|
(
|
|
TrainingArguments(optim=OptimizerNames.ADAFACTOR, output_dir="None"),
|
|
transformers.optimization.Adafactor,
|
|
{
|
|
"scale_parameter": False,
|
|
"relative_step": False,
|
|
"lr": TrainingArguments.learning_rate,
|
|
},
|
|
),
|
|
]
|
|
|
|
if is_apex_available():
|
|
import apex
|
|
|
|
optim_test_params.append(
|
|
(
|
|
TrainingArguments(optim=OptimizerNames.ADAMW_APEX_FUSED, output_dir="None"),
|
|
apex.optimizers.FusedAdam,
|
|
default_adam_kwargs,
|
|
)
|
|
)
|
|
|
|
if is_bitsandbytes_available():
|
|
import bitsandbytes as bnb
|
|
|
|
optim_test_params.append(
|
|
(
|
|
TrainingArguments(optim=OptimizerNames.ADAMW_BNB, output_dir="None"),
|
|
bnb.optim.AdamW,
|
|
default_adam_kwargs,
|
|
)
|
|
)
|
|
|
|
optim_test_params.append(
|
|
(
|
|
TrainingArguments(optim=OptimizerNames.ADAMW_8BIT, output_dir="None"),
|
|
bnb.optim.AdamW,
|
|
default_adam_kwargs,
|
|
)
|
|
)
|
|
|
|
optim_test_params.append(
|
|
(
|
|
TrainingArguments(optim=OptimizerNames.PAGED_ADAMW, output_dir="None"),
|
|
bnb.optim.AdamW,
|
|
default_adam_kwargs,
|
|
)
|
|
)
|
|
|
|
optim_test_params.append(
|
|
(
|
|
TrainingArguments(optim=OptimizerNames.PAGED_ADAMW_8BIT, output_dir="None"),
|
|
bnb.optim.AdamW,
|
|
default_adam_kwargs,
|
|
)
|
|
)
|
|
|
|
optim_test_params.append(
|
|
(
|
|
TrainingArguments(optim=OptimizerNames.LION, output_dir="None"),
|
|
bnb.optim.Lion,
|
|
default_lion_kwargs,
|
|
)
|
|
)
|
|
|
|
optim_test_params.append(
|
|
(
|
|
TrainingArguments(optim=OptimizerNames.LION_8BIT, output_dir="None"),
|
|
bnb.optim.Lion,
|
|
default_lion_kwargs,
|
|
)
|
|
)
|
|
|
|
optim_test_params.append(
|
|
(
|
|
TrainingArguments(optim=OptimizerNames.PAGED_LION_8BIT, output_dir="None"),
|
|
bnb.optim.Lion,
|
|
default_lion_kwargs,
|
|
)
|
|
)
|
|
|
|
if is_torchdistx_available():
|
|
import torchdistx
|
|
|
|
optim_test_params.append(
|
|
(
|
|
TrainingArguments(optim=OptimizerNames.ADAMW_ANYPRECISION, output_dir="None"),
|
|
torchdistx.optimizers.AnyPrecisionAdamW,
|
|
dict(default_adam_kwargs, **default_anyprecision_kwargs),
|
|
)
|
|
)
|
|
|
|
|
|
@require_torch
|
|
class TrainerOptimizerChoiceTest(unittest.TestCase):
|
|
def check_optim_and_kwargs(self, training_args: TrainingArguments, expected_cls, expected_kwargs):
|
|
actual_cls, optim_kwargs = Trainer.get_optimizer_cls_and_kwargs(training_args)
|
|
self.assertEqual(expected_cls, actual_cls)
|
|
self.assertIsNotNone(optim_kwargs)
|
|
|
|
for p, v in expected_kwargs.items():
|
|
self.assertTrue(p in optim_kwargs)
|
|
actual_v = optim_kwargs[p]
|
|
self.assertTrue(actual_v == v, f"Failed check for {p}. Expected {v}, but got {actual_v}.")
|
|
|
|
@parameterized.expand(optim_test_params, skip_on_empty=True)
|
|
def test_optim_supported(self, training_args: TrainingArguments, expected_cls, expected_kwargs):
|
|
# exercises all the valid --optim options
|
|
self.check_optim_and_kwargs(training_args, expected_cls, expected_kwargs)
|
|
|
|
trainer = get_regression_trainer(**training_args.to_dict())
|
|
trainer.train()
|
|
|
|
def test_fused_adam(self):
|
|
# Pretend that apex is installed and mock apex.optimizers.FusedAdam exists.
|
|
# Trainer.get_optimizer_cls_and_kwargs does not use FusedAdam. It only has to return the
|
|
# class given, so mocking apex.optimizers.FusedAdam should be fine for testing and allow
|
|
# the test to run without requiring an apex installation.
|
|
mock = Mock()
|
|
modules = {
|
|
"apex": mock,
|
|
"apex.optimizers": mock.optimizers,
|
|
"apex.optimizers.FusedAdam": mock.optimizers.FusedAdam,
|
|
}
|
|
with patch.dict("sys.modules", modules):
|
|
self.check_optim_and_kwargs(
|
|
TrainingArguments(optim=OptimizerNames.ADAMW_APEX_FUSED, output_dir="None"),
|
|
mock.optimizers.FusedAdam,
|
|
default_adam_kwargs,
|
|
)
|
|
|
|
def test_fused_adam_no_apex(self):
|
|
args = TrainingArguments(optim=OptimizerNames.ADAMW_APEX_FUSED, output_dir="None")
|
|
|
|
# Pretend that apex does not exist, even if installed. By setting apex to None, importing
|
|
# apex will fail even if apex is installed.
|
|
with patch.dict("sys.modules", {"apex.optimizers": None}):
|
|
with self.assertRaises(ValueError):
|
|
Trainer.get_optimizer_cls_and_kwargs(args)
|
|
|
|
def test_bnb_adam8bit(self):
|
|
# Pretend that Bits and Bytes is installed and mock bnb.optim.Adam8bit exists.
|
|
# Trainer.get_optimizer_cls_and_kwargs does not use Adam8bit. It only has to return the
|
|
# class given, so mocking bnb.optim.Adam8bit should be fine for testing and allow
|
|
# the test to run without requiring a bnb installation.
|
|
mock = Mock()
|
|
modules = {
|
|
"bitsandbytes": mock,
|
|
"bitsandbytes.optim": mock.optim,
|
|
"bitsandbytes.optim.AdamW": mock.optim.AdamW,
|
|
}
|
|
with patch.dict("sys.modules", modules):
|
|
self.check_optim_and_kwargs(
|
|
TrainingArguments(optim=OptimizerNames.ADAMW_BNB, output_dir="None"),
|
|
mock.optim.AdamW,
|
|
default_adam_kwargs,
|
|
)
|
|
|
|
def test_bnb_paged_adam8bit_alias(self):
|
|
mock = Mock()
|
|
modules = {
|
|
"bitsandbytes": mock,
|
|
"bitsandbytes.optim": mock.optim,
|
|
"bitsandbytes.optim.AdamW": mock.optim.AdamW,
|
|
}
|
|
with patch.dict("sys.modules", modules):
|
|
self.check_optim_and_kwargs(
|
|
TrainingArguments(optim=OptimizerNames.ADAMW_8BIT, output_dir="None"),
|
|
mock.optim.AdamW,
|
|
default_adam_kwargs,
|
|
)
|
|
|
|
def test_bnb_paged_adam(self):
|
|
mock = Mock()
|
|
modules = {
|
|
"bitsandbytes": mock,
|
|
"bitsandbytes.optim": mock.optim,
|
|
"bitsandbytes.optim.AdamW": mock.optim.AdamW,
|
|
}
|
|
with patch.dict("sys.modules", modules):
|
|
self.check_optim_and_kwargs(
|
|
TrainingArguments(optim=OptimizerNames.PAGED_ADAMW, output_dir="None"),
|
|
mock.optim.AdamW,
|
|
default_adam_kwargs,
|
|
)
|
|
|
|
def test_bnb_paged_adam8bit(self):
|
|
mock = Mock()
|
|
modules = {
|
|
"bitsandbytes": mock,
|
|
"bitsandbytes.optim": mock.optim,
|
|
"bitsandbytes.optim.AdamW": mock.optim.AdamW,
|
|
}
|
|
with patch.dict("sys.modules", modules):
|
|
self.check_optim_and_kwargs(
|
|
TrainingArguments(optim=OptimizerNames.PAGED_ADAMW_8BIT, output_dir="None"),
|
|
mock.optim.AdamW,
|
|
default_adam_kwargs,
|
|
)
|
|
|
|
def test_bnb_lion(self):
|
|
mock = Mock()
|
|
modules = {
|
|
"bitsandbytes": mock,
|
|
"bitsandbytes.optim": mock.optim,
|
|
"bitsandbytes.optim.Lion": mock.optim.Lion,
|
|
}
|
|
with patch.dict("sys.modules", modules):
|
|
self.check_optim_and_kwargs(
|
|
TrainingArguments(optim=OptimizerNames.LION, output_dir="None"),
|
|
mock.optim.Lion,
|
|
default_lion_kwargs,
|
|
)
|
|
|
|
def test_bnb_lion8bit(self):
|
|
mock = Mock()
|
|
modules = {
|
|
"bitsandbytes": mock,
|
|
"bitsandbytes.optim": mock.optim,
|
|
"bitsandbytes.optim.Lion": mock.optim.Lion,
|
|
}
|
|
with patch.dict("sys.modules", modules):
|
|
self.check_optim_and_kwargs(
|
|
TrainingArguments(optim=OptimizerNames.LION_8BIT, output_dir="None"),
|
|
mock.optim.Lion,
|
|
default_lion_kwargs,
|
|
)
|
|
|
|
def test_bnb_paged_lion8bit(self):
|
|
mock = Mock()
|
|
modules = {
|
|
"bitsandbytes": mock,
|
|
"bitsandbytes.optim": mock.optim,
|
|
"bitsandbytes.optim.Lion": mock.optim.Lion,
|
|
}
|
|
with patch.dict("sys.modules", modules):
|
|
self.check_optim_and_kwargs(
|
|
TrainingArguments(optim=OptimizerNames.PAGED_LION_8BIT, output_dir="None"),
|
|
mock.optim.Lion,
|
|
default_lion_kwargs,
|
|
)
|
|
|
|
def test_bnb_paged_lion(self):
|
|
mock = Mock()
|
|
modules = {
|
|
"bitsandbytes": mock,
|
|
"bitsandbytes.optim": mock.optim,
|
|
"bitsandbytes.optim.Lion": mock.optim.Lion,
|
|
}
|
|
with patch.dict("sys.modules", modules):
|
|
self.check_optim_and_kwargs(
|
|
TrainingArguments(optim=OptimizerNames.PAGED_LION, output_dir="None"),
|
|
mock.optim.Lion,
|
|
default_lion_kwargs,
|
|
)
|
|
|
|
def test_bnb_adam8bit_no_bnb(self):
|
|
args = TrainingArguments(optim=OptimizerNames.ADAMW_BNB, output_dir="None")
|
|
|
|
# Pretend that bnb does not exist, even if installed. By setting bnb to None, importing
|
|
# bnb will fail even if bnb is installed.
|
|
with patch.dict("sys.modules", {"bitsandbytes.optim": None}):
|
|
with self.assertRaises(ValueError):
|
|
Trainer.get_optimizer_cls_and_kwargs(args)
|
|
|
|
def test_bnb_paged_adam_no_bnb(self):
|
|
args = TrainingArguments(optim=OptimizerNames.PAGED_ADAMW, output_dir="None")
|
|
|
|
# Pretend that bnb does not exist, even if installed. By setting bnb to None, importing
|
|
# bnb will fail even if bnb is installed.
|
|
with patch.dict("sys.modules", {"bitsandbytes.optim": None}):
|
|
with self.assertRaises(ValueError):
|
|
Trainer.get_optimizer_cls_and_kwargs(args)
|
|
|
|
def test_bnb_paged_adam8bit_no_bnb(self):
|
|
args = TrainingArguments(optim=OptimizerNames.PAGED_ADAMW_8BIT, output_dir="None")
|
|
|
|
# Pretend that bnb does not exist, even if installed. By setting bnb to None, importing
|
|
# bnb will fail even if bnb is installed.
|
|
with patch.dict("sys.modules", {"bitsandbytes.optim": None}):
|
|
with self.assertRaises(ValueError):
|
|
Trainer.get_optimizer_cls_and_kwargs(args)
|
|
|
|
def test_bnb_paged_lion_no_bnb(self):
|
|
args = TrainingArguments(optim=OptimizerNames.PAGED_LION, output_dir="None")
|
|
|
|
# Pretend that bnb does not exist, even if installed. By setting bnb to None, importing
|
|
# bnb will fail even if bnb is installed.
|
|
with patch.dict("sys.modules", {"bitsandbytes.optim": None}):
|
|
with self.assertRaises(ValueError):
|
|
Trainer.get_optimizer_cls_and_kwargs(args)
|
|
|
|
def test_bnb_paged_lion8bit_no_bnb(self):
|
|
args = TrainingArguments(optim=OptimizerNames.PAGED_LION_8BIT, output_dir="None")
|
|
|
|
# Pretend that bnb does not exist, even if installed. By setting bnb to None, importing
|
|
# bnb will fail even if bnb is installed.
|
|
with patch.dict("sys.modules", {"bitsandbytes.optim": None}):
|
|
with self.assertRaises(ValueError):
|
|
Trainer.get_optimizer_cls_and_kwargs(args)
|
|
|
|
def test_anyprecision_adamw(self):
|
|
# Pretend that torchdistx is installed and mock torchdistx.optimizers.AnyPrecisionAdamW exists.
|
|
# Trainer.get_optimizer_cls_and_kwargs does not use AnyPrecisioinAdamW. It only has to return the
|
|
# class given, so mocking torchdistx.optimizers.AnyPrecisionAdamW should be fine for testing and allow
|
|
# the test to run without requiring a bnb installation.
|
|
mock = Mock()
|
|
modules = {
|
|
"torchdistx": mock,
|
|
"torchdistx.optimizers": mock.optimizers,
|
|
"torchdistx.optimizers.AnyPrecisionAdamW.": mock.optimizers.AnyPrecisionAdamW,
|
|
}
|
|
with patch.dict("sys.modules", modules):
|
|
self.check_optim_and_kwargs(
|
|
TrainingArguments(optim=OptimizerNames.ADAMW_ANYPRECISION, output_dir="None"),
|
|
mock.optimizers.AnyPrecisionAdamW,
|
|
dict(default_adam_kwargs, **default_anyprecision_kwargs),
|
|
)
|
|
|
|
def test_no_torchdistx_anyprecision_adamw(self):
|
|
args = TrainingArguments(optim=OptimizerNames.ADAMW_ANYPRECISION, output_dir="None")
|
|
|
|
# Pretend that torchdistx does not exist, even if installed. By setting torchdistx to None, importing
|
|
# torchdistx.optimizers will fail even if torchdistx is installed.
|
|
with patch.dict("sys.modules", {"torchdistx.optimizers": None}):
|
|
with self.assertRaises(ValueError):
|
|
Trainer.get_optimizer_cls_and_kwargs(args)
|
|
|
|
|
|
@require_torch
|
|
@require_wandb
|
|
class TrainerHyperParameterWandbIntegrationTest(unittest.TestCase):
|
|
def setUp(self):
|
|
args = TrainingArguments("..")
|
|
self.n_epochs = args.num_train_epochs
|
|
self.batch_size = args.train_batch_size
|
|
|
|
def test_hyperparameter_search(self):
|
|
class MyTrialShortNamer(TrialShortNamer):
|
|
DEFAULTS = {"a": 0, "b": 0}
|
|
|
|
def hp_space(trial):
|
|
return {
|
|
"method": "random",
|
|
"metric": {},
|
|
"parameters": {
|
|
"a": {"distribution": "uniform", "min": 1e-6, "max": 1e-4},
|
|
"b": {"distribution": "int_uniform", "min": 1, "max": 6},
|
|
},
|
|
}
|
|
|
|
def model_init(config):
|
|
if config is None:
|
|
a = 0
|
|
b = 0
|
|
else:
|
|
a = config["a"]
|
|
b = config["b"]
|
|
model_config = RegressionModelConfig(a=a, b=b, double_output=False)
|
|
|
|
return RegressionPreTrainedModel(model_config)
|
|
|
|
def hp_name(params):
|
|
return MyTrialShortNamer.shortname(params)
|
|
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
trainer = get_regression_trainer(
|
|
output_dir=tmp_dir,
|
|
learning_rate=0.1,
|
|
logging_steps=1,
|
|
eval_strategy=IntervalStrategy.EPOCH,
|
|
save_strategy=IntervalStrategy.EPOCH,
|
|
num_train_epochs=4,
|
|
disable_tqdm=True,
|
|
load_best_model_at_end=True,
|
|
logging_dir="runs",
|
|
run_name="test",
|
|
model_init=model_init,
|
|
)
|
|
trainer.hyperparameter_search(
|
|
direction="minimize", hp_space=hp_space, hp_name=hp_name, backend="wandb", n_trials=4, anonymous="must"
|
|
)
|
|
|
|
|
|
class HyperParameterSearchBackendsTest(unittest.TestCase):
|
|
def test_hyperparameter_search_backends(self):
|
|
self.assertEqual(
|
|
list(ALL_HYPERPARAMETER_SEARCH_BACKENDS.keys()),
|
|
list(HPSearchBackend),
|
|
)
|
|
|
|
|
|
@require_torch
|
|
class OptimizerAndModelInspectionTest(unittest.TestCase):
|
|
def test_get_num_trainable_parameters(self):
|
|
model = nn.Sequential(nn.Linear(128, 64), nn.Linear(64, 32))
|
|
# in_features * out_features + bias
|
|
layer_1 = 128 * 64 + 64
|
|
layer_2 = 64 * 32 + 32
|
|
trainer = Trainer(model=model)
|
|
self.assertEqual(trainer.get_num_trainable_parameters(), layer_1 + layer_2)
|
|
# Freeze the last layer
|
|
for param in model[-1].parameters():
|
|
param.requires_grad = False
|
|
self.assertEqual(trainer.get_num_trainable_parameters(), layer_1)
|
|
|
|
def test_get_learning_rates(self):
|
|
model = nn.Sequential(nn.Linear(128, 64))
|
|
trainer = Trainer(model=model)
|
|
with self.assertRaises(ValueError):
|
|
trainer.get_learning_rates()
|
|
trainer.create_optimizer()
|
|
self.assertEqual(trainer.get_learning_rates(), [5e-05, 5e-05])
|
|
|
|
def test_get_optimizer_group(self):
|
|
model = nn.Sequential(nn.Linear(128, 64))
|
|
trainer = Trainer(model=model)
|
|
# ValueError is raised if optimizer is None
|
|
with self.assertRaises(ValueError):
|
|
trainer.get_optimizer_group()
|
|
trainer.create_optimizer()
|
|
# Get groups
|
|
num_groups = len(trainer.get_optimizer_group())
|
|
self.assertEqual(num_groups, 2)
|
|
# Get group of parameter
|
|
param = next(model.parameters())
|
|
group = trainer.get_optimizer_group(param)
|
|
self.assertIn(param, group["params"])
|