Now that we've downloaded the CIFAR-10 dataset, split it into test and training data, and reshaped and rescaled it, we are ready to start building our VAE model. We'll use the same Model API from the Keras module in TensorFlow 2. The TensorFlow documentation contains an example of how to implement a VAE using convolutinal networks(https://www.tensorflow.org/tutorials/generative/cvae), and we'll build on this code example; however, for our purpose, we will implement simpler VAE enetwork using MLP layers based on the original VAE paper, AutoEncoding Variational Bayes, and show how we adapt the TensorFlow example to also allow for IAF modules in decoding.
In the original article, the authors propose two kinds of models for use in the VAE, both MLP feedforward networks: Gaussian and Bernoulli, with these names reflecting the probability distribution functions used in the MLP network outputs in their finals layers The Bernoulli MLP can be used so the decoder of the network, generating the simulated image x from the latent vector z. The formula for the Bernoulli MLP is:
Where the first line is the cross-entropy function we use to determine if the network generates an approximation of the original image in reconstruction, while y is a feedforward netwrok with two layers: a thanh transformation followed by a sigmoidal function to scale the output between 0 and 1. Recall that this scaling is why we had to normalize the CIFAR-10 pixels from their original values.
We can easily create this Bernoulli MLP network using the Keras API:
class BernoulliMLP(tf, keras.Model):
def __init__(self, input_shape, name='BernoulliMLP', hidden)dim=10, latent_dim=10, **kwargs):
super().__init__(name=name, **kwargs)
self._h = tf.keras.layers.Dense(hidden_dim, activation='tanh')
self._y = tf.keras.layers.Dense(latent_dim, activation='sigmoid')
def call(self, x):
return self._y(self._h(x)), None, None
We just need to specify the dimensions of the single hidden layer and the latent output(z). We then specify the forward pass as a composition of these two layers. Note that in the output, we've returned threee values, with the second two set as None. This is because in our end model, we could use either the BernoulliMLP or Gaussian MLP as the decoder. If we used the GaussianMLP, we return three values, as we will see below; the example in this chapter utilizes a binaary output and cross entropy loss so we can use just the single output, but we want the return signatures for the two decoders to match.
The second network type proposed by the authors in the original VAE paper was a Gaussian MLP, whose formulas are:
This network can be used as either the encoder (generating the latent vector z) or the decoder (generating the simulated image x) in the network. The equations above assume that it is used as the decoder, and for the encoder we just switch the x and z variables. As you can se, this network has two types of layers, a hidden layer given by a tanh transformation of the input, and two output layers, each given by linear transformations of the hidden layer, which are used as the inputs of a lognormal likelihood function. Like the Bernoulli MLP, we can easily implement this simple network using the TensorFlow Keras API:
class GaussianMLP(tf.keras.Model):
def __init__(self, input_shape, name='GaussianMLP', hidden_dim=10, latent_dim=10, iaf=False, **kwargs):
super().__init__(name=name, **kwrgs)
self._h = tf.keras.layers.Dense(hidden_dim, activation='tanh')
self._logvar = tf.keras.layers.Dense(latent_dim)
self._iaf_output = None
if iaf:
self._iaf_output = tf.keras.layers.Dense(latent_dim)
def call(self, x):
if self._laf_output:
return self._mean(self._h(x)), self._logvar(self._h(x)), self._iaf_output(self._h(x))
else:
return self._mean(self._h(x)), self._logvar(self._h(x)), None
As you can see, to implement the call function, we must return the two outputs of the model(the mean and log variance of the normal distribution we'll use to compute the likelihood of z or z). However, recall that for the IAE model, the encoder has to have an additional output h, which is fed into each step of the normalizing flow:
To allow for this additional output, we include a third variable in the output, which get set to a linear transformation of the input if we set the IAF options to True, and is none if False, so we can use the GaussianMLP as an encoder in networks both with and without IAF.
Now that we have both of our subnetworks defined, let's see how we can use them to construct a complete VAE network. Like the sub-networks, we can define the VAE using the Keras API:
class VAE(tf.keras.Modle):
def __init__(self, input_shape, name='variational_autoencoder', latent_dim=10, hidden_dim=10, encoder='GaussianMLP', decoder='BernoulliMLP', iaf_model=None, number_iaf_networks=0, iaf_params={}, num_samples=100, **kwars):
super().__init__(name=name, **kwargs)
self._latent_dim = latent_dim
self._num_samples = num_samples
self._iaf = []
if encoder == 'GaussianMLP':
self._encoder = GaussianMLP(input_shape=input_shape, latent_dim=latent_dim, iaf=(iaf_model is not None), hidden_dim=hidden_dim)
else:
raise ValueError("Unknown encoder type: {}", format(encoder))
if decoder == 'BernoulliMLP':
sekf,_decoder = BernoulliMLP(input_shape=(1, latent_dim), latent_dim=input_shape[1], hidden_dim=hidden_dim)
elif decoder == 'GaussianMLP':
self._encoder = GaussianMLP(input_shape=(1, latent_dim), latent_dim=input_shape[1], iaf=(iaf_modl is not None), hidden_dim=hidden_dim)
else:
raise ValueError("Unknown decoder type: {}", format(decoder))
if iaf_model:
self._iaf = []
for t in range(number_iaf_networks):
self._iaf.append(iaf_model(input_shape==(1, latent_dim*2), **iaf_params))
As you can see, this model is defined to contain both an encoder and decoder network. Additionally, we allow the user to specify whether we are implementing IAF as part of the model, in which case we need a stack of autoregressive trasforms specified by the iaf_params variable, Because this IAF network needs to take both z and h as inputs, the input shape is twice the size of the latent_dim(z). We allow the decoder to be either the GaussianMLP or BernoulliMLP network, while the encoder si the GaussianMLP.
There are a few other function of this model class that we need to cover; the first are simply the encoding and decoding, functions of the VAE model class:
def encode(self, x):
return self._encoder.call(x)
def decode(self, z, apply_sigmoid=False):
logits, _, _ = self._decoder.call(z)
if apply_sigmoid:
probs = tf.sigmoid(logits)
return probs
return logits
For the encoder, we simply call(run the forward pass for) the encoder network. To decode,you will notice that we specify three outputs. The article that introduced VAE models, Autoencoding Variational Bayes, provided examples of a decodeer specified as either a Gaussian Multilayer Perceptron(MLP) or Benoulli output. If we used a Gaussian MLP, the decoder would yield the value, mean, and standard deviation vectors for the output, and we need to transform that output to a probability (0 to 1) using the sigmoidal transform. In the Bernoulli case, the output is already in the range 0 to 1, and we don't need this transformation (apply_sigmoid=False).
Once we've trained the VAE network, we'll want to use sampling in order to generate random latent vectors(z) and run the decoder to generate new images. While we could just run this as a normal function of the class in the Python runtime, we'll decorate this function with the @tf. function annotation, which will allow it to be executed in the TensorFlow graph runtime (just like any of the tf functions, such as reduce_sum and muliply), making using of GPU and TPU device if they are available. We sample a value from a random normal distribution, for a specified number of samples, and then apply the decoder to generate new images:
@tf.function
def sample(self, eps=None):
if eps is None:
eps = tf.random.normal(shape=(self._num_samples, self.latent_dim))
return self._Decoder.call(eps, applyu_sigmoid=False)
Finally, recall that the "reparamterization trick" is used to allow us to backpropagate through the value of z and reduce the variance of the likelihood of z. We need to implement this transformation, which is given by:
def reparameterize(self, mean, logvar):
eps = tf.random.normal(shape=mean.shape)
return sps * tf.exp(logvar * .5) + mean
In the ooriginal paper, Autoencoding Variational Bayes, this is given by:
where i is a data point in x and 1 is a sample from the random distribution, here, a normal. In our code, we multiply by 0.5 because we are computing the log variance (or standard deviation squared), and log(s^2) log(s)2, so the 0.5 cancels the 2, leaving us with exp(log(s)) = s, just as we require in the formula.
We'll also include a class property (with the @property decorator) so we can access the array of normalizing transforms if we implement IAF:
@property
def iaf(self):
return self._iaf
Now, we'll need a few additional functions to actually run our VAE algorithm. The first computers the log normal probability density function(pdf), used in the computation of the variational lower bound, or ELBOL:
def log_normal_pdf(sample, mean, logvar, raxis=1):
log2pi = tf.math.log(2. * np.pi)
return tf.reduce_sum(
S * ((sample - mean) ** 2. * tf.exp(-logvar) + \ logvar + log2pi), axis=raxis)
We now need to utilize this function as part of computing the loss with each minbatch gradient descent pass in the process of training the VAE. As with the sample method, we'll decorate this function with the @tf. function with the @tf.function annotation so it will be executed on the graph runtime:
@tf.function
def compute_loss(model, x):
mean, logvar, h = model.encode(x)
z = model.reparameterize(mean, logvar)
logqz_x = log_normal_pdf(z, mean, logvar)
for iaf_model in model.iaf:
mean, logvar, _ = iaf_model.call(tf.concat([z, h], 2))
s = tf.sigmoid(logvar)
z = tf.add(tf.math.multiply(z,s), tf.math.mutiply(mean, (i-s))
logqz_x -= tf.reduce_sum(tf.math.log(s))
x_logit = model.decode(z)
coss_ent = tf.nn.sigmoid_cross_entropy_with_logits(logits=x_logit, labels=x)
logpx_z = -tf.reduce_sum(cross_ent, axis=[2])
logpz = log_normal_pdf(z, 0., 0.)
return -tf.reduce_mean(logpx_z + logpz - logqz_x)
Let's unpack a bit of what is going on here. First, we can see that we call the encoder network on the input(a minibatch of flattened images, in our case) to generate the needed mean, logvariance, and, if we are using IAF in our network, the accessory input h that we'll pass along with each step of the normalizing flow transform.
We apply the "reparameterization trick" on the inputs in order to generate the latent vector z, and apply a lognormal pdf to get the logq(z|x).
If we are using IAF, we need to iteratively transform z using each network, and pass in the h(accessory input) from the decoder at each step. Then we apply the loss from this transform to the initial loss we computed, as per the algorithm given in the IAF paper.
Once we have the transformed or untransformed z, we decode it using the decoder network to get the reconsturcteed data, x, from which we calculate a cross-entropy loss. We sum these over teh minibatch and take the lognormal pdf of z evaluated at a standard normal distribution(the prior), before computing the expected lower bound.
Recall that the expression for the variational lower bound, or ELBO, is:
So, our minibatch estimator is a sample of this value:
Now that we have these ingredients, we can run the stochastic gradient descent using the GradientTape API, just as we did for the DBN in Chapter 4, Teaching Networks to Generate Digits passing in an optimizer, model, and minibatch of data(x):
@tf.function
def compute_apply_gradient(model, x, optimizer):
with tf.GradientTape() as tape:
loss = compute_loss(model, x)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_Variables))
To run the training, first we need to specify a model using the class we've built. If we don't want to use IAF, we could do this as follows:
model = VAE(input_shape=(1, 3072), hidden_dim=500, latent_dim=500)
If we want to use IAF transformations, we need to include some additional arguments:
model = VAE(input_shape=(1, 3072), hidden_dim=500, latent_dim=500, iaf_model=GaussianMLP, number_iaf_networks=3, iaf_params=('latent_dim': 500,' hidden_dim':500, 'iaf':False})
With the model created,, we need to specify a number of epochs, an optimizer(in this instance, Adam, as we described in Chapter 3, Building Blocks of Deep Neural Networks). We split our data into minibatches of 32 elements, and apply gradient updates after each minibatch for the number of epochs we've specified. At regular intervals, we output the estimate of the ELBO to verify that our model is getting better:
import time as time
epochs = 100
optimizer = tf.keras.optimizers.Adam(le-4)
for epoch in range(1, epochs + 1):
start_time = time.time()
for train_x in cifar10_train.map(lambda x: flatten_image(x, label=False)).batch(32):
compute_apply_gradients(model, train_x, optimizer)
end_time = time.time()
if epch % 1 == 0:
loss = tf.keras.metrics.Mean()
for test_x in Cifar10_test.map(lambda x: flatten_image(x, label=False)).batch(32)):
loss(compute_loss(model, test_x))
elbo = -loss.result()
print('Epoch: {}, Test set ELBO:{}, ''time elapse for current epoch {}'.format(epoch, elbo, end_time - start_time))
We can verify that the model is improving by looking at updates, which should show an increasing ELBO:
To examine the output of the model, we can first look at the reconstruction error; does the encoding of the input image by the network approximately capture the dominant patterns in the input image, allowing it to be reconstructed from its vector z? We can compare the raw image to its reconstruction formed by passing the image through the encoder, applying, IAF, and then decoding it:
for sample in cifar10_train.map(lambda x: flatten_image(x, label-False)).batch(1).take(10):
mean, logvar, h = model.encode(sample)
z = model.reparameterize(mean, logvar)
for iaf_model in model.iaf:
mean, logvar, _ = iaf_model.call(tf.concat({z, h], 2))
s = tf.sigmoid(logvar)
z = tf.add(tf.math.multiply(z,s), tf.math.multiply(mean, (1-s))
plt.figure(0)
plt.imshow((sample.numpy().reshape(32,32,3)).astype(np, float32), cmap=plt.get_camp("gray"))
For the first few CIFAR-10 images, we get the following output, showing that we have captured the overall pattern of the image (although it is fuzzy, a general downside to VAEs that we'll address in our discussion of Generative Adversarial Networks(GANs) in future chapters):
What if we wanted to create entirely new images? Here we can use the "sample" function we defined previously in Creating the network from TensorFlow 2 to create batches of new images from randomly generated z vectors, rather than the encoded product of CIFAR images:
plt.imshow((model.sample(10)).numpy().reshape(32,32,3)).astype(np.float32), cmap-plt.get_camp("gray"))
This code will produce output like the following, which shows a set of images generated from vectors of random numbers:
These are, admittedly, a bit blurry, but you can appreciate that they show structure and look comparable to some of the "reconstructed" CIFAR-10 image you saw previously. Part of the challenge here, as we'll discuss more in subsequent chapters, is the loss function itself: the cross-entropy function, in essence, penalizes each pixel for how much it resembles the input pixel, While this might be mathematically correct, it doen't capture what we might think of as conceptual "similarity" between an input and reconstructed image. For example, an input image could have a single pixel set to infinity, which would create a large difference between it and a reconstruction that set that pixel to 0; however, a human, looking at the image, would perceive both as being identical. The objective functions used for GANs, described in Chapter 6, Image Generation with GANs, capture this nuance more accurately.