825 lines
32 KiB
Python
825 lines
32 KiB
Python
# coding=utf-8
|
|
# Copyright 2021, The HuggingFace Inc. team. All rights reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
"""Testing suite for the PyTorch MVP model."""
|
|
|
|
import copy
|
|
import tempfile
|
|
import unittest
|
|
|
|
import timeout_decorator # noqa
|
|
|
|
from transformers import MvpConfig, is_torch_available
|
|
from transformers.testing_utils import (
|
|
require_sentencepiece,
|
|
require_tokenizers,
|
|
require_torch,
|
|
require_torch_fp16,
|
|
slow,
|
|
torch_device,
|
|
)
|
|
from transformers.utils import cached_property
|
|
|
|
from ...generation.test_utils import GenerationTesterMixin
|
|
from ...test_configuration_common import ConfigTester
|
|
from ...test_modeling_common import ModelTesterMixin, floats_tensor, ids_tensor
|
|
from ...test_pipeline_mixin import PipelineTesterMixin
|
|
|
|
|
|
if is_torch_available():
|
|
import torch
|
|
|
|
from transformers import (
|
|
MvpForCausalLM,
|
|
MvpForConditionalGeneration,
|
|
MvpForQuestionAnswering,
|
|
MvpForSequenceClassification,
|
|
MvpModel,
|
|
MvpTokenizer,
|
|
)
|
|
from transformers.models.mvp.modeling_mvp import MvpDecoder, MvpEncoder, shift_tokens_right
|
|
|
|
|
|
def prepare_mvp_inputs_dict(
|
|
config,
|
|
input_ids,
|
|
decoder_input_ids=None,
|
|
attention_mask=None,
|
|
decoder_attention_mask=None,
|
|
head_mask=None,
|
|
decoder_head_mask=None,
|
|
cross_attn_head_mask=None,
|
|
):
|
|
if attention_mask is None:
|
|
attention_mask = input_ids.ne(config.pad_token_id)
|
|
if decoder_attention_mask is None:
|
|
decoder_attention_mask = decoder_input_ids.ne(config.pad_token_id)
|
|
if head_mask is None:
|
|
head_mask = torch.ones(config.encoder_layers, config.encoder_attention_heads, device=torch_device)
|
|
if decoder_head_mask is None:
|
|
decoder_head_mask = torch.ones(config.decoder_layers, config.decoder_attention_heads, device=torch_device)
|
|
if cross_attn_head_mask is None:
|
|
cross_attn_head_mask = torch.ones(config.decoder_layers, config.decoder_attention_heads, device=torch_device)
|
|
return {
|
|
"input_ids": input_ids,
|
|
"decoder_input_ids": decoder_input_ids,
|
|
"attention_mask": attention_mask,
|
|
"decoder_attention_mask": attention_mask,
|
|
"head_mask": head_mask,
|
|
"decoder_head_mask": decoder_head_mask,
|
|
"cross_attn_head_mask": cross_attn_head_mask,
|
|
}
|
|
|
|
|
|
class MvpModelTester:
|
|
def __init__(
|
|
self,
|
|
parent,
|
|
batch_size=13,
|
|
seq_length=7,
|
|
is_training=True,
|
|
use_labels=False,
|
|
vocab_size=99,
|
|
hidden_size=16,
|
|
num_hidden_layers=2,
|
|
num_attention_heads=4,
|
|
intermediate_size=4,
|
|
hidden_act="gelu",
|
|
hidden_dropout_prob=0.1,
|
|
attention_probs_dropout_prob=0.1,
|
|
max_position_embeddings=20,
|
|
eos_token_id=2,
|
|
pad_token_id=1,
|
|
bos_token_id=0,
|
|
):
|
|
self.parent = parent
|
|
self.batch_size = batch_size
|
|
self.seq_length = seq_length
|
|
self.is_training = is_training
|
|
self.use_labels = use_labels
|
|
self.vocab_size = vocab_size
|
|
self.hidden_size = hidden_size
|
|
self.num_hidden_layers = num_hidden_layers
|
|
self.num_attention_heads = num_attention_heads
|
|
self.intermediate_size = intermediate_size
|
|
self.hidden_act = hidden_act
|
|
self.hidden_dropout_prob = hidden_dropout_prob
|
|
self.attention_probs_dropout_prob = attention_probs_dropout_prob
|
|
self.max_position_embeddings = max_position_embeddings
|
|
self.eos_token_id = eos_token_id
|
|
self.pad_token_id = pad_token_id
|
|
self.bos_token_id = bos_token_id
|
|
|
|
def prepare_config_and_inputs(self):
|
|
input_ids = ids_tensor([self.batch_size, self.seq_length], self.vocab_size)
|
|
input_ids = ids_tensor([self.batch_size, self.seq_length], self.vocab_size).clamp(
|
|
3,
|
|
)
|
|
input_ids[:, -1] = self.eos_token_id # Eos Token
|
|
|
|
decoder_input_ids = ids_tensor([self.batch_size, self.seq_length], self.vocab_size)
|
|
|
|
config = self.get_config()
|
|
inputs_dict = prepare_mvp_inputs_dict(config, input_ids, decoder_input_ids)
|
|
return config, inputs_dict
|
|
|
|
def get_config(self):
|
|
return MvpConfig(
|
|
vocab_size=self.vocab_size,
|
|
d_model=self.hidden_size,
|
|
encoder_layers=self.num_hidden_layers,
|
|
decoder_layers=self.num_hidden_layers,
|
|
encoder_attention_heads=self.num_attention_heads,
|
|
decoder_attention_heads=self.num_attention_heads,
|
|
encoder_ffn_dim=self.intermediate_size,
|
|
decoder_ffn_dim=self.intermediate_size,
|
|
dropout=self.hidden_dropout_prob,
|
|
attention_dropout=self.attention_probs_dropout_prob,
|
|
max_position_embeddings=self.max_position_embeddings,
|
|
eos_token_id=self.eos_token_id,
|
|
bos_token_id=self.bos_token_id,
|
|
pad_token_id=self.pad_token_id,
|
|
)
|
|
|
|
def get_pipeline_config(self):
|
|
config = self.get_config()
|
|
config.max_position_embeddings = 100
|
|
config.vocab_size = 300
|
|
return config
|
|
|
|
def prepare_config_and_inputs_for_common(self):
|
|
config, inputs_dict = self.prepare_config_and_inputs()
|
|
return config, inputs_dict
|
|
|
|
def create_and_check_decoder_model_past_large_inputs(self, config, inputs_dict):
|
|
model = MvpModel(config=config).get_decoder().to(torch_device).eval()
|
|
input_ids = inputs_dict["input_ids"]
|
|
attention_mask = inputs_dict["attention_mask"]
|
|
head_mask = inputs_dict["head_mask"]
|
|
|
|
# first forward pass
|
|
outputs = model(input_ids, attention_mask=attention_mask, head_mask=head_mask, use_cache=True)
|
|
|
|
output, past_key_values = outputs.to_tuple()
|
|
|
|
# create hypothetical multiple next token and extent to next_input_ids
|
|
next_tokens = ids_tensor((self.batch_size, 3), config.vocab_size)
|
|
next_attn_mask = ids_tensor((self.batch_size, 3), 2)
|
|
|
|
# append to next input_ids and
|
|
next_input_ids = torch.cat([input_ids, next_tokens], dim=-1)
|
|
next_attention_mask = torch.cat([attention_mask, next_attn_mask], dim=-1)
|
|
|
|
output_from_no_past = model(next_input_ids, attention_mask=next_attention_mask)["last_hidden_state"]
|
|
output_from_past = model(next_tokens, attention_mask=next_attention_mask, past_key_values=past_key_values)[
|
|
"last_hidden_state"
|
|
]
|
|
|
|
# select random slice
|
|
random_slice_idx = ids_tensor((1,), output_from_past.shape[-1]).item()
|
|
output_from_no_past_slice = output_from_no_past[:, -3:, random_slice_idx].detach()
|
|
output_from_past_slice = output_from_past[:, :, random_slice_idx].detach()
|
|
|
|
self.parent.assertTrue(output_from_past_slice.shape[1] == next_tokens.shape[1])
|
|
|
|
# test that outputs are equal for slice
|
|
self.parent.assertTrue(torch.allclose(output_from_past_slice, output_from_no_past_slice, atol=1e-3))
|
|
|
|
def check_encoder_decoder_model_standalone(self, config, inputs_dict):
|
|
model = MvpModel(config=config).to(torch_device).eval()
|
|
outputs = model(**inputs_dict)
|
|
|
|
encoder_last_hidden_state = outputs.encoder_last_hidden_state
|
|
last_hidden_state = outputs.last_hidden_state
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdirname:
|
|
encoder = model.get_encoder()
|
|
encoder.save_pretrained(tmpdirname)
|
|
encoder = MvpEncoder.from_pretrained(tmpdirname).to(torch_device)
|
|
|
|
encoder_last_hidden_state_2 = encoder(inputs_dict["input_ids"], attention_mask=inputs_dict["attention_mask"])[
|
|
0
|
|
]
|
|
|
|
self.parent.assertTrue((encoder_last_hidden_state_2 - encoder_last_hidden_state).abs().max().item() < 1e-3)
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdirname:
|
|
decoder = model.get_decoder()
|
|
decoder.save_pretrained(tmpdirname)
|
|
decoder = MvpDecoder.from_pretrained(tmpdirname).to(torch_device)
|
|
|
|
last_hidden_state_2 = decoder(
|
|
input_ids=inputs_dict["decoder_input_ids"],
|
|
attention_mask=inputs_dict["decoder_attention_mask"],
|
|
encoder_hidden_states=encoder_last_hidden_state,
|
|
encoder_attention_mask=inputs_dict["attention_mask"],
|
|
)[0]
|
|
|
|
self.parent.assertTrue((last_hidden_state_2 - last_hidden_state).abs().max().item() < 1e-3)
|
|
|
|
|
|
@require_torch
|
|
class MvpHeadTests(unittest.TestCase):
|
|
vocab_size = 99
|
|
|
|
def _get_config_and_data(self):
|
|
input_ids = torch.tensor(
|
|
[
|
|
[71, 82, 18, 33, 46, 91, 2],
|
|
[68, 34, 26, 58, 30, 82, 2],
|
|
[5, 97, 17, 39, 94, 40, 2],
|
|
[76, 83, 94, 25, 70, 78, 2],
|
|
[87, 59, 41, 35, 48, 66, 2],
|
|
[55, 13, 16, 58, 5, 2, 1], # note padding
|
|
[64, 27, 31, 51, 12, 75, 2],
|
|
[52, 64, 86, 17, 83, 39, 2],
|
|
[48, 61, 9, 24, 71, 82, 2],
|
|
[26, 1, 60, 48, 22, 13, 2],
|
|
[21, 5, 62, 28, 14, 76, 2],
|
|
[45, 98, 37, 86, 59, 48, 2],
|
|
[70, 70, 50, 9, 28, 0, 2],
|
|
],
|
|
dtype=torch.long,
|
|
device=torch_device,
|
|
)
|
|
|
|
batch_size = input_ids.shape[0]
|
|
config = MvpConfig(
|
|
vocab_size=self.vocab_size,
|
|
d_model=24,
|
|
encoder_layers=2,
|
|
decoder_layers=2,
|
|
encoder_attention_heads=2,
|
|
decoder_attention_heads=2,
|
|
encoder_ffn_dim=32,
|
|
decoder_ffn_dim=32,
|
|
max_position_embeddings=48,
|
|
eos_token_id=2,
|
|
pad_token_id=1,
|
|
bos_token_id=0,
|
|
)
|
|
return config, input_ids, batch_size
|
|
|
|
def test_sequence_classification_forward(self):
|
|
config, input_ids, batch_size = self._get_config_and_data()
|
|
labels = _long_tensor([2] * batch_size).to(torch_device)
|
|
config.num_labels = 3
|
|
model = MvpForSequenceClassification(config)
|
|
model.to(torch_device)
|
|
outputs = model(input_ids=input_ids, decoder_input_ids=input_ids, labels=labels)
|
|
expected_shape = torch.Size((batch_size, config.num_labels))
|
|
self.assertEqual(outputs["logits"].shape, expected_shape)
|
|
self.assertIsInstance(outputs["loss"].item(), float)
|
|
|
|
def test_question_answering_forward(self):
|
|
config, input_ids, batch_size = self._get_config_and_data()
|
|
sequence_labels = ids_tensor([batch_size], 2).to(torch_device)
|
|
model = MvpForQuestionAnswering(config)
|
|
model.to(torch_device)
|
|
outputs = model(
|
|
input_ids=input_ids,
|
|
start_positions=sequence_labels,
|
|
end_positions=sequence_labels,
|
|
)
|
|
|
|
self.assertEqual(outputs["start_logits"].shape, input_ids.shape)
|
|
self.assertEqual(outputs["end_logits"].shape, input_ids.shape)
|
|
self.assertIsInstance(outputs["loss"].item(), float)
|
|
|
|
@timeout_decorator.timeout(1)
|
|
def test_lm_forward(self):
|
|
config, input_ids, batch_size = self._get_config_and_data()
|
|
lm_labels = ids_tensor([batch_size, input_ids.shape[1]], self.vocab_size).to(torch_device)
|
|
lm_model = MvpForConditionalGeneration(config)
|
|
lm_model.to(torch_device)
|
|
outputs = lm_model(input_ids=input_ids, labels=lm_labels)
|
|
expected_shape = (batch_size, input_ids.shape[1], config.vocab_size)
|
|
self.assertEqual(outputs["logits"].shape, expected_shape)
|
|
self.assertIsInstance(outputs["loss"].item(), float)
|
|
|
|
def test_lm_uneven_forward(self):
|
|
config = MvpConfig(
|
|
vocab_size=self.vocab_size,
|
|
d_model=14,
|
|
encoder_layers=2,
|
|
decoder_layers=2,
|
|
encoder_attention_heads=2,
|
|
decoder_attention_heads=2,
|
|
encoder_ffn_dim=8,
|
|
decoder_ffn_dim=8,
|
|
max_position_embeddings=48,
|
|
)
|
|
lm_model = MvpForConditionalGeneration(config).to(torch_device)
|
|
context = torch.tensor(
|
|
[[71, 82, 18, 33, 46, 91, 2], [68, 34, 26, 58, 30, 2, 1]], device=torch_device, dtype=torch.long
|
|
)
|
|
summary = torch.tensor([[82, 71, 82, 18, 2], [58, 68, 2, 1, 1]], device=torch_device, dtype=torch.long)
|
|
outputs = lm_model(input_ids=context, decoder_input_ids=summary, labels=summary)
|
|
expected_shape = (*summary.shape, config.vocab_size)
|
|
self.assertEqual(outputs["logits"].shape, expected_shape)
|
|
|
|
def test_generate_beam_search(self):
|
|
input_ids = torch.tensor([[71, 82, 2], [68, 34, 2]], device=torch_device, dtype=torch.long)
|
|
config = MvpConfig(
|
|
vocab_size=self.vocab_size,
|
|
d_model=24,
|
|
encoder_layers=2,
|
|
decoder_layers=2,
|
|
encoder_attention_heads=2,
|
|
decoder_attention_heads=2,
|
|
encoder_ffn_dim=32,
|
|
decoder_ffn_dim=32,
|
|
max_position_embeddings=48,
|
|
eos_token_id=2,
|
|
pad_token_id=1,
|
|
bos_token_id=0,
|
|
)
|
|
lm_model = MvpForConditionalGeneration(config).to(torch_device)
|
|
lm_model.eval()
|
|
|
|
max_length = 5
|
|
generated_ids = lm_model.generate(
|
|
input_ids.clone(),
|
|
do_sample=True,
|
|
num_return_sequences=1,
|
|
num_beams=2,
|
|
no_repeat_ngram_size=3,
|
|
max_length=max_length,
|
|
)
|
|
self.assertEqual(generated_ids.shape, (input_ids.shape[0], max_length))
|
|
|
|
def test_shift_tokens_right(self):
|
|
input_ids = torch.tensor([[71, 82, 18, 33, 2, 1, 1], [68, 34, 26, 58, 30, 82, 2]], dtype=torch.long)
|
|
shifted = shift_tokens_right(input_ids, 1, 2)
|
|
n_pad_before = input_ids.eq(1).float().sum()
|
|
n_pad_after = shifted.eq(1).float().sum()
|
|
self.assertEqual(shifted.shape, input_ids.shape)
|
|
self.assertEqual(n_pad_after, n_pad_before - 1)
|
|
self.assertTrue(torch.eq(shifted[:, 0], 2).all())
|
|
|
|
@slow
|
|
def test_tokenization(self):
|
|
tokenizer = MvpTokenizer.from_pretrained("RUCAIBox/mvp")
|
|
examples = [" Hello world", " DomDramg"] # need leading spaces for equality
|
|
fairseq_results = [
|
|
torch.tensor([0, 20920, 232, 2]),
|
|
torch.tensor([0, 11349, 495, 4040, 571, 2]),
|
|
]
|
|
for ex, desired_result in zip(examples, fairseq_results):
|
|
mvp_toks = tokenizer.encode(ex, return_tensors="pt").squeeze()
|
|
assert_tensors_close(desired_result.long(), mvp_toks, prefix=ex)
|
|
|
|
@require_torch_fp16
|
|
def test_generate_fp16(self):
|
|
config, input_ids, batch_size = self._get_config_and_data()
|
|
attention_mask = input_ids.ne(1).to(torch_device)
|
|
model = MvpForConditionalGeneration(config).eval().to(torch_device)
|
|
model.half()
|
|
model.generate(input_ids, attention_mask=attention_mask)
|
|
model.generate(num_beams=4, do_sample=True, early_stopping=False, num_return_sequences=3)
|
|
|
|
def test_dummy_inputs(self):
|
|
config, *_ = self._get_config_and_data()
|
|
model = MvpForConditionalGeneration(config).eval().to(torch_device)
|
|
model(**model.dummy_inputs)
|
|
|
|
def test_resize_tokens_embeddings_more(self):
|
|
config, input_ids, _ = self._get_config_and_data()
|
|
|
|
def _get_embs(m):
|
|
return (m.get_input_embeddings().weight.data.clone(), m.get_output_embeddings().weight.data.clone())
|
|
|
|
model = MvpForConditionalGeneration(config).eval().to(torch_device)
|
|
input, output = _get_embs(model)
|
|
self.assertTrue(torch.eq(input, output).all())
|
|
new_vocab_size = 45
|
|
model.resize_token_embeddings(new_vocab_size)
|
|
input_new, output_new = _get_embs(model)
|
|
self.assertEqual(input_new.shape, (new_vocab_size, config.d_model))
|
|
self.assertEqual(output_new.shape, (new_vocab_size, config.d_model))
|
|
self.assertTrue(torch.eq(input_new, output_new).all())
|
|
|
|
|
|
@require_torch
|
|
class MvpModelTest(ModelTesterMixin, GenerationTesterMixin, PipelineTesterMixin, unittest.TestCase):
|
|
all_model_classes = (
|
|
(MvpModel, MvpForConditionalGeneration, MvpForSequenceClassification, MvpForQuestionAnswering)
|
|
if is_torch_available()
|
|
else ()
|
|
)
|
|
all_generative_model_classes = (MvpForConditionalGeneration,) if is_torch_available() else ()
|
|
pipeline_model_mapping = (
|
|
{
|
|
"conversational": MvpForConditionalGeneration,
|
|
"feature-extraction": MvpModel,
|
|
"fill-mask": MvpForConditionalGeneration,
|
|
"question-answering": MvpForQuestionAnswering,
|
|
"summarization": MvpForConditionalGeneration,
|
|
"text-classification": MvpForSequenceClassification,
|
|
"text-generation": MvpForCausalLM,
|
|
"text2text-generation": MvpForConditionalGeneration,
|
|
"translation": MvpForConditionalGeneration,
|
|
"zero-shot": MvpForSequenceClassification,
|
|
}
|
|
if is_torch_available()
|
|
else {}
|
|
)
|
|
is_encoder_decoder = True
|
|
fx_compatible = False
|
|
test_pruning = False
|
|
test_missing_keys = False
|
|
|
|
# TODO: Fix the failed tests
|
|
def is_pipeline_test_to_skip(
|
|
self, pipeline_test_casse_name, config_class, model_architecture, tokenizer_name, processor_name
|
|
):
|
|
if (
|
|
pipeline_test_casse_name == "QAPipelineTests"
|
|
and tokenizer_name is not None
|
|
and not tokenizer_name.endswith("Fast")
|
|
):
|
|
# `QAPipelineTests` fails for a few models when the slower tokenizer are used.
|
|
# (The slower tokenizers were never used for pipeline tests before the pipeline testing rework)
|
|
# TODO: check (and possibly fix) the `QAPipelineTests` with slower tokenizer
|
|
return True
|
|
|
|
return False
|
|
|
|
def setUp(self):
|
|
self.model_tester = MvpModelTester(self)
|
|
self.config_tester = ConfigTester(self, config_class=MvpConfig)
|
|
|
|
def test_config(self):
|
|
self.config_tester.run_common_tests()
|
|
|
|
def test_save_load_strict(self):
|
|
config, inputs_dict = self.model_tester.prepare_config_and_inputs()
|
|
for model_class in self.all_model_classes:
|
|
model = model_class(config)
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdirname:
|
|
model.save_pretrained(tmpdirname)
|
|
model2, info = model_class.from_pretrained(tmpdirname, output_loading_info=True)
|
|
self.assertEqual(info["missing_keys"], [])
|
|
|
|
def test_decoder_model_past_with_large_inputs(self):
|
|
config_and_inputs = self.model_tester.prepare_config_and_inputs()
|
|
self.model_tester.create_and_check_decoder_model_past_large_inputs(*config_and_inputs)
|
|
|
|
def test_encoder_decoder_model_standalone(self):
|
|
config_and_inputs = self.model_tester.prepare_config_and_inputs_for_common()
|
|
self.model_tester.check_encoder_decoder_model_standalone(*config_and_inputs)
|
|
|
|
# MvpForSequenceClassification does not support inputs_embeds
|
|
def test_inputs_embeds(self):
|
|
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common()
|
|
|
|
for model_class in (MvpModel, MvpForConditionalGeneration, MvpForQuestionAnswering):
|
|
model = model_class(config)
|
|
model.to(torch_device)
|
|
model.eval()
|
|
|
|
inputs = copy.deepcopy(self._prepare_for_class(inputs_dict, model_class))
|
|
|
|
if not self.is_encoder_decoder:
|
|
input_ids = inputs["input_ids"]
|
|
del inputs["input_ids"]
|
|
else:
|
|
encoder_input_ids = inputs["input_ids"]
|
|
decoder_input_ids = inputs.get("decoder_input_ids", encoder_input_ids)
|
|
del inputs["input_ids"]
|
|
inputs.pop("decoder_input_ids", None)
|
|
|
|
wte = model.get_input_embeddings()
|
|
if not self.is_encoder_decoder:
|
|
inputs["inputs_embeds"] = wte(input_ids)
|
|
else:
|
|
inputs["inputs_embeds"] = wte(encoder_input_ids)
|
|
inputs["decoder_inputs_embeds"] = wte(decoder_input_ids)
|
|
|
|
with torch.no_grad():
|
|
model(**inputs)[0]
|
|
|
|
@require_torch_fp16
|
|
def test_generate_fp16(self):
|
|
config, input_dict = self.model_tester.prepare_config_and_inputs()
|
|
input_ids = input_dict["input_ids"]
|
|
attention_mask = input_ids.ne(1).to(torch_device)
|
|
model = MvpForConditionalGeneration(config).eval().to(torch_device)
|
|
model.half()
|
|
model.generate(input_ids, attention_mask=attention_mask)
|
|
model.generate(num_beams=4, do_sample=True, early_stopping=False, num_return_sequences=3)
|
|
|
|
|
|
def assert_tensors_close(a, b, atol=1e-12, prefix=""):
|
|
"""If tensors have different shapes, different values or a and b are not both tensors, raise a nice Assertion error."""
|
|
if a is None and b is None:
|
|
return True
|
|
try:
|
|
if torch.allclose(a, b, atol=atol):
|
|
return True
|
|
raise
|
|
except Exception:
|
|
pct_different = (torch.gt((a - b).abs(), atol)).float().mean().item()
|
|
if a.numel() > 100:
|
|
msg = f"tensor values are {pct_different:.1%} percent different."
|
|
else:
|
|
msg = f"{a} != {b}"
|
|
if prefix:
|
|
msg = prefix + ": " + msg
|
|
raise AssertionError(msg)
|
|
|
|
|
|
def _long_tensor(tok_lst):
|
|
return torch.tensor(tok_lst, dtype=torch.long, device=torch_device)
|
|
|
|
|
|
@require_torch
|
|
@require_sentencepiece
|
|
@require_tokenizers
|
|
class MvpModelIntegrationTests(unittest.TestCase):
|
|
@cached_property
|
|
def default_tokenizer(self):
|
|
return MvpTokenizer.from_pretrained("RUCAIBox/mvp")
|
|
|
|
@slow
|
|
def test_inference_no_head(self):
|
|
model = MvpModel.from_pretrained("RUCAIBox/mvp").to(torch_device)
|
|
input_ids = _long_tensor([[0, 31414, 232, 328, 740, 1140, 12695, 69, 46078, 1588, 2]])
|
|
attention_mask = input_ids.ne(model.config.pad_token_id)
|
|
with torch.no_grad():
|
|
output = model(input_ids=input_ids, attention_mask=attention_mask).last_hidden_state
|
|
expected_shape = torch.Size((1, 11, 1024))
|
|
self.assertEqual(output.shape, expected_shape)
|
|
expected_slice = torch.tensor(
|
|
[[0.3461, 0.3624, 0.2689], [0.3461, 0.3624, 0.2689], [-0.1562, 1.1637, -0.3784]], device=torch_device
|
|
)
|
|
self.assertTrue(torch.allclose(output[:, :3, :3], expected_slice, atol=1e-3))
|
|
|
|
@slow
|
|
def test_summarization_inference(self):
|
|
model = MvpForConditionalGeneration.from_pretrained("RUCAIBox/mvp").to(torch_device)
|
|
tok = self.default_tokenizer
|
|
PGE_ARTICLE = """ Listen to local radio broadcasts for advertisements that reference casinos in your area.\nIf none are in your area, listen to national radio broadcasts for advertisements of casinos in other areas.\nNote the location that is mentioned in each advertisement that involves a casino.\nIf no locations are mentioned, note any additional contact information, such as a website or phone number. Use that information to find out where the casinos are.;\n,\n\nIf you learn about more than 1 casino on the radio, use the Internet to search the distance between your location and each casino. Sites such as maps.google.com or mapquest.com will help you in this search.'""" # fmt: skip
|
|
EXPECTED_SUMMARY = "Listen to the radio.\nUse the Internet."
|
|
dct = tok.batch_encode_plus(
|
|
[PGE_ARTICLE],
|
|
return_tensors="pt",
|
|
).to(torch_device)
|
|
|
|
hypotheses_batch = model.generate(**dct)
|
|
|
|
decoded = tok.batch_decode(hypotheses_batch, skip_special_tokens=True)
|
|
self.assertEqual(EXPECTED_SUMMARY, decoded[0])
|
|
|
|
|
|
class MvpStandaloneDecoderModelTester:
|
|
def __init__(
|
|
self,
|
|
parent,
|
|
vocab_size=99,
|
|
batch_size=13,
|
|
d_model=16,
|
|
decoder_seq_length=7,
|
|
is_training=True,
|
|
is_decoder=True,
|
|
use_attention_mask=True,
|
|
use_cache=False,
|
|
use_labels=True,
|
|
decoder_start_token_id=2,
|
|
decoder_ffn_dim=32,
|
|
decoder_layers=2,
|
|
encoder_attention_heads=4,
|
|
decoder_attention_heads=4,
|
|
max_position_embeddings=30,
|
|
is_encoder_decoder=False,
|
|
pad_token_id=0,
|
|
bos_token_id=1,
|
|
eos_token_id=2,
|
|
scope=None,
|
|
):
|
|
self.parent = parent
|
|
self.batch_size = batch_size
|
|
self.decoder_seq_length = decoder_seq_length
|
|
# For common tests
|
|
self.seq_length = self.decoder_seq_length
|
|
self.is_training = is_training
|
|
self.use_attention_mask = use_attention_mask
|
|
self.use_labels = use_labels
|
|
|
|
self.vocab_size = vocab_size
|
|
self.d_model = d_model
|
|
self.hidden_size = d_model
|
|
self.num_hidden_layers = decoder_layers
|
|
self.decoder_layers = decoder_layers
|
|
self.decoder_ffn_dim = decoder_ffn_dim
|
|
self.encoder_attention_heads = encoder_attention_heads
|
|
self.decoder_attention_heads = decoder_attention_heads
|
|
self.num_attention_heads = decoder_attention_heads
|
|
self.eos_token_id = eos_token_id
|
|
self.bos_token_id = bos_token_id
|
|
self.pad_token_id = pad_token_id
|
|
self.decoder_start_token_id = decoder_start_token_id
|
|
self.use_cache = use_cache
|
|
self.max_position_embeddings = max_position_embeddings
|
|
self.is_encoder_decoder = is_encoder_decoder
|
|
|
|
self.scope = None
|
|
self.decoder_key_length = decoder_seq_length
|
|
self.base_model_out_len = 2
|
|
self.decoder_attention_idx = 1
|
|
|
|
def prepare_config_and_inputs(self):
|
|
input_ids = ids_tensor([self.batch_size, self.decoder_seq_length], self.vocab_size)
|
|
|
|
attention_mask = None
|
|
if self.use_attention_mask:
|
|
attention_mask = ids_tensor([self.batch_size, self.decoder_seq_length], vocab_size=2)
|
|
|
|
lm_labels = None
|
|
if self.use_labels:
|
|
lm_labels = ids_tensor([self.batch_size, self.decoder_seq_length], self.vocab_size)
|
|
|
|
config = MvpConfig(
|
|
vocab_size=self.vocab_size,
|
|
d_model=self.d_model,
|
|
encoder_layers=self.decoder_layers,
|
|
decoder_layers=self.decoder_layers,
|
|
decoder_ffn_dim=self.decoder_ffn_dim,
|
|
encoder_attention_heads=self.encoder_attention_heads,
|
|
decoder_attention_heads=self.decoder_attention_heads,
|
|
eos_token_id=self.eos_token_id,
|
|
bos_token_id=self.bos_token_id,
|
|
use_cache=self.use_cache,
|
|
pad_token_id=self.pad_token_id,
|
|
decoder_start_token_id=self.decoder_start_token_id,
|
|
max_position_embeddings=self.max_position_embeddings,
|
|
is_encoder_decoder=self.is_encoder_decoder,
|
|
)
|
|
|
|
return (
|
|
config,
|
|
input_ids,
|
|
attention_mask,
|
|
lm_labels,
|
|
)
|
|
|
|
def prepare_config_and_inputs_for_decoder(self):
|
|
(
|
|
config,
|
|
input_ids,
|
|
attention_mask,
|
|
lm_labels,
|
|
) = self.prepare_config_and_inputs()
|
|
|
|
encoder_hidden_states = floats_tensor([self.batch_size, self.decoder_seq_length, self.hidden_size])
|
|
encoder_attention_mask = ids_tensor([self.batch_size, self.decoder_seq_length], vocab_size=2)
|
|
|
|
return (
|
|
config,
|
|
input_ids,
|
|
attention_mask,
|
|
encoder_hidden_states,
|
|
encoder_attention_mask,
|
|
lm_labels,
|
|
)
|
|
|
|
def create_and_check_decoder_model_past(
|
|
self,
|
|
config,
|
|
input_ids,
|
|
attention_mask,
|
|
lm_labels,
|
|
):
|
|
config.use_cache = True
|
|
model = MvpDecoder(config=config).to(torch_device).eval()
|
|
# first forward pass
|
|
outputs = model(input_ids, use_cache=True)
|
|
outputs_use_cache_conf = model(input_ids)
|
|
outputs_no_past = model(input_ids, use_cache=False)
|
|
|
|
self.parent.assertTrue(len(outputs) == len(outputs_use_cache_conf))
|
|
self.parent.assertTrue(len(outputs) == len(outputs_no_past) + 1)
|
|
|
|
past_key_values = outputs["past_key_values"]
|
|
|
|
# create hypothetical next token and extent to next_input_ids
|
|
next_tokens = ids_tensor((self.batch_size, 1), config.vocab_size)
|
|
|
|
# append to next input_ids and
|
|
next_input_ids = torch.cat([input_ids, next_tokens], dim=-1)
|
|
|
|
output_from_no_past = model(next_input_ids)["last_hidden_state"]
|
|
output_from_past = model(next_tokens, past_key_values=past_key_values)["last_hidden_state"]
|
|
|
|
# select random slice
|
|
random_slice_idx = ids_tensor((1,), output_from_past.shape[-1]).item()
|
|
output_from_no_past_slice = output_from_no_past[:, next_input_ids.shape[-1] - 1, random_slice_idx].detach()
|
|
output_from_past_slice = output_from_past[:, 0, random_slice_idx].detach()
|
|
|
|
# test that outputs are equal for slice
|
|
assert torch.allclose(output_from_past_slice, output_from_no_past_slice, atol=1e-3)
|
|
|
|
def create_and_check_decoder_model_attention_mask_past(
|
|
self,
|
|
config,
|
|
input_ids,
|
|
attention_mask,
|
|
lm_labels,
|
|
):
|
|
model = MvpDecoder(config=config).to(torch_device).eval()
|
|
|
|
# create attention mask
|
|
attn_mask = torch.ones(input_ids.shape, dtype=torch.long, device=torch_device)
|
|
|
|
half_seq_length = input_ids.shape[-1] // 2
|
|
attn_mask[:, half_seq_length:] = 0
|
|
|
|
# first forward pass
|
|
past_key_values = model(input_ids, attention_mask=attn_mask, use_cache=True)["past_key_values"]
|
|
|
|
# create hypothetical next token and extent to next_input_ids
|
|
next_tokens = ids_tensor((self.batch_size, 1), config.vocab_size)
|
|
|
|
# change a random masked slice from input_ids
|
|
random_seq_idx_to_change = ids_tensor((1,), half_seq_length).item() + 1
|
|
random_other_next_tokens = ids_tensor((self.batch_size, 1), config.vocab_size).squeeze(-1)
|
|
input_ids[:, -random_seq_idx_to_change] = random_other_next_tokens
|
|
|
|
# append to next input_ids and attn_mask
|
|
next_input_ids = torch.cat([input_ids, next_tokens], dim=-1)
|
|
attn_mask = torch.cat(
|
|
[attn_mask, torch.ones((attn_mask.shape[0], 1), dtype=torch.long, device=torch_device)],
|
|
dim=1,
|
|
)
|
|
|
|
# get two different outputs
|
|
output_from_no_past = model(next_input_ids, attention_mask=attn_mask)["last_hidden_state"]
|
|
output_from_past = model(next_tokens, attention_mask=attn_mask, past_key_values=past_key_values)[
|
|
"last_hidden_state"
|
|
]
|
|
|
|
# select random slice
|
|
random_slice_idx = ids_tensor((1,), output_from_past.shape[-1]).item()
|
|
output_from_no_past_slice = output_from_no_past[:, next_input_ids.shape[-1] - 1, random_slice_idx].detach()
|
|
output_from_past_slice = output_from_past[:, 0, random_slice_idx].detach()
|
|
|
|
# test that outputs are equal for slice
|
|
assert torch.allclose(output_from_past_slice, output_from_no_past_slice, atol=1e-3)
|
|
|
|
def prepare_config_and_inputs_for_common(self):
|
|
config_and_inputs = self.prepare_config_and_inputs()
|
|
(
|
|
config,
|
|
input_ids,
|
|
attention_mask,
|
|
lm_labels,
|
|
) = config_and_inputs
|
|
|
|
inputs_dict = {
|
|
"input_ids": input_ids,
|
|
"attention_mask": attention_mask,
|
|
}
|
|
return config, inputs_dict
|
|
|
|
|
|
@require_torch
|
|
class MvpStandaloneDecoderModelTest(ModelTesterMixin, GenerationTesterMixin, unittest.TestCase):
|
|
all_model_classes = (MvpDecoder, MvpForCausalLM) if is_torch_available() else ()
|
|
all_generative_model_classes = (MvpForCausalLM,) if is_torch_available() else ()
|
|
fx_comptatible = True
|
|
test_pruning = False
|
|
is_encoder_decoder = False
|
|
|
|
def setUp(
|
|
self,
|
|
):
|
|
self.model_tester = MvpStandaloneDecoderModelTester(self, is_training=False)
|
|
self.config_tester = ConfigTester(self, config_class=MvpConfig)
|
|
|
|
def test_config(self):
|
|
self.config_tester.run_common_tests()
|
|
|
|
def test_decoder_model_past(self):
|
|
config_and_inputs = self.model_tester.prepare_config_and_inputs()
|
|
self.model_tester.create_and_check_decoder_model_past(*config_and_inputs)
|
|
|
|
def test_decoder_model_attn_mask_past(self):
|
|
config_and_inputs = self.model_tester.prepare_config_and_inputs()
|
|
self.model_tester.create_and_check_decoder_model_attention_mask_past(*config_and_inputs)
|
|
|
|
def test_retain_grad_hidden_states_attentions(self):
|
|
# decoder cannot keep gradients
|
|
return
|