2021-03-07 00:44:16 +08:00
# coding=utf-8
# Copyright 2021 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Testing suite for the PyTorch M2M100 model. """
import copy
import tempfile
import unittest
from transformers import is_torch_available
from transformers . file_utils import cached_property
from transformers . testing_utils import require_sentencepiece , require_tokenizers , require_torch , slow , torch_device
from . test_configuration_common import ConfigTester
from . test_generation_utils import GenerationTesterMixin
from . test_modeling_common import ModelTesterMixin , ids_tensor
if is_torch_available ( ) :
import torch
from transformers import M2M100Config , M2M100ForConditionalGeneration , M2M100Model , M2M100Tokenizer
from transformers . models . m2m_100 . modeling_m2m_100 import M2M100Decoder , M2M100Encoder
def prepare_m2m_100_inputs_dict (
config ,
input_ids ,
decoder_input_ids ,
attention_mask = None ,
decoder_attention_mask = None ,
) :
if attention_mask is None :
attention_mask = input_ids . ne ( config . pad_token_id )
if decoder_attention_mask is None :
decoder_attention_mask = decoder_input_ids . ne ( config . pad_token_id )
return {
" input_ids " : input_ids ,
" decoder_input_ids " : decoder_input_ids ,
" attention_mask " : attention_mask ,
" decoder_attention_mask " : attention_mask ,
}
@require_torch
class M2M100ModelTester :
def __init__ (
self ,
parent ,
batch_size = 13 ,
seq_length = 7 ,
is_training = True ,
use_labels = False ,
vocab_size = 99 ,
hidden_size = 16 ,
num_hidden_layers = 2 ,
num_attention_heads = 4 ,
intermediate_size = 4 ,
hidden_act = " relu " ,
hidden_dropout_prob = 0.1 ,
attention_probs_dropout_prob = 0.1 ,
2021-03-09 22:35:07 +08:00
encoder_layerdrop = 0.0 ,
decoder_layerdrop = 0.0 ,
2021-03-07 00:44:16 +08:00
max_position_embeddings = 20 ,
eos_token_id = 2 ,
pad_token_id = 1 ,
bos_token_id = 0 ,
) :
self . parent = parent
self . batch_size = batch_size
self . seq_length = seq_length
self . is_training = is_training
self . use_labels = use_labels
self . vocab_size = vocab_size
self . hidden_size = hidden_size
self . num_hidden_layers = num_hidden_layers
self . num_attention_heads = num_attention_heads
self . intermediate_size = intermediate_size
self . hidden_act = hidden_act
self . hidden_dropout_prob = hidden_dropout_prob
self . attention_probs_dropout_prob = attention_probs_dropout_prob
2021-03-09 22:35:07 +08:00
self . encoder_layerdrop = encoder_layerdrop
self . decoder_layerdrop = decoder_layerdrop
2021-03-07 00:44:16 +08:00
self . max_position_embeddings = max_position_embeddings
self . eos_token_id = eos_token_id
self . pad_token_id = pad_token_id
self . bos_token_id = bos_token_id
def prepare_config_and_inputs ( self ) :
input_ids = ids_tensor ( [ self . batch_size , self . seq_length ] , self . vocab_size )
input_ids [ : , - 1 ] = self . eos_token_id # Eos Token
decoder_input_ids = ids_tensor ( [ self . batch_size , self . seq_length ] , self . vocab_size )
2021-03-08 18:36:19 +08:00
# we need to clamp the input ids here to avoid having pad token in between
# this is because for M2M100 the position_ids are prepared such that
# all pad tokens have pos id = 2 and rest are between 2..seq_length
# and the seq_length here is seq_length - num_pad_tokens
# but when using past, there is no way of knowing if the past input ids had
# pad tokens in them, which results in incorrect seq_lenth and which in turn results in
# position_ids being off by num_pad_tokens in past input
input_ids = input_ids . clamp ( self . pad_token_id + 1 )
decoder_input_ids = decoder_input_ids . clamp ( self . pad_token_id + 1 )
2021-03-07 00:44:16 +08:00
config = M2M100Config (
vocab_size = self . vocab_size ,
d_model = self . hidden_size ,
encoder_layers = self . num_hidden_layers ,
decoder_layers = self . num_hidden_layers ,
encoder_attention_heads = self . num_attention_heads ,
decoder_attention_heads = self . num_attention_heads ,
encoder_ffn_dim = self . intermediate_size ,
decoder_ffn_dim = self . intermediate_size ,
dropout = self . hidden_dropout_prob ,
attention_dropout = self . attention_probs_dropout_prob ,
2021-03-09 22:35:07 +08:00
encoder_layerdrop = self . encoder_layerdrop ,
decoder_layerdrop = self . decoder_layerdrop ,
2021-03-07 00:44:16 +08:00
max_position_embeddings = self . max_position_embeddings ,
eos_token_id = self . eos_token_id ,
bos_token_id = self . bos_token_id ,
pad_token_id = self . pad_token_id ,
)
inputs_dict = prepare_m2m_100_inputs_dict ( config , input_ids , decoder_input_ids )
return config , inputs_dict
def prepare_config_and_inputs_for_common ( self ) :
config , inputs_dict = self . prepare_config_and_inputs ( )
return config , inputs_dict
def create_and_check_decoder_model_past_large_inputs ( self , config , inputs_dict ) :
model = M2M100Model ( config = config ) . get_decoder ( ) . to ( torch_device ) . eval ( )
input_ids = inputs_dict [ " input_ids " ]
attention_mask = inputs_dict [ " attention_mask " ]
# first forward pass
outputs = model ( input_ids , attention_mask = attention_mask , use_cache = True )
output , past_key_values = outputs . to_tuple ( )
# create hypothetical multiple next token and extent to next_input_ids
next_tokens = ids_tensor ( ( self . batch_size , 3 ) , config . vocab_size )
next_attn_mask = ids_tensor ( ( self . batch_size , 3 ) , 2 )
# append to next input_ids and
next_input_ids = torch . cat ( [ input_ids , next_tokens ] , dim = - 1 )
next_attention_mask = torch . cat ( [ attention_mask , next_attn_mask ] , dim = - 1 )
output_from_no_past = model ( next_input_ids , attention_mask = next_attention_mask ) [ " last_hidden_state " ]
output_from_past = model ( next_tokens , attention_mask = next_attention_mask , past_key_values = past_key_values ) [
" last_hidden_state "
]
# select random slice
random_slice_idx = ids_tensor ( ( 1 , ) , output_from_past . shape [ - 1 ] ) . item ( )
output_from_no_past_slice = output_from_no_past [ : , - 3 : , random_slice_idx ] . detach ( )
output_from_past_slice = output_from_past [ : , : , random_slice_idx ] . detach ( )
self . parent . assertTrue ( output_from_past_slice . shape [ 1 ] == next_tokens . shape [ 1 ] )
# test that outputs are equal for slice
self . parent . assertTrue ( torch . allclose ( output_from_past_slice , output_from_no_past_slice , atol = 1e-2 ) )
def check_encoder_decoder_model_standalone ( self , config , inputs_dict ) :
model = M2M100Model ( config = config ) . to ( torch_device ) . eval ( )
outputs = model ( * * inputs_dict )
encoder_last_hidden_state = outputs . encoder_last_hidden_state
last_hidden_state = outputs . last_hidden_state
with tempfile . TemporaryDirectory ( ) as tmpdirname :
encoder = model . get_encoder ( )
encoder . save_pretrained ( tmpdirname )
encoder = M2M100Encoder . from_pretrained ( tmpdirname ) . to ( torch_device )
encoder_last_hidden_state_2 = encoder ( inputs_dict [ " input_ids " ] , attention_mask = inputs_dict [ " attention_mask " ] ) [
0
]
self . parent . assertTrue ( ( encoder_last_hidden_state_2 - encoder_last_hidden_state ) . abs ( ) . max ( ) . item ( ) < 1e-3 )
with tempfile . TemporaryDirectory ( ) as tmpdirname :
decoder = model . get_decoder ( )
decoder . save_pretrained ( tmpdirname )
decoder = M2M100Decoder . from_pretrained ( tmpdirname ) . to ( torch_device )
last_hidden_state_2 = decoder (
input_ids = inputs_dict [ " decoder_input_ids " ] ,
attention_mask = inputs_dict [ " decoder_attention_mask " ] ,
encoder_hidden_states = encoder_last_hidden_state ,
encoder_attention_mask = inputs_dict [ " attention_mask " ] ,
) [ 0 ]
self . parent . assertTrue ( ( last_hidden_state_2 - last_hidden_state ) . abs ( ) . max ( ) . item ( ) < 1e-3 )
@require_torch
class M2M100ModelTest ( ModelTesterMixin , GenerationTesterMixin , unittest . TestCase ) :
all_model_classes = (
(
M2M100Model ,
M2M100ForConditionalGeneration ,
)
if is_torch_available ( )
else ( )
)
all_generative_model_classes = ( M2M100ForConditionalGeneration , ) if is_torch_available ( ) else ( )
is_encoder_decoder = True
test_pruning = False
test_head_masking = False
test_missing_keys = False
def setUp ( self ) :
self . model_tester = M2M100ModelTester ( self )
self . config_tester = ConfigTester ( self , config_class = M2M100Config )
def test_config ( self ) :
self . config_tester . run_common_tests ( )
def test_save_load_strict ( self ) :
config , inputs_dict = self . model_tester . prepare_config_and_inputs ( )
for model_class in self . all_model_classes :
model = model_class ( config )
with tempfile . TemporaryDirectory ( ) as tmpdirname :
model . save_pretrained ( tmpdirname )
model2 , info = model_class . from_pretrained ( tmpdirname , output_loading_info = True )
self . assertEqual ( info [ " missing_keys " ] , [ ] )
def test_decoder_model_past_with_large_inputs ( self ) :
config_and_inputs = self . model_tester . prepare_config_and_inputs ( )
self . model_tester . create_and_check_decoder_model_past_large_inputs ( * config_and_inputs )
def test_encoder_decoder_model_standalone ( self ) :
config_and_inputs = self . model_tester . prepare_config_and_inputs_for_common ( )
self . model_tester . check_encoder_decoder_model_standalone ( * config_and_inputs )
def test_inputs_embeds ( self ) :
config , inputs_dict = self . model_tester . prepare_config_and_inputs_for_common ( )
for model_class in ( M2M100Model , M2M100ForConditionalGeneration ) :
model = model_class ( config )
model . to ( torch_device )
model . eval ( )
inputs = copy . deepcopy ( self . _prepare_for_class ( inputs_dict , model_class ) )
if not self . is_encoder_decoder :
input_ids = inputs [ " input_ids " ]
del inputs [ " input_ids " ]
else :
encoder_input_ids = inputs [ " input_ids " ]
decoder_input_ids = inputs . get ( " decoder_input_ids " , encoder_input_ids )
del inputs [ " input_ids " ]
inputs . pop ( " decoder_input_ids " , None )
wte = model . get_input_embeddings ( )
if not self . is_encoder_decoder :
inputs [ " inputs_embeds " ] = wte ( input_ids )
else :
inputs [ " inputs_embeds " ] = wte ( encoder_input_ids )
inputs [ " decoder_inputs_embeds " ] = wte ( decoder_input_ids )
with torch . no_grad ( ) :
model ( * * inputs ) [ 0 ]
def test_generate_fp16 ( self ) :
config , input_dict = self . model_tester . prepare_config_and_inputs ( )
input_ids = input_dict [ " input_ids " ]
attention_mask = input_ids . ne ( 1 ) . to ( torch_device )
model = M2M100ForConditionalGeneration ( config ) . eval ( ) . to ( torch_device )
if torch_device == " cuda " :
model . half ( )
model . generate ( input_ids , attention_mask = attention_mask )
model . generate ( num_beams = 4 , do_sample = True , early_stopping = False , num_return_sequences = 3 )
def _long_tensor ( tok_lst ) :
return torch . tensor ( tok_lst , dtype = torch . long , device = torch_device )
TOLERANCE = 1e-4
@require_torch
@require_sentencepiece
@require_tokenizers
@slow
class M2M100ModelIntegrationTests ( unittest . TestCase ) :
@cached_property
def default_tokenizer ( self ) :
return M2M100Tokenizer . from_pretrained ( " facebook/m2m100_418M " )
def test_inference_no_head ( self ) :
model = M2M100Model . from_pretrained ( " facebook/m2m100_418M " ) . to ( torch_device )
input_ids = _long_tensor ( [ [ 128028 , 98 , 12 , 30527 , 2732 , 159 , 7755 , 61904 , 39144 , 38 , 2 ] ] )
decoder_input_ids = _long_tensor ( [ [ 2 , 128028 , 98 , 12 , 30527 , 2732 , 159 , 7755 , 61904 , 39144 , 38 ] ] )
inputs_dict = prepare_m2m_100_inputs_dict ( model . config , input_ids , decoder_input_ids )
with torch . no_grad ( ) :
output = model ( * * inputs_dict ) [ 0 ]
expected_shape = torch . Size ( ( 1 , 11 , 1024 ) )
self . assertEqual ( output . shape , expected_shape )
# change to expected output here
expected_slice = torch . tensor (
[ [ - 0.7780 , - 0.1676 , 0.1038 ] , [ - 6.7556 , - 1.3992 , 0.0567 ] , [ - 7.5383 , - 0.5920 , - 0.2779 ] ] , device = torch_device
)
self . assertTrue ( torch . allclose ( output [ : , : 3 , : 3 ] , expected_slice , atol = TOLERANCE ) )
def test_inference_head ( self ) :
model = M2M100ForConditionalGeneration . from_pretrained ( " facebook/m2m100_418M " ) . to ( torch_device )
# change to intended input
input_ids = _long_tensor ( [ [ 128028 , 98 , 12 , 30527 , 2732 , 159 , 7755 , 61904 , 39144 , 38 , 2 ] ] )
decoder_input_ids = _long_tensor ( [ [ 2 , 128028 , 98 , 12 , 30527 , 2732 , 159 , 7755 , 61904 , 39144 , 38 ] ] )
inputs_dict = prepare_m2m_100_inputs_dict ( model . config , input_ids , decoder_input_ids )
with torch . no_grad ( ) :
output = model ( * * inputs_dict ) [ 0 ]
expected_shape = torch . Size ( ( 1 , 11 , model . config . vocab_size ) )
self . assertEqual ( output . shape , expected_shape )
# change to expected output here
expected_slice = torch . tensor (
[ [ - 1.0448 , - 1.0411 , 3.7992 ] , [ - 3.2191 , - 3.2386 , - 1.3451 ] , [ - 3.6210 , - 3.5993 , 0.4925 ] ] , device = torch_device
)
self . assertTrue ( torch . allclose ( output [ : , : 3 , : 3 ] , expected_slice , atol = TOLERANCE ) )
def test_seq_to_seq_generation ( self ) :
model = M2M100ForConditionalGeneration . from_pretrained ( " facebook/m2m100_418M " ) . to ( torch_device )
tokenizer = M2M100Tokenizer . from_pretrained ( " facebook/m2m100_418M " , src_lang = " fr " , tgt_lang = " en " )
src_fr = [
" L ' affaire NSA souligne l ' absence totale de débat sur le renseignement " ,
" Selon moi, il y a deux niveaux de réponse de la part du gouvernement français. " ,
" Lorsque François Hollande téléphone à Barack Obama ou quand le ministre des affaires étrangères Laurent Fabius convoque l ' ambassadeur des Etats-Unis, ils réagissent à une vraie découverte, qui est celle de l ' ampleur de la surveillance américaine sur l ' ensemble des communications en France. " ,
]
# The below article tests that we don't add any hypotheses outside of the top n_beams
dct = tokenizer ( src_fr , padding = True , return_tensors = " pt " )
hypotheses_batch = model . generate (
input_ids = dct [ " input_ids " ] . to ( torch_device ) ,
attention_mask = dct [ " attention_mask " ] . to ( torch_device ) ,
num_beams = 5 ,
forced_bos_token_id = tokenizer . get_lang_id ( " en " ) ,
)
expected_en = [
" The NSA case highlights the total absence of intelligence debate " ,
" I think there are two levels of response from the French government. " ,
" When François Hollande calls Barack Obama or when Foreign Minister Laurent Fabius calls the U.S. Ambassador, they respond to a real discovery, which is that of the scale of U.S. surveillance on all communications in France. " ,
]
generated = tokenizer . batch_decode (
hypotheses_batch . tolist ( ) , clean_up_tokenization_spaces = True , skip_special_tokens = True
)
assert generated == expected_en