詳解如何使用Keras實(shí)現(xiàn)Wassertein GAN
在閱讀論文 Wassertein GAN 時(shí),作者發(fā)現(xiàn)理解它最好的辦法就是用代碼來實(shí)現(xiàn)其內(nèi)容。于是在本文中,作者將用自己的在 Keras 上的代碼來向大家簡(jiǎn)要介紹一下WGAN。
何為 GAN?
GAN,亦稱為生成對(duì)抗網(wǎng)絡(luò)(Generative Adversarial Network),它是生成模型中的一類——即一種能夠通過觀察來自特定分布的訓(xùn)練數(shù)據(jù),進(jìn)而嘗試對(duì)這個(gè)分布進(jìn)行預(yù)測(cè)的模型。這個(gè)模型新獲取的樣本「看起來」會(huì)和最初的訓(xùn)練樣本類似。有些生成模型只會(huì)去學(xué)習(xí)訓(xùn)練數(shù)據(jù)分布的參數(shù),有一些模型則只能從訓(xùn)練數(shù)據(jù)分布中提取樣本,而有一些則可以二者兼顧。
目前,已經(jīng)存在了很多種類的生成模型:全可見信念網(wǎng)絡(luò)(Fully Visible Belief Network)、變分自編碼器(Variational Autoencoder)、玻爾茲曼機(jī)(Boltzmann Machine),生成隨機(jī)網(wǎng)絡(luò)(Generative Stochastic Network),像素循環(huán)神經(jīng)網(wǎng)絡(luò)(PixelRNN)等等。以上的模型都因其所表征或接近的訓(xùn)練數(shù)據(jù)密度而有所區(qū)別。一些模型會(huì)去精細(xì)的表征訓(xùn)練數(shù)據(jù),另一些則會(huì)以某種方式去和訓(xùn)練數(shù)據(jù)進(jìn)行互動(dòng)——比如說生成模型。GAN 就是這里所說的后者。大部分生成模型的學(xué)習(xí)原則都可被概括為「最大化相似度預(yù)測(cè)」——即讓模型的參數(shù)能夠盡可能地與訓(xùn)練數(shù)據(jù)相似。
GAN 的工作方式可以看成一個(gè)由兩部分構(gòu)成的游戲:生成器(Generator/G)和判別器(Discriminator/D)(一般而言,這兩者都由神經(jīng)網(wǎng)絡(luò)構(gòu)成)。生成器隨機(jī)將一個(gè)噪聲作為自己的輸入,然后嘗試去生成一個(gè)樣本,目的是讓判別器無法判斷這個(gè)樣本是來自訓(xùn)練數(shù)據(jù)還是來自生成器的。在判別器這里,我們讓它以監(jiān)督學(xué)習(xí)方式來工作,具體而言就是讓它觀察真實(shí)樣本和生成器生成的樣本,并且同時(shí)用標(biāo)簽告訴它這些樣本分別來自哪里。在某種意義上,判別器可以代替固定的損失函數(shù),并且嘗試學(xué)習(xí)與訓(xùn)練數(shù)據(jù)分布相關(guān)的模式。
何為 Wasserstein GAN?
就其本質(zhì)而言,任何生成模型的目標(biāo)都是讓模型(習(xí)得地)的分布與真實(shí)數(shù)據(jù)之間的差異達(dá)到最小。然而,傳統(tǒng) GAN 中的判別器 D 并不會(huì)當(dāng)模型與真實(shí)的分布重疊度不夠時(shí)去提供足夠的信息來估計(jì)這個(gè)差異度——這導(dǎo)致生成器得不到一個(gè)強(qiáng)有力的反饋信息(特別是在訓(xùn)練之初),此外生成器的穩(wěn)定性也普遍不足。
Wasserstein GAN 在原來的基礎(chǔ)之上添加了一些新的方法,讓判別器 D 去擬合模型與真實(shí)分布之間的 Wasserstein 距離。Wassersterin 距離會(huì)大致估計(jì)出「調(diào)整一個(gè)分布去匹配另一個(gè)分布還需要多少工作」。此外,其定義的方式十分值得注意,它甚至可以適用于非重疊的分布。
為了讓判別器 D 可以有效地?cái)M合 Wasserstein 距離:
- 其權(quán)重必須在緊致空間(compact space)之內(nèi)。為了達(dá)到這個(gè)目的,其權(quán)重需要在每步訓(xùn)練之后,被調(diào)整到-0.01 到+0.01 的閉區(qū)間上。然而,論文作者承認(rèn),雖然這對(duì)于裁剪間距的選擇并不是理想且高敏感的(highly sensitive),但是它在實(shí)踐中卻是有效的。更多信息可參見論文 6 到 7 頁。
- 由于判別器被訓(xùn)練到了更好的狀態(tài)上,所以它可以為生成器提供一個(gè)有用的梯度。
- 判別器頂層需要有線性激活。
- 它需要一個(gè)本質(zhì)上不會(huì)修改判別器輸出的價(jià)值函數(shù)。
- K.mean(y_true * y_pred)
以 keras 這段損失函數(shù)為例:
- 這里采用 mean 來適應(yīng)不同的批大小以及乘積。
- 預(yù)測(cè)的值通過乘上 element(可使用的真值)來最大化輸出結(jié)果(優(yōu)化器通常會(huì)將損失函數(shù)的值最小化)。
論文作者表示,與 vanlillaGAN 相比,WGAN 有一下優(yōu)點(diǎn):
- 有意義的損失指標(biāo)。判別器 D 的損失可以與生成樣本(這些樣本使得可以更少地監(jiān)控訓(xùn)練過程)的質(zhì)量很好地關(guān)聯(lián)起來。
- 穩(wěn)定性得到改進(jìn)。當(dāng)判別器 D 的訓(xùn)練達(dá)到了最佳,它便可以為生成器 G 的訓(xùn)練提供一個(gè)有用的損失。這意味著,對(duì)判別器 D 和生成器 G 的訓(xùn)練不必在樣本數(shù)量上保持平衡(相反,在 Vanilla GAN 方法中而這是平衡的)。此外,作者也表示,在實(shí)驗(yàn)中,他們的 WGAN 模型沒有發(fā)生過一次崩潰的情況。
開始編程!
我們會(huì)在 Keras 上實(shí)現(xiàn) ACGAN 的 Wasserstein variety。在 ACGAN 這種生成對(duì)抗網(wǎng)絡(luò)中,其判別器 D 不僅可以預(yù)測(cè)樣本的真實(shí)與否,同時(shí)還可以將其進(jìn)行歸類。
下方代碼附有部分解釋。
[1] 導(dǎo)入庫文件:
- import os
- import matplotlib.pyplot as plt
- %matplotlib inline
- %config InlineBackend.figure_format = 'retina' # enable hi-res output
- import numpy as np
- import tensorflow as tf
- import keras.backend as K
- from keras.datasets import mnist
- from keras.layers import *
- from keras.models import *
- from keras.optimizers import *
- from keras.initializers import *
- from keras.callbacks import *
- from keras.utils.generic_utils import Progbar
[2].Runtime 配置
- # random seed
- RND = 777
- # output settings
- RUN = 'B'
- OUT_DIR = 'out/' + RUN
- TENSORBOARD_DIR = '/tensorboard/wgans/' + RUN
- SAVE_SAMPLE_IMAGES = False
- # GPU # to run on
- GPU = "0"
- BATCH_SIZE = 100
- ITERATIONS = 20000
- # size of the random vector used to initialize G
- Z_SIZE = 100
[3]生成器 G 每進(jìn)行一次迭代,判別器 D 都需要進(jìn)行 D_ITERS 次迭代。
- 由于在 WGAN 中讓判別器質(zhì)量能夠優(yōu)化這件事更加重要,所以判別器 D 與生成器 G 在訓(xùn)練次數(shù)上呈非對(duì)稱比例。
- 在論文的 v2 版本中,判別器 D 在生成器 G 每 1000 次迭代的前 25 次都會(huì)訓(xùn)練 100 次,此外,判別器也會(huì)當(dāng)生成器每進(jìn)行了 500 次迭代以后訓(xùn)練 100 次。
- D_ITERS = 5
[4]其它準(zhǔn)備:
- # create output dirif not os.path.isdir(OUT_DIR): os.makedirs(OUT_DIR)
- # make only specific GPU to be utilized
- os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"os.environ["CUDA_VISIBLE_DEVICES"] = GPU
- # seed random generator for repeatability
- np.random.seed(RND)
- # force Keras to use last dimension for image channels
- K.set_image_dim_ordering('tf')
[5]判別器的損失函數(shù):
- 由于判別器 D 一方面力圖在訓(xùn)練數(shù)據(jù)分布與生成器 G 生成的數(shù)據(jù)之間習(xí)得一個(gè) Wasserstein 距離的擬合,另一方面判別器 D 又是線性激活的,所以這里我們不需要去修改它的輸出結(jié)果。
- 由于已經(jīng)使用了損失函數(shù) Mean,所以我們可以在不同的批大小之間比較輸出結(jié)果。
- 預(yù)測(cè)結(jié)果等于真值(true value)與元素的點(diǎn)乘(element-wise multiplication),為了讓判別器 D 的輸出能夠最大化(通常,優(yōu)化器都力圖去讓損失函數(shù)的值達(dá)到最小),真值需要取-1。
- def d_loss(y_true, y_pred): return K.mean(y_true * y_pred)
[6].創(chuàng)建判別器 D
判別器將圖像作為輸入,然后給出兩個(gè)輸出:
- 用線性激活來評(píng)價(jià)生成圖像的「虛假度」(最大化以用于生成圖像)。
- 用 softmax 激活來對(duì)圖像種類進(jìn)行預(yù)測(cè)。
- 由于權(quán)重是從標(biāo)準(zhǔn)差為 0.02 的正態(tài)分布中初始化出來的,所以最初的剪裁不會(huì)去掉所有的權(quán)重。
- def create_D():
- # weights are initlaized from normal distribution with below params
- weight_init = RandomNormal(mean=0., stddev=0.02)
- input_image = Input(shape=(28, 28, 1), name='input_image')
- x = Conv2D( 32, (3, 3),
- padding='same',
- name='conv_1',
- kernel_initializer=weight_init)(input_image)
- x = LeakyReLU()(x)
- x = MaxPool2D(pool_size=2)(x)
- x = Dropout(0.3)(x)
- x = Conv2D( 64, (3, 3),
- padding='same',
- name='conv_2',
- kernel_initializer=weight_init)(x)
- x = MaxPool2D(pool_size=1)(x)
- x = LeakyReLU()(x)
- x = Dropout(0.3)(x)
- x = Conv2D( 128, (3, 3),
- padding='same',
- name='conv_3',
- kernel_initializer=weight_init)(x)
- x = MaxPool2D(pool_size=2)(x)
- x = LeakyReLU()(x)
- x = Dropout(0.3)(x)
- x = Conv2D( 256, (3, 3),
- padding='same',
- name='coonv_4',
- kernel_initializer=weight_init)(x)
- x = MaxPool2D(pool_size=1)(x)
- x = LeakyReLU()(x)
- x = Dropout(0.3)(x)
- features = Flatten()(x)
- output_is_fake = Dense( 1, activation='linear', name='output_is_fake')(features)
- output_class = Dense( 10, activation='softmax', name='output_class')(features) return Model(
- inputs=[input_image], outputs=[output_is_fake,
[7].創(chuàng)建生成器
生成器有兩個(gè)輸入:
- 一個(gè)尺寸為Z_SIZE的潛在隨機(jī)變量。
- 我們希望生成的數(shù)字類型(integer o 到 9)。
為了加入這些輸入(input),integer 類型會(huì)在內(nèi)部轉(zhuǎn)換成一個(gè)1 x DICT_LEN(在本例中DICT_LEN = 10)的稀疏向量,然后乘上嵌入的維度為 DICT_LEN x Z_SIZE的矩陣,結(jié)果得到一個(gè)維度為1 x Z_SIZE的密集向量。然后該向量乘上(點(diǎn) 乘)可能的輸入(input),經(jīng)過多個(gè)上菜樣和卷積層,最后其維度就可以和訓(xùn)練圖像的維度匹配了。
- def create_G(Z_SIZEZ_SIZE=Z_SIZE):
- DICT_LEN = 10
- EMBEDDING_LEN = Z_SIZE
- # weights are initialized from normal distribution with below params
- weight_init = RandomNormal(mean=0., stddev=0.02)
- # class# input_class = Input(shape=(1, ), dtype='int32', name='input_class')
- # encode class# to the same size as Z to use hadamard multiplication later on
- e = Embedding(
- DICT_LEN, EMBEDDING_LEN,
- embeddings_initializer='glorot_uniform')(input_class)
- embedded_class = Flatten(name='embedded_class')(e)
- # latent var
- input_z = Input(shape=(Z_SIZE, ), name='input_z')
- # hadamard product
- h = multiply([input_z, embedded_class], name='h')
- # cnn part
- x = Dense(1024)(h)
- x = LeakyReLU()(x)
- x = Dense(128 * 7 * 7)(x)
- x = LeakyReLU()(x)
- x = Reshape((7, 7, 128))(x)
- x = UpSampling2D(size=(2, 2))(x)
- x = Conv2D(256, (5, 5), padding='same', kernel_initializer=weight_init)(x)
- x = LeakyReLU()(x)
- x = UpSampling2D(size=(2, 2))(x)
- x = Conv2D(128, (5, 5), padding='same', kernel_initializer=weight_init)(x)
- x = LeakyReLU()(x)
- x = Conv2D( 1, (2, 2),
- padding='same',
- activation='tanh',
- name='output_generated_image',
- kernel_initializer=weight_init)(x) return Mode
[8].將判別器 D 和生成器 G 整合到一個(gè)模型中:
- D = create_D()
- D.compile(
- optimizer=RMSprop(lr=0.00005),
- loss=[d_loss, 'sparse_categorical_crossentropy'])
- input_z = Input(shape=(Z_SIZE, ), name='input_z_')
- input_class = Input(shape=(1, ),name='input_class_', dtype='int32')
- G = create_G()
- # create combined D(G) model
- output_is_fake, output_class = D(G(inputs=[input_z, input_class]))
- DG = Model(inputs=[input_z, input_class], outputs=[output_is_fake, output_class])
- DG.compile(
- optimizer=RMSprop(lr=0.00005),
- loss=[d_loss, 'sparse_categorical_crossentropy']
- )
[9].加載 MNIST 數(shù)據(jù)集:
- # load mnist data
- (X_train, y_train), (X_test, y_test) = mnist.load_data()
- # use all available 70k samples from both train and test sets
- X_train = np.concatenate((X_train, X_test))
- y_train = np.concatenate((y_train, y_test))
- # convert to -1..1 range, reshape to (sample_i, 28, 28, 1)
- X_train = (X_train.astype(np.float32) - 127.5) / 127.5X_train = np.expand_dims(X_train, axis=3)
[10].生成樣本以及將指標(biāo)和圖像發(fā)送到 TensorBorad 的實(shí)用工具:
- # save 10x10 sample of generated images
- def generate_samples(n=0, save=True):
- zz = np.random.normal(0., 1., (100, Z_SIZE))
- generated_classes = np.array(list(range(0, 10)) * 10)
- generated_images = G.predict([zz, generated_classes.reshape(-1, 1)])
- rr = [] for c in range(10):
- rr.append(
- np.concatenate(generated_images[c * 10:(1 + c) * 10]).reshape( 280, 28))
- img = np.hstack(rr) if save:
- plt.imsave(OUT_DIR + '/samples_%07d.png' % n, img, cmap=plt.cm.gray) return img
- # write tensorboard summaries
- sw = tf.summary.FileWriter(TENSORBOARD_DIR)
- def update_tb_summary(step, write_sample_images=True):
- s = tf.Summary()
- # losses as is for names, vals in zip((('D_real_is_fake', 'D_real_class'),
- ('D_fake_is_fake', 'D_fake_class'), ('DG_is_fake', 'DG_class')),
- (D_true_losses, D_fake_losses, DG_losses)):
- v = s.value.add()
- v.simple_value = vals[-1][1]
- v.tag = names[0]
- v = s.value.add()
- v.simple_value = vals[-1][2]
- v.tag = names[1]
- # D loss: -1*D_true_is_fake - D_fake_is_fake
- v = s.value.add()
- v.simple_value = -D_true_losses[-1][1] - D_fake_losses[-1][1]
- v.tag = 'D loss (-1*D_real_is_fake - D_fake_is_fake)'
- # generated image if write_sample_images:
- img = generate_samples(step, save=True)
- s.MergeFromString(tf.Session().run(
- tf.summary.image('samples_%07d' % step,
- img.reshape([1, *img.shape, 1]))))
- sw.add_summary(s, step)
- sw.flush()
[11].訓(xùn)練
訓(xùn)練過程包含了以下步驟:
- 解除對(duì)判別器 D 權(quán)重的控制,讓它們變得可學(xué)習(xí)。
- 調(diào)整判別器的權(quán)重(調(diào)整到-0.01 到+0.01 閉區(qū)間上)。
- 向判別器 D 提供真實(shí)的樣本,通過在損失函數(shù)中將其乘上-1 來盡可能最大化它的輸出,最小化它的值。
- 向判別器 D 提供假的樣本試圖最小化其輸出。
- 按照上文講述的判別器迭代訓(xùn)練方法重復(fù)步驟 3 和 4。
- 固定判別器 D 的權(quán)重。
- 訓(xùn)練一對(duì)判別器和生成器,盡力去最小化其輸出。由于這種手段優(yōu)化了生成器 G 的權(quán)重,所以前面已經(jīng)訓(xùn)練好了的權(quán)重固定的判別器才會(huì)將生成的假樣本判斷為真圖像。
- progress_bar = Progbar(target=ITERATIONS)
- DG_losses = []
- D_true_losses = []
- D_fake_losses = []for it in range(ITERATIONS): if len(D_true_losses) > 0:
- progress_bar.update(
- it,
- values=[ # avg of 5 most recent
- ('D_real_is_fake', np.mean(D_true_losses[-5:], axis=0)[1]),
- ('D_real_class', np.mean(D_true_losses[-5:], axis=0)[2]),
- ('D_fake_is_fake', np.mean(D_fake_losses[-5:], axis=0)[1]),
- ('D_fake_class', np.mean(D_fake_losses[-5:], axis=0)[2]),
- ('D(G)_is_fake', np.mean(DG_losses[-5:],axis=0)[1]),
- ('D(G)_class', np.mean(DG_losses[-5:],axis=0)[2])
- ]
- )
- else:
- progress_bar.update(it)
- # 1: train D on real+generated images if (it % 1000) < 25 or it % 500 == 0: # 25 times in 1000, every 500th
- d_iters = 100
- else:
- d_iters = D_ITERS for d_it in range(d_iters):
- # unfreeze D
- D.trainable = True for l in D.layers: l.trainable = True
- # clip D weights for l in D.layers:
- weights = l.get_weights()
- weights = [np.clip(w, -0.01, 0.01) for w in weights]
- l.set_weights(weights)
- # 1.1: maximize D output on reals === minimize -1*(D(real))
- # draw random samples from real images
- index = np.random.choice(len(X_train), BATCH_SIZE, replace=False)
- real_images = X_train[index]
- real_images_classes = y_train[index]
- DD_loss = D.train_on_batch(real_images, [-np.ones(BATCH_SIZE),
- real_images_classes])
- D_true_losses.append(D_loss)
- # 1.2: minimize D output on fakes
- zz = np.random.normal(0., 1., (BATCH_SIZE, Z_SIZE))
- generated_classes = np.random.randint(0, 10, BATCH_SIZE)
- generated_images = G.predict([zz, generated_classes.reshape(-1, 1)])
- DD_loss = D.train_on_batch(generated_images, [np.ones(BATCH_SIZE),
- generated_classes])
- D_fake_losses.append(D_loss)
- # 2: train D(G) (D is frozen)
- # minimize D output while supplying it with fakes,
- # telling it that they are reals (-1)
- # freeze D
- D.trainable = False for l in D.layers: l.trainable = False
- zz = np.random.normal(0., 1., (BATCH_SIZE, Z_SIZE))
- generated_classes = np.random.randint(0, 10, BATCH_SIZE)
- DGDG_loss = DG.train_on_batch(
- [zz, generated_classes.reshape((-1, 1))],
- [-np.ones(BATCH_SIZE), generated_classes])
- DG_losses.append(DG_loss) if it % 10 == 0:
- update_tb_summary(it, write_sample_images=(it
結(jié)論
視頻的每一秒都是 250 次訓(xùn)練迭代。使用 Wasserstein GAN 的一個(gè)好處就是它有著損失與樣本質(zhì)量之間的關(guān)系。
附論文地址:https://arxiv.org/pdf/1701.07875.pdf
參考文獻(xiàn)
1. Wasserstein GAN paper (https://arxiv.org/pdf/1701.07875.pdf) – Martin Arjovsky, Soumith Chintala, Léon Bottou
2. NIPS 2016 Tutorial: Generative Adversarial Networks (https://arxiv.org/pdf/1701.00160.pdf) – Ian Goodfellow
3. Original PyTorch code for the Wasserstein GAN paper (https://github.com/martinarjovsky/WassersteinGAN)
4. Conditional Image Synthesis with Auxiliary Classifier GANs (https://arxiv.org/pdf/1610.09585v3.pdf) – Augustus Odena, Christopher Olah, Jonathon Shlens
5. Keras ACGAN implementation (https://github.com/lukedeo/keras-acgan) – Luke de Oliveira
6. Code for the article (https://gist.github.com/myurasov/6ecf449b32eb263e7d9a7f6e9aed5dc2)
原文:https://myurasov.github.io/2017/09/24/wasserstein-gan-keras.html?r
【本文是51CTO專欄機(jī)構(gòu)“機(jī)器之心”的原創(chuàng)譯文,微信公眾號(hào)“機(jī)器之心( id: almosthuman2014)”】