tensorlayer3/tests/layers/test_layers_recurrent.py

927 lines
35 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import unittest
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
import numpy as np
import tensorlayer as tl
from tests.utils import CustomTestCase
class Layer_RNN_Test(CustomTestCase):
@classmethod
def setUpClass(cls):
cls.batch_size = 2
cls.vocab_size = 20
cls.embedding_size = 4
cls.hidden_size = 8
cls.num_steps = 6
cls.data_n_steps = np.random.randint(low=cls.num_steps // 2, high=cls.num_steps + 1, size=cls.batch_size)
cls.data_x = np.random.random([cls.batch_size, cls.num_steps, cls.embedding_size]).astype(np.float32)
for i in range(cls.batch_size):
for j in range(cls.data_n_steps[i], cls.num_steps):
cls.data_x[i][j][:] = 0
cls.data_y = np.zeros([cls.batch_size, 1]).astype(np.float32)
cls.data_y2 = np.zeros([cls.batch_size, cls.num_steps]).astype(np.float32)
map1 = np.random.random([1, cls.num_steps])
map2 = np.random.random([cls.embedding_size, 1])
for i in range(cls.batch_size):
cls.data_y[i] = np.matmul(map1, np.matmul(cls.data_x[i], map2))
cls.data_y2[i] = np.matmul(cls.data_x[i], map2)[:, 0]
@classmethod
def tearDownClass(cls):
pass
def test_basic_simplernn(self):
inputs = tl.layers.Input([self.batch_size, self.num_steps, self.embedding_size])
rnnlayer = tl.layers.RNN(
cell=tf.keras.layers.SimpleRNNCell(units=self.hidden_size, dropout=0.1), return_last_output=True,
return_seq_2d=False, return_last_state=True
)
rnn, rnn_state = rnnlayer(inputs)
outputs = tl.layers.Dense(n_units=1)(rnn)
rnn_model = tl.models.Model(inputs=inputs, outputs=[outputs, rnn_state[0]])
print(rnn_model)
optimizer = tf.optimizers.Adam(learning_rate=0.01)
rnn_model.train()
assert rnnlayer.is_train
for epoch in range(50):
with tf.GradientTape() as tape:
pred_y, final_state = rnn_model(self.data_x)
loss = tl.cost.mean_squared_error(pred_y, self.data_y)
gradients = tape.gradient(loss, rnn_model.trainable_weights)
optimizer.apply_gradients(zip(gradients, rnn_model.trainable_weights))
if (epoch + 1) % 10 == 0:
print("epoch %d, loss %f" % (epoch, loss))
def test_basic_simplernn_class(self):
inputs = tl.layers.Input([self.batch_size, self.num_steps, self.embedding_size])
rnnlayer = tl.layers.SimpleRNN(
units=self.hidden_size, dropout=0.1, return_last_output=True, return_seq_2d=False, return_last_state=True
)
rnn, rnn_state = rnnlayer(inputs)
outputs = tl.layers.Dense(n_units=1)(rnn)
rnn_model = tl.models.Model(inputs=inputs, outputs=[outputs, rnn_state[0]])
print(rnn_model)
optimizer = tf.optimizers.Adam(learning_rate=0.01)
rnn_model.train()
assert rnnlayer.is_train
for epoch in range(50):
with tf.GradientTape() as tape:
pred_y, final_state = rnn_model(self.data_x)
loss = tl.cost.mean_squared_error(pred_y, self.data_y)
gradients = tape.gradient(loss, rnn_model.trainable_weights)
optimizer.apply_gradients(zip(gradients, rnn_model.trainable_weights))
if (epoch + 1) % 10 == 0:
print("epoch %d, loss %f" % (epoch, loss))
def test_basic_simplernn2(self):
inputs = tl.layers.Input([self.batch_size, self.num_steps, self.embedding_size])
rnnlayer = tl.layers.RNN(
cell=tf.keras.layers.SimpleRNNCell(units=self.hidden_size, dropout=0.1), return_last_output=False,
return_seq_2d=True, return_last_state=False
)
rnn = rnnlayer(inputs)
outputs = tl.layers.Dense(n_units=1)(rnn)
rnn_model = tl.models.Model(inputs=inputs, outputs=[outputs, rnn])
print(rnn_model)
rnn_model.eval()
assert not rnnlayer.is_train
pred_y, rnn_y = rnn_model(self.data_x)
self.assertEqual(pred_y.get_shape().as_list(), [self.batch_size * self.num_steps, 1])
self.assertEqual(rnn_y.get_shape().as_list(), [self.batch_size * self.num_steps, self.hidden_size])
def test_basic_simplernn3(self):
inputs = tl.layers.Input([self.batch_size, self.num_steps, self.embedding_size])
rnnlayer = tl.layers.RNN(
cell=tf.keras.layers.SimpleRNNCell(units=self.hidden_size, dropout=0.1), return_last_output=False,
return_seq_2d=False, return_last_state=False
)
rnn = rnnlayer(inputs)
rnn_model = tl.models.Model(inputs=inputs, outputs=rnn)
print(rnn_model)
rnn_model.eval()
assert not rnnlayer.is_train
rnn_y = rnn_model(self.data_x)
self.assertEqual(rnn_y.get_shape().as_list(), [self.batch_size, self.num_steps, self.hidden_size])
def test_basic_simplernn_dynamic(self):
class CustomisedModel(tl.models.Model):
def __init__(self):
super(CustomisedModel, self).__init__()
self.rnnlayer = tl.layers.RNN(
cell=tf.keras.layers.SimpleRNNCell(units=8, dropout=0.1), in_channels=4, return_last_output=False,
return_seq_2d=False, return_last_state=False
)
self.dense = tl.layers.Dense(in_channels=8, n_units=1)
def forward(self, x):
z = self.rnnlayer(x)
z = self.dense(z[:, -1, :])
return z
rnn_model = CustomisedModel()
print(rnn_model)
optimizer = tf.optimizers.Adam(learning_rate=0.01)
rnn_model.train()
for epoch in range(50):
with tf.GradientTape() as tape:
pred_y = rnn_model(self.data_x)
loss = tl.cost.mean_squared_error(pred_y, self.data_y)
gradients = tape.gradient(loss, rnn_model.trainable_weights)
optimizer.apply_gradients(zip(gradients, rnn_model.trainable_weights))
if (epoch + 1) % 10 == 0:
print("epoch %d, loss %f" % (epoch, loss))
def test_basic_simplernn_dynamic_class(self):
class CustomisedModel(tl.models.Model):
def __init__(self):
super(CustomisedModel, self).__init__()
self.rnnlayer = tl.layers.SimpleRNN(
units=8, dropout=0.1, in_channels=4, return_last_output=False, return_seq_2d=False,
return_last_state=False
)
self.dense = tl.layers.Dense(in_channels=8, n_units=1)
def forward(self, x):
z = self.rnnlayer(x)
z = self.dense(z[:, -1, :])
return z
rnn_model = CustomisedModel()
print(rnn_model)
optimizer = tf.optimizers.Adam(learning_rate=0.01)
rnn_model.train()
for epoch in range(50):
with tf.GradientTape() as tape:
pred_y = rnn_model(self.data_x)
loss = tl.cost.mean_squared_error(pred_y, self.data_y)
gradients = tape.gradient(loss, rnn_model.trainable_weights)
optimizer.apply_gradients(zip(gradients, rnn_model.trainable_weights))
if (epoch + 1) % 10 == 0:
print("epoch %d, loss %f" % (epoch, loss))
def test_basic_simplernn_dynamic_2(self):
class CustomisedModel(tl.models.Model):
def __init__(self):
super(CustomisedModel, self).__init__()
self.rnnlayer = tl.layers.RNN(
cell=tf.keras.layers.SimpleRNNCell(units=8, dropout=0.1), in_channels=4, return_last_output=False,
return_seq_2d=False, return_last_state=False
)
self.dense = tl.layers.Dense(in_channels=8, n_units=1)
def forward(self, x):
z = self.rnnlayer(x, return_seq_2d=True)
z = self.dense(z[-2:, :])
return z
rnn_model = CustomisedModel()
print(rnn_model)
optimizer = tf.optimizers.Adam(learning_rate=0.01)
rnn_model.train()
assert rnn_model.rnnlayer.is_train
for epoch in range(50):
with tf.GradientTape() as tape:
pred_y = rnn_model(self.data_x)
loss = tl.cost.mean_squared_error(pred_y, self.data_y)
gradients = tape.gradient(loss, rnn_model.trainable_weights)
optimizer.apply_gradients(zip(gradients, rnn_model.trainable_weights))
if (epoch + 1) % 10 == 0:
print("epoch %d, loss %f" % (epoch, loss))
def test_basic_simplernn_dynamic_3(self):
class CustomisedModel(tl.models.Model):
def __init__(self):
super(CustomisedModel, self).__init__()
self.rnnlayer1 = tl.layers.RNN(
cell=tf.keras.layers.SimpleRNNCell(units=8, dropout=0.1), in_channels=4, return_last_output=True,
return_last_state=True
)
self.rnnlayer2 = tl.layers.RNN(
cell=tf.keras.layers.SimpleRNNCell(units=8, dropout=0.1), in_channels=4, return_last_output=True,
return_last_state=False
)
self.dense = tl.layers.Dense(in_channels=8, n_units=1)
def forward(self, x):
_, state = self.rnnlayer1(x[:, :2, :])
z = self.rnnlayer2(x[:, 2:, :], initial_state=state)
z = self.dense(z)
return z
rnn_model = CustomisedModel()
print(rnn_model)
optimizer = tf.optimizers.Adam(learning_rate=0.01)
rnn_model.train()
assert rnn_model.rnnlayer1.is_train
assert rnn_model.rnnlayer2.is_train
for epoch in range(50):
with tf.GradientTape() as tape:
pred_y = rnn_model(self.data_x)
loss = tl.cost.mean_squared_error(pred_y, self.data_y)
gradients = tape.gradient(loss, rnn_model.trainable_weights)
optimizer.apply_gradients(zip(gradients, rnn_model.trainable_weights))
if (epoch + 1) % 10 == 0:
print("epoch %d, loss %f" % (epoch, loss))
def test_basic_lstmrnn(self):
inputs = tl.layers.Input([self.batch_size, self.num_steps, self.embedding_size])
rnnlayer = tl.layers.RNN(
cell=tf.keras.layers.LSTMCell(units=self.hidden_size, dropout=0.1), return_last_output=True,
return_seq_2d=False, return_last_state=True
)
rnn, rnn_state = rnnlayer(inputs)
outputs = tl.layers.Dense(n_units=1)(rnn)
rnn_model = tl.models.Model(inputs=inputs, outputs=[outputs, rnn_state[0], rnn_state[1]])
print(rnn_model)
optimizer = tf.optimizers.Adam(learning_rate=0.01)
rnn_model.train()
for epoch in range(50):
with tf.GradientTape() as tape:
pred_y, final_h, final_c = rnn_model(self.data_x)
loss = tl.cost.mean_squared_error(pred_y, self.data_y)
gradients = tape.gradient(loss, rnn_model.trainable_weights)
optimizer.apply_gradients(zip(gradients, rnn_model.trainable_weights))
if (epoch + 1) % 10 == 0:
print("epoch %d, loss %f" % (epoch, loss))
def test_basic_lstmrnn_class(self):
inputs = tl.layers.Input([self.batch_size, self.num_steps, self.embedding_size])
rnnlayer = tl.layers.LSTMRNN(
units=self.hidden_size, dropout=0.1, return_last_output=True, return_seq_2d=False, return_last_state=True
)
rnn, rnn_state = rnnlayer(inputs)
outputs = tl.layers.Dense(n_units=1)(rnn)
rnn_model = tl.models.Model(inputs=inputs, outputs=[outputs, rnn_state[0], rnn_state[1]])
print(rnn_model)
optimizer = tf.optimizers.Adam(learning_rate=0.01)
rnn_model.train()
for epoch in range(50):
with tf.GradientTape() as tape:
pred_y, final_h, final_c = rnn_model(self.data_x)
loss = tl.cost.mean_squared_error(pred_y, self.data_y)
gradients = tape.gradient(loss, rnn_model.trainable_weights)
optimizer.apply_gradients(zip(gradients, rnn_model.trainable_weights))
if (epoch + 1) % 10 == 0:
print("epoch %d, loss %f" % (epoch, loss))
def test_basic_grurnn(self):
inputs = tl.layers.Input([self.batch_size, self.num_steps, self.embedding_size])
rnnlayer = tl.layers.RNN(
cell=tf.keras.layers.GRUCell(units=self.hidden_size, dropout=0.1), return_last_output=True,
return_seq_2d=False, return_last_state=True
)
rnn, rnn_state = rnnlayer(inputs)
outputs = tl.layers.Dense(n_units=1)(rnn)
rnn_model = tl.models.Model(inputs=inputs, outputs=[outputs, rnn_state[0]])
print(rnn_model)
optimizer = tf.optimizers.Adam(learning_rate=0.01)
rnn_model.train()
for epoch in range(50):
with tf.GradientTape() as tape:
pred_y, final_h = rnn_model(self.data_x)
loss = tl.cost.mean_squared_error(pred_y, self.data_y)
gradients = tape.gradient(loss, rnn_model.trainable_weights)
optimizer.apply_gradients(zip(gradients, rnn_model.trainable_weights))
if (epoch + 1) % 10 == 0:
print("epoch %d, loss %f" % (epoch, loss))
def test_basic_grurnn_class(self):
inputs = tl.layers.Input([self.batch_size, self.num_steps, self.embedding_size])
rnnlayer = tl.layers.GRURNN(
units=self.hidden_size, dropout=0.1, return_last_output=True, return_seq_2d=False, return_last_state=True
)
rnn, rnn_state = rnnlayer(inputs)
outputs = tl.layers.Dense(n_units=1)(rnn)
rnn_model = tl.models.Model(inputs=inputs, outputs=[outputs, rnn_state[0]])
print(rnn_model)
optimizer = tf.optimizers.Adam(learning_rate=0.01)
rnn_model.train()
for epoch in range(50):
with tf.GradientTape() as tape:
pred_y, final_h = rnn_model(self.data_x)
loss = tl.cost.mean_squared_error(pred_y, self.data_y)
gradients = tape.gradient(loss, rnn_model.trainable_weights)
optimizer.apply_gradients(zip(gradients, rnn_model.trainable_weights))
if (epoch + 1) % 10 == 0:
print("epoch %d, loss %f" % (epoch, loss))
def test_basic_birnn_simplernncell(self):
inputs = tl.layers.Input([self.batch_size, self.num_steps, self.embedding_size])
rnnlayer = tl.layers.BiRNN(
fw_cell=tf.keras.layers.SimpleRNNCell(units=self.hidden_size, dropout=0.1),
bw_cell=tf.keras.layers.SimpleRNNCell(units=self.hidden_size + 1,
dropout=0.1), return_seq_2d=True, return_last_state=True
)
rnn, rnn_fw_state, rnn_bw_state = rnnlayer(inputs)
dense = tl.layers.Dense(n_units=1)(rnn)
outputs = tl.layers.Reshape([-1, self.num_steps])(dense)
rnn_model = tl.models.Model(inputs=inputs, outputs=[outputs, rnn, rnn_fw_state[0], rnn_bw_state[0]])
print(rnn_model)
optimizer = tf.optimizers.Adam(learning_rate=0.01)
rnn_model.train()
assert rnnlayer.is_train
for epoch in range(50):
with tf.GradientTape() as tape:
pred_y, r, rfw, rbw = rnn_model(self.data_x)
loss = tl.cost.mean_squared_error(pred_y, self.data_y2)
self.assertEqual(
r.get_shape().as_list(), [self.batch_size * self.num_steps, self.hidden_size + self.hidden_size + 1]
)
gradients = tape.gradient(loss, rnn_model.trainable_weights)
optimizer.apply_gradients(zip(gradients, rnn_model.trainable_weights))
if (epoch + 1) % 10 == 0:
print("epoch %d, loss %f" % (epoch, loss))
def test_basic_birnn_lstmcell(self):
inputs = tl.layers.Input([self.batch_size, self.num_steps, self.embedding_size])
rnnlayer = tl.layers.BiRNN(
fw_cell=tf.keras.layers.LSTMCell(units=self.hidden_size, dropout=0.1),
bw_cell=tf.keras.layers.LSTMCell(units=self.hidden_size + 1,
dropout=0.1), return_seq_2d=False, return_last_state=True
)
rnn, rnn_fw_state, rnn_bw_state = rnnlayer(inputs)
din = tl.layers.Reshape([-1, self.hidden_size + self.hidden_size + 1])(rnn)
dense = tl.layers.Dense(n_units=1)(din)
outputs = tl.layers.Reshape([-1, self.num_steps])(dense)
rnn_model = tl.models.Model(inputs=inputs, outputs=[outputs, rnn, rnn_fw_state[0], rnn_bw_state[0]])
print(rnn_model)
optimizer = tf.optimizers.Adam(learning_rate=0.01)
rnn_model.train()
assert rnnlayer.is_train
for epoch in range(50):
with tf.GradientTape() as tape:
pred_y, r, rfw, rbw = rnn_model(self.data_x)
loss = tl.cost.mean_squared_error(pred_y, self.data_y2)
self.assertEqual(
r.get_shape().as_list(), [self.batch_size, self.num_steps, self.hidden_size + self.hidden_size + 1]
)
gradients = tape.gradient(loss, rnn_model.trainable_weights)
optimizer.apply_gradients(zip(gradients, rnn_model.trainable_weights))
if (epoch + 1) % 10 == 0:
print("epoch %d, loss %f" % (epoch, loss))
def test_basic_birnn_grucell(self):
class CustomisedModel(tl.models.Model):
def __init__(self):
super(CustomisedModel, self).__init__()
self.rnnlayer = tl.layers.BiRNN(
fw_cell=tf.keras.layers.GRUCell(units=8,
dropout=0.1), bw_cell=tf.keras.layers.GRUCell(units=8, dropout=0.1),
in_channels=4, return_seq_2d=False, return_last_state=False
)
self.dense = tl.layers.Dense(in_channels=16, n_units=1)
self.reshape = tl.layers.Reshape([-1, 6])
def forward(self, x):
z = self.rnnlayer(x, return_seq_2d=True)
z = self.dense(z)
z = self.reshape(z)
return z
rnn_model = CustomisedModel()
print(rnn_model)
optimizer = tf.optimizers.Adam(learning_rate=0.01)
rnn_model.train()
for epoch in range(50):
with tf.GradientTape() as tape:
pred_y = rnn_model(self.data_x)
loss = tl.cost.mean_squared_error(pred_y, self.data_y)
gradients = tape.gradient(loss, rnn_model.trainable_weights)
optimizer.apply_gradients(zip(gradients, rnn_model.trainable_weights))
if (epoch + 1) % 10 == 0:
print("epoch %d, loss %f" % (epoch, loss))
def test_stack_simplernn(self):
inputs = tl.layers.Input([self.batch_size, self.num_steps, self.embedding_size])
rnnlayer1 = tl.layers.RNN(
cell=tf.keras.layers.SimpleRNNCell(units=self.hidden_size, dropout=0.1), return_last_output=False,
return_seq_2d=False, return_last_state=False
)
rnn1 = rnnlayer1(inputs)
rnnlayer2 = tl.layers.RNN(
cell=tf.keras.layers.SimpleRNNCell(units=self.hidden_size, dropout=0.1), return_last_output=True,
return_seq_2d=False, return_last_state=False
)
rnn2 = rnnlayer2(rnn1)
outputs = tl.layers.Dense(n_units=1)(rnn2)
rnn_model = tl.models.Model(inputs=inputs, outputs=outputs)
print(rnn_model)
optimizer = tf.optimizers.Adam(learning_rate=0.01)
rnn_model.train()
assert rnnlayer1.is_train
assert rnnlayer2.is_train
for epoch in range(50):
with tf.GradientTape() as tape:
pred_y = rnn_model(self.data_x)
loss = tl.cost.mean_squared_error(pred_y, self.data_y)
gradients = tape.gradient(loss, rnn_model.trainable_weights)
optimizer.apply_gradients(zip(gradients, rnn_model.trainable_weights))
if (epoch + 1) % 10 == 0:
print("epoch %d, loss %f" % (epoch, loss))
def test_stack_birnn_simplernncell(self):
inputs = tl.layers.Input([self.batch_size, self.num_steps, self.embedding_size])
rnnlayer = tl.layers.BiRNN(
fw_cell=tf.keras.layers.SimpleRNNCell(units=self.hidden_size, dropout=0.1),
bw_cell=tf.keras.layers.SimpleRNNCell(units=self.hidden_size + 1,
dropout=0.1), return_seq_2d=False, return_last_state=False
)
rnn = rnnlayer(inputs)
rnnlayer2 = tl.layers.BiRNN(
fw_cell=tf.keras.layers.SimpleRNNCell(units=self.hidden_size, dropout=0.1),
bw_cell=tf.keras.layers.SimpleRNNCell(units=self.hidden_size + 1,
dropout=0.1), return_seq_2d=True, return_last_state=False
)
rnn2 = rnnlayer2(rnn)
dense = tl.layers.Dense(n_units=1)(rnn2)
outputs = tl.layers.Reshape([-1, self.num_steps])(dense)
rnn_model = tl.models.Model(inputs=inputs, outputs=outputs)
print(rnn_model)
optimizer = tf.optimizers.Adam(learning_rate=0.01)
rnn_model.train()
assert rnnlayer.is_train
assert rnnlayer2.is_train
for epoch in range(50):
with tf.GradientTape() as tape:
pred_y = rnn_model(self.data_x)
loss = tl.cost.mean_squared_error(pred_y, self.data_y2)
gradients = tape.gradient(loss, rnn_model.trainable_weights)
optimizer.apply_gradients(zip(gradients, rnn_model.trainable_weights))
if (epoch + 1) % 10 == 0:
print("epoch %d, loss %f" % (epoch, loss))
def test_basic_simplernn_dropout_1(self):
inputs = tl.layers.Input([self.batch_size, self.num_steps, self.embedding_size])
rnnlayer = tl.layers.RNN(
cell=tf.keras.layers.SimpleRNNCell(units=self.hidden_size, dropout=0.5), return_last_output=True,
return_seq_2d=False, return_last_state=False
)
rnn = rnnlayer(inputs)
outputs = tl.layers.Dense(n_units=1)(rnn)
rnn_model = tl.models.Model(inputs=inputs, outputs=[outputs, rnn])
print(rnn_model)
rnn_model.train()
assert rnnlayer.is_train
pred_y, rnn_1 = rnn_model(self.data_x)
pred_y, rnn_2 = rnn_model(self.data_x)
self.assertFalse(np.allclose(rnn_1, rnn_2))
rnn_model.eval()
assert not rnnlayer.is_train
pred_y_1, rnn_1 = rnn_model(self.data_x)
pred_y_2, rnn_2 = rnn_model(self.data_x)
self.assertTrue(np.allclose(rnn_1, rnn_2))
def test_basic_simplernn_dropout_2(self):
inputs = tl.layers.Input([self.batch_size, self.num_steps, self.embedding_size])
rnnlayer = tl.layers.RNN(
cell=tf.keras.layers.SimpleRNNCell(units=self.hidden_size, recurrent_dropout=0.5), return_last_output=True,
return_seq_2d=False, return_last_state=False
)
rnn = rnnlayer(inputs)
outputs = tl.layers.Dense(n_units=1)(rnn)
rnn_model = tl.models.Model(inputs=inputs, outputs=[outputs, rnn])
print(rnn_model)
rnn_model.train()
assert rnnlayer.is_train
pred_y, rnn_1 = rnn_model(self.data_x)
pred_y, rnn_2 = rnn_model(self.data_x)
self.assertFalse(np.allclose(rnn_1, rnn_2))
rnn_model.eval()
assert not rnnlayer.is_train
pred_y_1, rnn_1 = rnn_model(self.data_x)
pred_y_2, rnn_2 = rnn_model(self.data_x)
self.assertTrue(np.allclose(rnn_1, rnn_2))
def test_basic_birnn_simplernn_dropout_1(self):
inputs = tl.layers.Input([self.batch_size, self.num_steps, self.embedding_size])
rnnlayer = tl.layers.BiRNN(
fw_cell=tf.keras.layers.SimpleRNNCell(units=self.hidden_size, dropout=0.5),
bw_cell=tf.keras.layers.SimpleRNNCell(units=self.hidden_size,
dropout=0.5), return_seq_2d=True, return_last_state=False
)
rnn = rnnlayer(inputs)
outputs = tl.layers.Dense(n_units=1)(rnn)
rnn_model = tl.models.Model(inputs=inputs, outputs=[outputs, rnn])
print(rnn_model)
rnn_model.train()
assert rnnlayer.is_train
pred_y, rnn_1 = rnn_model(self.data_x)
pred_y, rnn_2 = rnn_model(self.data_x)
self.assertFalse(np.allclose(rnn_1, rnn_2))
rnn_model.eval()
assert not rnnlayer.is_train
pred_y_1, rnn_1 = rnn_model(self.data_x)
pred_y_2, rnn_2 = rnn_model(self.data_x)
self.assertTrue(np.allclose(rnn_1, rnn_2))
def test_basic_birnn_simplernn_dropout_2(self):
inputs = tl.layers.Input([self.batch_size, self.num_steps, self.embedding_size])
rnnlayer = tl.layers.BiRNN(
fw_cell=tf.keras.layers.SimpleRNNCell(units=self.hidden_size, recurrent_dropout=0.5),
bw_cell=tf.keras.layers.SimpleRNNCell(units=self.hidden_size,
recurrent_dropout=0.5), return_seq_2d=True, return_last_state=False
)
rnn = rnnlayer(inputs)
outputs = tl.layers.Dense(n_units=1)(rnn)
rnn_model = tl.models.Model(inputs=inputs, outputs=[outputs, rnn])
print(rnn_model)
rnn_model.train()
assert rnnlayer.is_train
pred_y, rnn_1 = rnn_model(self.data_x)
pred_y, rnn_2 = rnn_model(self.data_x)
self.assertFalse(np.allclose(rnn_1, rnn_2))
rnn_model.eval()
assert not rnnlayer.is_train
pred_y_1, rnn_1 = rnn_model(self.data_x)
pred_y_2, rnn_2 = rnn_model(self.data_x)
self.assertTrue(np.allclose(rnn_1, rnn_2))
def test_sequence_length(self):
data = [[[1], [2], [0], [0], [0]], [[1], [2], [3], [0], [0]], [[1], [2], [6], [1], [0]]]
data = tf.convert_to_tensor(data, dtype=tf.float32)
length = tl.layers.retrieve_seq_length_op(data)
print(length)
data = [
[[1, 2], [2, 2], [1, 2], [1, 2], [0, 0]], [[2, 3], [2, 4], [3, 2], [0, 0], [0, 0]],
[[3, 3], [2, 2], [5, 3], [1, 2], [0, 0]]
]
data = tf.convert_to_tensor(data, dtype=tf.float32)
length = tl.layers.retrieve_seq_length_op(data)
print(length)
def test_sequence_length2(self):
data = [[1, 2, 0, 0, 0], [1, 2, 3, 0, 0], [1, 2, 6, 1, 0]]
data = tf.convert_to_tensor(data, dtype=tf.float32)
length = tl.layers.retrieve_seq_length_op2(data)
print(length)
def test_sequence_length3(self):
data = [[[1], [2], [0], [0], [0]], [[1], [2], [3], [0], [0]], [[1], [2], [6], [1], [0]]]
data = tf.convert_to_tensor(data, dtype=tf.float32)
length = tl.layers.retrieve_seq_length_op3(data)
print(length)
data = [
[[1, 2], [2, 2], [1, 2], [1, 2], [0, 0]], [[2, 3], [2, 4], [3, 2], [0, 0], [0, 0]],
[[3, 3], [2, 2], [5, 3], [1, 2], [0, 0]]
]
data = tf.convert_to_tensor(data, dtype=tf.float32)
length = tl.layers.retrieve_seq_length_op3(data)
print(length)
data = [[1, 2, 0, 0, 0], [1, 2, 3, 0, 0], [1, 2, 6, 1, 0]]
data = tf.convert_to_tensor(data, dtype=tf.float32)
length = tl.layers.retrieve_seq_length_op3(data)
print(length)
data = [
['hello', 'world', '', '', ''], ['hello', 'world', 'tensorlayer', '', ''],
['hello', 'world', 'tensorlayer', '2.0', '']
]
data = tf.convert_to_tensor(data, dtype=tf.string)
length = tl.layers.retrieve_seq_length_op3(data, pad_val='')
print(length)
try:
data = [1, 2, 0, 0, 0]
data = tf.convert_to_tensor(data, dtype=tf.float32)
length = tl.layers.retrieve_seq_length_op3(data)
print(length)
except Exception as e:
print(e)
try:
data = np.random.random([4, 2, 6, 2])
data = tf.convert_to_tensor(data, dtype=tf.float32)
length = tl.layers.retrieve_seq_length_op3(data)
print(length)
except Exception as e:
print(e)
def test_target_mask_op(self):
fail_flag = False
data = [
['hello', 'world', '', '', ''], ['hello', 'world', 'tensorlayer', '', ''],
['hello', 'world', 'tensorlayer', '2.0', '']
]
try:
tl.layers.target_mask_op(data, pad_val='')
fail_flag = True
except AttributeError as e:
print(e)
if fail_flag:
self.fail("Type error not raised")
data = tf.convert_to_tensor(data, dtype=tf.string)
mask = tl.layers.target_mask_op(data, pad_val='')
print(mask)
data = [[[1], [0], [0], [0], [0]], [[1], [2], [3], [0], [0]], [[1], [2], [0], [1], [0]]]
data = tf.convert_to_tensor(data, dtype=tf.float32)
mask = tl.layers.target_mask_op(data)
print(mask)
data = [
[[0, 0], [2, 2], [1, 2], [1, 2], [0, 0]], [[2, 3], [2, 4], [3, 2], [1, 0], [0, 0]],
[[3, 3], [0, 1], [5, 3], [1, 2], [0, 0]]
]
data = tf.convert_to_tensor(data, dtype=tf.float32)
mask = tl.layers.target_mask_op(data)
print(mask)
fail_flag = False
try:
data = [1, 2, 0, 0, 0]
data = tf.convert_to_tensor(data, dtype=tf.float32)
tl.layers.target_mask_op(data)
fail_flag = True
except ValueError as e:
print(e)
if fail_flag:
self.fail("Wrong data shape not detected.")
fail_flag = False
try:
data = np.random.random([4, 2, 6, 2])
data = tf.convert_to_tensor(data, dtype=tf.float32)
tl.layers.target_mask_op(data)
fail_flag = True
except ValueError as e:
print(e)
if fail_flag:
self.fail("Wrong data shape not detected.")
def test_dynamic_rnn(self):
batch_size = 3
num_steps = 5
embedding_size = 6
hidden_size = 4
inputs = tl.layers.Input([batch_size, num_steps, embedding_size])
rnn_layer = tl.layers.RNN(
cell=tf.keras.layers.LSTMCell(units=hidden_size, dropout=0.1), in_channels=embedding_size,
return_last_output=True, return_last_state=True
)
rnn_layer.is_train = False
print(tl.layers.retrieve_seq_length_op3(inputs))
_ = rnn_layer(inputs, sequence_length=tl.layers.retrieve_seq_length_op3(inputs))
_ = rnn_layer(inputs, sequence_length=np.array([5, 5, 5]))
# test exceptions
except_flag = False
try:
_ = rnn_layer(inputs, sequence_length=1)
except_flag = True
except TypeError as e:
print(e)
try:
_ = rnn_layer(inputs, sequence_length=["str", 1, 2])
except_flag = True
except TypeError as e:
print(e)
try:
_ = rnn_layer(inputs, sequence_length=[10, 2, 2])
except_flag = True
except ValueError as e:
print(e)
try:
_ = rnn_layer(inputs, sequence_length=[1])
except_flag = True
except ValueError as e:
print(e)
if except_flag:
self.fail("Exception not detected.")
# test warning
for _ in range(5):
_ = rnn_layer(inputs, sequence_length=[5, 5, 5], return_last_output=False, return_last_state=True)
_ = rnn_layer(inputs, sequence_length=[5, 5, 5], return_last_output=True, return_last_state=False)
x = rnn_layer(inputs, sequence_length=None, return_last_output=True, return_last_state=True)
y = rnn_layer(inputs, sequence_length=[5, 5, 5], return_last_output=True, return_last_state=True)
assert len(x) == 2
assert len(y) == 2
for i, j in zip(x, y):
self.assertTrue(np.allclose(i, j))
def test_dynamic_rnn_with_seq_len_op2(self):
data = [[[1], [2], [0], [0], [0]], [[1], [2], [3], [0], [0]], [[1], [2], [6], [1], [1]]]
data = tf.convert_to_tensor(data, dtype=tf.float32)
class DynamicRNNExample(tl.models.Model):
def __init__(self):
super(DynamicRNNExample, self).__init__()
self.rnnlayer = tl.layers.RNN(
cell=tf.keras.layers.SimpleRNNCell(units=6, dropout=0.1), in_channels=1, return_last_output=True,
return_last_state=True
)
def forward(self, x):
z0, s0 = self.rnnlayer(x, sequence_length=None)
z1, s1 = self.rnnlayer(x, sequence_length=tl.layers.retrieve_seq_length_op3(x))
z2, s2 = self.rnnlayer(x, sequence_length=tl.layers.retrieve_seq_length_op3(x), initial_state=s1)
print(z0)
print(z1)
print(z2)
print("===")
print(s0)
print(s1)
print(s2)
return z2, s2
model = DynamicRNNExample()
model.eval()
output, state = model(data)
print(output.shape)
print(state)
def test_dynamic_rnn_with_fake_data(self):
class CustomisedModel(tl.models.Model):
def __init__(self):
super(CustomisedModel, self).__init__()
self.rnnlayer = tl.layers.LSTMRNN(
units=8, dropout=0.1, in_channels=4, return_last_output=True, return_last_state=False
)
self.dense = tl.layers.Dense(in_channels=8, n_units=1)
def forward(self, x):
z = self.rnnlayer(x, sequence_length=tl.layers.retrieve_seq_length_op3(x))
z = self.dense(z[:, :])
return z
rnn_model = CustomisedModel()
print(rnn_model)
optimizer = tf.optimizers.Adam(learning_rate=0.01)
rnn_model.train()
for epoch in range(50):
with tf.GradientTape() as tape:
pred_y = rnn_model(self.data_x)
loss = tl.cost.mean_squared_error(pred_y, self.data_y)
gradients = tape.gradient(loss, rnn_model.trainable_weights)
optimizer.apply_gradients(zip(gradients, rnn_model.trainable_weights))
if (epoch + 1) % 10 == 0:
print("epoch %d, loss %f" % (epoch, loss))
filename = "dynamic_rnn.h5"
rnn_model.save_weights(filename)
# Testing saving and restoring of RNN weights
rnn_model2 = CustomisedModel()
rnn_model2.eval()
pred_y = rnn_model2(self.data_x)
loss = tl.cost.mean_squared_error(pred_y, self.data_y)
print("MODEL INIT loss %f" % (loss))
rnn_model2.load_weights(filename)
pred_y = rnn_model2(self.data_x)
loss = tl.cost.mean_squared_error(pred_y, self.data_y)
print("MODEL RESTORE W loss %f" % (loss))
import os
os.remove(filename)
if __name__ == '__main__':
unittest.main()