927 lines
35 KiB
Python
927 lines
35 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import os
|
|
import unittest
|
|
|
|
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
|
|
|
|
import numpy as np
|
|
import tensorlayer as tl
|
|
|
|
from tests.utils import CustomTestCase
|
|
|
|
|
|
class Layer_RNN_Test(CustomTestCase):
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
|
|
cls.batch_size = 2
|
|
|
|
cls.vocab_size = 20
|
|
cls.embedding_size = 4
|
|
|
|
cls.hidden_size = 8
|
|
cls.num_steps = 6
|
|
|
|
cls.data_n_steps = np.random.randint(low=cls.num_steps // 2, high=cls.num_steps + 1, size=cls.batch_size)
|
|
cls.data_x = np.random.random([cls.batch_size, cls.num_steps, cls.embedding_size]).astype(np.float32)
|
|
|
|
for i in range(cls.batch_size):
|
|
for j in range(cls.data_n_steps[i], cls.num_steps):
|
|
cls.data_x[i][j][:] = 0
|
|
|
|
cls.data_y = np.zeros([cls.batch_size, 1]).astype(np.float32)
|
|
cls.data_y2 = np.zeros([cls.batch_size, cls.num_steps]).astype(np.float32)
|
|
|
|
map1 = np.random.random([1, cls.num_steps])
|
|
map2 = np.random.random([cls.embedding_size, 1])
|
|
for i in range(cls.batch_size):
|
|
cls.data_y[i] = np.matmul(map1, np.matmul(cls.data_x[i], map2))
|
|
cls.data_y2[i] = np.matmul(cls.data_x[i], map2)[:, 0]
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
pass
|
|
|
|
def test_basic_simplernn(self):
|
|
|
|
inputs = tl.layers.Input([self.batch_size, self.num_steps, self.embedding_size])
|
|
rnnlayer = tl.layers.RNN(
|
|
cell=tf.keras.layers.SimpleRNNCell(units=self.hidden_size, dropout=0.1), return_last_output=True,
|
|
return_seq_2d=False, return_last_state=True
|
|
)
|
|
rnn, rnn_state = rnnlayer(inputs)
|
|
outputs = tl.layers.Dense(n_units=1)(rnn)
|
|
rnn_model = tl.models.Model(inputs=inputs, outputs=[outputs, rnn_state[0]])
|
|
print(rnn_model)
|
|
|
|
optimizer = tf.optimizers.Adam(learning_rate=0.01)
|
|
|
|
rnn_model.train()
|
|
assert rnnlayer.is_train
|
|
|
|
for epoch in range(50):
|
|
with tf.GradientTape() as tape:
|
|
pred_y, final_state = rnn_model(self.data_x)
|
|
loss = tl.cost.mean_squared_error(pred_y, self.data_y)
|
|
|
|
gradients = tape.gradient(loss, rnn_model.trainable_weights)
|
|
optimizer.apply_gradients(zip(gradients, rnn_model.trainable_weights))
|
|
|
|
if (epoch + 1) % 10 == 0:
|
|
print("epoch %d, loss %f" % (epoch, loss))
|
|
|
|
def test_basic_simplernn_class(self):
|
|
|
|
inputs = tl.layers.Input([self.batch_size, self.num_steps, self.embedding_size])
|
|
rnnlayer = tl.layers.SimpleRNN(
|
|
units=self.hidden_size, dropout=0.1, return_last_output=True, return_seq_2d=False, return_last_state=True
|
|
)
|
|
rnn, rnn_state = rnnlayer(inputs)
|
|
outputs = tl.layers.Dense(n_units=1)(rnn)
|
|
rnn_model = tl.models.Model(inputs=inputs, outputs=[outputs, rnn_state[0]])
|
|
print(rnn_model)
|
|
|
|
optimizer = tf.optimizers.Adam(learning_rate=0.01)
|
|
|
|
rnn_model.train()
|
|
assert rnnlayer.is_train
|
|
|
|
for epoch in range(50):
|
|
with tf.GradientTape() as tape:
|
|
pred_y, final_state = rnn_model(self.data_x)
|
|
loss = tl.cost.mean_squared_error(pred_y, self.data_y)
|
|
|
|
gradients = tape.gradient(loss, rnn_model.trainable_weights)
|
|
optimizer.apply_gradients(zip(gradients, rnn_model.trainable_weights))
|
|
|
|
if (epoch + 1) % 10 == 0:
|
|
print("epoch %d, loss %f" % (epoch, loss))
|
|
|
|
def test_basic_simplernn2(self):
|
|
|
|
inputs = tl.layers.Input([self.batch_size, self.num_steps, self.embedding_size])
|
|
rnnlayer = tl.layers.RNN(
|
|
cell=tf.keras.layers.SimpleRNNCell(units=self.hidden_size, dropout=0.1), return_last_output=False,
|
|
return_seq_2d=True, return_last_state=False
|
|
)
|
|
rnn = rnnlayer(inputs)
|
|
outputs = tl.layers.Dense(n_units=1)(rnn)
|
|
rnn_model = tl.models.Model(inputs=inputs, outputs=[outputs, rnn])
|
|
print(rnn_model)
|
|
|
|
rnn_model.eval()
|
|
assert not rnnlayer.is_train
|
|
|
|
pred_y, rnn_y = rnn_model(self.data_x)
|
|
self.assertEqual(pred_y.get_shape().as_list(), [self.batch_size * self.num_steps, 1])
|
|
self.assertEqual(rnn_y.get_shape().as_list(), [self.batch_size * self.num_steps, self.hidden_size])
|
|
|
|
def test_basic_simplernn3(self):
|
|
|
|
inputs = tl.layers.Input([self.batch_size, self.num_steps, self.embedding_size])
|
|
rnnlayer = tl.layers.RNN(
|
|
cell=tf.keras.layers.SimpleRNNCell(units=self.hidden_size, dropout=0.1), return_last_output=False,
|
|
return_seq_2d=False, return_last_state=False
|
|
)
|
|
rnn = rnnlayer(inputs)
|
|
rnn_model = tl.models.Model(inputs=inputs, outputs=rnn)
|
|
print(rnn_model)
|
|
|
|
rnn_model.eval()
|
|
assert not rnnlayer.is_train
|
|
|
|
rnn_y = rnn_model(self.data_x)
|
|
self.assertEqual(rnn_y.get_shape().as_list(), [self.batch_size, self.num_steps, self.hidden_size])
|
|
|
|
def test_basic_simplernn_dynamic(self):
|
|
|
|
class CustomisedModel(tl.models.Model):
|
|
|
|
def __init__(self):
|
|
super(CustomisedModel, self).__init__()
|
|
self.rnnlayer = tl.layers.RNN(
|
|
cell=tf.keras.layers.SimpleRNNCell(units=8, dropout=0.1), in_channels=4, return_last_output=False,
|
|
return_seq_2d=False, return_last_state=False
|
|
)
|
|
self.dense = tl.layers.Dense(in_channels=8, n_units=1)
|
|
|
|
def forward(self, x):
|
|
z = self.rnnlayer(x)
|
|
z = self.dense(z[:, -1, :])
|
|
return z
|
|
|
|
rnn_model = CustomisedModel()
|
|
print(rnn_model)
|
|
optimizer = tf.optimizers.Adam(learning_rate=0.01)
|
|
rnn_model.train()
|
|
|
|
for epoch in range(50):
|
|
with tf.GradientTape() as tape:
|
|
pred_y = rnn_model(self.data_x)
|
|
loss = tl.cost.mean_squared_error(pred_y, self.data_y)
|
|
|
|
gradients = tape.gradient(loss, rnn_model.trainable_weights)
|
|
optimizer.apply_gradients(zip(gradients, rnn_model.trainable_weights))
|
|
|
|
if (epoch + 1) % 10 == 0:
|
|
print("epoch %d, loss %f" % (epoch, loss))
|
|
|
|
def test_basic_simplernn_dynamic_class(self):
|
|
|
|
class CustomisedModel(tl.models.Model):
|
|
|
|
def __init__(self):
|
|
super(CustomisedModel, self).__init__()
|
|
self.rnnlayer = tl.layers.SimpleRNN(
|
|
units=8, dropout=0.1, in_channels=4, return_last_output=False, return_seq_2d=False,
|
|
return_last_state=False
|
|
)
|
|
self.dense = tl.layers.Dense(in_channels=8, n_units=1)
|
|
|
|
def forward(self, x):
|
|
z = self.rnnlayer(x)
|
|
z = self.dense(z[:, -1, :])
|
|
return z
|
|
|
|
rnn_model = CustomisedModel()
|
|
print(rnn_model)
|
|
optimizer = tf.optimizers.Adam(learning_rate=0.01)
|
|
rnn_model.train()
|
|
|
|
for epoch in range(50):
|
|
with tf.GradientTape() as tape:
|
|
pred_y = rnn_model(self.data_x)
|
|
loss = tl.cost.mean_squared_error(pred_y, self.data_y)
|
|
|
|
gradients = tape.gradient(loss, rnn_model.trainable_weights)
|
|
optimizer.apply_gradients(zip(gradients, rnn_model.trainable_weights))
|
|
|
|
if (epoch + 1) % 10 == 0:
|
|
print("epoch %d, loss %f" % (epoch, loss))
|
|
|
|
def test_basic_simplernn_dynamic_2(self):
|
|
|
|
class CustomisedModel(tl.models.Model):
|
|
|
|
def __init__(self):
|
|
super(CustomisedModel, self).__init__()
|
|
self.rnnlayer = tl.layers.RNN(
|
|
cell=tf.keras.layers.SimpleRNNCell(units=8, dropout=0.1), in_channels=4, return_last_output=False,
|
|
return_seq_2d=False, return_last_state=False
|
|
)
|
|
self.dense = tl.layers.Dense(in_channels=8, n_units=1)
|
|
|
|
def forward(self, x):
|
|
z = self.rnnlayer(x, return_seq_2d=True)
|
|
z = self.dense(z[-2:, :])
|
|
return z
|
|
|
|
rnn_model = CustomisedModel()
|
|
print(rnn_model)
|
|
optimizer = tf.optimizers.Adam(learning_rate=0.01)
|
|
rnn_model.train()
|
|
assert rnn_model.rnnlayer.is_train
|
|
|
|
for epoch in range(50):
|
|
with tf.GradientTape() as tape:
|
|
pred_y = rnn_model(self.data_x)
|
|
loss = tl.cost.mean_squared_error(pred_y, self.data_y)
|
|
|
|
gradients = tape.gradient(loss, rnn_model.trainable_weights)
|
|
optimizer.apply_gradients(zip(gradients, rnn_model.trainable_weights))
|
|
|
|
if (epoch + 1) % 10 == 0:
|
|
print("epoch %d, loss %f" % (epoch, loss))
|
|
|
|
def test_basic_simplernn_dynamic_3(self):
|
|
|
|
class CustomisedModel(tl.models.Model):
|
|
|
|
def __init__(self):
|
|
super(CustomisedModel, self).__init__()
|
|
self.rnnlayer1 = tl.layers.RNN(
|
|
cell=tf.keras.layers.SimpleRNNCell(units=8, dropout=0.1), in_channels=4, return_last_output=True,
|
|
return_last_state=True
|
|
)
|
|
self.rnnlayer2 = tl.layers.RNN(
|
|
cell=tf.keras.layers.SimpleRNNCell(units=8, dropout=0.1), in_channels=4, return_last_output=True,
|
|
return_last_state=False
|
|
)
|
|
self.dense = tl.layers.Dense(in_channels=8, n_units=1)
|
|
|
|
def forward(self, x):
|
|
_, state = self.rnnlayer1(x[:, :2, :])
|
|
z = self.rnnlayer2(x[:, 2:, :], initial_state=state)
|
|
z = self.dense(z)
|
|
return z
|
|
|
|
rnn_model = CustomisedModel()
|
|
print(rnn_model)
|
|
optimizer = tf.optimizers.Adam(learning_rate=0.01)
|
|
rnn_model.train()
|
|
assert rnn_model.rnnlayer1.is_train
|
|
assert rnn_model.rnnlayer2.is_train
|
|
|
|
for epoch in range(50):
|
|
with tf.GradientTape() as tape:
|
|
pred_y = rnn_model(self.data_x)
|
|
loss = tl.cost.mean_squared_error(pred_y, self.data_y)
|
|
|
|
gradients = tape.gradient(loss, rnn_model.trainable_weights)
|
|
optimizer.apply_gradients(zip(gradients, rnn_model.trainable_weights))
|
|
|
|
if (epoch + 1) % 10 == 0:
|
|
print("epoch %d, loss %f" % (epoch, loss))
|
|
|
|
def test_basic_lstmrnn(self):
|
|
|
|
inputs = tl.layers.Input([self.batch_size, self.num_steps, self.embedding_size])
|
|
rnnlayer = tl.layers.RNN(
|
|
cell=tf.keras.layers.LSTMCell(units=self.hidden_size, dropout=0.1), return_last_output=True,
|
|
return_seq_2d=False, return_last_state=True
|
|
)
|
|
rnn, rnn_state = rnnlayer(inputs)
|
|
outputs = tl.layers.Dense(n_units=1)(rnn)
|
|
rnn_model = tl.models.Model(inputs=inputs, outputs=[outputs, rnn_state[0], rnn_state[1]])
|
|
print(rnn_model)
|
|
|
|
optimizer = tf.optimizers.Adam(learning_rate=0.01)
|
|
|
|
rnn_model.train()
|
|
|
|
for epoch in range(50):
|
|
with tf.GradientTape() as tape:
|
|
pred_y, final_h, final_c = rnn_model(self.data_x)
|
|
loss = tl.cost.mean_squared_error(pred_y, self.data_y)
|
|
|
|
gradients = tape.gradient(loss, rnn_model.trainable_weights)
|
|
optimizer.apply_gradients(zip(gradients, rnn_model.trainable_weights))
|
|
|
|
if (epoch + 1) % 10 == 0:
|
|
print("epoch %d, loss %f" % (epoch, loss))
|
|
|
|
def test_basic_lstmrnn_class(self):
|
|
|
|
inputs = tl.layers.Input([self.batch_size, self.num_steps, self.embedding_size])
|
|
rnnlayer = tl.layers.LSTMRNN(
|
|
units=self.hidden_size, dropout=0.1, return_last_output=True, return_seq_2d=False, return_last_state=True
|
|
)
|
|
rnn, rnn_state = rnnlayer(inputs)
|
|
outputs = tl.layers.Dense(n_units=1)(rnn)
|
|
rnn_model = tl.models.Model(inputs=inputs, outputs=[outputs, rnn_state[0], rnn_state[1]])
|
|
print(rnn_model)
|
|
|
|
optimizer = tf.optimizers.Adam(learning_rate=0.01)
|
|
|
|
rnn_model.train()
|
|
|
|
for epoch in range(50):
|
|
with tf.GradientTape() as tape:
|
|
pred_y, final_h, final_c = rnn_model(self.data_x)
|
|
loss = tl.cost.mean_squared_error(pred_y, self.data_y)
|
|
|
|
gradients = tape.gradient(loss, rnn_model.trainable_weights)
|
|
optimizer.apply_gradients(zip(gradients, rnn_model.trainable_weights))
|
|
|
|
if (epoch + 1) % 10 == 0:
|
|
print("epoch %d, loss %f" % (epoch, loss))
|
|
|
|
def test_basic_grurnn(self):
|
|
|
|
inputs = tl.layers.Input([self.batch_size, self.num_steps, self.embedding_size])
|
|
rnnlayer = tl.layers.RNN(
|
|
cell=tf.keras.layers.GRUCell(units=self.hidden_size, dropout=0.1), return_last_output=True,
|
|
return_seq_2d=False, return_last_state=True
|
|
)
|
|
rnn, rnn_state = rnnlayer(inputs)
|
|
outputs = tl.layers.Dense(n_units=1)(rnn)
|
|
rnn_model = tl.models.Model(inputs=inputs, outputs=[outputs, rnn_state[0]])
|
|
print(rnn_model)
|
|
|
|
optimizer = tf.optimizers.Adam(learning_rate=0.01)
|
|
|
|
rnn_model.train()
|
|
|
|
for epoch in range(50):
|
|
with tf.GradientTape() as tape:
|
|
pred_y, final_h = rnn_model(self.data_x)
|
|
loss = tl.cost.mean_squared_error(pred_y, self.data_y)
|
|
|
|
gradients = tape.gradient(loss, rnn_model.trainable_weights)
|
|
optimizer.apply_gradients(zip(gradients, rnn_model.trainable_weights))
|
|
|
|
if (epoch + 1) % 10 == 0:
|
|
print("epoch %d, loss %f" % (epoch, loss))
|
|
|
|
def test_basic_grurnn_class(self):
|
|
|
|
inputs = tl.layers.Input([self.batch_size, self.num_steps, self.embedding_size])
|
|
rnnlayer = tl.layers.GRURNN(
|
|
units=self.hidden_size, dropout=0.1, return_last_output=True, return_seq_2d=False, return_last_state=True
|
|
)
|
|
rnn, rnn_state = rnnlayer(inputs)
|
|
outputs = tl.layers.Dense(n_units=1)(rnn)
|
|
rnn_model = tl.models.Model(inputs=inputs, outputs=[outputs, rnn_state[0]])
|
|
print(rnn_model)
|
|
|
|
optimizer = tf.optimizers.Adam(learning_rate=0.01)
|
|
|
|
rnn_model.train()
|
|
|
|
for epoch in range(50):
|
|
with tf.GradientTape() as tape:
|
|
pred_y, final_h = rnn_model(self.data_x)
|
|
loss = tl.cost.mean_squared_error(pred_y, self.data_y)
|
|
|
|
gradients = tape.gradient(loss, rnn_model.trainable_weights)
|
|
optimizer.apply_gradients(zip(gradients, rnn_model.trainable_weights))
|
|
|
|
if (epoch + 1) % 10 == 0:
|
|
print("epoch %d, loss %f" % (epoch, loss))
|
|
|
|
def test_basic_birnn_simplernncell(self):
|
|
|
|
inputs = tl.layers.Input([self.batch_size, self.num_steps, self.embedding_size])
|
|
rnnlayer = tl.layers.BiRNN(
|
|
fw_cell=tf.keras.layers.SimpleRNNCell(units=self.hidden_size, dropout=0.1),
|
|
bw_cell=tf.keras.layers.SimpleRNNCell(units=self.hidden_size + 1,
|
|
dropout=0.1), return_seq_2d=True, return_last_state=True
|
|
)
|
|
rnn, rnn_fw_state, rnn_bw_state = rnnlayer(inputs)
|
|
dense = tl.layers.Dense(n_units=1)(rnn)
|
|
outputs = tl.layers.Reshape([-1, self.num_steps])(dense)
|
|
rnn_model = tl.models.Model(inputs=inputs, outputs=[outputs, rnn, rnn_fw_state[0], rnn_bw_state[0]])
|
|
print(rnn_model)
|
|
|
|
optimizer = tf.optimizers.Adam(learning_rate=0.01)
|
|
|
|
rnn_model.train()
|
|
assert rnnlayer.is_train
|
|
|
|
for epoch in range(50):
|
|
with tf.GradientTape() as tape:
|
|
pred_y, r, rfw, rbw = rnn_model(self.data_x)
|
|
loss = tl.cost.mean_squared_error(pred_y, self.data_y2)
|
|
|
|
self.assertEqual(
|
|
r.get_shape().as_list(), [self.batch_size * self.num_steps, self.hidden_size + self.hidden_size + 1]
|
|
)
|
|
gradients = tape.gradient(loss, rnn_model.trainable_weights)
|
|
optimizer.apply_gradients(zip(gradients, rnn_model.trainable_weights))
|
|
|
|
if (epoch + 1) % 10 == 0:
|
|
print("epoch %d, loss %f" % (epoch, loss))
|
|
|
|
def test_basic_birnn_lstmcell(self):
|
|
|
|
inputs = tl.layers.Input([self.batch_size, self.num_steps, self.embedding_size])
|
|
rnnlayer = tl.layers.BiRNN(
|
|
fw_cell=tf.keras.layers.LSTMCell(units=self.hidden_size, dropout=0.1),
|
|
bw_cell=tf.keras.layers.LSTMCell(units=self.hidden_size + 1,
|
|
dropout=0.1), return_seq_2d=False, return_last_state=True
|
|
)
|
|
rnn, rnn_fw_state, rnn_bw_state = rnnlayer(inputs)
|
|
din = tl.layers.Reshape([-1, self.hidden_size + self.hidden_size + 1])(rnn)
|
|
dense = tl.layers.Dense(n_units=1)(din)
|
|
outputs = tl.layers.Reshape([-1, self.num_steps])(dense)
|
|
rnn_model = tl.models.Model(inputs=inputs, outputs=[outputs, rnn, rnn_fw_state[0], rnn_bw_state[0]])
|
|
print(rnn_model)
|
|
|
|
optimizer = tf.optimizers.Adam(learning_rate=0.01)
|
|
|
|
rnn_model.train()
|
|
assert rnnlayer.is_train
|
|
|
|
for epoch in range(50):
|
|
with tf.GradientTape() as tape:
|
|
pred_y, r, rfw, rbw = rnn_model(self.data_x)
|
|
loss = tl.cost.mean_squared_error(pred_y, self.data_y2)
|
|
|
|
self.assertEqual(
|
|
r.get_shape().as_list(), [self.batch_size, self.num_steps, self.hidden_size + self.hidden_size + 1]
|
|
)
|
|
gradients = tape.gradient(loss, rnn_model.trainable_weights)
|
|
optimizer.apply_gradients(zip(gradients, rnn_model.trainable_weights))
|
|
|
|
if (epoch + 1) % 10 == 0:
|
|
print("epoch %d, loss %f" % (epoch, loss))
|
|
|
|
def test_basic_birnn_grucell(self):
|
|
|
|
class CustomisedModel(tl.models.Model):
|
|
|
|
def __init__(self):
|
|
super(CustomisedModel, self).__init__()
|
|
self.rnnlayer = tl.layers.BiRNN(
|
|
fw_cell=tf.keras.layers.GRUCell(units=8,
|
|
dropout=0.1), bw_cell=tf.keras.layers.GRUCell(units=8, dropout=0.1),
|
|
in_channels=4, return_seq_2d=False, return_last_state=False
|
|
)
|
|
self.dense = tl.layers.Dense(in_channels=16, n_units=1)
|
|
self.reshape = tl.layers.Reshape([-1, 6])
|
|
|
|
def forward(self, x):
|
|
z = self.rnnlayer(x, return_seq_2d=True)
|
|
z = self.dense(z)
|
|
z = self.reshape(z)
|
|
return z
|
|
|
|
rnn_model = CustomisedModel()
|
|
print(rnn_model)
|
|
optimizer = tf.optimizers.Adam(learning_rate=0.01)
|
|
rnn_model.train()
|
|
|
|
for epoch in range(50):
|
|
with tf.GradientTape() as tape:
|
|
pred_y = rnn_model(self.data_x)
|
|
loss = tl.cost.mean_squared_error(pred_y, self.data_y)
|
|
|
|
gradients = tape.gradient(loss, rnn_model.trainable_weights)
|
|
optimizer.apply_gradients(zip(gradients, rnn_model.trainable_weights))
|
|
|
|
if (epoch + 1) % 10 == 0:
|
|
print("epoch %d, loss %f" % (epoch, loss))
|
|
|
|
def test_stack_simplernn(self):
|
|
|
|
inputs = tl.layers.Input([self.batch_size, self.num_steps, self.embedding_size])
|
|
rnnlayer1 = tl.layers.RNN(
|
|
cell=tf.keras.layers.SimpleRNNCell(units=self.hidden_size, dropout=0.1), return_last_output=False,
|
|
return_seq_2d=False, return_last_state=False
|
|
)
|
|
rnn1 = rnnlayer1(inputs)
|
|
rnnlayer2 = tl.layers.RNN(
|
|
cell=tf.keras.layers.SimpleRNNCell(units=self.hidden_size, dropout=0.1), return_last_output=True,
|
|
return_seq_2d=False, return_last_state=False
|
|
)
|
|
rnn2 = rnnlayer2(rnn1)
|
|
outputs = tl.layers.Dense(n_units=1)(rnn2)
|
|
rnn_model = tl.models.Model(inputs=inputs, outputs=outputs)
|
|
print(rnn_model)
|
|
|
|
optimizer = tf.optimizers.Adam(learning_rate=0.01)
|
|
|
|
rnn_model.train()
|
|
assert rnnlayer1.is_train
|
|
assert rnnlayer2.is_train
|
|
|
|
for epoch in range(50):
|
|
with tf.GradientTape() as tape:
|
|
pred_y = rnn_model(self.data_x)
|
|
loss = tl.cost.mean_squared_error(pred_y, self.data_y)
|
|
|
|
gradients = tape.gradient(loss, rnn_model.trainable_weights)
|
|
optimizer.apply_gradients(zip(gradients, rnn_model.trainable_weights))
|
|
|
|
if (epoch + 1) % 10 == 0:
|
|
print("epoch %d, loss %f" % (epoch, loss))
|
|
|
|
def test_stack_birnn_simplernncell(self):
|
|
|
|
inputs = tl.layers.Input([self.batch_size, self.num_steps, self.embedding_size])
|
|
rnnlayer = tl.layers.BiRNN(
|
|
fw_cell=tf.keras.layers.SimpleRNNCell(units=self.hidden_size, dropout=0.1),
|
|
bw_cell=tf.keras.layers.SimpleRNNCell(units=self.hidden_size + 1,
|
|
dropout=0.1), return_seq_2d=False, return_last_state=False
|
|
)
|
|
rnn = rnnlayer(inputs)
|
|
rnnlayer2 = tl.layers.BiRNN(
|
|
fw_cell=tf.keras.layers.SimpleRNNCell(units=self.hidden_size, dropout=0.1),
|
|
bw_cell=tf.keras.layers.SimpleRNNCell(units=self.hidden_size + 1,
|
|
dropout=0.1), return_seq_2d=True, return_last_state=False
|
|
)
|
|
rnn2 = rnnlayer2(rnn)
|
|
dense = tl.layers.Dense(n_units=1)(rnn2)
|
|
outputs = tl.layers.Reshape([-1, self.num_steps])(dense)
|
|
rnn_model = tl.models.Model(inputs=inputs, outputs=outputs)
|
|
print(rnn_model)
|
|
|
|
optimizer = tf.optimizers.Adam(learning_rate=0.01)
|
|
|
|
rnn_model.train()
|
|
assert rnnlayer.is_train
|
|
assert rnnlayer2.is_train
|
|
|
|
for epoch in range(50):
|
|
with tf.GradientTape() as tape:
|
|
pred_y = rnn_model(self.data_x)
|
|
loss = tl.cost.mean_squared_error(pred_y, self.data_y2)
|
|
|
|
gradients = tape.gradient(loss, rnn_model.trainable_weights)
|
|
optimizer.apply_gradients(zip(gradients, rnn_model.trainable_weights))
|
|
|
|
if (epoch + 1) % 10 == 0:
|
|
print("epoch %d, loss %f" % (epoch, loss))
|
|
|
|
def test_basic_simplernn_dropout_1(self):
|
|
|
|
inputs = tl.layers.Input([self.batch_size, self.num_steps, self.embedding_size])
|
|
rnnlayer = tl.layers.RNN(
|
|
cell=tf.keras.layers.SimpleRNNCell(units=self.hidden_size, dropout=0.5), return_last_output=True,
|
|
return_seq_2d=False, return_last_state=False
|
|
)
|
|
rnn = rnnlayer(inputs)
|
|
outputs = tl.layers.Dense(n_units=1)(rnn)
|
|
rnn_model = tl.models.Model(inputs=inputs, outputs=[outputs, rnn])
|
|
print(rnn_model)
|
|
|
|
rnn_model.train()
|
|
assert rnnlayer.is_train
|
|
|
|
pred_y, rnn_1 = rnn_model(self.data_x)
|
|
pred_y, rnn_2 = rnn_model(self.data_x)
|
|
self.assertFalse(np.allclose(rnn_1, rnn_2))
|
|
|
|
rnn_model.eval()
|
|
assert not rnnlayer.is_train
|
|
|
|
pred_y_1, rnn_1 = rnn_model(self.data_x)
|
|
pred_y_2, rnn_2 = rnn_model(self.data_x)
|
|
self.assertTrue(np.allclose(rnn_1, rnn_2))
|
|
|
|
def test_basic_simplernn_dropout_2(self):
|
|
|
|
inputs = tl.layers.Input([self.batch_size, self.num_steps, self.embedding_size])
|
|
rnnlayer = tl.layers.RNN(
|
|
cell=tf.keras.layers.SimpleRNNCell(units=self.hidden_size, recurrent_dropout=0.5), return_last_output=True,
|
|
return_seq_2d=False, return_last_state=False
|
|
)
|
|
rnn = rnnlayer(inputs)
|
|
outputs = tl.layers.Dense(n_units=1)(rnn)
|
|
rnn_model = tl.models.Model(inputs=inputs, outputs=[outputs, rnn])
|
|
print(rnn_model)
|
|
|
|
rnn_model.train()
|
|
assert rnnlayer.is_train
|
|
|
|
pred_y, rnn_1 = rnn_model(self.data_x)
|
|
pred_y, rnn_2 = rnn_model(self.data_x)
|
|
self.assertFalse(np.allclose(rnn_1, rnn_2))
|
|
|
|
rnn_model.eval()
|
|
assert not rnnlayer.is_train
|
|
|
|
pred_y_1, rnn_1 = rnn_model(self.data_x)
|
|
pred_y_2, rnn_2 = rnn_model(self.data_x)
|
|
self.assertTrue(np.allclose(rnn_1, rnn_2))
|
|
|
|
def test_basic_birnn_simplernn_dropout_1(self):
|
|
|
|
inputs = tl.layers.Input([self.batch_size, self.num_steps, self.embedding_size])
|
|
rnnlayer = tl.layers.BiRNN(
|
|
fw_cell=tf.keras.layers.SimpleRNNCell(units=self.hidden_size, dropout=0.5),
|
|
bw_cell=tf.keras.layers.SimpleRNNCell(units=self.hidden_size,
|
|
dropout=0.5), return_seq_2d=True, return_last_state=False
|
|
)
|
|
rnn = rnnlayer(inputs)
|
|
outputs = tl.layers.Dense(n_units=1)(rnn)
|
|
rnn_model = tl.models.Model(inputs=inputs, outputs=[outputs, rnn])
|
|
print(rnn_model)
|
|
|
|
rnn_model.train()
|
|
assert rnnlayer.is_train
|
|
|
|
pred_y, rnn_1 = rnn_model(self.data_x)
|
|
pred_y, rnn_2 = rnn_model(self.data_x)
|
|
self.assertFalse(np.allclose(rnn_1, rnn_2))
|
|
|
|
rnn_model.eval()
|
|
assert not rnnlayer.is_train
|
|
|
|
pred_y_1, rnn_1 = rnn_model(self.data_x)
|
|
pred_y_2, rnn_2 = rnn_model(self.data_x)
|
|
self.assertTrue(np.allclose(rnn_1, rnn_2))
|
|
|
|
def test_basic_birnn_simplernn_dropout_2(self):
|
|
|
|
inputs = tl.layers.Input([self.batch_size, self.num_steps, self.embedding_size])
|
|
rnnlayer = tl.layers.BiRNN(
|
|
fw_cell=tf.keras.layers.SimpleRNNCell(units=self.hidden_size, recurrent_dropout=0.5),
|
|
bw_cell=tf.keras.layers.SimpleRNNCell(units=self.hidden_size,
|
|
recurrent_dropout=0.5), return_seq_2d=True, return_last_state=False
|
|
)
|
|
rnn = rnnlayer(inputs)
|
|
outputs = tl.layers.Dense(n_units=1)(rnn)
|
|
rnn_model = tl.models.Model(inputs=inputs, outputs=[outputs, rnn])
|
|
print(rnn_model)
|
|
|
|
rnn_model.train()
|
|
assert rnnlayer.is_train
|
|
|
|
pred_y, rnn_1 = rnn_model(self.data_x)
|
|
pred_y, rnn_2 = rnn_model(self.data_x)
|
|
self.assertFalse(np.allclose(rnn_1, rnn_2))
|
|
|
|
rnn_model.eval()
|
|
assert not rnnlayer.is_train
|
|
|
|
pred_y_1, rnn_1 = rnn_model(self.data_x)
|
|
pred_y_2, rnn_2 = rnn_model(self.data_x)
|
|
self.assertTrue(np.allclose(rnn_1, rnn_2))
|
|
|
|
def test_sequence_length(self):
|
|
data = [[[1], [2], [0], [0], [0]], [[1], [2], [3], [0], [0]], [[1], [2], [6], [1], [0]]]
|
|
data = tf.convert_to_tensor(data, dtype=tf.float32)
|
|
length = tl.layers.retrieve_seq_length_op(data)
|
|
print(length)
|
|
data = [
|
|
[[1, 2], [2, 2], [1, 2], [1, 2], [0, 0]], [[2, 3], [2, 4], [3, 2], [0, 0], [0, 0]],
|
|
[[3, 3], [2, 2], [5, 3], [1, 2], [0, 0]]
|
|
]
|
|
data = tf.convert_to_tensor(data, dtype=tf.float32)
|
|
length = tl.layers.retrieve_seq_length_op(data)
|
|
print(length)
|
|
|
|
def test_sequence_length2(self):
|
|
data = [[1, 2, 0, 0, 0], [1, 2, 3, 0, 0], [1, 2, 6, 1, 0]]
|
|
data = tf.convert_to_tensor(data, dtype=tf.float32)
|
|
length = tl.layers.retrieve_seq_length_op2(data)
|
|
print(length)
|
|
|
|
def test_sequence_length3(self):
|
|
data = [[[1], [2], [0], [0], [0]], [[1], [2], [3], [0], [0]], [[1], [2], [6], [1], [0]]]
|
|
data = tf.convert_to_tensor(data, dtype=tf.float32)
|
|
length = tl.layers.retrieve_seq_length_op3(data)
|
|
print(length)
|
|
data = [
|
|
[[1, 2], [2, 2], [1, 2], [1, 2], [0, 0]], [[2, 3], [2, 4], [3, 2], [0, 0], [0, 0]],
|
|
[[3, 3], [2, 2], [5, 3], [1, 2], [0, 0]]
|
|
]
|
|
data = tf.convert_to_tensor(data, dtype=tf.float32)
|
|
length = tl.layers.retrieve_seq_length_op3(data)
|
|
print(length)
|
|
data = [[1, 2, 0, 0, 0], [1, 2, 3, 0, 0], [1, 2, 6, 1, 0]]
|
|
data = tf.convert_to_tensor(data, dtype=tf.float32)
|
|
length = tl.layers.retrieve_seq_length_op3(data)
|
|
print(length)
|
|
data = [
|
|
['hello', 'world', '', '', ''], ['hello', 'world', 'tensorlayer', '', ''],
|
|
['hello', 'world', 'tensorlayer', '2.0', '']
|
|
]
|
|
data = tf.convert_to_tensor(data, dtype=tf.string)
|
|
length = tl.layers.retrieve_seq_length_op3(data, pad_val='')
|
|
print(length)
|
|
|
|
try:
|
|
data = [1, 2, 0, 0, 0]
|
|
data = tf.convert_to_tensor(data, dtype=tf.float32)
|
|
length = tl.layers.retrieve_seq_length_op3(data)
|
|
print(length)
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
try:
|
|
data = np.random.random([4, 2, 6, 2])
|
|
data = tf.convert_to_tensor(data, dtype=tf.float32)
|
|
length = tl.layers.retrieve_seq_length_op3(data)
|
|
print(length)
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
def test_target_mask_op(self):
|
|
fail_flag = False
|
|
data = [
|
|
['hello', 'world', '', '', ''], ['hello', 'world', 'tensorlayer', '', ''],
|
|
['hello', 'world', 'tensorlayer', '2.0', '']
|
|
]
|
|
try:
|
|
tl.layers.target_mask_op(data, pad_val='')
|
|
fail_flag = True
|
|
except AttributeError as e:
|
|
print(e)
|
|
if fail_flag:
|
|
self.fail("Type error not raised")
|
|
|
|
data = tf.convert_to_tensor(data, dtype=tf.string)
|
|
mask = tl.layers.target_mask_op(data, pad_val='')
|
|
print(mask)
|
|
|
|
data = [[[1], [0], [0], [0], [0]], [[1], [2], [3], [0], [0]], [[1], [2], [0], [1], [0]]]
|
|
data = tf.convert_to_tensor(data, dtype=tf.float32)
|
|
mask = tl.layers.target_mask_op(data)
|
|
print(mask)
|
|
|
|
data = [
|
|
[[0, 0], [2, 2], [1, 2], [1, 2], [0, 0]], [[2, 3], [2, 4], [3, 2], [1, 0], [0, 0]],
|
|
[[3, 3], [0, 1], [5, 3], [1, 2], [0, 0]]
|
|
]
|
|
data = tf.convert_to_tensor(data, dtype=tf.float32)
|
|
mask = tl.layers.target_mask_op(data)
|
|
print(mask)
|
|
|
|
fail_flag = False
|
|
try:
|
|
data = [1, 2, 0, 0, 0]
|
|
data = tf.convert_to_tensor(data, dtype=tf.float32)
|
|
tl.layers.target_mask_op(data)
|
|
fail_flag = True
|
|
except ValueError as e:
|
|
print(e)
|
|
if fail_flag:
|
|
self.fail("Wrong data shape not detected.")
|
|
|
|
fail_flag = False
|
|
try:
|
|
data = np.random.random([4, 2, 6, 2])
|
|
data = tf.convert_to_tensor(data, dtype=tf.float32)
|
|
tl.layers.target_mask_op(data)
|
|
fail_flag = True
|
|
except ValueError as e:
|
|
print(e)
|
|
if fail_flag:
|
|
self.fail("Wrong data shape not detected.")
|
|
|
|
def test_dynamic_rnn(self):
|
|
batch_size = 3
|
|
num_steps = 5
|
|
embedding_size = 6
|
|
|
|
hidden_size = 4
|
|
inputs = tl.layers.Input([batch_size, num_steps, embedding_size])
|
|
|
|
rnn_layer = tl.layers.RNN(
|
|
cell=tf.keras.layers.LSTMCell(units=hidden_size, dropout=0.1), in_channels=embedding_size,
|
|
return_last_output=True, return_last_state=True
|
|
)
|
|
|
|
rnn_layer.is_train = False
|
|
|
|
print(tl.layers.retrieve_seq_length_op3(inputs))
|
|
_ = rnn_layer(inputs, sequence_length=tl.layers.retrieve_seq_length_op3(inputs))
|
|
_ = rnn_layer(inputs, sequence_length=np.array([5, 5, 5]))
|
|
|
|
# test exceptions
|
|
except_flag = False
|
|
try:
|
|
_ = rnn_layer(inputs, sequence_length=1)
|
|
except_flag = True
|
|
except TypeError as e:
|
|
print(e)
|
|
|
|
try:
|
|
_ = rnn_layer(inputs, sequence_length=["str", 1, 2])
|
|
except_flag = True
|
|
except TypeError as e:
|
|
print(e)
|
|
|
|
try:
|
|
_ = rnn_layer(inputs, sequence_length=[10, 2, 2])
|
|
except_flag = True
|
|
except ValueError as e:
|
|
print(e)
|
|
|
|
try:
|
|
_ = rnn_layer(inputs, sequence_length=[1])
|
|
except_flag = True
|
|
except ValueError as e:
|
|
print(e)
|
|
|
|
if except_flag:
|
|
self.fail("Exception not detected.")
|
|
|
|
# test warning
|
|
for _ in range(5):
|
|
_ = rnn_layer(inputs, sequence_length=[5, 5, 5], return_last_output=False, return_last_state=True)
|
|
_ = rnn_layer(inputs, sequence_length=[5, 5, 5], return_last_output=True, return_last_state=False)
|
|
|
|
x = rnn_layer(inputs, sequence_length=None, return_last_output=True, return_last_state=True)
|
|
y = rnn_layer(inputs, sequence_length=[5, 5, 5], return_last_output=True, return_last_state=True)
|
|
|
|
assert len(x) == 2
|
|
assert len(y) == 2
|
|
|
|
for i, j in zip(x, y):
|
|
self.assertTrue(np.allclose(i, j))
|
|
|
|
def test_dynamic_rnn_with_seq_len_op2(self):
|
|
data = [[[1], [2], [0], [0], [0]], [[1], [2], [3], [0], [0]], [[1], [2], [6], [1], [1]]]
|
|
data = tf.convert_to_tensor(data, dtype=tf.float32)
|
|
|
|
class DynamicRNNExample(tl.models.Model):
|
|
|
|
def __init__(self):
|
|
super(DynamicRNNExample, self).__init__()
|
|
|
|
self.rnnlayer = tl.layers.RNN(
|
|
cell=tf.keras.layers.SimpleRNNCell(units=6, dropout=0.1), in_channels=1, return_last_output=True,
|
|
return_last_state=True
|
|
)
|
|
|
|
def forward(self, x):
|
|
z0, s0 = self.rnnlayer(x, sequence_length=None)
|
|
z1, s1 = self.rnnlayer(x, sequence_length=tl.layers.retrieve_seq_length_op3(x))
|
|
z2, s2 = self.rnnlayer(x, sequence_length=tl.layers.retrieve_seq_length_op3(x), initial_state=s1)
|
|
print(z0)
|
|
print(z1)
|
|
print(z2)
|
|
print("===")
|
|
print(s0)
|
|
print(s1)
|
|
print(s2)
|
|
return z2, s2
|
|
|
|
model = DynamicRNNExample()
|
|
model.eval()
|
|
|
|
output, state = model(data)
|
|
print(output.shape)
|
|
print(state)
|
|
|
|
def test_dynamic_rnn_with_fake_data(self):
|
|
|
|
class CustomisedModel(tl.models.Model):
|
|
|
|
def __init__(self):
|
|
super(CustomisedModel, self).__init__()
|
|
self.rnnlayer = tl.layers.LSTMRNN(
|
|
units=8, dropout=0.1, in_channels=4, return_last_output=True, return_last_state=False
|
|
)
|
|
self.dense = tl.layers.Dense(in_channels=8, n_units=1)
|
|
|
|
def forward(self, x):
|
|
z = self.rnnlayer(x, sequence_length=tl.layers.retrieve_seq_length_op3(x))
|
|
z = self.dense(z[:, :])
|
|
return z
|
|
|
|
rnn_model = CustomisedModel()
|
|
print(rnn_model)
|
|
optimizer = tf.optimizers.Adam(learning_rate=0.01)
|
|
rnn_model.train()
|
|
|
|
for epoch in range(50):
|
|
with tf.GradientTape() as tape:
|
|
pred_y = rnn_model(self.data_x)
|
|
loss = tl.cost.mean_squared_error(pred_y, self.data_y)
|
|
|
|
gradients = tape.gradient(loss, rnn_model.trainable_weights)
|
|
optimizer.apply_gradients(zip(gradients, rnn_model.trainable_weights))
|
|
|
|
if (epoch + 1) % 10 == 0:
|
|
print("epoch %d, loss %f" % (epoch, loss))
|
|
|
|
filename = "dynamic_rnn.h5"
|
|
rnn_model.save_weights(filename)
|
|
|
|
# Testing saving and restoring of RNN weights
|
|
rnn_model2 = CustomisedModel()
|
|
rnn_model2.eval()
|
|
pred_y = rnn_model2(self.data_x)
|
|
loss = tl.cost.mean_squared_error(pred_y, self.data_y)
|
|
print("MODEL INIT loss %f" % (loss))
|
|
|
|
rnn_model2.load_weights(filename)
|
|
pred_y = rnn_model2(self.data_x)
|
|
loss = tl.cost.mean_squared_error(pred_y, self.data_y)
|
|
print("MODEL RESTORE W loss %f" % (loss))
|
|
|
|
import os
|
|
os.remove(filename)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
unittest.main()
|