Now that you have an appreciation of some of the theoretical underpinnings of the RBM, let's look at how we can implement it using the TensorFlow 2.0 library. For this purpose, we will represent the RBM as a custom layer type using the Keras layers API.
Code in this chapter was adapted to TensorFlow 2 from the origial Theano (another deep learning Python framework) code from deeplearning.net
Firstly, we extend tf.keras.layer:
from tensorflow.keras import layers
import tensorflow_probability as tfp
class RBM(layers.Layer):
def __init__(self, umger_hidden_units=10, number_sivible_units=None, learning_rate=0.1, cd_steps=1):
super().__init__()
self.number_hidden_units = number_hidden_units
self.number_visible_units = number_visible_units
self.learning_Rate = learning_Rate
self.cd_steps = cd_steps
We input a number of hidden units, visible units, a learning rate for CD updates, and the number of steps to take with each CD pass. For the layers API, we are only required to implement two functions: build() and call(). build() is executed when we call model.compile(), and is used to initialize the weights of the network, including inferring the right size of the weights given the input dimensions:
def build(self, input_shape):
if not self.number_visible_units:
self.number_visible_units = input_shape[-1]
self.w_rec = self.add_weight(shape=(self.number_visible_units, self.number_hidden_units), initializer='random_normal', trainable=True)
self.w_gen = self.add_weight(shape=(self.number_hidden_units, self.number_visible_units), initilizer='random_normal', trainable=True)
self.hb = self.add_weight(shape=(self.number_hidden_units,), initializer='random_normal', trainable=True)
self.vb = self.add_weight(shape=(self.number_visible_units, ), initializer='random_normal', trainable=True)
We also need a way to perform both forward and reverse samples from the model.
For the forward pass, we need to compute sigmoidal activations from the input, and then stochastically turn the hidden units on or off based on the activation probabliity between 1 and 0 given by that sigmoidal activation:
def forward(self, x):
return tf.sigmoid(tf.add(tf.matmul(x, self.w), self.hb))
def sample_h(self, x):
u_sample = tfp.distributions.Uniform().sample((x.shape[1], self.hb.shape[-1]))
return tf.cast((x) > u_sample, tf.float32)
Likewise, we need a way to sample in reverse for the visible units:
def reverse(self, x):
return tf.sigmoid(tf.add(tf.matmul(x, self.w_gen), self.vb))
def sample_v(self, x):
u_sample = tfp.distributions.Uniform().sample((x.shape[1], self.vb.shape[-1]))
return tf.cast(self.reverse(x) > u_sample, tf.float32)
We also implement call() in the RBM class, which provides the forward pass we would use if we were to use the fit() method of the Model API for backpropagation(which we can do for fine-tuning later in our deep belief model):
def call(self, inputs):
return tf.sigmoid(tf.add(tf.matmul(inputs, self.w), self.hb))
To actually implement CD learning for each RBM, we need to create some additional functions, The first calculates the free energy, as you saw in the Gibbs measure earlier in this chapter:
def free_energy(self, x):
return -tf.tensordot(x, self.vb, 1)\
-tf.reduce_sum(tf.math.log(1+tf.math.exp(tf.add(tf.matmul(x, self.w), self.hb))),1)
Note here that we could have used the Bernoulli distribution from tensorflow_probability in order to perform this sampling, using the sigmoidal activations as the probabilities; however, this is slow and would cause performance issues when we need to repetitively sample during CD leraning. Instead, we use a speedup sigmoidal array and then set the hidden unit as 1 if it is greater than the random number. Thus, if a sigmoidal activation is 0.9, it has a 90% probability, of being greater than a randomly sampled uniform number, and is set to "on." This has the same behavior as sampling a Bernoulli variable with a probability of 0.9, but is computationally much more efficient. The reverse and visible samples are computed similarly. Finally, putting these together allows us to perform both forward and reverse Gibbs samples:
def reverse_gibbs(self, x):
return self.sample_h(self.sample_v(x))
To perform the CD updates, we make use of TensorFlow 2's eager execution and the GradientTape API you saw in Chapter 3, Building Blocks of Deep Neural Networks:
def cd_update(self, x):
with tf.GradientTape(watch_accessed_variables=False) as g:
h_sample = self.sample_h(x)
for step in range(self.cd_steps):
v_sample = tf.constant(self.sample_v(h_sample))
h_sample = self.sample_h(v_sample)
g.watch(self.w_rec)
g.watch(self.hb)
g.watch(self.vb)
cost = tf.reduce_mean(self.free_energy(x)) - tf.reduce_mean(self.free_energy(v_sample))
w_grad, hb_grad, vb_grad = g.gradient(cost, [self.w_rec, self.hb, self.vb])
self.w_rec.assign_sub(self.learning_rate * w_grad)
self.w_gen = tf.Variable(tf.transpose(self.w_rec) #force tieing
self.hb.assign_sub(self.learning_rate * hb_grad)
self.vb.assign_sub(self.learning_rate * vb_grad)
return self.reconstruction_cost(x).numpy()
We perform one or more sample steps, and compute the cost using the differece between the free energy of the data and the reconstructed data(which is cast as a constant using tf.constant so that we don't treat it as a variable during autogradient calculation). We then compute the gradients of the three weight matrices and update their values, before returning our reconstruction cost as a way to monitor progress.
The reconstruction cost is simply the cross-entropy loss between the input and reconstructed data:
def reconstruction_cost(self, x):
return tf.reduce_mean(
tf.reduce_sum(tf.math.add(tf.math.multiply(x,tf.math.log(self.reverse(self,forward(x))),
tf.math.multiply(tf.math.subtract(1,x),tf.math.log(
tf.math.subtract(1,self.reverse(self.forwar(x))))),1),)
which represents the formula:
where y is the target label, y-hat is the estimated label from the sotmax function, and N is the number of elements in the dataset.
Note that we enforce the weights being equal by copying over the transposed value of the updated (recognition) weights into the generative weights. Keeping the two sets of weights separate will be useful later on when we perform updates only on the recognition (forward) or generative (backward) weights during the wake-sleep procedure.
Putting it all together, we can initialize an RBM with 500 units like in Hinton's paper24, call build() with the shape of the flattened MNIST digits, and run successive epochs of training:
rbm = RBM(500)
rbm.build([784])
num_epochs=100
def train_rbm(rbm=None, data=mnist_train, map_fn=flatten_image, num_epochs= 1000, tolerance=1e-3, batch_size=32, shuffle_buffer=1024):
last_cost = None
for epoch in range(num_epochs):
cost = 0.0
count = 0.0
for datapoints in data.map(map_fn).shuffle(shuffle_buffer).batch(batch_size):
cost += rbm.cd_update)datapoints)
count += 1.0
cost /= count
print("epoch: {}, cost: {}".format(epoch, cost))
if last_cost and abs(last_cost-cost) <= tolerance:
break;
last_cost = cost
return rbm
rbm = train_rbm(rbm, mnist_train, partial(flatten_iamge, label=False), 100, 0.5, 2000)
After -25 steps, the model should converge, and we can inspect the reults. One parameter of interest is the weight matrix w; the shape is 784(28*28) by 500, so we could see each "column" as a 28*28 filter, similar to the kernels in the convolutional networks we studied in Chapter 3, Building Blocks of Deep Neural Networks. We can visualize a few of these to see what kinds of patterns they are recognizing in the images:
fig, axarr = plt.subplots(10,10)
plt.axis('off')
for i in range(10):
for j in range(10):
fig.axes[i*10+j].get_xaxis().set_visible(False)
fig.axes[i*10+j].get_yaxis().set_visible(False)
axarr[i,j].imshow(rbm.w_rec.numpy()[:, i*10+j].reshape(28,28), cmap=plt.get_camp("gray"))
This provides a set of filters:
We can see that these filters appear to represent different shapes that we would find in a digit image, such as curves or lines, We can also observe the reconstruction of the images by sampling from out data:
i=0
for image, label in mnist_train.map(flatten_image).batch(1).take(10):
plt.figure(i)
plt.imshow(rbm.forward_gibbs(image).numpy().reshape(28,28).astype(np.float32), cmap=plt.get_camp("gray"))
i+=1
plt.figure(i)
plt.imshow(image.numpy().reshape(28,28).astype(np.float32), cmap=plt.get_cmap("gray"))
i+=1
We can see in Figure 4.11 that the network has nicely captured the underlying data distribution, as our samples represent a recognizable binary form of the input images. Now that we have one layer working, let's continue by combining multiple RBMs in layers to create a more powerful model.