tensorlayer3/tests/models/test_seq2seq_model.py

97 lines
3.0 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import unittest
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
import numpy as np
import tensorflow as tf
import tensorlayer as tl
from tqdm import tqdm
from sklearn.utils import shuffle
from tensorlayer.models.seq2seq import Seq2seq
from tests.utils import CustomTestCase
from tensorlayer.cost import cross_entropy_seq
class Model_SEQ2SEQ_Test(CustomTestCase):
@classmethod
def setUpClass(cls):
cls.batch_size = 16
cls.vocab_size = 20
cls.embedding_size = 32
cls.dec_seq_length = 5
cls.trainX = np.random.randint(20, size=(50, 6))
cls.trainY = np.random.randint(20, size=(50, cls.dec_seq_length + 1))
cls.trainY[:, 0] = 0 # start_token == 0
# Parameters
cls.src_len = len(cls.trainX)
cls.tgt_len = len(cls.trainY)
assert cls.src_len == cls.tgt_len
cls.num_epochs = 100
cls.n_step = cls.src_len // cls.batch_size
@classmethod
def tearDownClass(cls):
pass
def test_basic_simpleSeq2Seq(self):
model_ = Seq2seq(
decoder_seq_length=5,
cell_enc=tf.keras.layers.GRUCell,
cell_dec=tf.keras.layers.GRUCell,
n_layer=3,
n_units=128,
embedding_layer=tl.layers.Embedding(vocabulary_size=self.vocab_size, embedding_size=self.embedding_size),
)
optimizer = tf.optimizers.Adam(learning_rate=0.001)
for epoch in range(self.num_epochs):
model_.train()
trainX, trainY = shuffle(self.trainX, self.trainY)
total_loss, n_iter = 0, 0
for X, Y in tqdm(tl.iterate.minibatches(inputs=trainX, targets=trainY, batch_size=self.batch_size,
shuffle=False), total=self.n_step,
desc='Epoch[{}/{}]'.format(epoch + 1, self.num_epochs), leave=False):
dec_seq = Y[:, :-1]
target_seq = Y[:, 1:]
with tf.GradientTape() as tape:
## compute outputs
output = model_(inputs=[X, dec_seq])
output = tf.reshape(output, [-1, self.vocab_size])
loss = cross_entropy_seq(logits=output, target_seqs=target_seq)
grad = tape.gradient(loss, model_.all_weights)
optimizer.apply_gradients(zip(grad, model_.all_weights))
total_loss += loss
n_iter += 1
model_.eval()
test_sample = trainX[0:2, :].tolist()
top_n = 1
for i in range(top_n):
prediction = model_([test_sample], seq_length=self.dec_seq_length, start_token=0, top_n=1)
print("Prediction: >>>>> ", prediction, "\n Target: >>>>> ", trainY[0:2, 1:], "\n\n")
# printing average loss after every epoch
print('Epoch [{}/{}]: loss {:.4f}'.format(epoch + 1, self.num_epochs, total_loss / n_iter))
if __name__ == '__main__':
unittest.main()