tensorlayer3/tests/models/test_model_save_graph.py

548 lines
19 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import unittest
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
import numpy as np
import tensorflow as tf
import tensorlayer as tl
from tensorlayer.layers import *
from tensorlayer.models import *
from tests.utils import CustomTestCase
def RemoveDateInConfig(config):
config["version_info"]["save_date"] = None
return config
def basic_static_model():
ni = Input((None, 24, 24, 3))
nn = Conv2d(16, (5, 5), (1, 1), padding='SAME', act=tf.nn.relu, name="conv1")(ni)
nn = MaxPool2d((3, 3), (2, 2), padding='SAME', name='pool1')(nn)
nn = Conv2d(16, (5, 5), (1, 1), padding='SAME', act=tf.nn.relu, name="conv2")(nn)
nn = MaxPool2d((3, 3), (2, 2), padding='SAME', name='pool2')(nn)
nn = Flatten(name='flatten')(nn)
nn = Dense(100, act=None, name="dense1")(nn)
M = Model(inputs=ni, outputs=nn)
return M
class Model_Save_and_Load_without_weights(CustomTestCase):
@classmethod
def setUpClass(cls):
print("##### begin testing save_graph, load_graph, without weights #####")
def test_save(self):
M1 = basic_static_model()
print("Model config = \n", M1.config)
print("Model = \n", M1)
M1.save(filepath="basic_model_without_weights.hdf5", save_weights=False)
M2 = Model.load(filepath="basic_model_without_weights.hdf5", load_weights=False)
M1_config = RemoveDateInConfig(M1.config)
M2_config = RemoveDateInConfig(M2.config)
self.assertEqual(M1_config, M2_config)
def get_model(inputs_shape):
ni = Input(inputs_shape)
nn = Dropout(keep=0.8)(ni)
nn = Dense(n_units=800, act=tf.nn.relu,
in_channels=784)(nn) # in_channels is optional in this case as it can be inferred by the previous layer
nn = Dropout(keep=0.8)(nn)
nn = Dense(n_units=800, act=tf.nn.relu,
in_channels=800)(nn) # in_channels is optional in this case as it can be inferred by the previous layer
nn = Dropout(keep=0.8)(nn)
nn = Dense(n_units=10, act=tf.nn.relu,
in_channels=800)(nn) # in_channels is optional in this case as it can be inferred by the previous layer
M = Model(inputs=ni, outputs=nn)
return M
class Model_Save_with_weights(CustomTestCase):
@classmethod
def setUpClass(cls):
print("##### begin testing save_graph, after training, with weights #####")
def test_save(self):
tl.logging.set_verbosity(tl.logging.DEBUG)
X_train, y_train, X_val, y_val, X_test, y_test = tl.files.load_mnist_dataset(shape=(-1, 784))
MLP = get_model([None, 784])
print(MLP)
n_epoch = 3
batch_size = 500
train_weights = MLP.trainable_weights
optimizer = tf.optimizers.Adam(lr=0.0001)
for epoch in range(n_epoch): ## iterate the dataset n_epoch times
print("epoch = ", epoch)
for X_batch, y_batch in tl.iterate.minibatches(X_train, y_train, batch_size, shuffle=True):
MLP.train() # enable dropout
with tf.GradientTape() as tape:
## compute outputs
_logits = MLP(X_batch) # alternatively, you can use MLP(x, is_train=True) and remove MLP.train()
## compute loss and update model
_loss = tl.cost.cross_entropy(_logits, y_batch, name='train_loss')
grad = tape.gradient(_loss, train_weights)
optimizer.apply_gradients(zip(grad, train_weights))
MLP.eval()
val_loss, val_acc, n_iter = 0, 0, 0
for X_batch, y_batch in tl.iterate.minibatches(X_val, y_val, batch_size, shuffle=False):
_logits = MLP(X_batch) # is_train=False, disable dropout
val_loss += tl.cost.cross_entropy(_logits, y_batch, name='eval_loss')
val_acc += np.mean(np.equal(np.argmax(_logits, 1), y_batch))
n_iter += 1
print(" val loss: {}".format(val_loss / n_iter))
print(" val acc: {}".format(val_acc / n_iter))
MLP.save("MLP.hdf5")
class Model_Load_with_weights_and_train(CustomTestCase):
@classmethod
def setUpClass(cls):
print("##### begin testing load_graph, after training, with weights, and train again #####")
def test_save(self):
MLP = Model.load("MLP.hdf5", )
MLP.eval()
n_epoch = 3
batch_size = 500
train_weights = MLP.trainable_weights
optimizer = tf.optimizers.Adam(lr=0.0001)
X_train, y_train, X_val, y_val, X_test, y_test = tl.files.load_mnist_dataset(shape=(-1, 784))
val_loss, val_acc, n_iter = 0, 0, 0
for X_batch, y_batch in tl.iterate.minibatches(X_val, y_val, batch_size, shuffle=False):
_logits = MLP(X_batch) # is_train=False, disable dropout
val_loss += tl.cost.cross_entropy(_logits, y_batch, name='eval_loss')
val_acc += np.mean(np.equal(np.argmax(_logits, 1), y_batch))
n_iter += 1
print(" val loss: {}".format(val_loss / n_iter))
print(" val acc: {}".format(val_acc / n_iter))
assert val_acc > 0.7
for epoch in range(n_epoch): ## iterate the dataset n_epoch times
print("epoch = ", epoch)
for X_batch, y_batch in tl.iterate.minibatches(X_train, y_train, batch_size, shuffle=True):
MLP.train() # enable dropout
with tf.GradientTape() as tape:
## compute outputs
_logits = MLP(X_batch) # alternatively, you can use MLP(x, is_train=True) and remove MLP.train()
## compute loss and update model
_loss = tl.cost.cross_entropy(_logits, y_batch, name='train_loss')
grad = tape.gradient(_loss, train_weights)
optimizer.apply_gradients(zip(grad, train_weights))
MLP.save("MLP.hdf5")
def create_base_network(input_shape):
'''Base network to be shared (eq. to feature extraction).
'''
input = Input(shape=input_shape)
x = Flatten()(input)
x = Dense(128, act=tf.nn.relu)(x)
x = Dropout(0.9)(x)
x = Dense(128, act=tf.nn.relu)(x)
x = Dropout(0.9)(x)
x = Dense(128, act=tf.nn.relu)(x)
return Model(input, x)
def get_siamese_network(input_shape):
"""Create siamese network with shared base network as layer
"""
base_layer = create_base_network(input_shape).as_layer()
ni_1 = Input(input_shape)
ni_2 = Input(input_shape)
nn_1 = base_layer(ni_1)
nn_2 = base_layer(ni_2)
return Model(inputs=[ni_1, ni_2], outputs=[nn_1, nn_2])
class Reuse_ModelLayer_test(CustomTestCase):
@classmethod
def setUpClass(cls):
print("##### begin testing save_graph, load_graph, including ModelLayer and reuse #####")
def test_save(self):
input_shape = (None, 784)
M1 = get_siamese_network(input_shape)
print("Model config = \n", M1.config)
print("Model = \n", M1)
M1.save(filepath="siamese.hdf5", save_weights=False)
M2 = Model.load(filepath="siamese.hdf5", load_weights=False)
M1_config = RemoveDateInConfig(M1.config)
M2_config = RemoveDateInConfig(M2.config)
self.assertEqual(M1_config, M2_config)
class Vgg_LayerList_test(CustomTestCase):
@classmethod
def setUpClass(cls):
print("##### begin testing save_graph, load_graph, including LayerList #####")
def test_save(self):
M1 = tl.models.vgg16(mode='static')
print("Model config = \n", M1.config)
print("Model = \n", M1)
M1.save(filepath="vgg.hdf5", save_weights=False)
M2 = Model.load(filepath="vgg.hdf5", load_weights=False)
M1_config = RemoveDateInConfig(M1.config)
M2_config = RemoveDateInConfig(M2.config)
self.assertEqual(M1_config, M2_config)
class List_inputs_outputs_test(CustomTestCase):
@classmethod
def setUpClass(cls):
print("##### begin testing model with list inputs and outputs #####")
def test_list_inputs_outputs(self):
ni_1 = Input(shape=[4, 16])
ni_2 = Input(shape=[4, 32])
a_1 = Dense(80)(ni_1)
b_1 = Dense(160)(ni_2)
concat = Concat()([a_1, b_1])
a_2 = Dense(10)(concat)
b_2 = Dense(20)(concat)
M1 = Model(inputs=[ni_1, ni_2], outputs=[a_2, b_2])
print("Model config = \n", M1.config)
print("Model = \n", M1)
M1.save(filepath="list.hdf5", save_weights=False)
M2 = Model.load(filepath="list.hdf5", load_weights=False)
M1_config = RemoveDateInConfig(M1.config)
M2_config = RemoveDateInConfig(M2.config)
self.assertEqual(M1_config, M2_config)
class Lambda_layer_test(CustomTestCase):
@classmethod
def setUpClass(cls):
print("##### begin testing lambda layer #####")
def test_lambda_layer_no_para_no_args(self):
x = tl.layers.Input([8, 3], name='input')
y = tl.layers.Lambda(lambda x: 2 * x, name='lambda')(x)
M1 = tl.models.Model(x, y)
M1.save("lambda_no_para_no_args.hdf5")
M2 = tl.models.Model.load("lambda_no_para_no_args.hdf5")
print(M1)
print(M2)
M1.eval()
M2.eval()
npInput = np.zeros((8, 3)) + 3
output1 = M1(npInput).numpy()
output2 = M1(npInput).numpy()
M1_config = RemoveDateInConfig(M1.config)
M2_config = RemoveDateInConfig(M2.config)
self.assertEqual((output1 == output2).all(), True)
self.assertEqual(M1_config, M2_config)
def test_lambda_layer_no_para_with_args(self):
def customize_func(x, foo=42): # x is the inputs, foo is an argument
return foo * x
x = tl.layers.Input([8, 3], name='input')
y = tl.layers.Lambda(customize_func, fn_args={'foo': 3}, name='lambda')(x)
M1 = tl.models.Model(x, y)
M1.save("lambda_no_para_with_args.hdf5")
M2 = tl.models.Model.load("lambda_no_para_with_args.hdf5")
print(M1)
print(M2)
M1.eval()
M2.eval()
npInput = np.zeros((8, 3)) + 3
output1 = M1(npInput).numpy()
output2 = M2(npInput).numpy()
M1_config = RemoveDateInConfig(M1.config)
M2_config = RemoveDateInConfig(M2.config)
self.assertEqual((output1 == output2).all(), True)
self.assertEqual((output1 == (np.zeros((8, 3)) + 9)).all(), True)
self.assertEqual(M1_config, M2_config)
def test_lambda_layer_keras_model(self):
input_shape = [100, 5]
in_2 = tl.layers.Input(input_shape, name='input')
layers = [
tf.keras.layers.Dense(10, activation=tf.nn.relu),
tf.keras.layers.Dense(5, activation=tf.nn.sigmoid),
tf.keras.layers.Dense(1, activation=tf.nn.relu)
]
perceptron = tf.keras.Sequential(layers)
# in order to compile keras model and get trainable_variables of the keras model
_ = perceptron(np.random.random(input_shape).astype(np.float32))
plambdalayer = tl.layers.Lambda(perceptron, perceptron.trainable_variables)(in_2)
M2 = tl.models.Model(inputs=in_2, outputs=plambdalayer)
M2.save('M2_keras.hdf5')
M4 = Model.load('M2_keras.hdf5')
M2.eval()
M4.eval()
npInput = np.zeros(input_shape) + 3
output2 = M2(npInput).numpy()
output4 = M4(npInput).numpy()
M2_config = RemoveDateInConfig(M2.config)
M4_config = RemoveDateInConfig(M4.config)
self.assertEqual((output2 == output4).all(), True)
self.assertEqual(M2_config, M4_config)
ori_weights = M4.all_weights
ori_val = ori_weights[1].numpy()
modify_val = np.zeros_like(ori_val) + 10
M4.all_weights[1].assign(modify_val)
M4 = Model.load('M2_keras.hdf5')
self.assertLess(np.max(np.abs(ori_val - M4.all_weights[1].numpy())), 1e-7)
def test_lambda_layer_keras_layer(self):
input_shape = [100, 5]
in_1 = tl.layers.Input(input_shape, name='input')
denselayer = tf.keras.layers.Dense(10, activation=tf.nn.relu)
# in order to compile keras model and get trainable_variables of the keras model
_ = denselayer(np.random.random(input_shape).astype(np.float32))
dlambdalayer = tl.layers.Lambda(denselayer, denselayer.trainable_variables)(in_1)
M1 = tl.models.Model(inputs=in_1, outputs=dlambdalayer)
M1.save('M1_keras.hdf5')
M3 = Model.load('M1_keras.hdf5')
M1.eval()
M3.eval()
npInput = np.zeros(input_shape) + 3
output1 = M1(npInput).numpy()
output3 = M3(npInput).numpy()
M1_config = RemoveDateInConfig(M1.config)
M3_config = RemoveDateInConfig(M3.config)
self.assertEqual((output1 == output3).all(), True)
self.assertEqual(M1_config, M3_config)
ori_weights = M3.all_weights
ori_val = ori_weights[1].numpy()
modify_val = np.zeros_like(ori_val) + 10
M3.all_weights[1].assign(modify_val)
M3 = Model.load('M1_keras.hdf5')
self.assertLess(np.max(np.abs(ori_val - M3.all_weights[1].numpy())), 1e-7)
class ElementWise_lambda_test(CustomTestCase):
@classmethod
def setUpClass(cls):
print("##### begin testing elementwise lambda layer #####")
def test_elementwise_no_para_with_args(self):
# z = mean + noise * tf.exp(std * 0.5) + foo
def func(noise, mean, std, foo=42):
return mean + noise * tf.exp(std * 0.5) + foo
noise = tl.layers.Input([100, 1])
mean = tl.layers.Input([100, 1])
std = tl.layers.Input([100, 1])
out = tl.layers.ElementwiseLambda(fn=func, fn_args={'foo': 84}, name='elementwiselambda')([noise, mean, std])
M1 = Model(inputs=[noise, mean, std], outputs=out)
M1.save("elementwise_npwa.hdf5")
M2 = Model.load("elementwise_npwa.hdf5")
M1.eval()
M2.eval()
ipt = [np.zeros((100, 1)) + 11, np.zeros((100, 1)) + 21, np.zeros((100, 1)) + 31]
output1 = M1(ipt).numpy()
output2 = M2(ipt).numpy()
M1_config = RemoveDateInConfig(M1.config)
M2_config = RemoveDateInConfig(M2.config)
self.assertEqual((output1 == output2).all(), True)
self.assertEqual(M1_config, M2_config)
def test_elementwise_no_para_no_args(self):
# z = mean + noise * tf.exp(std * 0.5) + foo
def func(noise, mean, std, foo=42):
return mean + noise * tf.exp(std * 0.5) + foo
noise = tl.layers.Input([100, 1])
mean = tl.layers.Input([100, 1])
std = tl.layers.Input([100, 1])
out = tl.layers.ElementwiseLambda(fn=func, name='elementwiselambda')([noise, mean, std])
M1 = Model(inputs=[noise, mean, std], outputs=out)
M1.save("elementwise_npna.hdf5")
M2 = Model.load("elementwise_npna.hdf5")
M1.eval()
M2.eval()
ipt = [np.zeros((100, 1)) + 11, np.zeros((100, 1)) + 21, np.zeros((100, 1)) + 31]
output1 = M1(ipt).numpy()
output2 = M2(ipt).numpy()
M1_config = RemoveDateInConfig(M1.config)
M2_config = RemoveDateInConfig(M2.config)
self.assertEqual((output1 == output2).all(), True)
self.assertEqual(M1_config, M2_config)
def test_elementwise_lambda_func(self):
# z = mean + noise * tf.exp(std * 0.5)
noise = tl.layers.Input([100, 1])
mean = tl.layers.Input([100, 1])
std = tl.layers.Input([100, 1])
out = tl.layers.ElementwiseLambda(fn=lambda x, y, z: x + y * tf.exp(z * 0.5),
name='elementwiselambda')([noise, mean, std])
M1 = Model(inputs=[noise, mean, std], outputs=out)
M1.save("elementwise_lambda.hdf5")
M2 = Model.load("elementwise_lambda.hdf5")
M1.eval()
M2.eval()
ipt = [
(np.zeros((100, 1)) + 11).astype(np.float32), (np.zeros((100, 1)) + 21).astype(np.float32),
(np.zeros((100, 1)) + 31).astype(np.float32)
]
output1 = M1(ipt).numpy()
output2 = M2(ipt).numpy()
M1_config = RemoveDateInConfig(M1.config)
M2_config = RemoveDateInConfig(M2.config)
self.assertEqual((output1 == output2).all(), True)
self.assertEqual(M1_config, M2_config)
# # ElementwiseLambda does not support keras layer/model func yet
# def test_elementwise_keras_model(self):
# kerasinput1 = tf.keras.layers.Input(shape=(100, ))
# kerasinput2 = tf.keras.layers.Input(shape=(100, ))
# kerasconcate = tf.keras.layers.concatenate(inputs=[kerasinput1, kerasinput2])
# kerasmodel = tf.keras.models.Model(inputs=[kerasinput1, kerasinput2], outputs=kerasconcate)
# _ = kerasmodel([np.random.random([100,]).astype(np.float32), np.random.random([100,]).astype(np.float32)])
#
# input1 = tl.layers.Input([100, 1])
# input2 = tl.layers.Input([100, 1])
# out = tl.layers.ElementwiseLambda(fn=kerasmodel, name='elementwiselambda')([input1, input2])
# M1 = Model(inputs=[input1, input2], outputs=out)
# M1.save("elementwise_keras_model.hdf5")
# M2 = Model.load("elementwise_keras_model.hdf5")
#
# M1.eval()
# M2.eval()
# ipt = [np.zeros((100, 1)) + 11, np.zeros((100, 1)) + 21, np.zeros((100, 1)) + 31]
# output1 = M1(ipt).numpy()
# output2 = M2(ipt).numpy()
#
# M1_config = RemoveDateInConfig(M1.config)
# M2_config = RemoveDateInConfig(M2.config)
#
# self.assertEqual((output1 == output2).all(), True)
# self.assertEqual(M1_config, M2_config)
class basic_dynamic_model(Model):
def __init__(self):
super(basic_dynamic_model, self).__init__()
self.conv1 = Conv2d(16, (5, 5), (1, 1), padding='SAME', act=tf.nn.relu, in_channels=3, name="conv1")
self.pool1 = MaxPool2d((3, 3), (2, 2), padding='SAME', name='pool1')
self.conv2 = Conv2d(16, (5, 5), (1, 1), padding='SAME', act=tf.nn.relu, in_channels=16, name="conv2")
self.pool2 = MaxPool2d((3, 3), (2, 2), padding='SAME', name='pool2')
self.flatten = Flatten(name='flatten')
self.dense1 = Dense(100, act=None, in_channels=576, name="dense1")
self.dense2 = Dense(10, act=None, in_channels=100, name="dense2")
def forward(self, x):
x = self.conv1(x)
x = self.pool1(x)
x = self.conv2(x)
x = self.pool2(x)
x = self.flatten(x)
x = self.dense1(x)
x = self.dense2(x)
return x
class Dynamic_config_test(CustomTestCase):
@classmethod
def setUpClass(cls):
print("##### begin testing exception in dynamic mode #####")
def test_dynamic_config(self):
M1 = basic_dynamic_model()
print(M1.config)
for layer in M1.all_layers:
print(layer.config)
class Exception_test(CustomTestCase):
@classmethod
def setUpClass(cls):
print("##### begin testing exception in dynamic mode #####")
def test_exception(self):
M1 = basic_dynamic_model()
try:
M1.save("dynamic.hdf5", save_weights=False)
except Exception as e:
self.assertIsInstance(e, RuntimeError)
print(e)
M2 = basic_static_model()
M2.save("basic_static_mode.hdf5", save_weights=False)
try:
M3 = Model.load("basic_static_mode.hdf5")
except Exception as e:
self.assertIsInstance(e, RuntimeError)
print(e)
if __name__ == '__main__':
tl.logging.set_verbosity(tl.logging.DEBUG)
unittest.main()