機(jī)器學(xué)習(xí) | PyTorch簡明教程下篇
接著上篇《PyTorch簡明教程上篇》,繼續(xù)學(xué)習(xí)多層感知機(jī),卷積神經(jīng)網(wǎng)絡(luò)和LSTMNet。
1、多層感知機(jī)
多層感知機(jī)通過在網(wǎng)絡(luò)中加入一個或多個隱藏層來克服線性模型的限制,是一個簡單的神經(jīng)網(wǎng)絡(luò),也是深度學(xué)習(xí)的重要基礎(chǔ),具體圖如下:
import numpy as np
import torch
from torch.autograd import Variable
from torch import optim
from data_util import load_mnist
def build_model(input_dim, output_dim):
return torch.nn.Sequential(
torch.nn.Linear(input_dim, 512, bias=False),
torch.nn.ReLU(),
torch.nn.Dropout(0.2),
torch.nn.Linear(512, 512, bias=False),
torch.nn.ReLU(),
torch.nn.Dropout(0.2),
torch.nn.Linear(512, output_dim, bias=False),
)
def train(model, loss, optimizer, x_val, y_val):
model.train()
optimizer.zero_grad()
fx = model.forward(x_val)
output = loss.forward(fx, y_val)
output.backward()
optimizer.step()
return output.item()
def predict(model, x_val):
model.eval()
output = model.forward(x_val)
return output.data.numpy().argmax(axis=1)
def main():
torch.manual_seed(42)
trX, teX, trY, teY = load_mnist(notallow=False)
trX = torch.from_numpy(trX).float()
teX = torch.from_numpy(teX).float()
trY = torch.tensor(trY)
n_examples, n_features = trX.size()
n_classes = 10
model = build_model(n_features, n_classes)
loss = torch.nn.CrossEntropyLoss(reductinotallow='mean')
optimizer = optim.Adam(model.parameters())
batch_size = 100
for i in range(100):
cost = 0.
num_batches = n_examples // batch_size
for k in range(num_batches):
start, end = k * batch_size, (k + 1) * batch_size
cost += train(model, loss, optimizer,
trX[start:end], trY[start:end])
predY = predict(model, teX)
print("Epoch %d, cost = %f, acc = %.2f%%"
% (i + 1, cost / num_batches, 100. * np.mean(predY == teY)))
if __name__ == "__main__":
main()
(1)以上代碼和單層神經(jīng)網(wǎng)絡(luò)的代碼類似,區(qū)別是build_model構(gòu)建一個包含三個線性層和兩個ReLU激活函數(shù)的神經(jīng)網(wǎng)絡(luò)模型:
- 向模型中添加第一個線性層,該層的輸入特征數(shù)量為input_dim,輸出特征數(shù)量為512;
- 接著添加一個ReLU激活函數(shù)和一個Dropout層,用于增強(qiáng)模型的非線性能力和防止過擬合;
- 向模型中添加第二個線性層,該層的輸入特征數(shù)量為512,輸出特征數(shù)量為512;
- 接著添加一個ReLU激活函數(shù)和一個Dropout層;
- 向模型中添加第三個線性層,該層的輸入特征數(shù)量為512,輸出特征數(shù)量為output_dim,即模型的輸出類別數(shù)量;
(2)什么是ReLU激活函數(shù)?ReLU(Rectified Linear Unit,修正線性單元)激活函數(shù)是深度學(xué)習(xí)和神經(jīng)網(wǎng)絡(luò)中常用的一種激活函數(shù),ReLU函數(shù)的數(shù)學(xué)表達(dá)式為:f(x) = max(0, x),其中x是輸入值。ReLU函數(shù)的特點(diǎn)是當(dāng)輸入值小于等于0時,輸出為0;當(dāng)輸入值大于0時,輸出等于輸入值。簡單來說,ReLU函數(shù)就是將負(fù)數(shù)部分抑制為0,正數(shù)部分保持不變。ReLU激活函數(shù)在神經(jīng)網(wǎng)絡(luò)中的作用是引入非線性因素,使得神經(jīng)網(wǎng)絡(luò)能夠擬合復(fù)雜的非線性關(guān)系,同時,ReLU函數(shù)相對于其他激活函數(shù)(如Sigmoid或Tanh)具有計算速度快、收斂速度快等優(yōu)點(diǎn);
(3)什么是Dropout層?Dropout層是一種在神經(jīng)網(wǎng)絡(luò)中用于防止過擬合的技術(shù)。在訓(xùn)練過程中,Dropout層會隨機(jī)地將一部分神經(jīng)元的輸出置為0,即"丟棄"這些神經(jīng)元,這樣做的目的是為了減少神經(jīng)元之間的相互依賴,從而提高網(wǎng)絡(luò)的泛化能力;
(4)print("Epoch %d, cost = %f, acc = %.2f%%" % (i + 1, cost / num_batches, 100. * np.mean(predY == teY)))最后打印當(dāng)前訓(xùn)練的輪次,損失值和acc,上述的代碼輸出如下:
...
Epoch 91, cost = 0.011129, acc = 98.45%
Epoch 92, cost = 0.007644, acc = 98.58%
Epoch 93, cost = 0.011872, acc = 98.61%
Epoch 94, cost = 0.010658, acc = 98.58%
Epoch 95, cost = 0.007274, acc = 98.54%
Epoch 96, cost = 0.008183, acc = 98.43%
Epoch 97, cost = 0.009999, acc = 98.33%
Epoch 98, cost = 0.011613, acc = 98.36%
Epoch 99, cost = 0.007391, acc = 98.51%
Epoch 100, cost = 0.011122, acc = 98.59%
可以看出最后相同的數(shù)據(jù)分類,準(zhǔn)確率比單層神經(jīng)網(wǎng)絡(luò)要高(98.59% > 97.68%)。
2、卷積神經(jīng)網(wǎng)絡(luò)
卷積神經(jīng)網(wǎng)絡(luò)(CNN)是一種深度學(xué)習(xí)算法,如果輸入一個矩陣,CNN能夠區(qū)分出重要的部分和不重要的部分(分配權(quán)重),相比較其他分類任務(wù),CNN對數(shù)據(jù)預(yù)處理的要求不是很高,只要經(jīng)過足夠的訓(xùn)練,就可以學(xué)習(xí)到矩陣中的特征,如下圖:
import numpy as np
import torch
from torch.autograd import Variable
from torch import optim
from data_util import load_mnist
class ConvNet(torch.nn.Module):
def __init__(self, output_dim):
super(ConvNet, self).__init__()
self.conv = torch.nn.Sequential()
self.conv.add_module("conv_1", torch.nn.Conv2d(1, 10, kernel_size=5))
self.conv.add_module("maxpool_1", torch.nn.MaxPool2d(kernel_size=2))
self.conv.add_module("relu_1", torch.nn.ReLU())
self.conv.add_module("conv_2", torch.nn.Conv2d(10, 20, kernel_size=5))
self.conv.add_module("dropout_2", torch.nn.Dropout())
self.conv.add_module("maxpool_2", torch.nn.MaxPool2d(kernel_size=2))
self.conv.add_module("relu_2", torch.nn.ReLU())
self.fc = torch.nn.Sequential()
self.fc.add_module("fc1", torch.nn.Linear(320, 50))
self.fc.add_module("relu_3", torch.nn.ReLU())
self.fc.add_module("dropout_3", torch.nn.Dropout())
self.fc.add_module("fc2", torch.nn.Linear(50, output_dim))
def forward(self, x):
x = self.conv.forward(x)
x = x.view(-1, 320)
return self.fc.forward(x)
def train(model, loss, optimizer, x_val, y_val):
model.train()
optimizer.zero_grad()
fx = model.forward(x_val)
output = loss.forward(fx, y_val)
output.backward()
optimizer.step()
return output.item()
def predict(model, x_val):
model.eval()
output = model.forward(x_val)
return output.data.numpy().argmax(axis=1)
def main():
torch.manual_seed(42)
trX, teX, trY, teY = load_mnist(notallow=False)
trX = trX.reshape(-1, 1, 28, 28)
teX = teX.reshape(-1, 1, 28, 28)
trX = torch.from_numpy(trX).float()
teX = torch.from_numpy(teX).float()
trY = torch.tensor(trY)
n_examples = len(trX)
n_classes = 10
model = ConvNet(output_dim=n_classes)
loss = torch.nn.CrossEntropyLoss(reductinotallow='mean')
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
batch_size = 100
for i in range(100):
cost = 0.
num_batches = n_examples // batch_size
for k in range(num_batches):
start, end = k * batch_size, (k + 1) * batch_size
cost += train(model, loss, optimizer,
trX[start:end], trY[start:end])
predY = predict(model, teX)
print("Epoch %d, cost = %f, acc = %.2f%%"
% (i + 1, cost / num_batches, 100. * np.mean(predY == teY)))
if __name__ == "__main__":
main()
(1)以上代碼定義了一個名為ConvNet的類,它繼承自torch.nn.Module類,表示一個卷積神經(jīng)網(wǎng)絡(luò),在__init__方法中定義了兩個子模塊conv和fc,分別表示卷積層和全連接層。在conv子模塊中,我們定義了兩個卷積層(torch.nn.Conv2d)、兩個最大池化層(torch.nn.MaxPool2d)、兩個ReLU激活函數(shù)(torch.nn.ReLU)和一個Dropout層(torch.nn.Dropout)。在fc子模塊中,定義了兩個線性層(torch.nn.Linear)、一個ReLU激活函數(shù)和一個Dropout層;
(2)定義池化層的目的?池化層(Pooling layer)是CNN中的一個重要組成部分。池化層的主要目的有以下幾點(diǎn):
- 降低維度:池化層通過對輸入特征圖(Feature maps)進(jìn)行局部區(qū)域的下采樣操作,降低了特征圖的尺寸。這樣可以減少后續(xù)層中的參數(shù)數(shù)量,降低計算復(fù)雜度,加速訓(xùn)練過程;
- 平移不變性:池化層可以提高網(wǎng)絡(luò)對輸入圖像的平移不變性。當(dāng)圖像中的某個特征發(fā)生小幅度平移時,池化層的輸出仍然具有相似的特征表示。這有助于提高模型的泛化能力,使其能夠在不同位置和尺度下識別相同的特征;
- 防止過擬合:通過減少特征圖的尺寸,池化層可以降低模型的參數(shù)數(shù)量,從而降低過擬合的風(fēng)險;
- 增強(qiáng)特征表達(dá):池化操作可以聚合局部區(qū)域內(nèi)的特征,從而強(qiáng)化和突出更重要的特征信息。常見的池化操作有最大池化(Max Pooling)和平均池化(Average Pooling),分別表示在局部區(qū)域內(nèi)取最大值或平均值作為輸出;
(3)print("Epoch %d, cost = %f, acc = %.2f%%" % (i + 1, cost / num_batches, 100. * np.mean(predY == teY)))最后打印當(dāng)前訓(xùn)練的輪次,損失值和acc,上述的代碼輸出如下:
...
Epoch 91, cost = 0.047302, acc = 99.22%
Epoch 92, cost = 0.049026, acc = 99.22%
Epoch 93, cost = 0.048953, acc = 99.13%
Epoch 94, cost = 0.045235, acc = 99.12%
Epoch 95, cost = 0.045136, acc = 99.14%
Epoch 96, cost = 0.048240, acc = 99.02%
Epoch 97, cost = 0.049063, acc = 99.21%
Epoch 98, cost = 0.045373, acc = 99.23%
Epoch 99, cost = 0.046127, acc = 99.12%
Epoch 100, cost = 0.046864, acc = 99.10%
可以看出最后相同的數(shù)據(jù)分類,準(zhǔn)確率比多層感知機(jī)要高(99.10% > 98.59%)。
3、LSTMNet
LSTMNet是使用長短時記憶網(wǎng)絡(luò)(Long Short-Term Memory, LSTM)構(gòu)建的神經(jīng)網(wǎng)絡(luò),核心思想是引入了一個名為"記憶單元"的結(jié)構(gòu),該結(jié)構(gòu)可以在一定程度上保留長期依賴信息,LSTM中的每個單元包括一個輸入門(input gate)、一個遺忘門(forget gate)和一個輸出門(output gate),這些門的作用是控制信息在記憶單元中的流動,以便網(wǎng)絡(luò)可以學(xué)習(xí)何時存儲、更新或輸出有用的信息。
import numpy as np
import torch
from torch import optim, nn
from data_util import load_mnist
class LSTMNet(torch.nn.Module):
def __init__(self, input_dim, hidden_dim, output_dim):
super(LSTMNet, self).__init__()
self.hidden_dim = hidden_dim
self.lstm = nn.LSTM(input_dim, hidden_dim)
self.linear = nn.Linear(hidden_dim, output_dim, bias=False)
def forward(self, x):
batch_size = x.size()[1]
h0 = torch.zeros([1, batch_size, self.hidden_dim])
c0 = torch.zeros([1, batch_size, self.hidden_dim])
fx, _ = self.lstm.forward(x, (h0, c0))
return self.linear.forward(fx[-1])
def train(model, loss, optimizer, x_val, y_val):
model.train()
optimizer.zero_grad()
fx = model.forward(x_val)
output = loss.forward(fx, y_val)
output.backward()
optimizer.step()
return output.item()
def predict(model, x_val):
model.eval()
output = model.forward(x_val)
return output.data.numpy().argmax(axis=1)
def main():
torch.manual_seed(42)
trX, teX, trY, teY = load_mnist(notallow=False)
train_size = len(trY)
n_classes = 10
seq_length = 28
input_dim = 28
hidden_dim = 128
batch_size = 100
epochs = 100
trX = trX.reshape(-1, seq_length, input_dim)
teX = teX.reshape(-1, seq_length, input_dim)
trX = np.swapaxes(trX, 0, 1)
teX = np.swapaxes(teX, 0, 1)
trX = torch.from_numpy(trX).float()
teX = torch.from_numpy(teX).float()
trY = torch.tensor(trY)
model = LSTMNet(input_dim, hidden_dim, n_classes)
loss = torch.nn.CrossEntropyLoss(reductinotallow='mean')
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
for i in range(epochs):
cost = 0.
num_batches = train_size // batch_size
for k in range(num_batches):
start, end = k * batch_size, (k + 1) * batch_size
cost += train(model, loss, optimizer,
trX[:, start:end, :], trY[start:end])
predY = predict(model, teX)
print("Epoch %d, cost = %f, acc = %.2f%%" %
(i + 1, cost / num_batches, 100. * np.mean(predY == teY)))
if __name__ == "__main__":
main()
(1)以上這段代碼通用的部分就不解釋了,具體說LSTMNet類:
- self.lstm = nn.LSTM(input_dim, hidden_dim)創(chuàng)建一個LSTM層,輸入維度為input_dim,隱藏層維度為hidden_dim;
- self.linear = nn.Linear(hidden_dim, output_dim, bias=False)創(chuàng)建一個線性層(全連接層),輸入維度為hidden_dim,輸出維度為output_dim,并設(shè)置不使用偏置項(bias);
- h0 = torch.zeros([1, batch_size, self.hidden_dim])初始化LSTM層的隱藏狀態(tài)h0,全零張量,形狀為[1, batch_size, hidden_dim];
- c0 = torch.zeros([1, batch_size, self.hidden_dim])初始化LSTM層的細(xì)胞狀態(tài)c0,全零張量,形狀為[1, batch_size, hidden_dim];
- fx, _ = self.lstm.forward(x, (h0, c0))將輸入數(shù)據(jù)x以及初始隱藏狀態(tài)h0和細(xì)胞狀態(tài)c0傳入LSTM層,得到LSTM層的輸出fx;
- return self.linear.forward(fx[-1])將LSTM層的輸出傳入線性層進(jìn)行計算,得到最終輸出。這里fx[-1]表示取LSTM層輸出的最后一個時間步的數(shù)據(jù);
(2)print("Epoch %d, cost = %f, acc = %.2f%%" % (i + 1, cost / num_batches, 100. * np.mean(predY == teY)))最后打印當(dāng)前訓(xùn)練的輪次,損失值和acc,上述的代碼輸出如下:
Epoch 91, cost = 0.000468, acc = 98.57%
Epoch 92, cost = 0.000452, acc = 98.57%
Epoch 93, cost = 0.000437, acc = 98.58%
Epoch 94, cost = 0.000422, acc = 98.57%
Epoch 95, cost = 0.000409, acc = 98.58%
Epoch 96, cost = 0.000396, acc = 98.58%
Epoch 97, cost = 0.000384, acc = 98.57%
Epoch 98, cost = 0.000372, acc = 98.56%
Epoch 99, cost = 0.000360, acc = 98.55%
Epoch 100, cost = 0.000349, acc = 98.55%
4、輔助代碼
兩篇文章的from data_util import load_mnist的data_util.py代碼如下:
import gzip
import os
import urllib.request as request
from os import path
import numpy as np
DATASET_DIR = 'datasets/'
MNIST_FILES = ["train-images-idx3-ubyte.gz", "train-labels-idx1-ubyte.gz",
"t10k-images-idx3-ubyte.gz", "t10k-labels-idx1-ubyte.gz"]
def download_file(url, local_path):
dir_path = path.dirname(local_path)
if not path.exists(dir_path):
print("Creating the directory '%s' ..." % dir_path)
os.makedirs(dir_path)
print("Downloading from '%s' ..." % url)
request.urlretrieve(url, local_path)
def download_mnist(local_path):
url_root = "http://yann.lecun.com/exdb/mnist/"
for f_name in MNIST_FILES:
f_path = os.path.join(local_path, f_name)
if not path.exists(f_path):
download_file(url_root + f_name, f_path)
def one_hot(x, n):
if type(x) == list:
x = np.array(x)
x = x.flatten()
o_h = np.zeros((len(x), n))
o_h[np.arange(len(x)), x] = 1
return o_h
def load_mnist(ntrain=60000, ntest=10000, notallow=True):
data_dir = os.path.join(DATASET_DIR, 'mnist/')
if not path.exists(data_dir):
download_mnist(data_dir)
else:
# check all files
checks = [path.exists(os.path.join(data_dir, f)) for f in MNIST_FILES]
if not np.all(checks):
download_mnist(data_dir)
with gzip.open(os.path.join(data_dir, 'train-images-idx3-ubyte.gz')) as fd:
buf = fd.read()
loaded = np.frombuffer(buf, dtype=np.uint8)
trX = loaded[16:].reshape((60000, 28 * 28)).astype(float)
with gzip.open(os.path.join(data_dir, 'train-labels-idx1-ubyte.gz')) as fd:
buf = fd.read()
loaded = np.frombuffer(buf, dtype=np.uint8)
trY = loaded[8:].reshape((60000))
with gzip.open(os.path.join(data_dir, 't10k-images-idx3-ubyte.gz')) as fd:
buf = fd.read()
loaded = np.frombuffer(buf, dtype=np.uint8)
teX = loaded[16:].reshape((10000, 28 * 28)).astype(float)
with gzip.open(os.path.join(data_dir, 't10k-labels-idx1-ubyte.gz')) as fd:
buf = fd.read()
loaded = np.frombuffer(buf, dtype=np.uint8)
teY = loaded[8:].reshape((10000))
trX /= 255.
teX /= 255.
trX = trX[:ntrain]
trY = trY[:ntrain]
teX = teX[:ntest]
teY = teY[:ntest]
if onehot:
trY = one_hot(trY, 10)
teY = one_hot(teY, 10)
else:
trY = np.asarray(trY)
teY = np.asarray(teY)
return trX, teX, trY, teY