D2L学习笔记-循环神经网络

  1. 1. 序列模型
  2. 2. 文本预处理
  3. 3. 语言模型和数据集
  4. 4. 循环神经网络
    1. 4.1 循环神经网络的从零开始实现
    2. 4.2 循环神经网络的简洁实现

李沐动手学深度学习(PyTorch)课程学习笔记第八章:循环神经网络。

1. 序列模型

由于涉及较多数学公式,序列模型的讲解可以转至:序列模型

首先,我们生成一些数据:使用正弦函数和一些可加性噪声来生成序列数据,时间步为 1, 2, ..., 1000

1
2
3
4
5
6
7
8
9
10
11
import torch
import matplotlib.pyplot as plt
from torch import nn
from d2l import torch as d2l
from tqdm import tqdm

T = 1000 # 总共产生1000个点
time = torch.arange(1, T + 1, dtype=torch.float32)
x = torch.sin(0.01 * time) + torch.normal(0, 0.2, (T,)) # 0~10大概为一个半周期(3*PI),并加入噪声
d2l.plot(time, [x], 'time', 'x', xlim=[1, 1000], figsize=(6, 4))
plt.show()

接下来,我们将这个序列转换为模型的特征-标签(feature-label)对。基于嵌入维度 𝜏,我们将数据映射为数据对 𝑦_𝑡 = 𝑥_𝑡𝐱_𝑡 = [𝑥_{𝑡 - 𝜏}, ...,𝑥_{𝑡 - 1}],这比我们提供的数据样本少了 𝜏 个,因为我们没有足够的历史记录来描述前 𝜏 个数据样本。一个简单的解决办法是:如果拥有足够长的序列就丢弃这几项;另一个方法是用零填充序列。在这里,我们仅使用前600个特征-标签对进行训练:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
tau = 4
features = torch.zeros((T - tau, tau)) # 一共996个样本,每个样本的特征长度为4
for i in range(tau):
features[:, i] = x[i:T - tau + i] # 按列填充
labels = x[tau:].reshape((-1, 1)) # labels的元素在x中的下标为[4, 5, 6, ...],即0~3预测4,1~4预测5
# features的元素在x中的下标:
# [0, 1, 2, 3]
# [1, 2, 3, 4]
# ...
# [T - tau, T - tau + 1, T - tau + 2, T - tau + 3]

batch_size, n_train = 16, 600
# 只有前n_train个样本用于训练
train_iter = d2l.load_array((features[:n_train], labels[:n_train]), batch_size, is_train=True)

在这里,我们使用一个相当简单的架构训练模型:一个拥有两个全连接层的多层感知机,ReLU 激活函数和平方损失:

1
2
3
4
5
6
7
8
def init_weights(m):
if type(m) == nn.Linear:
nn.init.xavier_uniform_(m.weight)

net = nn.Sequential(nn.Linear(4, 10), nn.ReLU(), nn.Linear(10, 1))
net.apply(init_weights)

loss_function = nn.MSELoss(reduction='none') # 注意:MSELoss计算平方误差时不带系数1/2

现在准备训练模型,实现下面的训练代码的方式与前面几章中的循环训练基本相同。因此,我们不会深入探讨太多细节:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def train(net, train_iter, loss_function, num_epochs, lr, device):
print('training on', device)
net.to(device)
loss_function.to(device)
optimizer = torch.optim.Adam(net.parameters(), lr=lr)

for epoch in range(num_epochs):
net.train()
train_loss = []
for X, y in tqdm(train_iter):
X, y = X.to(device), y.to(device)
optimizer.zero_grad()
loss = loss_function(net(X), y)
loss.mean().backward()
optimizer.step()

train_loss.append(loss.mean())
train_loss = sum(train_loss) / len(train_loss)
print(f"[ Train | {epoch + 1:03d}/{num_epochs:03d} ] loss = {train_loss:.5f}")

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
lr, num_epochs = 0.01, 5

train(net, train_iter, loss_function, 5, 0.01, device)

由于训练损失很小,因此我们期望模型能有很好的工作效果。让我们看看这在实践中意味着什么。首先是检查模型预测下一个时间步的能力,也就是单步预测(one-step-ahead prediction):

1
2
3
4
onestep_preds = net(features.to(device))
d2l.plot([time, time[tau:]], [x.detach().numpy(), onestep_preds.cpu().detach().numpy()],
'time', 'x', legend=['data', '1-step preds'], xlim=[1, 1000], figsize=(6, 4))
plt.show()

正如我们所料,单步预测效果不错。即使这些预测的时间步超过了604(n_train + tau),其结果看起来仍然是可信的。然而有一个小问题:如果数据观察序列的时间步只到604,我们需要一步一步地向前迈进,换句话说,我们必须使用我们自己的预测(而不是原始数据)来进行多步预测。让我们看看效果如何:

1
2
3
4
5
6
7
8
9
10
multistep_preds = torch.zeros(T)
multistep_preds[:n_train + tau] = x[:n_train + tau]
for i in range(n_train + tau, T):
multistep_preds[i] = net(multistep_preds[i - tau:i].reshape((1, -1)).to(device))

d2l.plot([time, time[tau:], time[n_train + tau:]],
[x.detach().numpy(), onestep_preds.cpu().detach().numpy(),
multistep_preds[n_train + tau:].cpu().detach().numpy()], 'time', 'x',
legend=['data', '1-step preds', 'multi-step preds'], xlim=[1, 1000], figsize=(6, 4))
plt.show()

如上面的例子所示,绿线的预测显然并不理想。经过几个预测步骤之后,预测的结果很快就会衰减到一个常数。为什么这个算法效果这么差呢?事实是由于误差的累积:假设在步骤1之后,我们积累了一些误差,于是步骤2的输入被扰动了,后面的预测误差依此类推。因此误差可能会相当快地偏离真实的观测结果。例如,未来24小时的天气预报往往相当准确,但超过这一点,精度就会迅速下降。我们将在本章及后续章节中讨论如何改进这一点。

基于 k = 1, 4, 16, 64,通过对整个序列预测的计算,让我们更仔细地看一下 k 步预测的困难:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
max_steps = 64
features = torch.zeros((T - tau - max_steps + 1, tau + max_steps))
# 列i(i<tau)是来自x的观测,其时间步从(i)到(i+T-tau-max_steps+1)
for i in range(tau):
features[:, i] = x[i:i + T - tau - max_steps + 1]

# 列i(i>=tau)是来自(i-tau+1)步的预测,其时间步从(i)到(i+T-tau-max_steps+1)
for i in range(tau, tau + max_steps):
features[:, i] = net(features[:, i - tau:i].to(device)).reshape(-1)

steps = (1, 4, 16, 64)
d2l.plot([time[tau + i - 1:T - max_steps + i] for i in steps],
[features[:, tau + i - 1].cpu().detach().numpy() for i in steps], 'time', 'x',
legend=[f'{i}-step preds' for i in steps], xlim=[5, 1000], figsize=(6, 4))
plt.show()

2. 文本预处理

对于序列数据处理问题,我们在上一节中评估了所需的统计工具和预测时面临的挑战。这样的数据存在许多种形式,文本是最常见例子之一。例如,一篇文章可以被简单地看作一串单词序列,甚至是一串字符序列。本节中,我们将解析文本的常见预处理步骤。这些步骤通常包括:

  • 将文本作为字符串加载到内存中。
  • 将字符串拆分为词元(如单词和字符)。
  • 建立一个词表,将拆分的词元映射到数字索引。
  • 将文本转换为数字索引序列,方便模型操作。

首先,我们从 H.G.Well 的时光机器中加载文本。这是一个相当小的语料库,只有30000多个单词,但足够我们小试牛刀,而现实中的文档集合可能会包含数十亿个单词。下面的函数将数据集读取到由多条文本行组成的列表中,其中每条文本行都是一个字符串。为简单起见,我们在这里忽略了标点符号和字母大写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import collections
import re
from d2l import torch as d2l

d2l.DATA_HUB['time_machine'] = (d2l.DATA_URL + 'timemachine.txt', '090b5e7e70c295757f55df93cb0a180b9691891a')
d2l.download('time_machine') # 默认路径在../data

def read_time_machine():
"""将时间机器数据集加载到文本行的列表中,同时将非大小写字母外的所有字符替换为空格"""
with open('../data/timemachine.txt', 'r') as f:
lines = f.readlines()
return [re.sub('[^A-Za-z]+', ' ', line).strip().lower() for line in lines]

lines = read_time_machine()
print(f'文本总行数: {len(lines)}') # 文本总行数: 3221
print(lines[0]) # the time machine by h g wells
print(lines[10]) # twinkled and his usually pale face was flushed and animated the

下面的 tokenize 函数将文本行列表(lines)作为输入,列表中的每个元素是一个文本序列(如一条文本行)。每个文本序列又被拆分成一个词元列表,词元(token)是文本的基本单位。最后,返回一个由词元列表组成的列表,其中的每个词元都是一个字符串(string):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def tokenize(lines, token='word'):
"""将文本行拆分为单词或字符词元"""
if token == 'word':
return [line.split() for line in lines]
elif token == 'char':
return [list(line) for line in lines] # list(str)能将字符串中的每个字符分隔开形成list
else:
print('错误!未知词元类型:' + token)

tokens = tokenize(lines)
# tokens = tokenize(lines, token='char')
for i in range(10):
print(tokens[i])
# ['the', 'time', 'machine', 'by', 'h', 'g', 'wells']
# []
# []
# []
# []
# ['i']
# []
# []
# ['the', 'time', 'traveller', 'for', 'so', 'it', 'will', 'be', 'convenient', 'to', 'speak', 'of', 'him']
# ['was', 'expounding', 'a', 'recondite', 'matter', 'to', 'us', 'his', 'grey', 'eyes', 'shone', 'and']

词元的类型是字符串,而模型需要的输入是数字,因此这种类型不方便模型使用。现在,让我们构建一个字典,通常也叫做词表(vocabulary),用来将字符串类型的词元映射到从0开始的数字索引中。我们先将训练集中的所有文档合并在一起,对它们的唯一词元进行统计,得到的统计结果称之为语料(corpus)。然后根据每个唯一词元的出现频率,为其分配一个数字索引。很少出现的词元通常被移除,这可以降低复杂性。另外,语料库中不存在或已删除的任何词元都将映射到一个特定的未知词元 <unk>。我们可以选择增加一个列表,用于保存那些被保留的词元,例如:填充词元(<pad>)、序列开始词元(<bos>)、序列结束词元(<eos>):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class Vocab:
"""文本词表"""
def __init__(self, tokens=None, min_freq=0, reserved_tokens=None):
if tokens is None:
tokens = []
if reserved_tokens is None:
reserved_tokens = []
# 按出现频率从大到小排序
counter = count_corpus(tokens)
self._token_freqs = sorted(counter.items(), key=lambda x: x[1], reverse=True)
# 构建索引到词元与词元到索引的映射,未知词元的索引为0
self.idx_to_token = ['<unk>'] + reserved_tokens
self.token_to_idx = {token: idx for idx, token in enumerate(self.idx_to_token)}
for token, freq in self._token_freqs:
if freq < min_freq: # 如果token出现的次数少于min_freq次则直接丢弃
break
if token not in self.token_to_idx:
self.idx_to_token.append(token)
self.token_to_idx[token] = len(self.idx_to_token) - 1

def __len__(self):
return len(self.idx_to_token)

def __getitem__(self, tokens):
if not isinstance(tokens, (list, tuple)):
return self.token_to_idx.get(tokens, self.unk) # tokens不存在则返回0
return [self.__getitem__(token) for token in tokens]

def to_tokens(self, indices):
if not isinstance(indices, (list, tuple)):
return self.idx_to_token[indices]
return [self.idx_to_token[index] for index in indices]

@property
def unk(self): # 未知词元的索引为0
return 0

@property
def token_freqs(self):
return self._token_freqs

def count_corpus(tokens):
"""统计词元的频率"""
# 这里的tokens是1D列表或2D列表
if len(tokens) == 0 or isinstance(tokens[0], list):
tokens = [token for line in tokens for token in line] # 将词元列表展平成一个列表
return collections.Counter(tokens)

我们首先使用时光机器数据集作为语料库来构建词表,然后打印前几个高频词元及其索引:

1
2
3
vocab = Vocab(tokens)
print(list(vocab.token_to_idx.items())[:10]) # 注意不加item()的话只会将key转成list
# [('<unk>', 0), ('the', 1), ('i', 2), ('and', 3), ('of', 4), ('a', 5), ('to', 6), ('was', 7), ('in', 8), ('that', 9)]

现在,我们可以将每一条文本行转换成一个数字索引列表:

1
2
print('文本:', tokens[0])  # 文本: ['the', 'time', 'machine', 'by', 'h', 'g', 'wells']
print('索引:', vocab[tokens[0]]) # 索引: [1, 19, 50, 40, 2183, 2184, 400]

在使用上述函数时,我们将所有功能打包到 load_corpus_time_machine 函数中,该函数返回 corpus(词元索引列表)和 vocab(时光机器语料库的词表)。我们在这里所做的改变是:

  • 为了简化后面章节中的训练,我们使用字符(而不是单词)实现文本词元化;
  • 时光机器数据集中的每个文本行不一定是一个句子或一个段落,还可能是一个单词,因此返回的 corpus 仅处理为单个列表,而不是使用多词元列表构成的一个列表。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def load_corpus_time_machine(max_tokens=-1):
"""返回时光机器数据集的词元索引列表和词表"""
lines = read_time_machine()
tokens = tokenize(lines, 'char')
vocab = Vocab(tokens)
# 因为时光机器数据集中的每个文本行不一定是一个句子或一个段落,所以将所有文本行展平到一个列表中
corpus = [vocab[token] for line in tokens for token in line]
if max_tokens > 0:
corpus = corpus[:max_tokens]
return corpus, vocab

corpus, vocab = load_corpus_time_machine()
print(len(corpus), len(vocab)) # 170580 28
print('索引:', corpus[:10]) # 索引: [3, 9, 2, 1, 3, 5, 13, 2, 1, 13]
print('文本:', vocab.to_tokens(corpus[:10])) # 文本: ['t', 'h', 'e', ' ', 't', 'i', 'm', 'e', ' ', 'm']

3. 语言模型和数据集

由于涉及较多数学公式,语言模型的讲解可以转至:语言模型和数据集

根据上一节中介绍的时光机器数据集构建词表,并打印前10个最常用的(频率最高的)单词:

1
2
3
4
5
6
7
8
9
10
11
import random
import torch
import matplotlib.pyplot as plt
from d2l import torch as d2l

tokens = d2l.tokenize(d2l.read_time_machine())
# 因为每个文本行不一定是一个句子或一个段落,因此我们把所有文本行拼接到一起
corpus = [token for line in tokens for token in line]
print(corpus[:10]) # ['the', 'time', 'machine', 'by', 'h', 'g', 'wells', 'i', 'the', 'time']
vocab = d2l.Vocab(corpus)
print(vocab.token_freqs[:10]) # [('the', 2261), ('i', 1267), ('and', 1245), ('of', 1155), ...]

正如我们所看到的,最流行的词看起来很无聊,这些词通常被称为停用词(stop words),因此可以被过滤掉。尽管如此,它们本身仍然是有意义的,我们仍然会在模型中使用它们。此外,还有个明显的问题是词频衰减的速度相当地快。例如,最常用单词的词频对比,第10个还不到第1个的1/5。为了更好地理解,我们可以画出的词频图:

1
2
3
freqs = [freq for token, freq in vocab.token_freqs]
d2l.plot(freqs, xlabel='token: x', ylabel='frequency: n(x)', xscale='log', yscale='log')
plt.show()

通过词频图我们可以发现:词频以一种明确的方式迅速衰减。将前几个单词作为例外消除后,剩余的所有单词大致遵循双对数坐标图上的一条直线。这意味着单词的频率满足齐普夫定律(Zipf’s law)。这告诉我们想要通过计数统计和平滑来建模单词是不可行的,因为这样建模的结果会大大高估尾部单词的频率,也就是所谓的不常用单词。那么其他的词元组合,比如二元语法、三元语法等等,又会如何呢?我们来看看二元语法的频率是否与一元语法的频率表现出相同的行为方式:

1
2
3
bigram_tokens = [pair for pair in zip(corpus[:-1], corpus[1:])]  # 遍历所有连续的两个词元
bigram_vocab = d2l.Vocab(bigram_tokens)
print(bigram_vocab.token_freqs[:10]) # [(('of', 'the'), 309), (('in', 'the'), 169), ...]

这里值得注意:在十个最频繁的词对中,有九个是由两个停用词组成的,只有一个与 the time 有关。我们再进一步看看三元语法的频率是否表现出相同的行为方式:

1
2
3
trigram_tokens = [triple for triple in zip(corpus[:-2], corpus[1:-1], corpus[2:])]  # 遍历所有连续的三个词元
trigram_vocab = d2l.Vocab(trigram_tokens)
print(trigram_vocab.token_freqs[:10]) # [(('the', 'time', 'traveller'), 59), (('the', 'time', 'machine'), 30), ...]

最后,我们直观地对比三种模型中的词元频率:一元语法、二元语法和三元语法:

1
2
3
4
5
bigram_freqs = [freq for token, freq in bigram_vocab.token_freqs]
trigram_freqs = [freq for token, freq in trigram_vocab.token_freqs]
d2l.plot([freqs, bigram_freqs, trigram_freqs], xlabel='token: x', ylabel='frequency: n(x)',
xscale='log', yscale='log', legend=['unigram', 'bigram', 'trigram'])
plt.show()

由于序列数据本质上是连续的,因此我们在处理数据时需要解决这个问题。在第一节中我们以一种相当特别的方式做到了这一点:当序列变得太长而不能被模型一次性全部处理时,我们可能希望拆分这样的序列方便模型读取。

在介绍该模型之前,我们看一下总体策略。假设我们将使用神经网络来训练语言模型,模型中的网络一次处理具有预定义长度(例如 𝑛 个时间步)的一个小批量序列。现在的问题是如何随机生成一个小批量数据的特征和标签以供读取。

首先,由于文本序列可以是任意长的,例如整本《时光机器》(The Time Machine),于是任意长的序列可以被我们划分为具有相同时间步数的子序列。当训练我们的神经网络时,这样的小批量子序列将被输入到模型中。假设网络一次只处理具有 𝑛 个时间步的子序列,那么可以从指定的起始位置开始截取连续的长度为 𝑛 的子序列,因为我们可以选择任意偏移量来指示初始位置,所以我们有相当大的自由度。

如果我们只选择一个偏移量,那么用于训练网络的、所有可能的子序列的覆盖范围将是有限的。因此,我们可以从随机偏移量开始划分序列,以同时获得覆盖性(coverage)和随机性(randomness)。下面,我们将描述如何实现随机采样(random sampling)和顺序分区(sequential partitioning)策略。

在随机采样中,每个样本都是在原始的长序列上任意捕获的子序列。在迭代过程中,来自两个相邻的、随机的、小批量中的子序列不一定在原始序列上相邻。对于语言建模,目标是基于到目前为止我们看到的词元来预测下一个词元,因此标签是移位了一个词元的原始序列。

下面的代码每次可以从数据中随机生成一个小批量。在这里,参数 batch_size 指定了每个小批量中子序列样本的数目,参数 num_steps 是每个子序列中预定义的时间步数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def seq_data_iter_random(corpus, batch_size, num_steps):
"""使用随机抽样生成一个小批量子序列"""
# 从随机偏移量开始对序列进行分区,随机范围包括num_steps-1
corpus = corpus[random.randint(0, num_steps - 1):] # 截取随机起始位置之后的部分
# 减去1,是因为我们需要考虑标签,要留至少一个数据作为最后一组预测的标签
num_subseqs = (len(corpus) - 1) // num_steps # 分区数
# 长度为num_steps的子序列的起始索引
initial_indices = list(range(0, num_subseqs * num_steps, num_steps))
# 在随机抽样的迭代过程中,来自两个相邻的、随机的、小批量中的子序列不一定在原始序列上相邻
random.shuffle(initial_indices)

def data(pos):
# 返回从pos位置开始的长度为num_steps的序列
return corpus[pos:pos + num_steps]

num_batches = num_subseqs // batch_size
for i in range(0, batch_size * num_batches, batch_size):
# 在这里,initial_indices包含子序列的随机起始索引
initial_indices_per_batch = initial_indices[i:i + batch_size]
X = [data(j) for j in initial_indices_per_batch]
Y = [data(j + 1) for j in initial_indices_per_batch]
yield torch.tensor(X), torch.tensor(Y)

下面我们生成一个从0到34的序列。假设批量大小为2,时间步数为5,这意味着可以生成6个特征-标签子序列对。如果设置小批量大小为2,我们只能得到3个小批量:

1
2
3
4
5
6
7
8
9
my_seq = list(range(35))
for X, Y in seq_data_iter_random(my_seq, batch_size=2, num_steps=5):
print('X:', X, '\nY:', Y)
# X: tensor([[ 7, 8, 9, 10, 11], [17, 18, 19, 20, 21]])
# Y: tensor([[ 8, 9, 10, 11, 12], [18, 19, 20, 21, 22]])
# X: tensor([[22, 23, 24, 25, 26], [27, 28, 29, 30, 31]])
# Y: tensor([[23, 24, 25, 26, 27], [28, 29, 30, 31, 32]])
# X: tensor([[ 2, 3, 4, 5, 6], [12, 13, 14, 15, 16]])
# Y: tensor([[ 3, 4, 5, 6, 7], [13, 14, 15, 16, 17]])

在迭代过程中,除了对原始序列可以随机抽样外,我们还可以保证两个相邻的小批量中的子序列在原始序列上也是相邻的。这种策略在基于小批量的迭代过程中保留了拆分的子序列的顺序,因此称为顺序分区:

1
2
3
4
5
6
7
8
9
10
11
12
13
def seq_data_iter_sequential(corpus, batch_size, num_steps):
"""使用顺序分区生成一个小批量子序列"""
# 从随机偏移量开始划分序列
offset = random.randint(0, num_steps)
num_tokens = ((len(corpus) - offset - 1) // batch_size) * batch_size
Xs = torch.tensor(corpus[offset:offset + num_tokens])
Ys = torch.tensor(corpus[offset + 1:offset + 1 + num_tokens])
Xs, Ys = Xs.reshape(batch_size, -1), Ys.reshape(batch_size, -1)
num_batches = Xs.shape[1] // num_steps # batch的数量
for i in range(0, num_steps * num_batches, num_steps):
X = Xs[:, i:i + num_steps]
Y = Ys[:, i:i + num_steps]
yield X, Y

基于相同的设置,通过顺序分区读取每个小批量的子序列的特征 X 和标签 Y。通过将它们打印出来可以发现:迭代期间来自两个相邻的小批量中的子序列在原始序列中确实是相邻的:

1
2
3
4
5
6
7
8
for X, Y in seq_data_iter_sequential(my_seq, batch_size=2, num_steps=5):
print('X:', X, '\nY:', Y)
# X: tensor([[ 2, 3, 4, 5, 6], [18, 19, 20, 21, 22]])
# Y: tensor([[ 3, 4, 5, 6, 7], [19, 20, 21, 22, 23]])
# X: tensor([[ 7, 8, 9, 10, 11], [23, 24, 25, 26, 27]])
# Y: tensor([[ 8, 9, 10, 11, 12], [24, 25, 26, 27, 28]])
# X: tensor([[12, 13, 14, 15, 16], [28, 29, 30, 31, 32]])
# Y: tensor([[13, 14, 15, 16, 17], [29, 30, 31, 32, 33]])

现在,我们将上面的两个采样函数包装到一个类中,以便稍后可以将其用作数据迭代器:

1
2
3
4
5
6
7
8
9
10
11
12
class SeqDataLoader:
"""加载序列数据的迭代器"""
def __init__(self, batch_size, num_steps, use_random_iter, max_tokens):
if use_random_iter:
self.data_iter_fn = d2l.seq_data_iter_random
else:
self.data_iter_fn = d2l.seq_data_iter_sequential
self.corpus, self.vocab = d2l.load_corpus_time_machine(max_tokens)
self.batch_size, self.num_steps = batch_size, num_steps

def __iter__(self):
return self.data_iter_fn(self.corpus, self.batch_size, self.num_steps)

最后,我们定义了一个函数 load_data_time_machine,它同时返回数据迭代器和词表,因此可以与其他带有 load_data 前缀的函数(如 d2l.load_data_fashion_mnist)类似地使用:

1
2
3
4
def load_data_time_machine(batch_size, num_steps, use_random_iter=False, max_tokens=10000):
"""返回时光机器数据集的迭代器和词表"""
data_iter = SeqDataLoader(batch_size, num_steps, use_random_iter, max_tokens)
return data_iter, data_iter.vocab

4. 循环神经网络

由于涉及较多数学公式,循环神经网络的理论部分可以转至:循环神经网络

4.1 循环神经网络的从零开始实现

本节将从头开始基于循环神经网络实现字符级语言模型。这样的模型将在 H.G.Wells 的时光机器数据集上训练。和前面上一节中介绍过的一样,我们先读取数据集:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import math
import torch
from torch.utils.tensorboard import SummaryWriter
from torch import nn
from torch.nn import functional as F
from d2l import torch as d2l
from tqdm import tqdm

batch_size, num_steps = 32, 35
train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps, max_tokens=10000)
print(len(train_iter.corpus)) # 10000
for X, y in train_iter:
print(X.shape, y.shape) # torch.Size([32, 35]) torch.Size([32, 35])
break

回想一下,在 train_iter 中,每个词元都表示为一个数字索引,将这些索引直接输入神经网络可能会使学习变得困难。我们通常将每个词元表示为更具表现力的特征向量。最简单的表示称为独热编码(one-hot encoding)。

简言之,将每个索引映射为相互不同的单位向量:假设词表中不同词元的数目为N(即 len(vocab)),词元索引的范围为0~N-1。如果词元的索引是整数 i,那么我们将创建一个长度为N的全0向量,并将第 i 处的元素设置为1。此向量是原始词元的一个独热向量。索引为0和2的独热向量如下所示:

1
2
3
print(F.one_hot(torch.tensor([0, 2]), len(vocab)))
# tensor([[1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
# [0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]])

我们每次采样的小批量数据形状是二维张量:(批量大小, 时间步数)one_hot 函数将这样一个小批量数据转换成三维张量,张量的最后一个维度等于词表大小(len(vocab))。我们经常转换输入的维度,以便获得形状为 (时间步数, 批量大小, 词表大小) 的输出。这将使我们能够更方便地通过最外层的维度,一步一步地更新小批量数据的隐状态:

1
2
X = torch.arange(10).reshape((2, 5))
print(F.one_hot(X.T, 28).shape) # torch.Size([5, 2, 28])

接下来,我们初始化循环神经网络模型的模型参数。隐藏单元数 num_hiddens 是一个可调的超参数。当训练语言模型时,输入和输出来自相同的词表(输出可以看成多分类问题,即输出表示对每个词元的预测概率)。因此,它们具有相同的维度,即词表的大小:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def get_params(vocab_size, num_hiddens, device):
num_inputs = num_outputs = vocab_size

def normal(shape):
return torch.randn(size=shape, device=device) * 0.01

# 隐藏层参数
W_xh = normal((num_inputs, num_hiddens))
W_hh = normal((num_hiddens, num_hiddens))
b_h = torch.zeros(num_hiddens, device=device)
# 输出层参数
W_hq = normal((num_hiddens, num_outputs))
b_q = torch.zeros(num_outputs, device=device)
# 附加梯度
params = [W_xh, W_hh, b_h, W_hq, b_q]
for param in params:
param.requires_grad_(True)
return params

为了定义循环神经网络模型,我们首先需要一个 init_rnn_state 函数在初始化时返回隐状态。这个函数的返回是一个张量,张量全用0填充,形状为 (批量大小, 隐藏单元数)。在后面的章节中我们将会遇到隐状态包含多个变量的情况,而使用元组可以更容易地处理些:

1
2
def init_rnn_state(batch_size, num_hiddens, device):
return (torch.zeros((batch_size, num_hiddens), device=device),)

下面的 rnn 函数定义了如何在一个时间步内计算隐状态和输出。循环神经网络模型通过 inputs 最外层的维度实现循环,以便逐时间步更新小批量数据的隐状态H。此外,这里使用 tanh 函数作为激活函数,当元素在实数上满足均匀分布时,tanh 函数的平均值为0:

1
2
3
4
5
6
7
8
9
10
11
def rnn(inputs, state, params):
# inputs.shape: (时间步数量, 批量大小, 词表大小)
W_xh, W_hh, b_h, W_hq, b_q = params
H, = state
outputs = []
# X.shape: (批量大小, 词表大小)
for X in inputs:
H = torch.tanh(torch.mm(X, W_xh) + torch.mm(H, W_hh) + b_h)
Y = torch.mm(H, W_hq) + b_q
outputs.append(Y)
return torch.cat(outputs, dim=0), (H,)

定义了所有需要的函数之后,接下来我们创建一个类来包装这些函数,并存储从零开始实现的循环神经网络模型的参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
class RNNModelScratch:
"""从零开始实现的循环神经网络模型"""
def __init__(self, vocab_size, num_hiddens, device, get_params, init_state, forward_fn):
self.vocab_size, self.num_hiddens = vocab_size, num_hiddens
self.params = get_params(vocab_size, num_hiddens, device)
self.init_state, self.forward_fn = init_state, forward_fn

def __call__(self, X, state):
X = F.one_hot(X.T, self.vocab_size).type(torch.float32)
return self.forward_fn(X, state, self.params)

def begin_state(self, batch_size, device):
return self.init_state(batch_size, self.num_hiddens, device)

让我们检查输出是否具有正确的形状。例如,隐状态的维数是否保持不变:

1
2
3
4
5
6
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
num_hiddens = 512
net = RNNModelScratch(len(vocab), num_hiddens, device, get_params, init_rnn_state, rnn)
state = net.begin_state(X.shape[0], device)
Y, new_state = net(X.to(device), state)
print(Y.shape, len(new_state), new_state[0].shape) # torch.Size([10, 28]) 1 torch.Size([2, 512])

我们可以看到输出形状是 (时间步数 * 批量大小, 词表大小),而隐状态形状保持不变,即 (批量大小, 隐藏单元数)

让我们首先定义预测函数来生成 prefix 之后的新字符,其中的 prefix 是一个用户提供的包含多个字符的字符串。在循环遍历 prefix 中的开始字符时,我们不断地将隐状态传递到下一个时间步,但是不生成任何输出。这被称为预热(warm-up)期,因为在此期间模型会自我更新(例如,更新隐状态),但不会进行预测。预热期结束后,隐状态的值通常比刚开始的初始值更适合预测,从而预测字符并输出它们:

1
2
3
4
5
6
7
8
9
10
11
12
def predict(prefix, num_preds, net, vocab, device):
"""在prefix后面生成新字符"""
state = net.begin_state(batch_size=1, device=device)
outputs = [vocab[prefix[0]]]
get_input = lambda: torch.tensor([outputs[-1]], device=device).reshape((1, 1))
for y in prefix[1:]: # 预热期
_, state = net(get_input(), state)
outputs.append(vocab[y])
for _ in range(num_preds): # 预测num_preds步
y, state = net(get_input(), state)
outputs.append(int(y.argmax(dim=1).reshape(1)))
return ''.join([vocab.idx_to_token[i] for i in outputs])

现在我们可以测试 predict 函数。我们将前缀指定为 time traveller,并基于这个前缀生成10个后续字符。鉴于我们还没有训练网络,它会生成荒谬的预测结果:

1
print(predict('time traveller ', 10, net, vocab, device))  # time traveller gxgtlsryyy

梯度裁剪的理论可转至:循环神经网络的从零开始实现

下面我们定义一个函数来裁剪模型的梯度,模型是从零开始实现的模型或由高级 API 构建的模型。我们在此计算了所有模型参数的梯度的范数:

1
2
3
4
5
6
7
8
9
10
def grad_clipping(net, theta):
"""裁剪梯度"""
if isinstance(net, nn.Module):
params = [p for p in net.parameters() if p.requires_grad]
else:
params = net.params
norm = torch.sqrt(sum(torch.sum((p.grad ** 2)) for p in params))
if norm > theta:
for param in params:
param.grad[:] *= theta / norm

在训练模型之前,让我们定义一个函数在一个迭代周期内训练模型。它与我们训练 Softmax 模型的方式有三个不同之处:

  • 序列数据的不同采样方法(随机采样和顺序分区)将导致隐状态初始化的差异。
  • 我们在更新模型参数之前裁剪梯度。这样的操作的目的是,即使训练过程中某个点上发生了梯度爆炸,也能保证模型不会发散。
  • 我们用困惑度来评价模型。这样的度量确保了不同长度的序列具有可比性。

具体来说,当使用顺序分区时,我们只在每个迭代周期的开始位置初始化隐状态。由于下一个小批量数据中的第 i 个子序列样本与当前第 i 个子序列样本相邻,因此当前小批量数据最后一个样本的隐状态,将用于初始化下一个小批量数据第一个样本的隐状态。这样,存储在隐状态中的序列的历史信息可以在一个迭代周期内流经相邻的子序列。然而,在任何一点隐状态的计算,都依赖于同一迭代周期中前面所有的小批量数据,这使得梯度计算变得复杂。为了降低计算量,在处理任何一个小批量数据之前,我们先分离梯度,使得隐状态的梯度计算总是限制在一个小批量数据的时间步内。

当使用随机抽样时,因为每个样本都是在一个随机位置抽样的,因此需要为每个迭代周期重新初始化隐状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def train_epoch(net, train_iter, loss_function, optimizer, device, use_random_iter):
"""训练网络一个迭代周期(定义见第8章)"""
state = None
train_loss = []
for X, Y in tqdm(train_iter):
if state is None or use_random_iter:
# 在第一次迭代或使用随机抽样时初始化state
state = net.begin_state(batch_size=X.shape[0], device=device)
else:
if isinstance(net, nn.Module) and not isinstance(state, tuple):
# state对于nn.GRU是个张量
state.detach_()
else:
# state对于nn.LSTM或对于我们从零开始实现的模型是个元组
for s in state:
s.detach_()
y = Y.T.reshape(-1)
X, y = X.to(device), y.to(device)
y_hat, state = net(X, state)
loss = loss_function(y_hat, y.long()).mean()
if isinstance(optimizer, torch.optim.Optimizer):
optimizer.zero_grad()
loss.backward()
grad_clipping(net, 1)
optimizer.step()
else:
loss.backward()
grad_clipping(net, 1)
optimizer(batch_size=1)
train_loss.append(loss) # 因为已经调用了mean函数
return math.exp(sum(train_loss) / len(train_loss)) # 返回困惑度

循环神经网络模型的训练函数既支持从零开始实现,也可以使用高级 API 来实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def train(net, train_iter, vocab, lr, num_epochs, device, use_random_iter=False):
"""训练模型(定义见第8章)"""
loss_function = nn.CrossEntropyLoss()
# 初始化
if isinstance(net, nn.Module):
optimizer = torch.optim.SGD(net.parameters(), lr)
else:
optimizer = lambda batch_size: d2l.sgd(net.params, lr, batch_size)
pred = lambda prefix: predict(prefix, 50, net, vocab, device)
# 训练和预测
writer = SummaryWriter('../logs/RNN_scratch_train_log')

for epoch in range(num_epochs):
ppl = train_epoch(net, train_iter, loss_function, optimizer, device, use_random_iter)
if (epoch + 1) % 10 == 0:
print(pred('time traveller'))
print(f'Perplexity: {ppl:.1f}')
writer.add_scalar('train_loss', ppl, epoch + 1)

print(pred('time traveller'))
print(pred('traveller'))
writer.close()

现在,我们训练循环神经网络模型。因为我们在数据集中只使用了10000个词元,所以模型需要更多的迭代周期来更好地收敛:

1
2
3
4
5
num_epochs, lr = 500, 1
train(net, train_iter, vocab, lr, num_epochs, device)
# Perplexity: 1.0
# time travelleryou can show black is white by argument said filby
# travelleryou can show black is white by argument said filby

4.2 循环神经网络的简洁实现

虽然从零开始实现循环神经网络对了解网络的实现方式具有指导意义,但并不方便。本节将展示如何使用深度学习框架的高级 API 提供的函数更有效地实现相同的语言模型。我们仍然从读取时光机器数据集开始:

1
2
3
4
5
6
7
8
9
10
import math
import torch
from torch.utils.tensorboard import SummaryWriter
from torch import nn
from torch.nn import functional as F
from d2l import torch as d2l
from tqdm import tqdm

batch_size, num_steps = 32, 35
train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)

高级 API 提供了循环神经网络的实现。我们构造一个具有256个隐藏单元的单隐藏层的循环神经网络层 rnn_layer。事实上,我们还没有讨论多层循环神经网络的意义(这将在深度循环神经网络中介绍)。现在仅需要将多层理解为一层循环神经网络的输出被用作下一层循环神经网络的输入就足够了:

1
2
num_hiddens = 256
rnn_layer = nn.RNN(len(vocab), num_hiddens)

我们使用张量来初始化隐状态,它的形状是 (隐藏层数, 批量大小, 隐藏单元数)

1
2
state = torch.zeros((1, batch_size, num_hiddens))
print(state.shape) # torch.Size([1, 32, 256])

通过一个隐状态和一个输入,我们就可以用更新后的隐状态计算输出。需要强调的是,rnn_layer 的输出(Y)不涉及输出层的计算:它是指每个时间步的隐状态,这些隐状态可以用作后续输出层的输入:

1
2
3
X = torch.rand(size=(num_steps, batch_size, len(vocab)))
Y, state_new = rnn_layer(X, state)
print(Y.shape, state_new.shape) # torch.Size([35, 32, 256]) torch.Size([1, 32, 256])

我们为一个完整的循环神经网络模型定义了一个 RNNModel 类。注意,rnn_layer 只包含隐藏的循环层,我们还需要创建一个单独的输出层:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class RNNModel(nn.Module):
"""循环神经网络模型"""
def __init__(self, rnn_layer, vocab_size, **kwargs):
super(RNNModel, self).__init__(**kwargs)
self.rnn = rnn_layer
self.vocab_size = vocab_size
self.num_hiddens = self.rnn.hidden_size
# 如果RNN是双向的(之后将介绍),num_directions应该是2,否则应该是1
if not self.rnn.bidirectional:
self.num_directions = 1
self.linear = nn.Linear(self.num_hiddens, self.vocab_size)
else:
self.num_directions = 2
self.linear = nn.Linear(self.num_hiddens * 2, self.vocab_size)

def forward(self, inputs, state):
X = F.one_hot(inputs.T.long(), self.vocab_size)
X = X.to(torch.float32)
Y, state = self.rnn(X, state)
# 全连接层首先将Y的形状改为:(时间步数 * 批量大小, 隐藏单元数)
# 它的输出形状是:(时间步数 * 批量大小, 词表大小)
output = self.linear(Y.reshape((-1, Y.shape[-1]))) # (35 * 32, 256) -> (35 * 32, 28)
return output, state

def begin_state(self, device, batch_size=1):
if not isinstance(self.rnn, nn.LSTM):
# nn.GRU以张量作为隐状态
return torch.zeros((self.num_directions * self.rnn.num_layers, batch_size, self.num_hiddens), device=device)
else:
# nn.LSTM以元组作为隐状态
return (torch.zeros((self.num_directions * self.rnn.num_layers, batch_size, self.num_hiddens), device=device),
torch.zeros((self.num_directions * self.rnn.num_layers, batch_size, self.num_hiddens), device=device))

在训练模型之前,让我们基于一个具有随机权重的模型进行预测,d2l.predict_ch8 函数与上一节中的 predict 函数相同:

1
2
3
4
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
net = RNNModel(rnn_layer, vocab_size=len(vocab))
net = net.to(device)
print(d2l.predict_ch8('time traveller', 10, net, vocab, device)) # time travellerxhhhhhhhhh

很明显,这种模型根本不能输出好的结果。接下来,我们使用上一节中定义的超参数训练模型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def train_epoch(net, train_iter, loss_function, optimizer, device, use_random_iter):
state = None
train_loss = []
for X, Y in tqdm(train_iter):
if state is None or use_random_iter:
state = net.begin_state(batch_size=X.shape[0], device=device)
else:
if isinstance(net, nn.Module) and not isinstance(state, tuple):
state.detach_()
else:
for s in state:
s.detach_()
y = Y.T.reshape(-1)
X, y = X.to(device), y.to(device)
loss_function.to(device)
y_hat, state = net(X, state)
loss = loss_function(y_hat, y.long()).mean()
optimizer.zero_grad()
loss.backward()
d2l.grad_clipping(net, 1) # 与上一节中的grad_clipping函数相同
optimizer.step()
train_loss.append(loss) # 因为已经调用了mean函数
return math.exp(sum(train_loss) / len(train_loss))

def train(net, train_iter, vocab, lr, num_epochs, device, use_random_iter=False):
loss_function = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(net.parameters(), lr)
pred = lambda prefix: d2l.predict_ch8(prefix, 50, net, vocab, device)

writer = SummaryWriter('../logs/RNN_scratch_train_log')

for epoch in range(num_epochs):
ppl = train_epoch(net, train_iter, loss_function, optimizer, device, use_random_iter)
if (epoch + 1) % 10 == 0:
print(pred('time traveller'))
print(f'Perplexity: {ppl:.1f}')
writer.add_scalar('train_loss', ppl, epoch + 1)

print(pred('time traveller'))
print(pred('traveller'))
writer.close()

num_epochs, lr = 500, 1
train(net, train_iter, vocab, lr, num_epochs, device)
# Perplexity: 1.3
# time traveller for so ig will aboca thoursugli gpseknop how stac
# travelleryou can space of the simestiok satt or al and wisc