forked from TensorLayer/tensorlayer3
97 lines
3.0 KiB
Python
97 lines
3.0 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import os
|
|
import unittest
|
|
|
|
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
|
|
|
|
import numpy as np
|
|
import tensorflow as tf
|
|
import tensorlayer as tl
|
|
from tqdm import tqdm
|
|
from sklearn.utils import shuffle
|
|
from tensorlayer.models.seq2seq import Seq2seq
|
|
from tests.utils import CustomTestCase
|
|
from tensorlayer.cost import cross_entropy_seq
|
|
|
|
|
|
class Model_SEQ2SEQ_Test(CustomTestCase):
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
|
|
cls.batch_size = 16
|
|
|
|
cls.vocab_size = 20
|
|
cls.embedding_size = 32
|
|
cls.dec_seq_length = 5
|
|
cls.trainX = np.random.randint(20, size=(50, 6))
|
|
cls.trainY = np.random.randint(20, size=(50, cls.dec_seq_length + 1))
|
|
cls.trainY[:, 0] = 0 # start_token == 0
|
|
|
|
# Parameters
|
|
cls.src_len = len(cls.trainX)
|
|
cls.tgt_len = len(cls.trainY)
|
|
|
|
assert cls.src_len == cls.tgt_len
|
|
|
|
cls.num_epochs = 100
|
|
cls.n_step = cls.src_len // cls.batch_size
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
pass
|
|
|
|
def test_basic_simpleSeq2Seq(self):
|
|
model_ = Seq2seq(
|
|
decoder_seq_length=5,
|
|
cell_enc=tf.keras.layers.GRUCell,
|
|
cell_dec=tf.keras.layers.GRUCell,
|
|
n_layer=3,
|
|
n_units=128,
|
|
embedding_layer=tl.layers.Embedding(vocabulary_size=self.vocab_size, embedding_size=self.embedding_size),
|
|
)
|
|
|
|
optimizer = tf.optimizers.Adam(learning_rate=0.001)
|
|
|
|
for epoch in range(self.num_epochs):
|
|
model_.train()
|
|
trainX, trainY = shuffle(self.trainX, self.trainY)
|
|
total_loss, n_iter = 0, 0
|
|
for X, Y in tqdm(tl.iterate.minibatches(inputs=trainX, targets=trainY, batch_size=self.batch_size,
|
|
shuffle=False), total=self.n_step,
|
|
desc='Epoch[{}/{}]'.format(epoch + 1, self.num_epochs), leave=False):
|
|
|
|
dec_seq = Y[:, :-1]
|
|
target_seq = Y[:, 1:]
|
|
|
|
with tf.GradientTape() as tape:
|
|
## compute outputs
|
|
output = model_(inputs=[X, dec_seq])
|
|
|
|
output = tf.reshape(output, [-1, self.vocab_size])
|
|
|
|
loss = cross_entropy_seq(logits=output, target_seqs=target_seq)
|
|
|
|
grad = tape.gradient(loss, model_.all_weights)
|
|
optimizer.apply_gradients(zip(grad, model_.all_weights))
|
|
|
|
total_loss += loss
|
|
n_iter += 1
|
|
|
|
model_.eval()
|
|
test_sample = trainX[0:2, :].tolist()
|
|
|
|
top_n = 1
|
|
for i in range(top_n):
|
|
prediction = model_([test_sample], seq_length=self.dec_seq_length, start_token=0, top_n=1)
|
|
print("Prediction: >>>>> ", prediction, "\n Target: >>>>> ", trainY[0:2, 1:], "\n\n")
|
|
|
|
# printing average loss after every epoch
|
|
print('Epoch [{}/{}]: loss {:.4f}'.format(epoch + 1, self.num_epochs, total_loss / n_iter))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|