D2L学习笔记-线性神经网络

  1. 1. 线性回归的从零实现
  2. 2. 线性回归的简洁实现
  3. 3. Softmax回归的从零实现
  4. 4. Softmax回归的简洁实现

李沐动手学深度学习(PyTorch)课程学习笔记第二章:线性神经网络。

1. 线性回归的从零实现

为了简单起见,我们将根据带有噪声的线性模型构造一个人造数据集。我们的任务是使用这个有限样本的数据集来恢复这个模型的参数。

首先我们生成数据集:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import random
import torch
import matplotlib.pyplot as plt

# 生成数据集
def synthetic_data(w, b, num_examples):
"""生成y = Xw + b + 噪声"""
X = torch.normal(0, 1, (num_examples, len(w))) # 均值为0,方差为1的随机数,大小为(num_examples, len(w))
y = torch.matmul(X, w) + b # y = Xw + b
y += torch.normal(0, 0.01, y.shape) # 加入一个随机噪音
return X, y.reshape((-1, 1)) # y为列向量

true_w = torch.tensor([2, -3.4])
true_b = 4.2
features, labels = synthetic_data(true_w, true_b, 1000)

print('features:', features[0],'\nlabel:', labels[0])
# features: tensor([ 0.7764, -1.2998])
# label: tensor([10.1756])

# 通过生成第二个特征features[:, 1]和labels的散点图,可以直观观察到两者之间的线性关系
plt.plot(features[:, 1].detach().numpy(), labels.detach().numpy(), 'ro', ms=3)
plt.show()

训练模型时要对数据集进行遍历,每次抽取一小批量样本,并使用它们来更新我们的模型。由于这个过程是训练机器学习算法的基础,所以有必要定义一个函数,该函数能打乱数据集中的样本并以小批量方式获取数据。

在下面的代码中,我们定义一个 data_iter 函数,该函数接收批量大小、特征矩阵和标签向量作为输入,生成大小为 batch_size 的小批量。每个小批量包含一组特征和标签:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def data_iter(batch_size, features, labels):
num_examples = len(features)
indices = list(range(num_examples))
# 这些样本是随机读取的,没有特定的顺序,因此要打乱下标
random.shuffle(indices)
for i in range(0, num_examples, batch_size):
batch_indices = torch.tensor(indices[i:min(i + batch_size, num_examples)])
yield features[batch_indices], labels[batch_indices]

batch_size = 10

for X, y in data_iter(batch_size, features, labels):
print(X, '\n', y)
break

在我们开始用小批量随机梯度下降优化我们的模型参数之前,我们需要先有一些参数。在下面的代码中,我们通过从均值为0、标准差为0.01的正态分布中采样随机数来初始化权重,并将偏置初始化为0:

1
2
w = torch.normal(0, 0.01, size=(2, 1), requires_grad=True)
b = torch.zeros(1, requires_grad=True)

定义线性回归模型:

1
2
3
def linreg(X, w, b):
"""线性回归模型"""
return torch.matmul(X, w) + b

定义损失函数:

1
2
3
def squared_loss(y_hat, y):
"""均方损失"""
return (y_hat - y.reshape(y_hat.shape)) ** 2 / 2

定义优化算法:

1
2
3
4
5
6
def sgd(params, lr, batch_size):
"""小批量随机梯度下降"""
with torch.no_grad(): # 更新参数的时候不需要计算梯度
for param in params:
param -= lr * param.grad / batch_size
param.grad.zero_() # 将梯度清零

现在我们已经准备好了模型训练所有需要的要素,可以实现主要的训练过程部分了。理解这段代码至关重要,因为从事深度学习后,相同的训练过程几乎一遍又一遍地出现。在每次迭代中,我们读取一小批量训练样本,并通过我们的模型来获得一组预测。计算完损失后,我们开始反向传播,存储每个参数的梯度。最后,我们调用优化算法(随机梯度下降法SGD)来更新模型参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 超参数
lr, num_epochs = 0.03, 3
net = linreg
loss_function = squared_loss

for epoch in range(num_epochs):
for X, y in data_iter(batch_size, features, labels):
loss = loss_function(net(X, w, b), y) # X和y的小批量损失
# loss的形状是(batch_size, 1),而不是标量,将loss中的所有元素加到一起,并以此计算关于[w, b]的梯度
loss.sum().backward()
sgd([w, b], lr, batch_size) # 使用参数的梯度更新参数
with torch.no_grad():
train_loss = loss_function(net(features, w, b), labels)
print(f'epoch {epoch + 1}, loss {float(train_loss.mean()):f}')

2. 线性回归的简洁实现

首先生成数据集:

1
2
3
4
5
6
7
8
import numpy as np
import torch
from torch.utils import data
from d2l import torch as d2l

true_w = torch.tensor([2, -3.4])
true_b = 4.2
features, labels = d2l.synthetic_data(true_w, true_b, 1000)

读取数据集:

1
2
3
4
5
6
7
8
9
10
11
12
def load_array(data_arrays, batch_size, is_train=True):
"""构造一个PyTorch数据迭代器"""
dataset = data.TensorDataset(*data_arrays)
return data.DataLoader(dataset, batch_size, shuffle=is_train)

batch_size = 10
data_iter = load_array((features, labels), batch_size)

# 这里我们使用iter构造Python迭代器,并使用next从迭代器中获取第一项
feature, label = next(iter(data_iter))
print(feature)
print(label)

接下来我们定义模型,在 PyTorch 中,全连接层在 Linear 类中定义。值得注意的是,我们将两个参数传递到 nn.Linear 中。第一个指定输入特征形状,即2,第二个指定输出特征形状,输出特征形状为单个标量,因此为1:

1
2
3
4
5
6
7
from torch import nn

net = nn.Sequential(nn.Linear(2, 1))

# 初始化模型参数
net[0].weight.data.normal_(0, 0.01)
net[0].bias.data.fill_(0)

定义损失函数,计算均方误差使用的是 MSELoss 类,也称平方 L2 范数,默认情况下,它返回所有样本损失的平均值:

1
loss_function = nn.MSELoss()

定义优化算法:

1
optimizer = torch.optim.SGD(net.parameters(), lr=0.03)

训练过程代码与我们从零开始实现时所做的非常相似:

1
2
3
4
5
6
7
8
9
10
11
num_epochs = 3

for epoch in range(num_epochs):
for X, y in data_iter:
loss = loss_function(net(X), y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
with torch.no_grad():
train_loss = loss_function(net(features), labels)
print(f'epoch {epoch + 1}, loss {train_loss:f}')

3. Softmax回归的从零实现

首先读入 Fashion-MNIST 数据集,原始数据集中的每个样本都是28*28的图像。本节将展平每个图像,把它们看作长度为784的向量。在后面的章节中,我们将讨论能够利用图像空间结构的特征,但现在我们暂时只把每个像素位置看作一个特征。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import torch
import torchvision
from torch.utils import data
from torch.utils.tensorboard import SummaryWriter
from torchvision import transforms
from d2l import torch as d2l

def load_data_fashion_mnist(batch_size, resize=None):
"""下载Fashion-MNIST数据集,然后将其加载到内存中"""
trans = [transforms.ToTensor()]
if resize:
trans.insert(0, transforms.Resize(resize))
trans = transforms.Compose(trans)
mnist_train = torchvision.datasets.FashionMNIST(root="../data", train=True, transform=trans, download=True)
mnist_test = torchvision.datasets.FashionMNIST(root="../data", train=False, transform=trans, download=True)
return (data.DataLoader(mnist_train, batch_size, shuffle=True, num_workers=0),
data.DataLoader(mnist_test, batch_size, shuffle=False, num_workers=0))

batch_size = 256

train_iter, test_iter = load_data_fashion_mnist(batch_size)

初始化模型参数,在 Softmax 回归中,我们的输出与类别一样多。因为我们的数据集有10个类别,所以网络输出维度为10。因此,权重将构成一个784*10的矩阵,偏置将构成一个1*10的行向量:

1
2
3
4
5
num_inputs = 784
num_outputs = 10

W = torch.normal(0, 0.01, size=(num_inputs, num_outputs), requires_grad=True)
b = torch.zeros(num_outputs, requires_grad=True)

定义 Softmax 操作,注意,虽然这在数学上看起来是正确的,但我们在代码实现中有点草率。矩阵中的非常大或非常小的元素可能造成数值上溢或下溢,但我们没有采取措施来防止这点:

1
2
3
4
def softmax(X):
X_exp = torch.exp(X)
partition = X_exp.sum(1, keepdim=True)
return X_exp / partition # 这里应用了广播机制

定义 Softmax 操作后,我们可以实现 Softmax 回归模型。下面的代码定义了输入如何通过网络映射到输出。注意,将数据传递到模型之前,我们使用 reshape 函数将每张原始图像展平为向量:

1
2
def net(X):
return softmax(torch.matmul(X.reshape((-1, W.shape[0])), W) + b)

接下来我们定义损失函数,交叉熵采用真实标签的预测概率的负对数似然。这里我们不使用 Python 的 for 循环迭代预测(这往往是低效的),而是通过一个运算符选择所有元素。下面我们创建一个数据样本 y_hat,其中包含2个样本在3个类别的预测概率,以及它们对应的标签 y。有了 y,我们知道在第一个样本中,第一类是正确的预测;而在第二个样本中,第三类是正确的预测。然后使用 y 作为 y_hat 中概率的索引,我们选择第一个样本中第一个类的概率和第二个样本中第三个类的概率:

1
2
3
4
5
6
7
8
y = torch.tensor([0, 2])
y_hat = torch.tensor([[0.1, 0.3, 0.6], [0.3, 0.2, 0.5]])
print(y_hat[[0, 1], y]) # tensor([0.1000, 0.5000])

def cross_entropy(y_hat, y):
return -torch.log(y_hat[range(len(y_hat)), y])

print(cross_entropy(y_hat, y)) # tensor([2.3026, 0.6931])

为了计算精度,我们执行以下操作。首先,如果 y_hat 是矩阵,那么假定第二个维度存储每个类的预测分数。我们使用 argmax 获得每行中最大元素的索引来获得预测类别。然后我们将预测类别与真实 y 元素进行比较。由于等式运算符 == 对数据类型很敏感,因此我们将 y_hat 的数据类型转换为与 y 的数据类型一致。结果是一个包含0(错)和1(对)的张量。最后,我们求和会得到正确预测的数量:

1
2
3
4
5
6
7
8
def accuracy(y_hat, y):
"""计算预测正确的数量"""
if len(y_hat.shape) > 1 and y_hat.shape[1] > 1:
y_hat = y_hat.argmax(axis=1)
cmp = y_hat.type(y.dtype) == y # tensor([False, True])
return float(cmp.type(y.dtype).sum())

print(accuracy(y_hat, y) / len(y)) # 0.5

同样,对于任意数据迭代器 data_iter 可访问的数据集,我们可以评估在任意模型 net 的精度,这里定义一个实用程序类 Accumulator,用于对多个变量进行累加。在 evaluate_accuracy 函数中,我们在 Accumulator 实例中创建了2个变量,分别用于存储正确预测的数量和预测的总数量。当我们遍历数据集时,两者都将随着时间的推移而累加:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def evaluate_accuracy(net, data_iter):
"""计算在指定数据集上模型的精度"""
if isinstance(net, torch.nn.Module): # 如果是用torch.nn实现的模型
net.eval() # 将模型先设置为评估模式
metric = Accumulator(2) # 正确预测数、预测总数
with torch.no_grad():
for X, y in data_iter:
metric.add(accuracy(net(X), y), y.numel())
return metric[0] / metric[1]

class Accumulator:
"""在n个变量上累加"""
def __init__(self, n):
self.data = [0.0] * n

def add(self, *args):
self.data = [a + float(b) for a, b in zip(self.data, args)]

def reset(self):
self.data = [0.0] * len(self.data)

def __getitem__(self, idx):
return self.data[idx]

接下来可以开始进行训练:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def train_epoch_ch3(net, train_iter, loss_function, updater):
"""训练模型一个迭代周期(定义见第3章)"""
# 如果net是用torch.nn实现的话先将模型设置为训练模式
if isinstance(net, torch.nn.Module):
net.train()
# 训练损失总和、训练准确度总和、样本数
metric = Accumulator(3)
for X, y in train_iter:
# 计算梯度并更新参数
y_hat = net(X)
loss = loss_function(y_hat, y)
if isinstance(updater, torch.optim.Optimizer):
# 使用PyTorch内置的优化器和损失函数
updater.zero_grad()
loss.mean().backward()
updater.step()
else:
# 使用定制的优化器和损失函数
loss.sum().backward()
updater(X.shape[0])
metric.add(float(loss.sum()), accuracy(y_hat, y), y.numel())
# 返回训练损失和训练精度
return metric[0] / metric[2], metric[1] / metric[2]

def train_ch3(net, train_iter, test_iter, loss_function, num_epochs, updater):
"""训练模型"""
writer = SummaryWriter('../logs/FashionMNIST_train_log')
for epoch in range(num_epochs):
train_metrics = train_epoch_ch3(net, train_iter, loss_function, updater)
test_acc = evaluate_accuracy(net, test_iter)
writer.add_scalar('train_loss', train_metrics[0], epoch + 1)
writer.add_scalar('train_acc', train_metrics[1], epoch + 1)
writer.add_scalar('test_acc', test_acc, epoch + 1)
train_loss, train_acc = train_metrics
writer.close()
assert train_loss < 0.5, train_loss
assert train_acc <= 1 and train_acc > 0.7, train_acc
assert test_acc <= 1 and test_acc > 0.7, test_acc

lr = 0.1

def updater(batch_size):
return d2l.sgd([W, b], lr, batch_size)

num_epochs = 10

train_ch3(net, train_iter, test_iter, cross_entropy, num_epochs, updater)

可以在项目路径下打开 Anaconda 的 PyTorch 环境,然后使用 TensorBoard 查看训练曲线:

1
tensorboard --logdir logs\FashionMNIST_train_log

4. Softmax回归的简洁实现

首先读取数据集:

1
2
3
4
5
6
7
8
9
10
11
12
import torch
import torchvision
from torch import nn
from torch.utils import data
from torchvision import transforms
from torch.utils.tensorboard import SummaryWriter
from tqdm import tqdm

mnist_train = torchvision.datasets.FashionMNIST(root="../data", train=True, transform=transforms.ToTensor(), download=True)
mnist_test = torchvision.datasets.FashionMNIST(root="../data", train=False, transform=transforms.ToTensor(), download=True)
train_iter = data.DataLoader(mnist_train, batch_size=256, shuffle=True, num_workers=0)
test_iter = data.DataLoader(mnist_test, batch_size=256, shuffle=False, num_workers=0)

定义模型,我们只需在 Sequential 中添加一个带有10个输出的全连接层,这10个输出分别表示对10个类别的预测概率:

1
2
# PyTorch不会隐式地调整输入的形状。因此,我们在线性层前定义了展平层(Flatten),来调整网络输入的形状
net = nn.Sequential(nn.Flatten(), nn.Linear(784, 10))

定义训练函数,由于之后很多模型的训练过程也是相似的,因此该函数可以复用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# 以后较为通用的函数将定义到util.functions.py中
def train_classifier(net, train_iter, test_iter, num_epochs, lr, device, writer_path=None, save_path=None):
def init_weights(m):
if type(m) == nn.Linear or type(m) == nn.Conv2d:
# nn.init.normal_(m.weight, mean=0, std=0.01) # 以均值0和标准差0.01随机初始化权重
nn.init.xavier_uniform_(m.weight) # Xavier初始化

net.apply(init_weights)

print(f'---------- Training on {device} ----------')
net.to(device)
loss_function = nn.CrossEntropyLoss() # reduction默认为'mean'
loss_function.to(device)
optimizer = torch.optim.SGD(net.parameters(), lr=lr)
if writer_path is not None:
writer = SummaryWriter(writer_path)

best_acc = 0.0
for epoch in range(num_epochs):
net.train()
train_loss, train_acc = [], []
for x, y in tqdm(train_iter):
x, y = x.to(device), y.to(device)
y_hat = net(x)

loss = loss_function(y_hat, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()

y_hat = y_hat.argmax(axis=1)
acc = (y_hat.type(y.dtype) == y).float().mean()
train_loss.append(loss.item())
train_acc.append(acc)

train_loss = sum(train_loss) / len(train_loss)
train_acc = sum(train_acc) / len(train_acc)
print(f'[ Train | epoch: {epoch + 1:03d}/{num_epochs:03d} ] loss = {train_loss:.5f}, acc = {train_acc:.5f}')

net.eval()
valid_loss, valid_acc = [], []
with torch.no_grad():
for x, y in tqdm(test_iter):
x, y = x.to(device), y.to(device)
y_hat = net(x)
loss = loss_function(y_hat, y)
y_hat = y_hat.argmax(axis=1)
acc = (y_hat.type(y.dtype) == y).float().mean()
valid_loss.append(loss.item())
valid_acc.append(acc)

valid_loss = sum(valid_loss) / len(valid_loss)
valid_acc = sum(valid_acc) / len(valid_acc)
print(f"[ Valid | epoch: {epoch + 1:03d}/{num_epochs:03d} ] loss = {valid_loss:.5f}, acc = {valid_acc:.5f}")

if writer_path is not None:
writer.add_scalars('loss', {'train': train_loss,
'valid': valid_loss}, epoch + 1)
writer.add_scalars('acc', {'train': train_acc,
'valid': valid_acc}, epoch + 1)

if save_path is not None and valid_acc > best_acc:
best_acc = valid_acc
torch.save(net.state_dict(), save_path)
print('Saving model with acc {:.3f}'.format(best_acc))

if writer_path is not None:
writer.close()

最后设定超参数训练模型:

1
2
3
4
5
6
7
writer_path = '../logs/FashionMNIST_train_log'
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
lr, num_epochs = 0.01, 10

train_classifier(net, train_iter, test_iter, num_epochs, lr, device, writer_path)
# [ Train | epoch: 010/010 ] loss = 0.60980, acc = 0.80254
# [ Valid | epoch: 010/010 ] loss = 0.62191, acc = 0.79395

下一章:多层感知机