362 lines
14 KiB
Python
362 lines
14 KiB
Python
# Copyright 2020 The HuggingFace Team. All rights reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import math
|
|
import os
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Tuple
|
|
from unittest.mock import patch
|
|
|
|
from parameterized import parameterized
|
|
|
|
from transformers.testing_utils import (
|
|
CaptureStderr,
|
|
ExtendSysPath,
|
|
TestCasePlus,
|
|
backend_device_count,
|
|
execute_subprocess_async,
|
|
get_torch_dist_unique_port,
|
|
require_apex,
|
|
require_bitsandbytes,
|
|
require_torch,
|
|
require_torch_gpu,
|
|
require_torch_multi_accelerator,
|
|
require_torch_non_multi_accelerator,
|
|
slow,
|
|
torch_device,
|
|
)
|
|
from transformers.trainer_callback import TrainerState
|
|
from transformers.trainer_utils import set_seed
|
|
|
|
|
|
bindir = os.path.abspath(os.path.dirname(__file__))
|
|
with ExtendSysPath(f"{bindir}/../../examples/pytorch/translation"):
|
|
from run_translation import main # noqa
|
|
|
|
|
|
set_seed(42)
|
|
MARIAN_MODEL = "sshleifer/student_marian_en_ro_6_1"
|
|
MBART_TINY = "sshleifer/tiny-mbart"
|
|
|
|
|
|
@require_torch
|
|
class TestTrainerExt(TestCasePlus):
|
|
def run_seq2seq_quick(
|
|
self,
|
|
distributed=False,
|
|
extra_args_str=None,
|
|
predict_with_generate=True,
|
|
do_train=True,
|
|
do_eval=True,
|
|
do_predict=True,
|
|
n_gpus_to_use=None,
|
|
):
|
|
output_dir = self.run_trainer(
|
|
eval_steps=1,
|
|
max_len=12,
|
|
model_name=MBART_TINY,
|
|
num_train_epochs=1,
|
|
distributed=distributed,
|
|
extra_args_str=extra_args_str,
|
|
predict_with_generate=predict_with_generate,
|
|
do_train=do_train,
|
|
do_eval=do_eval,
|
|
do_predict=do_predict,
|
|
n_gpus_to_use=n_gpus_to_use,
|
|
)
|
|
logs = TrainerState.load_from_json(os.path.join(output_dir, "trainer_state.json")).log_history
|
|
|
|
if not do_eval:
|
|
return
|
|
|
|
eval_metrics = [log for log in logs if "eval_loss" in log.keys()]
|
|
|
|
first_step_stats = eval_metrics[0]
|
|
if predict_with_generate:
|
|
assert "eval_bleu" in first_step_stats
|
|
|
|
last_step_stats = eval_metrics[-1]
|
|
assert isinstance(last_step_stats["eval_bleu"], float)
|
|
assert not math.isnan(float(last_step_stats["eval_loss"])), "eval_loss must not be `nan`"
|
|
|
|
@require_torch_non_multi_accelerator
|
|
def test_run_seq2seq_no_dist(self):
|
|
self.run_seq2seq_quick()
|
|
|
|
# verify that the trainer can handle non-distributed with n_gpu > 1
|
|
@require_torch_multi_accelerator
|
|
def test_run_seq2seq_dp(self):
|
|
self.run_seq2seq_quick(distributed=False)
|
|
|
|
# verify that the trainer can handle distributed with n_gpu > 1
|
|
@require_torch_multi_accelerator
|
|
def test_run_seq2seq_ddp(self):
|
|
self.run_seq2seq_quick(distributed=True)
|
|
|
|
@require_apex
|
|
@require_torch_gpu
|
|
def test_run_seq2seq_apex(self):
|
|
# XXX: apex breaks the trainer if it's run twice e.g. run_seq2seq.main() from the same
|
|
# program and it breaks other tests that run from the same pytest worker, therefore until this is
|
|
# sorted out it must be run only in an external program, that is distributed=True in this
|
|
# test and only under one or more gpus - if we want cpu will need to make a special test
|
|
#
|
|
# specifically to the problem traced it to self.optimizer.step() - if it's run 2nd time via
|
|
# 2nd main() call it botches the future eval.
|
|
#
|
|
self.run_seq2seq_quick(distributed=True, extra_args_str="--fp16 --fp16_backend=apex")
|
|
# test 2nd time - was getting eval_loss': nan'
|
|
# to reproduce the problem set distributed=False
|
|
self.run_seq2seq_quick(distributed=True, extra_args_str="--fp16 --fp16_backend=apex")
|
|
|
|
@parameterized.expand(["base", "low", "high", "mixed"])
|
|
@require_torch_multi_accelerator
|
|
def test_trainer_log_level_replica(self, experiment_id):
|
|
# as each sub-test is slow-ish split into multiple sub-tests to avoid CI timeout
|
|
experiments = {
|
|
# test with the default log_level - should be info and thus log info once
|
|
"base": {"extra_args_str": "", "n_matches": 1},
|
|
# test with low log_level and log_level_replica - should be noisy on all processes
|
|
# now the info string should appear twice on 2 processes
|
|
"low": {"extra_args_str": "--log_level debug --log_level_replica debug", "n_matches": 2},
|
|
# test with high log_level and low log_level_replica
|
|
# now the info string should appear once only on the replica
|
|
"high": {"extra_args_str": "--log_level error --log_level_replica debug", "n_matches": 1},
|
|
# test with high log_level and log_level_replica - should be quiet on all processes
|
|
"mixed": {"extra_args_str": "--log_level error --log_level_replica error", "n_matches": 0},
|
|
}
|
|
|
|
data = experiments[experiment_id]
|
|
kwargs = {
|
|
"distributed": True,
|
|
"predict_with_generate": False,
|
|
"do_eval": False,
|
|
"do_predict": False,
|
|
"n_gpus_to_use": 2,
|
|
}
|
|
log_info_string = "Running training"
|
|
with CaptureStderr() as cl:
|
|
self.run_seq2seq_quick(**kwargs, extra_args_str=data["extra_args_str"])
|
|
n_matches = len(re.findall(log_info_string, cl.err))
|
|
self.assertEqual(n_matches, data["n_matches"])
|
|
|
|
@slow
|
|
def test_run_seq2seq(self):
|
|
output_dir = self.run_trainer(
|
|
eval_steps=2,
|
|
max_len=128,
|
|
model_name=MARIAN_MODEL,
|
|
learning_rate=3e-4,
|
|
num_train_epochs=10,
|
|
distributed=False,
|
|
)
|
|
|
|
# Check metrics
|
|
logs = TrainerState.load_from_json(os.path.join(output_dir, "trainer_state.json")).log_history
|
|
eval_metrics = [log for log in logs if "eval_loss" in log.keys()]
|
|
first_step_stats = eval_metrics[0]
|
|
last_step_stats = eval_metrics[-1]
|
|
|
|
assert first_step_stats["eval_loss"] > last_step_stats["eval_loss"], "model learned nothing"
|
|
assert isinstance(last_step_stats["eval_bleu"], float)
|
|
|
|
# test if do_predict saves generations and metrics
|
|
contents = os.listdir(output_dir)
|
|
contents = {os.path.basename(p) for p in contents}
|
|
assert "generated_predictions.txt" in contents
|
|
assert "predict_results.json" in contents
|
|
|
|
@slow
|
|
@require_bitsandbytes
|
|
def test_run_seq2seq_bnb(self):
|
|
from transformers.training_args import OptimizerNames
|
|
|
|
def train_and_return_metrics(optim: str) -> Tuple[int, float]:
|
|
extra_args = "--skip_memory_metrics 0"
|
|
|
|
output_dir = self.run_trainer(
|
|
max_len=128,
|
|
model_name=MARIAN_MODEL,
|
|
learning_rate=3e-4,
|
|
num_train_epochs=1,
|
|
optim=optim,
|
|
distributed=True, # force run in a new process
|
|
extra_args_str=extra_args,
|
|
do_eval=False,
|
|
do_predict=False,
|
|
n_gpus_to_use=1, # to allow deterministic fixed memory usage
|
|
)
|
|
|
|
# Check metrics
|
|
logs = TrainerState.load_from_json(Path(output_dir, "trainer_state.json")).log_history
|
|
gpu_peak_mem_mb = int(logs[0]["train_mem_gpu_peaked_delta"] / 2**20)
|
|
gpu_alloc_mem_mb = int(logs[0]["train_mem_gpu_alloc_delta"] / 2**20)
|
|
|
|
loss = logs[0]["train_loss"]
|
|
return gpu_peak_mem_mb, gpu_alloc_mem_mb, loss
|
|
|
|
gpu_peak_mem_orig, gpu_alloc_mem_orig, loss_orig = train_and_return_metrics(OptimizerNames.ADAMW_TORCH.value)
|
|
gpu_peak_mem_bnb, gpu_alloc_mem_bnb, loss_bnb = train_and_return_metrics(OptimizerNames.ADAMW_BNB.value)
|
|
|
|
gpu_alloc_mem_diff = gpu_alloc_mem_orig - gpu_alloc_mem_bnb
|
|
|
|
gpu_total_mem_orig = gpu_peak_mem_orig + gpu_alloc_mem_orig
|
|
gpu_total_mem_bnb = gpu_peak_mem_bnb + gpu_alloc_mem_bnb
|
|
gpu_total_mem_diff = gpu_total_mem_orig - gpu_total_mem_bnb
|
|
|
|
# sshleifer/student_marian_en_ro_6_1 has 54M parameter, 29M of which is `nn.Embedding` which
|
|
# doesn't get quantized and remains in fp32. Therefore we only have 25M parameters quantized
|
|
# in 2 bytes and the diff in optim memory usage is derived as so:
|
|
#
|
|
# - normal 25*8=~200MB (8 bytes per param)
|
|
# - bnb 25*2= ~50MB (2 bytes per param)
|
|
#
|
|
# Thus we should expect ~150MB total memory saved.
|
|
#
|
|
# Peak memory should be the same - the total should be different by about that same margin
|
|
#
|
|
# After leaving a small margin to accommodate for differences between gpus let's check
|
|
# that we have at least 120MB in savings
|
|
expected_savings = 120
|
|
|
|
# uncomment the following if this test starts failing - requires py38 for a new print feature
|
|
# gpu_peak_mem_diff = gpu_peak_mem_orig - gpu_peak_mem_bnb
|
|
# print(f"{gpu_alloc_mem_orig=}MB {gpu_peak_mem_orig=}MB {gpu_alloc_mem_orig+gpu_peak_mem_orig=}MB")
|
|
# print(f" {gpu_alloc_mem_bnb=}MB {gpu_peak_mem_bnb=}MB {gpu_alloc_mem_bnb+gpu_peak_mem_bnb=}MB")
|
|
# print(f"{gpu_alloc_mem_diff=}MB")
|
|
# print(f"{gpu_peak_mem_diff=}MB")
|
|
# print(f"{gpu_total_mem_orig=}MB, {gpu_total_mem_bnb=}MB")
|
|
# print(f"{gpu_total_mem_diff=}MB, {gpu_total_mem_diff=}MB")
|
|
|
|
self.assertGreater(
|
|
gpu_alloc_mem_diff,
|
|
expected_savings,
|
|
"should use ~150MB less alloc gpu memory with BNB, compared to without it for this model but got"
|
|
f" a difference of {gpu_alloc_mem_diff}MB, with gpu_alloc_mem_orig={gpu_alloc_mem_orig}MB and"
|
|
f" gpu_alloc_mem_bnb={gpu_alloc_mem_bnb}MB",
|
|
)
|
|
|
|
self.assertGreater(
|
|
gpu_total_mem_diff,
|
|
expected_savings,
|
|
"should use ~150MB less total gpu memory with BNB, compared to without it for this model but got"
|
|
f" a difference of {gpu_total_mem_diff}MB, with gpu_total_mem_orig={gpu_total_mem_orig}MB and"
|
|
f" gpu_total_mem_bnb={gpu_total_mem_bnb}MB",
|
|
)
|
|
|
|
self.assertEqual(
|
|
loss_orig, loss_bnb, f"loss should be the same, but got loss_orig={loss_orig}, loss_bnb={loss_bnb}"
|
|
)
|
|
|
|
def run_trainer(
|
|
self,
|
|
max_len: int,
|
|
model_name: str,
|
|
num_train_epochs: int,
|
|
learning_rate: float = 3e-3,
|
|
optim: str = "adafactor",
|
|
distributed: bool = False,
|
|
extra_args_str: str = None,
|
|
eval_steps: int = 0,
|
|
predict_with_generate: bool = True,
|
|
do_train: bool = True,
|
|
do_eval: bool = True,
|
|
do_predict: bool = True,
|
|
n_gpus_to_use: int = None,
|
|
):
|
|
data_dir = self.test_file_dir / "../fixtures/tests_samples/wmt_en_ro"
|
|
output_dir = self.get_auto_remove_tmp_dir()
|
|
args_train = f"""
|
|
--model_name_or_path {model_name}
|
|
--train_file {data_dir}/train.json
|
|
--validation_file {data_dir}/val.json
|
|
--test_file {data_dir}/test.json
|
|
--output_dir {output_dir}
|
|
--overwrite_output_dir
|
|
--max_train_samples 8
|
|
--max_source_length {max_len}
|
|
--max_target_length {max_len}
|
|
--do_train
|
|
--num_train_epochs {str(num_train_epochs)}
|
|
--per_device_train_batch_size 4
|
|
--learning_rate {learning_rate}
|
|
--warmup_steps 8
|
|
--logging_steps 0
|
|
--logging_strategy no
|
|
--save_steps {str(eval_steps)}
|
|
--group_by_length
|
|
--label_smoothing_factor 0.1
|
|
--target_lang ro_RO
|
|
--source_lang en_XX
|
|
--report_to none
|
|
""".split()
|
|
|
|
args_eval = f"""
|
|
--do_eval
|
|
--per_device_eval_batch_size 4
|
|
--max_eval_samples 8
|
|
--val_max_target_length {max_len}
|
|
--eval_strategy steps
|
|
--eval_steps {str(eval_steps)}
|
|
""".split()
|
|
|
|
args_predict = """
|
|
--do_predict
|
|
""".split()
|
|
|
|
args = []
|
|
if do_train:
|
|
args += args_train
|
|
|
|
if do_eval:
|
|
args += args_eval
|
|
|
|
if do_predict:
|
|
args += args_predict
|
|
|
|
if predict_with_generate:
|
|
args += "--predict_with_generate".split()
|
|
|
|
if do_train:
|
|
if optim == "adafactor":
|
|
args += "--adafactor".split()
|
|
else:
|
|
args += f"--optim {optim}".split()
|
|
|
|
if extra_args_str is not None:
|
|
args += extra_args_str.split()
|
|
|
|
if distributed:
|
|
if n_gpus_to_use is None:
|
|
n_gpus_to_use = backend_device_count(torch_device)
|
|
master_port = get_torch_dist_unique_port()
|
|
distributed_args = f"""
|
|
-m torch.distributed.run
|
|
--nproc_per_node={n_gpus_to_use}
|
|
--master_port={master_port}
|
|
{self.examples_dir_str}/pytorch/translation/run_translation.py
|
|
""".split()
|
|
cmd = [sys.executable] + distributed_args + args
|
|
# keep for quick debug
|
|
# print(" ".join([f"\nPYTHONPATH={self.src_dir_str}"] +cmd)); die
|
|
execute_subprocess_async(cmd, env=self.get_env())
|
|
else:
|
|
testargs = ["run_translation.py"] + args
|
|
with patch.object(sys, "argv", testargs):
|
|
main()
|
|
|
|
return output_dir
|