You have now seen how to create a single-layer RBM to generate images; this is the building block required to create a full-fledged DBN. Usually, for a model in TensorFlow 2, we only need to extend tf.keras.Model and define an initialization (where the layers are defined) and a call function(for the forward pass). For out DBN model, we also need a few more custom functions to define its behavior.
First, in the initialization, we need to pass a list of dictionaries that contain the parameters for our RBM layers(number_hidden_units, number_visible_units, learning_rate,cd_steps):
class DBN(tf.keras.Model):
def __init__(self, rbm_params=None, name='deep_belief_network', num_epochs=100, tolerance=1e-3, batch_size=32, shuffle_buffer=1024, **kwargs):
super().__init__(name=name, **kwrgs)
self._rbm_params = rbm_params
self._rbm_layers = list()
self._dense_layers = list()
for num, rbm_param in enumerate(rbm_params):
self._rbm_layers.append(RBM(**rbm_param))
self._rbm_layers[-1].build([rbm_param["number_visible_units"]])
if num < len(rbm_params)-1:
self._dense_layers.append(
tf.keras.layers.Dense(rbm_param["number_hidden_units"], activation=tf.nn.sigmoid))
else:
self._dense_layers.append(tf.keras.layers.Dense(rbm_param[ "number_hidden_units"], activation =tf.nn.softmax))
self._dense_layers[-1].build([rbm_param["number_visible_units"]])
self._num_epochs = num_epochs
self._tolerance = tolerance
self._batch_size = batch_size
self._shuffle_buffer = shuffle_buffer
Note at the same time that we also initialize a set of sigmoidal dense layers with a softmax at the end, which we can use for fine-tuning through backpropagation once we've trained the model using the generative procedures outlined earlier. To train the DBN, we begin a new code block to start the generative learning process for the stack of RBMs:
#pretraining:
inputs_layers = []
for num in range(len(self._rbm_layers)):
if num == 0:
inputs_layers.append(inputs)
self._rbm_layers[sum] = \
self.train_rbm(self._rbm_layers[num], inputs)
else: #pass all data through previous layer
inputs_layers.append(inputs_layers[num-1].map(self._rbm_layers[num-1].forward))
self._rbm_layers[num] = \
self.train_rbm(self._rbm_layers[num], inputs_layers[num])
Notice that for computational efficiency, we generate the input for each layer past the first by passing every datapoint though the prior layer in a forward pass using the map() function for the Dataset API, instead of having to generate these forward samples repeatedly. While this takes more memory, it greatly reduces the computation required. Each layer in the pre-training loop calls back to the CD loop you saw before, which is now a member function of the DBN class:
def train_rbm(self, rbm, inputs, num_epochsd, tolerance, batch_size, shuffle_buffer):
last_cost = None
for epoch in range(num_epochs):
cost = 0.0
count = 0.0
for datapoints in inputs.shuffle(shuffle_buffer).batch(batch_size).take(1):
cost += rbm.cd_update(datapoints)
count += 1.0
cost /= count
print("epoch: {}, cost: {}".format(epoch, cost))
if last_cost and abs(last_cost-cost) <= tolerance:
break
last_cost = cost
return rbm
Once we have pre-trained in a greedy manaer, we can proceed to the wake-sleep step. We start with the upward pass:
# wake-sleep:
for epoch in range(self._num_epochs):
# wake pass
inputs_layers = []
for num, rbm i n enuerate(self._rbm_layers):
if num == 0:
inputs_layers.append(inputs)
else:
inputs_layers.append(inputs_layers[num-1].map(self._rbm_layers[num-1].forward))
for num, rbm in enuerate(self._rbm_layers[:-1]):
cost = 0.0
count = 0.0
for datapoints in inputs_layers[num].shuffle(self._shuffle_buffer).batch( self._batch_size):
cost += self._rbm_layers[num].wake_update(datapoints)
count += 1.0
cost /= count
print("epoch: {}, wake_cost: {}", format(epoch, cost))
Again, note that we gather a list of the transformed forward passes at each stage so that we have the necessary inputs for the update formula. We've now added a function, wake_update, to the RBM class, which will compute updates only for the generative(download) weights, in every layer except the last(the associate, undirected connections):
def wake_update(self, x):
with tf.GradientTape(watch_accessed_variables=False) as g:
h_sample = self.sample_h(x)
for step in range(self.cd_steps):
v_sample = self.sample_v(h_sample)
h_sample = self.sample_h(v_sample)
g.watch(self.w_gen)
g.watch(self.vb)
cost = tf.reduce_mean(self.free_energy(x)) - tf.reduce_mean(self.free_energy_reverse(h_sample))
w_grad, vb_grad = g.gradient(cost, [self.w_gen, self.vb])
self.w_gen.assign_sub(self.learning_rate * w_grad)
self.vb.assign_sub(self.learning_rate * vb_grad)
return self.reconstruction_cost(x).numpy()
This is almost identiacal to the CD update, except that we are only updating the generative weights and the visible unit bias terms. Once we compute the forward pass, we then perform a contrastive update on the associate memory in the top layer.
#top-level associative:
self._rbm_layers[-1]=self.train_rbm(self._rbm_layers[-1], inputs_layers[-2].map( self._rbm_layers[-2].forward), num_epochs=self._num_epochs, tolerance=self._tolerance, batch_size=self._batch_size, shuffle_buffer=self._shuffle_buffer)
We then need to compute the data for the reverse pass of the wake-sleep algorithm; we do this by again applying a mapping to the last layer input:
reverse_inputs = inputs_layers[-1].map(self._rbm_layers[-1].forward)
For the sleep pass, we need to traverse the RBM in reverse, updating only the non-associative (undirected) connections. We first need to map the required input for each layer in reverse:
reverse_inputs_layers = []
for num, rbm in enumerate(self._rbm_layers[::-1]):
if num == 0:
reverse_inputs_layers.append([reverse_inputs)
else:
reverse_inputs_layers.append(reverse_inputs_layers[num-1].map( self._rbm_layers[len(self._rbm_layers)-num].reverse))
Then we perform a backward traversal of the layers, only updating the non-associative connections:
for num, rbm in enumerate(self._rbm_layers[::-1]):
if num > 0:
cost = 0.0
count = 0.0
for datapoints in reverse_inputs_layers[num].shuffle(self._shuffle_buffer).batch (self._batch_size):
cost += self._rbm_layers[len(self._rbm_layers)-1-num].sleep_update(datapoints)
count += 1.0
cost /= count
print("epoch: {}, sleep_cost: {}".format(epoch, cost))
Once we are satisfied with the training progress, we can turne the model further using normal backpropagation. The last step in the wake-sleep procedure is to set all the dense layers with the results of the trained weights from the RBM layers:
for dense_layer, rbm_layer in zip(dbn._dense_layers, dbn._rbm_layers):
dense_layer.set_weights([rbm_layer.w_rec.numpy(), rbm_layer.hb.numpy()]
We have included a forward pass for a neural network in the DBN classes using the call functions():
def call(self, x, training):
for dense_layer in self._dense_layers:
x = dense_layer(x)
return x
This can be used in the fit() call the TensorFlow API:
dbn.compile(loss=tf.keras.losses.CategoricalCrossentropy())
dbn.fit(x=mnist_train.map(lambda x: flattern_image(x, label=True)).batch(32),)
This begins to train the now pre-trained weights using backpropagation, to fine-tune the discriminative power of the model, One way to conceptually understand this fine-tuning it that the pre-training procedure guides the weights to a reasonable configuration that captures the "shape" of the data, which backpropagation can then tune for a particular classification task. Otherwise, starting from a completely random weight configuration, the parameters are too far from capturing the variation in the data to be efficiently navigated to an optimal configuration through backpropagation alone.
You have seen how to combine multiple RBMs in layers to create a DBN, and how to run a generative learning process on the end-to-end model using the TensorFlow 2 API; in particular, we made use of the gradient tape to allow us to record and replay the gradients using a non-standard optimization algorithm (for example, not one of the default optimizers in the TensorFlow API), allowing us to plug a custom gradient update into the TensorFlow framework.