402 lines
17 KiB
Python
402 lines
17 KiB
Python
# Copyright 2020 The HuggingFace Team. All rights reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import gc
|
|
import unittest
|
|
|
|
from transformers import (
|
|
MODEL_FOR_CAUSAL_LM_MAPPING,
|
|
MODEL_FOR_SEQ_TO_SEQ_CAUSAL_LM_MAPPING,
|
|
TF_MODEL_FOR_CAUSAL_LM_MAPPING,
|
|
TF_MODEL_FOR_SEQ_TO_SEQ_CAUSAL_LM_MAPPING,
|
|
AutoModelForCausalLM,
|
|
AutoModelForSeq2SeqLM,
|
|
AutoTokenizer,
|
|
BlenderbotSmallForConditionalGeneration,
|
|
BlenderbotSmallTokenizer,
|
|
Conversation,
|
|
ConversationalPipeline,
|
|
TFAutoModelForCausalLM,
|
|
pipeline,
|
|
)
|
|
from transformers.testing_utils import (
|
|
is_pipeline_test,
|
|
is_torch_available,
|
|
require_tf,
|
|
require_torch,
|
|
slow,
|
|
torch_device,
|
|
)
|
|
|
|
from .test_pipelines_common import ANY
|
|
|
|
|
|
DEFAULT_DEVICE_NUM = -1 if torch_device == "cpu" else 0
|
|
|
|
|
|
@is_pipeline_test
|
|
class ConversationalPipelineTests(unittest.TestCase):
|
|
def tearDown(self):
|
|
super().tearDown()
|
|
# clean-up as much as possible GPU memory occupied by PyTorch
|
|
gc.collect()
|
|
if is_torch_available():
|
|
import torch
|
|
|
|
torch.cuda.empty_cache()
|
|
|
|
model_mapping = dict(
|
|
list(MODEL_FOR_SEQ_TO_SEQ_CAUSAL_LM_MAPPING.items())
|
|
if MODEL_FOR_SEQ_TO_SEQ_CAUSAL_LM_MAPPING
|
|
else [] + list(MODEL_FOR_CAUSAL_LM_MAPPING.items())
|
|
if MODEL_FOR_CAUSAL_LM_MAPPING
|
|
else []
|
|
)
|
|
tf_model_mapping = dict(
|
|
list(TF_MODEL_FOR_SEQ_TO_SEQ_CAUSAL_LM_MAPPING.items())
|
|
if TF_MODEL_FOR_SEQ_TO_SEQ_CAUSAL_LM_MAPPING
|
|
else [] + list(TF_MODEL_FOR_CAUSAL_LM_MAPPING.items())
|
|
if TF_MODEL_FOR_CAUSAL_LM_MAPPING
|
|
else []
|
|
)
|
|
|
|
def get_test_pipeline(self, model, tokenizer, processor):
|
|
conversation_agent = ConversationalPipeline(model=model, tokenizer=tokenizer)
|
|
return conversation_agent, [Conversation("Hi there!")]
|
|
|
|
def run_pipeline_test(self, conversation_agent, _):
|
|
# Simple
|
|
outputs = conversation_agent(Conversation("Hi there!"))
|
|
self.assertEqual(outputs, Conversation(past_user_inputs=["Hi there!"], generated_responses=[ANY(str)]))
|
|
|
|
# Single list
|
|
outputs = conversation_agent([Conversation("Hi there!")])
|
|
self.assertEqual(outputs, Conversation(past_user_inputs=["Hi there!"], generated_responses=[ANY(str)]))
|
|
|
|
# Batch
|
|
conversation_1 = Conversation("Going to the movies tonight - any suggestions?")
|
|
conversation_2 = Conversation("What's the last book you have read?")
|
|
self.assertEqual(len(conversation_1.past_user_inputs), 0)
|
|
self.assertEqual(len(conversation_2.past_user_inputs), 0)
|
|
|
|
outputs = conversation_agent([conversation_1, conversation_2])
|
|
self.assertEqual(outputs, [conversation_1, conversation_2])
|
|
self.assertEqual(
|
|
outputs,
|
|
[
|
|
Conversation(
|
|
past_user_inputs=["Going to the movies tonight - any suggestions?"],
|
|
generated_responses=[ANY(str)],
|
|
),
|
|
Conversation(past_user_inputs=["What's the last book you have read?"], generated_responses=[ANY(str)]),
|
|
],
|
|
)
|
|
|
|
# One conversation with history
|
|
conversation_2.add_user_input("Why do you recommend it?")
|
|
outputs = conversation_agent(conversation_2)
|
|
self.assertEqual(outputs, conversation_2)
|
|
self.assertEqual(
|
|
outputs,
|
|
Conversation(
|
|
past_user_inputs=["What's the last book you have read?", "Why do you recommend it?"],
|
|
generated_responses=[ANY(str), ANY(str)],
|
|
),
|
|
)
|
|
with self.assertRaises(ValueError):
|
|
conversation_agent("Hi there!")
|
|
with self.assertRaises(ValueError):
|
|
conversation_agent(Conversation())
|
|
# Conversation have been consumed and are not valid anymore
|
|
# Inactive conversations passed to the pipeline raise a ValueError
|
|
with self.assertRaises(ValueError):
|
|
conversation_agent(conversation_2)
|
|
|
|
@require_torch
|
|
@slow
|
|
def test_integration_torch_conversation(self):
|
|
# When
|
|
conversation_agent = pipeline(task="conversational", device=DEFAULT_DEVICE_NUM)
|
|
conversation_1 = Conversation("Going to the movies tonight - any suggestions?")
|
|
conversation_2 = Conversation("What's the last book you have read?")
|
|
# Then
|
|
self.assertEqual(len(conversation_1.past_user_inputs), 0)
|
|
self.assertEqual(len(conversation_2.past_user_inputs), 0)
|
|
# When
|
|
result = conversation_agent([conversation_1, conversation_2], do_sample=False, max_length=1000)
|
|
# Then
|
|
self.assertEqual(result, [conversation_1, conversation_2])
|
|
self.assertEqual(len(result[0].past_user_inputs), 1)
|
|
self.assertEqual(len(result[1].past_user_inputs), 1)
|
|
self.assertEqual(len(result[0].generated_responses), 1)
|
|
self.assertEqual(len(result[1].generated_responses), 1)
|
|
self.assertEqual(result[0].past_user_inputs[0], "Going to the movies tonight - any suggestions?")
|
|
self.assertEqual(result[0].generated_responses[0], "The Big Lebowski")
|
|
self.assertEqual(result[1].past_user_inputs[0], "What's the last book you have read?")
|
|
self.assertEqual(result[1].generated_responses[0], "The Last Question")
|
|
# When
|
|
conversation_2.add_user_input("Why do you recommend it?")
|
|
result = conversation_agent(conversation_2, do_sample=False, max_length=1000)
|
|
# Then
|
|
self.assertEqual(result, conversation_2)
|
|
self.assertEqual(len(result.past_user_inputs), 2)
|
|
self.assertEqual(len(result.generated_responses), 2)
|
|
self.assertEqual(result.past_user_inputs[1], "Why do you recommend it?")
|
|
self.assertEqual(result.generated_responses[1], "It's a good book.")
|
|
|
|
@require_torch
|
|
@slow
|
|
def test_integration_torch_conversation_truncated_history(self):
|
|
# When
|
|
conversation_agent = pipeline(task="conversational", min_length_for_response=24, device=DEFAULT_DEVICE_NUM)
|
|
conversation_1 = Conversation("Going to the movies tonight - any suggestions?")
|
|
# Then
|
|
self.assertEqual(len(conversation_1.past_user_inputs), 0)
|
|
# When
|
|
result = conversation_agent(conversation_1, do_sample=False, max_length=36)
|
|
# Then
|
|
self.assertEqual(result, conversation_1)
|
|
self.assertEqual(len(result.past_user_inputs), 1)
|
|
self.assertEqual(len(result.generated_responses), 1)
|
|
self.assertEqual(result.past_user_inputs[0], "Going to the movies tonight - any suggestions?")
|
|
self.assertEqual(result.generated_responses[0], "The Big Lebowski")
|
|
# When
|
|
conversation_1.add_user_input("Is it an action movie?")
|
|
result = conversation_agent(conversation_1, do_sample=False, max_length=36)
|
|
# Then
|
|
self.assertEqual(result, conversation_1)
|
|
self.assertEqual(len(result.past_user_inputs), 2)
|
|
self.assertEqual(len(result.generated_responses), 2)
|
|
self.assertEqual(result.past_user_inputs[1], "Is it an action movie?")
|
|
self.assertEqual(result.generated_responses[1], "It's a comedy.")
|
|
|
|
@require_torch
|
|
def test_small_model_pt(self):
|
|
tokenizer = AutoTokenizer.from_pretrained("microsoft/DialoGPT-small")
|
|
model = AutoModelForCausalLM.from_pretrained("microsoft/DialoGPT-small")
|
|
conversation_agent = ConversationalPipeline(model=model, tokenizer=tokenizer)
|
|
conversation = Conversation("hello")
|
|
output = conversation_agent(conversation)
|
|
self.assertEqual(output, Conversation(past_user_inputs=["hello"], generated_responses=["Hi"]))
|
|
|
|
@require_tf
|
|
def test_small_model_tf(self):
|
|
tokenizer = AutoTokenizer.from_pretrained("microsoft/DialoGPT-small")
|
|
model = TFAutoModelForCausalLM.from_pretrained("microsoft/DialoGPT-small")
|
|
conversation_agent = ConversationalPipeline(model=model, tokenizer=tokenizer)
|
|
conversation = Conversation("hello")
|
|
output = conversation_agent(conversation)
|
|
self.assertEqual(output, Conversation(past_user_inputs=["hello"], generated_responses=["Hi"]))
|
|
|
|
@require_torch
|
|
@slow
|
|
def test_integration_torch_conversation_dialogpt_input_ids(self):
|
|
tokenizer = AutoTokenizer.from_pretrained("microsoft/DialoGPT-small")
|
|
model = AutoModelForCausalLM.from_pretrained("microsoft/DialoGPT-small")
|
|
conversation_agent = ConversationalPipeline(model=model, tokenizer=tokenizer)
|
|
|
|
conversation_1 = Conversation("hello")
|
|
inputs = conversation_agent.preprocess(conversation_1)
|
|
self.assertEqual(inputs["input_ids"].tolist(), [[31373, 50256]])
|
|
|
|
conversation_2 = Conversation("how are you ?", past_user_inputs=["hello"], generated_responses=["Hi there!"])
|
|
inputs = conversation_agent.preprocess(conversation_2)
|
|
self.assertEqual(
|
|
inputs["input_ids"].tolist(), [[31373, 50256, 17250, 612, 0, 50256, 4919, 389, 345, 5633, 50256]]
|
|
)
|
|
|
|
@require_torch
|
|
@slow
|
|
def test_integration_torch_conversation_blenderbot_400M_input_ids(self):
|
|
tokenizer = AutoTokenizer.from_pretrained("facebook/blenderbot-400M-distill")
|
|
model = AutoModelForSeq2SeqLM.from_pretrained("facebook/blenderbot-400M-distill")
|
|
conversation_agent = ConversationalPipeline(model=model, tokenizer=tokenizer)
|
|
|
|
# test1
|
|
conversation_1 = Conversation("hello")
|
|
inputs = conversation_agent.preprocess(conversation_1)
|
|
self.assertEqual(inputs["input_ids"].tolist(), [[1710, 86, 2]])
|
|
|
|
# test2
|
|
conversation_1 = Conversation(
|
|
"I like lasagne.",
|
|
past_user_inputs=["hello"],
|
|
generated_responses=[
|
|
" Do you like lasagne? It is a traditional Italian dish consisting of a shepherd's pie."
|
|
],
|
|
)
|
|
inputs = conversation_agent.preprocess(conversation_1)
|
|
self.assertEqual(
|
|
inputs["input_ids"].tolist(),
|
|
[
|
|
# This should be compared with the same conversation on ParlAI `safe_interactive` demo.
|
|
[
|
|
1710, # hello
|
|
86,
|
|
228, # Double space
|
|
228,
|
|
946,
|
|
304,
|
|
398,
|
|
6881,
|
|
558,
|
|
964,
|
|
38,
|
|
452,
|
|
315,
|
|
265,
|
|
6252,
|
|
452,
|
|
322,
|
|
968,
|
|
6884,
|
|
3146,
|
|
278,
|
|
306,
|
|
265,
|
|
617,
|
|
87,
|
|
388,
|
|
75,
|
|
341,
|
|
286,
|
|
521,
|
|
21,
|
|
228, # Double space
|
|
228,
|
|
281, # I like lasagne.
|
|
398,
|
|
6881,
|
|
558,
|
|
964,
|
|
21,
|
|
2, # EOS
|
|
],
|
|
],
|
|
)
|
|
|
|
@require_torch
|
|
@slow
|
|
def test_integration_torch_conversation_blenderbot_400M(self):
|
|
tokenizer = AutoTokenizer.from_pretrained("facebook/blenderbot-400M-distill")
|
|
model = AutoModelForSeq2SeqLM.from_pretrained("facebook/blenderbot-400M-distill")
|
|
conversation_agent = ConversationalPipeline(model=model, tokenizer=tokenizer)
|
|
|
|
conversation_1 = Conversation("hello")
|
|
result = conversation_agent(
|
|
conversation_1,
|
|
)
|
|
self.assertEqual(
|
|
result.generated_responses[0],
|
|
# ParlAI implementation output, we have a different one, but it's our
|
|
# second best, you can check by using num_return_sequences=10
|
|
# " Hello! How are you? I'm just getting ready to go to work, how about you?",
|
|
" Hello! How are you doing today? I just got back from a walk with my dog.",
|
|
)
|
|
|
|
conversation_1 = Conversation("Lasagne hello")
|
|
result = conversation_agent(conversation_1, encoder_no_repeat_ngram_size=3)
|
|
self.assertEqual(
|
|
result.generated_responses[0],
|
|
" Do you like lasagne? It is a traditional Italian dish consisting of a shepherd's pie.",
|
|
)
|
|
|
|
conversation_1 = Conversation(
|
|
"Lasagne hello Lasagne is my favorite Italian dish. Do you like lasagne? I like lasagne."
|
|
)
|
|
result = conversation_agent(
|
|
conversation_1,
|
|
encoder_no_repeat_ngram_size=3,
|
|
)
|
|
self.assertEqual(
|
|
result.generated_responses[0],
|
|
" Me too. I like how it can be topped with vegetables, meats, and condiments.",
|
|
)
|
|
|
|
@require_torch
|
|
@slow
|
|
def test_integration_torch_conversation_encoder_decoder(self):
|
|
# When
|
|
tokenizer = AutoTokenizer.from_pretrained("facebook/blenderbot_small-90M")
|
|
model = AutoModelForSeq2SeqLM.from_pretrained("facebook/blenderbot_small-90M")
|
|
conversation_agent = ConversationalPipeline(model=model, tokenizer=tokenizer, device=DEFAULT_DEVICE_NUM)
|
|
|
|
conversation_1 = Conversation("My name is Sarah and I live in London")
|
|
conversation_2 = Conversation("Going to the movies tonight, What movie would you recommend? ")
|
|
# Then
|
|
self.assertEqual(len(conversation_1.past_user_inputs), 0)
|
|
self.assertEqual(len(conversation_2.past_user_inputs), 0)
|
|
# When
|
|
result = conversation_agent([conversation_1, conversation_2], do_sample=False, max_length=1000)
|
|
# Then
|
|
self.assertEqual(result, [conversation_1, conversation_2])
|
|
self.assertEqual(len(result[0].past_user_inputs), 1)
|
|
self.assertEqual(len(result[1].past_user_inputs), 1)
|
|
self.assertEqual(len(result[0].generated_responses), 1)
|
|
self.assertEqual(len(result[1].generated_responses), 1)
|
|
self.assertEqual(result[0].past_user_inputs[0], "My name is Sarah and I live in London")
|
|
self.assertEqual(
|
|
result[0].generated_responses[0],
|
|
"hi sarah, i live in london as well. do you have any plans for the weekend?",
|
|
)
|
|
self.assertEqual(
|
|
result[1].past_user_inputs[0], "Going to the movies tonight, What movie would you recommend? "
|
|
)
|
|
self.assertEqual(
|
|
result[1].generated_responses[0], "i don't know... i'm not really sure. what movie are you going to see?"
|
|
)
|
|
# When
|
|
conversation_1.add_user_input("Not yet, what about you?")
|
|
conversation_2.add_user_input("What's your name?")
|
|
result = conversation_agent([conversation_1, conversation_2], do_sample=False, max_length=1000)
|
|
# Then
|
|
self.assertEqual(result, [conversation_1, conversation_2])
|
|
self.assertEqual(len(result[0].past_user_inputs), 2)
|
|
self.assertEqual(len(result[1].past_user_inputs), 2)
|
|
self.assertEqual(len(result[0].generated_responses), 2)
|
|
self.assertEqual(len(result[1].generated_responses), 2)
|
|
self.assertEqual(result[0].past_user_inputs[1], "Not yet, what about you?")
|
|
self.assertEqual(result[0].generated_responses[1], "i don't have any plans yet. i'm not sure what to do yet.")
|
|
self.assertEqual(result[1].past_user_inputs[1], "What's your name?")
|
|
self.assertEqual(result[1].generated_responses[1], "i don't have a name, but i'm going to see a horror movie.")
|
|
|
|
@require_torch
|
|
@slow
|
|
def test_from_pipeline_conversation(self):
|
|
model_id = "facebook/blenderbot_small-90M"
|
|
|
|
# from model id
|
|
conversation_agent_from_model_id = pipeline("conversational", model=model_id, tokenizer=model_id)
|
|
|
|
# from model object
|
|
model = BlenderbotSmallForConditionalGeneration.from_pretrained(model_id)
|
|
tokenizer = BlenderbotSmallTokenizer.from_pretrained(model_id)
|
|
conversation_agent_from_model = pipeline("conversational", model=model, tokenizer=tokenizer)
|
|
|
|
conversation = Conversation("My name is Sarah and I live in London")
|
|
conversation_copy = Conversation("My name is Sarah and I live in London")
|
|
|
|
result_model_id = conversation_agent_from_model_id([conversation])
|
|
result_model = conversation_agent_from_model([conversation_copy])
|
|
|
|
# check for equality
|
|
self.assertEqual(
|
|
result_model_id.generated_responses[0],
|
|
"hi sarah, i live in london as well. do you have any plans for the weekend?",
|
|
)
|
|
self.assertEqual(
|
|
result_model_id.generated_responses[0],
|
|
result_model.generated_responses[0],
|
|
)
|