# Copyright 2019 The TensorFlow Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== """Tests for lstm_object_detection.lstm.rnn_decoder.""" from __future__ import absolute_import from __future__ import division from __future__ import print_function import numpy as np import tensorflow.compat.v1 as tf from tensorflow.contrib import layers as contrib_layers from tensorflow.contrib import rnn as contrib_rnn from lstm_object_detection.lstm import rnn_decoder class MockRnnCell(contrib_rnn.RNNCell): def __init__(self, input_size, num_units): self._input_size = input_size self._num_units = num_units self._filter_size = [3, 3] def __call__(self, inputs, state_tuple): outputs = tf.concat([inputs, state_tuple[0]], axis=3) new_state_tuple = (tf.multiply(state_tuple[0], 2), state_tuple[1]) return outputs, new_state_tuple def state_size(self): return self._num_units def output_size(self): return self._input_size + self._num_units def pre_bottleneck(self, inputs, state, input_index): with tf.variable_scope('bottleneck_%d' % input_index, reuse=tf.AUTO_REUSE): inputs = contrib_layers.separable_conv2d( tf.concat([inputs, state], 3), self._input_size, self._filter_size, depth_multiplier=1, activation_fn=tf.nn.relu6, normalizer_fn=None) return inputs class RnnDecoderTest(tf.test.TestCase): def test_rnn_decoder_single_unroll(self): batch_size = 2 num_unroll = 1 num_units = 64 width = 8 height = 10 input_channels = 128 initial_state = tf.random_normal((batch_size, width, height, num_units)) inputs = tf.random_normal([batch_size, width, height, input_channels]) rnn_cell = MockRnnCell(input_channels, num_units) outputs, states = rnn_decoder.rnn_decoder( decoder_inputs=[inputs] * num_unroll, initial_state=(initial_state, initial_state), cell=rnn_cell) self.assertEqual(len(outputs), num_unroll) self.assertEqual(len(states), num_unroll) with tf.Session() as sess: sess.run(tf.global_variables_initializer()) results = sess.run((outputs, states, inputs, initial_state)) outputs_results = results[0] states_results = results[1] inputs_results = results[2] initial_states_results = results[3] self.assertEqual(outputs_results[0].shape, (batch_size, width, height, input_channels + num_units)) self.assertAllEqual( outputs_results[0], np.concatenate((inputs_results, initial_states_results), axis=3)) self.assertEqual(states_results[0][0].shape, (batch_size, width, height, num_units)) self.assertEqual(states_results[0][1].shape, (batch_size, width, height, num_units)) self.assertAllEqual(states_results[0][0], np.multiply(initial_states_results, 2.0)) self.assertAllEqual(states_results[0][1], initial_states_results) def test_rnn_decoder_multiple_unroll(self): batch_size = 2 num_unroll = 3 num_units = 64 width = 8 height = 10 input_channels = 128 initial_state = tf.random_normal((batch_size, width, height, num_units)) inputs = tf.random_normal([batch_size, width, height, input_channels]) rnn_cell = MockRnnCell(input_channels, num_units) outputs, states = rnn_decoder.rnn_decoder( decoder_inputs=[inputs] * num_unroll, initial_state=(initial_state, initial_state), cell=rnn_cell) self.assertEqual(len(outputs), num_unroll) self.assertEqual(len(states), num_unroll) with tf.Session() as sess: sess.run(tf.global_variables_initializer()) results = sess.run((outputs, states, inputs, initial_state)) outputs_results = results[0] states_results = results[1] inputs_results = results[2] initial_states_results = results[3] for i in range(num_unroll): previous_state = ([initial_states_results, initial_states_results] if i == 0 else states_results[i - 1]) self.assertEqual( outputs_results[i].shape, (batch_size, width, height, input_channels + num_units)) self.assertAllEqual( outputs_results[i], np.concatenate((inputs_results, previous_state[0]), axis=3)) self.assertEqual(states_results[i][0].shape, (batch_size, width, height, num_units)) self.assertEqual(states_results[i][1].shape, (batch_size, width, height, num_units)) self.assertAllEqual(states_results[i][0], np.multiply(previous_state[0], 2.0)) self.assertAllEqual(states_results[i][1], previous_state[1]) class MultiInputRnnDecoderTest(tf.test.TestCase): def test_rnn_decoder_single_unroll(self): batch_size = 2 num_unroll = 1 num_units = 12 width = 8 height = 10 input_channels_large = 24 input_channels_small = 12 bottleneck_channels = 20 initial_state_c = tf.random_normal((batch_size, width, height, num_units)) initial_state_h = tf.random_normal((batch_size, width, height, num_units)) initial_state = (initial_state_c, initial_state_h) inputs_large = tf.random_normal( [batch_size, width, height, input_channels_large]) inputs_small = tf.random_normal( [batch_size, width, height, input_channels_small]) rnn_cell = MockRnnCell(bottleneck_channels, num_units) outputs, states = rnn_decoder.multi_input_rnn_decoder( decoder_inputs=[[inputs_large] * num_unroll, [inputs_small] * num_unroll], initial_state=initial_state, cell=rnn_cell, sequence_step=tf.zeros([batch_size]), pre_bottleneck=True) self.assertEqual(len(outputs), num_unroll) self.assertEqual(len(states), num_unroll) with tf.Session() as sess: sess.run(tf.global_variables_initializer()) results = sess.run( (outputs, states, inputs_large, inputs_small, initial_state)) outputs_results = results[0] states_results = results[1] initial_states_results = results[4] self.assertEqual( outputs_results[0].shape, (batch_size, width, height, bottleneck_channels + num_units)) self.assertEqual(states_results[0][0].shape, (batch_size, width, height, num_units)) self.assertEqual(states_results[0][1].shape, (batch_size, width, height, num_units)) # The first step should always update state. self.assertAllEqual(states_results[0][0], np.multiply(initial_states_results[0], 2)) self.assertAllEqual(states_results[0][1], initial_states_results[1]) def test_rnn_decoder_multiple_unroll(self): batch_size = 2 num_unroll = 3 num_units = 12 width = 8 height = 10 input_channels_large = 24 input_channels_small = 12 bottleneck_channels = 20 initial_state_c = tf.random_normal((batch_size, width, height, num_units)) initial_state_h = tf.random_normal((batch_size, width, height, num_units)) initial_state = (initial_state_c, initial_state_h) inputs_large = tf.random_normal( [batch_size, width, height, input_channels_large]) inputs_small = tf.random_normal( [batch_size, width, height, input_channels_small]) rnn_cell = MockRnnCell(bottleneck_channels, num_units) outputs, states = rnn_decoder.multi_input_rnn_decoder( decoder_inputs=[[inputs_large] * num_unroll, [inputs_small] * num_unroll], initial_state=initial_state, cell=rnn_cell, sequence_step=tf.zeros([batch_size]), pre_bottleneck=True) self.assertEqual(len(outputs), num_unroll) self.assertEqual(len(states), num_unroll) with tf.Session() as sess: sess.run(tf.global_variables_initializer()) results = sess.run( (outputs, states, inputs_large, inputs_small, initial_state)) outputs_results = results[0] states_results = results[1] initial_states_results = results[4] # The first step should always update state. self.assertAllEqual(states_results[0][0], np.multiply(initial_states_results[0], 2)) self.assertAllEqual(states_results[0][1], initial_states_results[1]) for i in range(num_unroll): self.assertEqual( outputs_results[i].shape, (batch_size, width, height, bottleneck_channels + num_units)) self.assertEqual(states_results[i][0].shape, (batch_size, width, height, num_units)) self.assertEqual(states_results[i][1].shape, (batch_size, width, height, num_units)) def test_rnn_decoder_multiple_unroll_with_skip(self): batch_size = 2 num_unroll = 5 num_units = 12 width = 8 height = 10 input_channels_large = 24 input_channels_small = 12 bottleneck_channels = 20 skip = 2 initial_state_c = tf.random_normal((batch_size, width, height, num_units)) initial_state_h = tf.random_normal((batch_size, width, height, num_units)) initial_state = (initial_state_c, initial_state_h) inputs_large = tf.random_normal( [batch_size, width, height, input_channels_large]) inputs_small = tf.random_normal( [batch_size, width, height, input_channels_small]) rnn_cell = MockRnnCell(bottleneck_channels, num_units) outputs, states = rnn_decoder.multi_input_rnn_decoder( decoder_inputs=[[inputs_large] * num_unroll, [inputs_small] * num_unroll], initial_state=initial_state, cell=rnn_cell, sequence_step=tf.zeros([batch_size]), pre_bottleneck=True, selection_strategy='SKIP%d' % skip) self.assertEqual(len(outputs), num_unroll) self.assertEqual(len(states), num_unroll) with tf.Session() as sess: sess.run(tf.global_variables_initializer()) results = sess.run( (outputs, states, inputs_large, inputs_small, initial_state)) outputs_results = results[0] states_results = results[1] initial_states_results = results[4] for i in range(num_unroll): self.assertEqual( outputs_results[i].shape, (batch_size, width, height, bottleneck_channels + num_units)) self.assertEqual(states_results[i][0].shape, (batch_size, width, height, num_units)) self.assertEqual(states_results[i][1].shape, (batch_size, width, height, num_units)) previous_state = ( initial_states_results if i == 0 else states_results[i - 1]) # State only updates during key frames if i % (skip + 1) == 0: self.assertAllEqual(states_results[i][0], np.multiply(previous_state[0], 2)) self.assertAllEqual(states_results[i][1], previous_state[1]) else: self.assertAllEqual(states_results[i][0], previous_state[0]) self.assertAllEqual(states_results[i][1], previous_state[1]) if __name__ == '__main__': tf.test.main()