transformers/docs/source/zh/main_classes/pipelines.md

15 KiB
Raw Permalink Blame History

Pipelines

pipelines是使用模型进行推理的一种简单方法。这些pipelines是抽象了库中大部分复杂代码的对象提供了一个专用于多个任务的简单API包括专名识别、掩码语言建模、情感分析、特征提取和问答等。请参阅任务摘要以获取使用示例。

有两种pipelines抽象类需要注意

pipeline抽象类

pipeline抽象类是对所有其他可用pipeline的封装。它可以像任何其他pipeline一样实例化但进一步提供额外的便利性。

简单调用一个项目:

>>> pipe = pipeline("text-classification")
>>> pipe("This restaurant is awesome")
[{'label': 'POSITIVE', 'score': 0.9998743534088135}]

如果您想使用 hub 上的特定模型可以忽略任务如果hub上的模型已经定义了该任务

>>> pipe = pipeline(model="FacebookAI/roberta-large-mnli")
>>> pipe("This restaurant is awesome")
[{'label': 'NEUTRAL', 'score': 0.7313136458396912}]

要在多个项目上调用pipeline可以使用列表调用它。

>>> pipe = pipeline("text-classification")
>>> pipe(["This restaurant is awesome", "This restaurant is awful"])
[{'label': 'POSITIVE', 'score': 0.9998743534088135},
 {'label': 'NEGATIVE', 'score': 0.9996669292449951}]

为了遍历整个数据集,建议直接使用 dataset。这意味着您不需要一次性分配整个数据集也不需要自己进行批处理。这应该与GPU上的自定义循环一样快。如果不是请随时提出issue。

import datasets
from transformers import pipeline
from transformers.pipelines.pt_utils import KeyDataset
from tqdm.auto import tqdm

pipe = pipeline("automatic-speech-recognition", model="facebook/wav2vec2-base-960h", device=0)
dataset = datasets.load_dataset("superb", name="asr", split="test")

# KeyDataset (only *pt*) will simply return the item in the dict returned by the dataset item
# as we're not interested in the *target* part of the dataset. For sentence pair use KeyPairDataset
for out in tqdm(pipe(KeyDataset(dataset, "file"))):
    print(out)
    # {"text": "NUMBER TEN FRESH NELLY IS WAITING ON YOU GOOD NIGHT HUSBAND"}
    # {"text": ....}
    # ....

为了方便使用,也可以使用生成器:

from transformers import pipeline

pipe = pipeline("text-classification")


def data():
    while True:
        # This could come from a dataset, a database, a queue or HTTP request
        # in a server
        # Caveat: because this is iterative, you cannot use `num_workers > 1` variable
        # to use multiple threads to preprocess data. You can still have 1 thread that
        # does the preprocessing while the main runs the big inference
        yield "This is a test"


for out in pipe(data()):
    print(out)
    # {"text": "NUMBER TEN FRESH NELLY IS WAITING ON YOU GOOD NIGHT HUSBAND"}
    # {"text": ....}
    # ....

autodoc pipeline

Pipeline batching

所有pipeline都可以使用批处理。这将在pipeline使用其流处理功能时起作用即传递列表或 Datasetgenerator 时)。

from transformers import pipeline
from transformers.pipelines.pt_utils import KeyDataset
import datasets

dataset = datasets.load_dataset("imdb", name="plain_text", split="unsupervised")
pipe = pipeline("text-classification", device=0)
for out in pipe(KeyDataset(dataset, "text"), batch_size=8, truncation="only_first"):
    print(out)
    # [{'label': 'POSITIVE', 'score': 0.9998743534088135}]
    # Exactly the same output as before, but the content are passed
    # as batches to the model

然而这并不自动意味着性能提升。它可能是一个10倍的加速或5倍的减速具体取决于硬件、数据和实际使用的模型。

主要是加速的示例:

from transformers import pipeline
from torch.utils.data import Dataset
from tqdm.auto import tqdm

pipe = pipeline("text-classification", device=0)


class MyDataset(Dataset):
    def __len__(self):
        return 5000

    def __getitem__(self, i):
        return "This is a test"


dataset = MyDataset()

for batch_size in [1, 8, 64, 256]:
    print("-" * 30)
    print(f"Streaming batch_size={batch_size}")
    for out in tqdm(pipe(dataset, batch_size=batch_size), total=len(dataset)):
        pass
# On GTX 970
------------------------------
Streaming no batching
100%|██████████████████████████████████████████████████████████████████████| 5000/5000 [00:26<00:00, 187.52it/s]
------------------------------
Streaming batch_size=8
100%|█████████████████████████████████████████████████████████████████████| 5000/5000 [00:04<00:00, 1205.95it/s]
------------------------------
Streaming batch_size=64
100%|█████████████████████████████████████████████████████████████████████| 5000/5000 [00:02<00:00, 2478.24it/s]
------------------------------
Streaming batch_size=256
100%|█████████████████████████████████████████████████████████████████████| 5000/5000 [00:01<00:00, 2554.43it/s]
(diminishing returns, saturated the GPU)

主要是减速的示例:

class MyDataset(Dataset):
    def __len__(self):
        return 5000

    def __getitem__(self, i):
        if i % 64 == 0:
            n = 100
        else:
            n = 1
        return "This is a test" * n

与其他句子相比,这是一个非常长的句子。在这种情况下,整个批次将需要400个tokens的长度因此整个批次将是 [64, 400] 而不是 [64, 4],从而导致较大的减速。更糟糕的是,在更大的批次上,程序会崩溃。

------------------------------
Streaming no batching
100%|█████████████████████████████████████████████████████████████████████| 1000/1000 [00:05<00:00, 183.69it/s]
------------------------------
Streaming batch_size=8
100%|█████████████████████████████████████████████████████████████████████| 1000/1000 [00:03<00:00, 265.74it/s]
------------------------------
Streaming batch_size=64
100%|██████████████████████████████████████████████████████████████████████| 1000/1000 [00:26<00:00, 37.80it/s]
------------------------------
Streaming batch_size=256
  0%|                                                                                 | 0/1000 [00:00<?, ?it/s]
Traceback (most recent call last):
  File "/home/nicolas/src/transformers/test.py", line 42, in <module>
    for out in tqdm(pipe(dataset, batch_size=256), total=len(dataset)):
....
    q = q / math.sqrt(dim_per_head)  # (bs, n_heads, q_length, dim_per_head)
RuntimeError: CUDA out of memory. Tried to allocate 376.00 MiB (GPU 0; 3.95 GiB total capacity; 1.72 GiB already allocated; 354.88 MiB free; 2.46 GiB reserved in total by PyTorch)

对于这个问题,没有好的(通用)解决方案,效果可能因您的用例而异。经验法则如下:

对于用户,一个经验法则是:

  • 使用硬件测量负载性能。测量、测量、再测量。真实的数字是唯一的方法。
  • 如果受到延迟的限制(进行推理的实时产品),不要进行批处理。
  • 如果使用CPU不要进行批处理。
  • 如果您在GPU上处理的是吞吐量您希望在大量静态数据上运行模型
    • 如果对序列长度的大小没有概念("自然"数据默认情况下不要进行批处理进行测试并尝试逐渐添加添加OOM检查以在失败时恢复如果您不能控制序列长度它将在某些时候失败
    • 如果您的序列长度非常规律那么批处理更有可能非常有趣进行测试并推动它直到出现OOM。
    • GPU越大批处理越有可能变得更有趣
  • 一旦启用批处理确保能够很好地处理OOM。

Pipeline chunk batching

zero-shot-classificationquestion-answering 在某种意义上稍微特殊,因为单个输入可能会导致模型的多次前向传递。在正常情况下,这将导致 batch_size 参数的问题。

为了规避这个问题这两个pipeline都有点特殊它们是 ChunkPipeline 而不是常规的 Pipeline。简而言之:

preprocessed = pipe.preprocess(inputs)
model_outputs = pipe.forward(preprocessed)
outputs = pipe.postprocess(model_outputs)

现在变成:

all_model_outputs = []
for preprocessed in pipe.preprocess(inputs):
    model_outputs = pipe.forward(preprocessed)
    all_model_outputs.append(model_outputs)
outputs = pipe.postprocess(all_model_outputs)

这对您的代码应该是非常直观的因为pipeline的使用方式是相同的。

这是一个简化的视图因为Pipeline可以自动处理批次这意味着您不必担心您的输入实际上会触发多少次前向传递您可以独立于输入优化 batch_size。前面部分的注意事项仍然适用。

Pipeline自定义

如果您想要重载特定的pipeline。

请随时为您手头的任务创建一个issuePipeline的目标是易于使用并支持大多数情况因此 transformers 可能支持您的用例。

如果您想简单地尝试一下,可以:

  • 继承您选择的pipeline
class MyPipeline(TextClassificationPipeline):
    def postprocess():
        # Your code goes here
        scores = scores * 100
        # And here


my_pipeline = MyPipeline(model=model, tokenizer=tokenizer, ...)
# or if you use *pipeline* function, then:
my_pipeline = pipeline(model="xxxx", pipeline_class=MyPipeline)

这样就可以让您编写所有想要的自定义代码。

实现一个pipeline

实现一个新的pipeline

音频

可用于音频任务的pipeline包括以下几种。

AudioClassificationPipeline

autodoc AudioClassificationPipeline - call - all

AutomaticSpeechRecognitionPipeline

autodoc AutomaticSpeechRecognitionPipeline - call - all

TextToAudioPipeline

autodoc TextToAudioPipeline - call - all

ZeroShotAudioClassificationPipeline

autodoc ZeroShotAudioClassificationPipeline - call - all

计算机视觉

可用于计算机视觉任务的pipeline包括以下几种。

DepthEstimationPipeline

autodoc DepthEstimationPipeline - call - all

ImageClassificationPipeline

autodoc ImageClassificationPipeline - call - all

ImageSegmentationPipeline

autodoc ImageSegmentationPipeline - call - all

ImageToImagePipeline

autodoc ImageToImagePipeline - call - all

ObjectDetectionPipeline

autodoc ObjectDetectionPipeline - call - all

VideoClassificationPipeline

autodoc VideoClassificationPipeline - call - all

ZeroShotImageClassificationPipeline

autodoc ZeroShotImageClassificationPipeline - call - all

ZeroShotObjectDetectionPipeline

autodoc ZeroShotObjectDetectionPipeline - call - all

自然语言处理

可用于自然语言处理任务的pipeline包括以下几种。

ConversationalPipeline

autodoc Conversation

autodoc ConversationalPipeline - call - all

FillMaskPipeline

autodoc FillMaskPipeline - call - all

NerPipeline

autodoc NerPipeline

See [TokenClassificationPipeline] for all details.

QuestionAnsweringPipeline

autodoc QuestionAnsweringPipeline - call - all

SummarizationPipeline

autodoc SummarizationPipeline - call - all

TableQuestionAnsweringPipeline

autodoc TableQuestionAnsweringPipeline - call

TextClassificationPipeline

autodoc TextClassificationPipeline - call - all

TextGenerationPipeline

autodoc TextGenerationPipeline - call - all

Text2TextGenerationPipeline

autodoc Text2TextGenerationPipeline - call - all

TokenClassificationPipeline

autodoc TokenClassificationPipeline - call - all

TranslationPipeline

autodoc TranslationPipeline - call - all

ZeroShotClassificationPipeline

autodoc ZeroShotClassificationPipeline - call - all

多模态

可用于多模态任务的pipeline包括以下几种。

DocumentQuestionAnsweringPipeline

autodoc DocumentQuestionAnsweringPipeline - call - all

FeatureExtractionPipeline

autodoc FeatureExtractionPipeline - call - all

ImageFeatureExtractionPipeline

autodoc ImageFeatureExtractionPipeline - call - all

ImageToTextPipeline

autodoc ImageToTextPipeline - call - all

MaskGenerationPipeline

autodoc MaskGenerationPipeline - call - all

VisualQuestionAnsweringPipeline

autodoc VisualQuestionAnsweringPipeline - call - all

Parent class: Pipeline

autodoc Pipeline