李沐动手学深度学习(PyTorch)课程学习笔记第九章:现代循环神经网络。
1. 门控循环单元(GRU)
在通过时间反向传播 中,我们讨论了如何在循环神经网络中计算梯度,以及矩阵连续乘积可以导致梯度消失或梯度爆炸的问题。下面我们简单思考一下这种梯度异常在实践中的意义,我们可能会遇到以下的情况:
早期观测值对预测所有未来观测值具有非常重要的意义。考虑一个极端情况,其中第一个观测值包含一个校验和,目标是在序列的末尾辨别校验和是否正确。在这种情况下,第一个词元的影响至关重要。我们希望有某些机制能够在一个记忆元里存储重要的早期信息 。如果没有这样的机制,我们将不得不给这个观测值指定一个非常大的梯度,因为它会影响所有后续的观测值。
一些词元没有相关的观测值。例如,在对网页内容进行情感分析时,可能有一些辅助 HTML 代码与网页传达的情绪无关。我们希望有一些机制来跳过隐状态表示中的此类词元 。
序列的各个部分之间存在逻辑中断。例如,书的章节之间可能会有过渡存在,或者证券的熊市和牛市之间可能会有过渡存在。在这种情况下,最好有一种方法来重置我们的内部状态表示 。
在学术界已经提出了许多方法来解决这类问题。其中最早的方法是长短期记忆 (long-short-term memory,LSTM),我们将在下一节中讨论。门控循环单元 (gated recurrent unit,GRU)是一个稍微简化的变体,通常能够提供同等的效果,并且计算的速度明显更快。由于门控循环单元更简单,我们从它开始解读。
门控循环单元与普通的循环神经网络之间的关键区别在于:前者支持隐状态的门控 。这意味着模型有专门的机制来确定应该何时更新隐状态,以及应该何时重置隐状态。这些机制是可学习的,并且能够解决了上面列出的问题。例如,如果第一个词元非常重要,模型将学会在第一次观测之后不更新隐状态。同样,模型也可以学会跳过不相关的临时观测。最后,模型还将学会在需要的时候重置隐状态。下面我们将详细讨论各类门控。
我们首先介绍重置门 (reset gate)和更新门 (update gate)。我们把它们设计成 (0, 1)
区间中的向量,这样我们就可以进行凸组合 。重置门允许我们控制可能还想记住 的过去状态的数量;更新门将允许我们控制新状态中有多少个是旧状态的副本 。
我们从构造这些门控开始。重置门和更新门的输入是由当前时间步的输入和前一时间步的隐状态给出。两个门的输出是由使用 Sigmoid 激活函数的两个全连接层给出。门控循环单元的数学表达详见:门控循环单元(GRU) 。
1.1 门控循环单元的从零开始实现
为了更好地理解门控循环单元模型,我们从零开始实现它。首先,我们读取上一章中使用的时间机器数据集:
1 2 3 4 5 6 7 8 9 10 import mathimport torchfrom torch.utils.tensorboard import SummaryWriterfrom torch import nnfrom torch.nn import functional as Ffrom d2l import torch as d2lfrom tqdm import tqdmbatch_size, num_steps = 32 , 35 train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)
下一步是初始化模型参数。我们从标准差为0.01的高斯分布中提取权重,并将偏置项设为0,超参数 num_hiddens
定义隐藏单元的数量,实例化与更新门、重置门、候选隐状态和输出层相关的所有权重和偏置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 def get_params (vocab_size, num_hiddens, device ): num_inputs = num_outputs = vocab_size def normal (shape ): return torch.randn(size=shape, device=device) * 0.01 def three (): return (normal((num_inputs, num_hiddens)), normal((num_hiddens, num_hiddens)), torch.zeros(num_hiddens, device=device)) W_xz, W_hz, b_z = three() W_xr, W_hr, b_r = three() W_xh, W_hh, b_h = three() W_hq = normal((num_hiddens, num_outputs)) b_q = torch.zeros(num_outputs, device=device) params = [W_xz, W_hz, b_z, W_xr, W_hr, b_r, W_xh, W_hh, b_h, W_hq, b_q] for param in params: param.requires_grad_(True ) return params
现在我们将定义隐状态的初始化函数 init_gru_state
。与从零开始实现 RNN 中定义的 init_rnn_state
函数一样,此函数返回一个形状为 (批量大小, 隐藏单元个数)
的张量,张量的值全部为零:
1 2 def init_gru_state (batch_size, num_hiddens, device ): return (torch.zeros((batch_size, num_hiddens), device=device),)
现在我们准备定义门控循环单元模型,模型的架构与基本的循环神经网络单元是相同的,只是权重更新公式更为复杂:
1 2 3 4 5 6 7 8 9 10 11 12 def gru (inputs, state, params ): W_xz, W_hz, b_z, W_xr, W_hr, b_r, W_xh, W_hh, b_h, W_hq, b_q = params H, = state outputs = [] for X in inputs: Z = torch.sigmoid((X @ W_xz) + (H @ W_hz) + b_z) R = torch.sigmoid((X @ W_xr) + (H @ W_hr) + b_r) H_tilda = torch.tanh((X @ W_xh) + ((R * H) @ W_hh) + b_h) H = Z * H + (1 - Z) * H_tilda Y = H @ W_hq + b_q outputs.append(Y) return torch.cat(outputs, dim=0 ), (H,)
训练和预测的工作方式与从零开始实现 RNN 完全相同。训练结束后,我们分别打印输出训练集的困惑度,以及前缀 time traveler
和 traveler
的预测序列上的困惑度:
1 2 3 4 device = torch.device('cuda' if torch.cuda.is_available() else 'cpu' ) num_hiddens, num_epochs, lr = 256 , 500 , 1 net = d2l.RNNModelScratch(len (vocab), num_hiddens, device, get_params, init_gru_state, gru) train(net, train_iter, vocab, lr, num_epochs, device)
1.2 门控循环单元的简洁实现
高级 API 包含了前文介绍的所有配置细节,所以我们可以直接实例化门控循环单元模型。这段代码的运行速度要快得多,因为它使用的是编译好的运算符而不是 Python 来处理之前阐述的许多细节:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 class RNNModel (nn.Module): def __init__ (self, rnn_layer, vocab_size, **kwargs ): super (RNNModel, self).__init__(**kwargs) self.rnn = rnn_layer self.vocab_size = vocab_size self.num_hiddens = self.rnn.hidden_size if not self.rnn.bidirectional: self.num_directions = 1 self.linear = nn.Linear(self.num_hiddens, self.vocab_size) else : self.num_directions = 2 self.linear = nn.Linear(self.num_hiddens * 2 , self.vocab_size) def forward (self, inputs, state ): X = F.one_hot(inputs.T.long(), self.vocab_size) X = X.to(torch.float32) Y, state = self.rnn(X, state) output = self.linear(Y.reshape((-1 , Y.shape[-1 ]))) return output, state def begin_state (self, device, batch_size=1 ): if not isinstance (self.rnn, nn.LSTM): return torch.zeros((self.num_directions * self.rnn.num_layers, batch_size, self.num_hiddens), device=device) else : return (torch.zeros((self.num_directions * self.rnn.num_layers, batch_size, self.num_hiddens), device=device), torch.zeros((self.num_directions * self.rnn.num_layers, batch_size, self.num_hiddens), device=device)) gru_layer = nn.GRU(len (vocab), num_hiddens) net = RNNModel(gru_layer, len (vocab)) net = net.to(device) def train_epoch (net, train_iter, loss_function, optimizer, device, use_random_iter ): state = None train_loss = [] for X, Y in tqdm(train_iter): if state is None or use_random_iter: state = net.begin_state(batch_size=X.shape[0 ], device=device) else : if isinstance (net, nn.Module) and not isinstance (state, tuple ): state.detach_() else : for s in state: s.detach_() y = Y.T.reshape(-1 ) X, y = X.to(device), y.to(device) loss_function.to(device) y_hat, state = net(X, state) loss = loss_function(y_hat, y.long()).mean() optimizer.zero_grad() loss.backward() d2l.grad_clipping(net, 1 ) optimizer.step() train_loss.append(loss) return math.exp(sum (train_loss) / len (train_loss)) def train (net, train_iter, vocab, lr, num_epochs, device, use_random_iter=False ): loss_function = nn.CrossEntropyLoss() optimizer = torch.optim.SGD(net.parameters(), lr) pred = lambda prefix: d2l.predict_ch8(prefix, 50 , net, vocab, device) writer = SummaryWriter('../logs/GRU_train_log' ) for epoch in range (num_epochs): ppl = train_epoch(net, train_iter, loss_function, optimizer, device, use_random_iter) if (epoch + 1 ) % 10 == 0 : print (pred('time traveller' )) print (f'Perplexity: {ppl:.1 f} ' ) writer.add_scalar('train_loss' , ppl, epoch + 1 ) print (pred('time traveller' )) print (pred('traveller' )) writer.close() train(net, train_iter, vocab, lr, num_epochs, device)
2. 长短期记忆网络(LSTM)
长期以来,隐变量模型存在着长期信息保存和短期输入缺失的问题。解决这一问题的最早方法之一是长短期存储器(long short-term memory,LSTM)。它有许多与门控循环单元一样的属性。
可以说,长短期记忆网络的设计灵感来自于计算机的逻辑门。长短期记忆网络引入了记忆元(memory cell),或简称为单元(cell)。有些文献认为记忆元是隐状态的一种特殊类型,它们与隐状态具有相同的形状,其设计目的是用于记录附加的信息。为了控制记忆元,我们需要许多门。其中一个门用来从单元中输出条目,我们将其称为输出门 (output gate)。另外一个门用来决定何时将数据读入单元,我们将其称为输入门 (input gate)。我们还需要一种机制来重置单元的内容,由遗忘门 (forget gate)来管理,这种设计的动机与门控循环单元相同,能够通过专用机制决定什么时候记忆或忽略隐状态中的输入。
长短期记忆网络的数学表达详见:长短期记忆网络(LSTM) 。
2.1 长短期记忆网络的从零开始实现
现在,我们从零开始实现长短期记忆网络,我们首先加载时光机器数据集:
1 2 3 4 5 6 7 8 9 10 import mathimport torchfrom torch.utils.tensorboard import SummaryWriterfrom torch import nnfrom torch.nn import functional as Ffrom d2l import torch as d2lfrom tqdm import tqdmbatch_size, num_steps = 32 , 35 train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)
接下来,我们需要定义和初始化模型参数。如前所述,超参数 num_hiddens
定义隐藏单元的数量。我们按照标准差0.01的高斯分布初始化权重,并将偏置项设为0:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 def get_lstm_params (vocab_size, num_hiddens, device ): num_inputs = num_outputs = vocab_size def normal (shape ): return torch.randn(size=shape, device=device) * 0.01 def three (): return (normal((num_inputs, num_hiddens)), normal((num_hiddens, num_hiddens)), torch.zeros(num_hiddens, device=device)) W_xi, W_hi, b_i = three() W_xf, W_hf, b_f = three() W_xo, W_ho, b_o = three() W_xc, W_hc, b_c = three() W_hq = normal((num_hiddens, num_outputs)) b_q = torch.zeros(num_outputs, device=device) params = [W_xi, W_hi, b_i, W_xf, W_hf, b_f, W_xo, W_ho, b_o, W_xc, W_hc, b_c, W_hq, b_q] for param in params: param.requires_grad_(True ) return params
在初始化函数中,长短期记忆网络的隐状态需要返回一个额外的记忆元,单元的值为0,形状为 (批量大小, 隐藏单元数)
。因此,我们得到以下的状态初始化:
1 2 3 def init_lstm_state (batch_size, num_hiddens, device ): return (torch.zeros((batch_size, num_hiddens), device=device), torch.zeros((batch_size, num_hiddens), device=device))
实际模型的定义与我们前面讨论的一样:提供三个门和一个额外的记忆元。请注意,只有隐状态 H
才会传递到输出层,而记忆元 C
不直接参与输出计算:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def lstm (inputs, state, params ): [W_xi, W_hi, b_i, W_xf, W_hf, b_f, W_xo, W_ho, b_o, W_xc, W_hc, b_c, W_hq, b_q] = params (H, C) = state outputs = [] for X in inputs: I = torch.sigmoid((X @ W_xi) + (H @ W_hi) + b_i) F = torch.sigmoid((X @ W_xf) + (H @ W_hf) + b_f) O = torch.sigmoid((X @ W_xo) + (H @ W_ho) + b_o) C_tilda = torch.tanh((X @ W_xc) + (H @ W_hc) + b_c) C = F * C + I * C_tilda H = O * torch.tanh(C) Y = (H @ W_hq) + b_q outputs.append(Y) return torch.cat(outputs, dim=0 ), (H, C)
让我们通过实例化从零实现 RNN 章节中引入的 RNNModelScratch
类来训练一个长短期记忆网络,就如我们在上一节中所做的一样:
1 2 3 4 device = torch.device('cuda' if torch.cuda.is_available() else 'cpu' ) num_hiddens, num_epochs, lr = 256 , 500 , 1 net = d2l.RNNModelScratch(len (vocab), num_hiddens, device, get_lstm_params, init_lstm_state, lstm) train(net, train_iter, vocab, lr, num_epochs, device)
2.2 长短期记忆网络的简洁实现
使用高级 API,我们可以直接实例化 LSTM 模型:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 class RNNModel (nn.Module): def __init__ (self, rnn_layer, vocab_size, **kwargs ): super (RNNModel, self).__init__(**kwargs) self.rnn = rnn_layer self.vocab_size = vocab_size self.num_hiddens = self.rnn.hidden_size if not self.rnn.bidirectional: self.num_directions = 1 self.linear = nn.Linear(self.num_hiddens, self.vocab_size) else : self.num_directions = 2 self.linear = nn.Linear(self.num_hiddens * 2 , self.vocab_size) def forward (self, inputs, state ): X = F.one_hot(inputs.T.long(), self.vocab_size) X = X.to(torch.float32) Y, state = self.rnn(X, state) output = self.linear(Y.reshape((-1 , Y.shape[-1 ]))) return output, state def begin_state (self, device, batch_size=1 ): if not isinstance (self.rnn, nn.LSTM): return torch.zeros((self.num_directions * self.rnn.num_layers, batch_size, self.num_hiddens), device=device) else : return (torch.zeros((self.num_directions * self.rnn.num_layers, batch_size, self.num_hiddens), device=device), torch.zeros((self.num_directions * self.rnn.num_layers, batch_size, self.num_hiddens), device=device)) lstm_layer = nn.LSTM(len (vocab), num_hiddens) net = RNNModel(lstm_layer, len (vocab)) net = net.to(device) def train_epoch (net, train_iter, loss_function, optimizer, device, use_random_iter ): state = None train_loss = [] for X, Y in tqdm(train_iter): if state is None or use_random_iter: state = net.begin_state(batch_size=X.shape[0 ], device=device) else : if isinstance (net, nn.Module) and not isinstance (state, tuple ): state.detach_() else : for s in state: s.detach_() y = Y.T.reshape(-1 ) X, y = X.to(device), y.to(device) loss_function.to(device) y_hat, state = net(X, state) loss = loss_function(y_hat, y.long()).mean() optimizer.zero_grad() loss.backward() d2l.grad_clipping(net, 1 ) optimizer.step() train_loss.append(loss) return math.exp(sum (train_loss) / len (train_loss)) def train (net, train_iter, vocab, lr, num_epochs, device, use_random_iter=False ): loss_function = nn.CrossEntropyLoss() optimizer = torch.optim.SGD(net.parameters(), lr) pred = lambda prefix: d2l.predict_ch8(prefix, 50 , net, vocab, device) writer = SummaryWriter('../logs/LSTM_train_log' ) for epoch in range (num_epochs): ppl = train_epoch(net, train_iter, loss_function, optimizer, device, use_random_iter) if (epoch + 1 ) % 10 == 0 : print (pred('time traveller' )) print (f'Perplexity: {ppl:.1 f} ' ) writer.add_scalar('train_loss' , ppl, epoch + 1 ) print (pred('time traveller' )) print (pred('traveller' )) writer.close() train(net, train_iter, vocab, lr, num_epochs, device)
3. 深度循环神经网络
到目前为止,我们只讨论了具有一个单向隐藏层的循环神经网络。其中,隐变量和观测值与具体的函数形式的交互方式是相当随意的。只要交互类型建模具有足够的灵活性,这就不是一个大问题。然而,对一个单层来说,这可能具有相当的挑战性。之前在线性模型中,我们通过添加更多的层来解决这个问题。而在循环神经网络中,我们首先需要确定如何添加更多的层,以及在哪里添加额外的非线性,因此这个问题有点棘手。
事实上,我们可以将多层循环神经网络堆叠在一起,通过对几个简单层的组合,产生了一个灵活的机制。特别是,数据可能与不同层的堆叠有关。例如,我们可能希望保持有关金融市场状况(熊市或牛市)的宏观数据可用,而微观数据只记录较短期的时间动态。
深度循环神经网络的数学表达详见:深度循环神经网络 。
实现多层循环神经网络所需的许多逻辑细节在高级 API 中都是现成的。简单起见,我们仅示范使用此类内置函数的实现方式。以长短期记忆网络模型为例,该代码与上一节中使用的代码非常相似,实际上唯一的区别是我们指定了层的数量,而不是使用单一层这个默认值。像往常一样,我们从加载数据集开始:
1 2 3 4 5 6 7 8 9 10 import mathimport torchfrom torch.utils.tensorboard import SummaryWriterfrom torch import nnfrom torch.nn import functional as Ffrom d2l import torch as d2lfrom tqdm import tqdmbatch_size, num_steps = 32 , 35 train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)
像选择超参数这类架构决策也跟上一节中的决策非常相似。因为我们有不同的词元,所以输入和输出都选择相同数量,即 vocab_size
。隐藏单元的数量仍然是256。唯一的区别是,我们现在通过 num_layers
的值来设定隐藏层数:
1 2 3 4 5 device = torch.device('cuda' if torch.cuda.is_available() else 'cpu' ) num_inputs, num_hiddens, num_layers = len (vocab), 256 , 2 lstm_layer = nn.LSTM(input_size=num_inputs, hidden_size=num_hiddens, num_layers=num_layers) net = d2l.RNNModel(lstm_layer, len (vocab)) net = net.to(device)
最后和上一节一样训练模型看看效果:
1 2 3 4 5 num_epochs, lr = 500 , 2 train(net, train_iter, vocab, lr * 1.0 , num_epochs, device)
4. 双向循环神经网络
在双向循环神经网络中,每个时间步的隐状态由当前时间步的前后数据同时决定,通过反向更新的隐藏层来利用反向时间信息,通常用来对序列抽取特征、填空,而不是预测未来 。
双向循环神经网络的数学表达详见:双向循环神经网络 。
由于双向循环神经网络使用了过去的和未来的数据,所以我们不能盲目地将这一语言模型应用于任何预测任务。尽管模型产出的困惑度是合理的,该模型预测未来词元的能力却可能存在严重缺陷。我们用下面的示例代码引以为戒,以防在错误的环境中使用它们:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import torchfrom torch import nnfrom d2l import torch as d2lbatch_size, num_steps = 32 , 35 train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps) device = torch.device('cuda' if torch.cuda.is_available() else 'cpu' ) num_inputs, num_hiddens, num_layers = len (vocab), 256 , 2 lstm_layer = nn.LSTM(num_inputs, num_hiddens, num_layers, bidirectional=True ) net = d2l.RNNModel(lstm_layer, len (vocab)) net = net.to(device) num_epochs, lr = 500 , 1 train(net, train_iter, vocab, lr, num_epochs, device)
5. 机器翻译与数据集
语言模型是自然语言处理的关键,而机器翻译是语言模型最成功的基准测试。因为机器翻译正是将输入序列转换成输出序列的序列转换模型 (sequence transduction)的核心问题。
与语言模型那一节中的语料库是单一语言的语言模型问题存在不同,机器翻译的数据集是由源语言和目标语言的文本序列对组成的。因此,我们需要一种完全不同的方法来预处理机器翻译数据集,而不是复用语言模型的预处理程序。
首先,下载一个由双语句子对组成的“英-法”数据集,数据集中的每一行都是制表符分隔的文本序列对,序列对由英文文本序列和翻译后的法语文本序列组成。请注意,每个文本序列可以是一个句子,也可以是包含多个句子的一个段落。在这个将英语翻译成法语的机器翻译问题中,英语是源语言(source language),法语是目标语言(target language)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import osimport torchimport matplotlib.pyplot as pltfrom d2l import torch as d2ld2l.DATA_HUB['fra-eng' ] = (d2l.DATA_URL + 'fra-eng.zip' , '94646ad1522d915e7b0f9296181140edcf86a4f5' ) def read_data_nmt (): """载入“英语-法语”数据集""" data_dir = d2l.download_extract('fra-eng' ) with open (os.path.join(data_dir, 'fra.txt' ), 'r' , encoding='utf-8' ) as f: return f.read() raw_text = read_data_nmt() print (raw_text[:75 ])
下载数据集后,原始文本数据需要经过几个预处理步骤。例如,我们用空格代替不间断空格(non-breaking space),使用小写字母替换大写字母,并在单词和标点符号之间插入空格:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def preprocess_nmt (text ): """预处理“英语-法语”数据集""" def no_space (char, prev_char ): return char in set (',.!?' ) and prev_char != ' ' text = text.replace('\u202f' , ' ' ).replace('\xa0' , ' ' ).lower() out = [' ' + char if i > 0 and no_space(char, text[i - 1 ]) else char for i, char in enumerate (text)] return '' .join(out) text = preprocess_nmt(raw_text) print (text[:80 ])
与之前的字符级词元化不同,在机器翻译中,我们更喜欢单词级词元化(最先进的模型可能使用更高级的词元化技术)。下面的 tokenize_nmt
函数对前 num_examples
个文本序列对进行词元化,其中每个词元要么是一个词,要么是一个标点符号。此函数返回两个词元列表:source
和 target
,source[i]
是源语言(这里是英语)第 i
个文本序列的词元列表,target[i]
是目标语言(这里是法语)第 i
个文本序列的词元列表。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def tokenize_nmt (text, num_examples=None ): """词元化“英语-法语”数据集""" source, target = [], [] for i, line in enumerate (text.split('\n' )): if num_examples and i > num_examples: break parts = line.split('\t' ) if len (parts) == 2 : source.append(parts[0 ].split(' ' )) target.append(parts[1 ].split(' ' )) return source, target source, target = tokenize_nmt(text) print (source[:3 ], target[:3 ])
让我们绘制每个文本序列所包含的词元数量的直方图。在这个简单的“英-法”数据集中,大多数文本序列的词元数量少于20个:
1 2 3 4 5 6 7 8 9 10 11 12 13 def show_list_len_pair_hist (legend, title, xlabel, ylabel, xlist, ylist ): """绘制列表长度对的直方图""" plt.figure(figsize=(8 , 6 ), dpi=150 ) _, _, patches = plt.hist([[len (l) for l in xlist], [len (l) for l in ylist]], edgecolor='r' , alpha=0.5 ) plt.title(title) plt.xlabel(xlabel) plt.ylabel(ylabel) plt.legend(legend) plt.show() show_list_len_pair_hist(['source' , 'target' ], 'The length of list' , '# tokens per sequence' , 'count' , source, target)
由于机器翻译数据集由语言对组成,因此我们可以分别为源语言和目标语言构建两个词表。使用单词级词元化时,词表大小将明显大于使用字符级词元化时的词表大小。为了缓解这一问题,这里我们将出现次数少于2次的低频率词元视为相同的未知(<unk>
)词元。除此之外,我们还指定了额外的特定词元,例如在小批量时用于将序列填充到相同长度的填充词元(<pad>
),以及序列的开始词元(<bos>
)和结束词元(<eos>
)。这些特殊词元在自然语言处理任务中比较常用。
1 2 src_vocab = d2l.Vocab(source, min_freq=2 , reserved_tokens=['<pad>' , '<bos>' , '<eos>' ]) print (len (src_vocab))
回想一下,语言模型中的序列样本都有一个固定的长度,无论这个样本是一个句子的一部分还是跨越了多个句子的一个片断。这个固定长度是由语言模型中的 num_steps
(时间步数或词元数量)参数指定的。在机器翻译中,每个样本都是由源和目标组成的文本序列对,其中的每个文本序列可能具有不同的长度。
为了提高计算效率,我们仍然可以通过截断 (truncation)和填充 (padding)方式实现一次只处理一个小批量的文本序列。假设同一个小批量中的每个序列都应该具有相同的长度 num_steps
,那么如果文本序列的词元数目少于 num_steps
时,我们将继续在其末尾添加特定的 <pad>
词元,直到其长度达到 num_steps
;反之,我们将截断文本序列时,只取其前 num_steps
个词元,并且丢弃剩余的词元。这样,每个文本序列将具有相同的长度,以便以相同形状的小批量进行加载。下面的 truncate_pad
函数将截断或填充文本序列:
1 2 3 4 5 6 7 def truncate_pad (line, num_steps, padding_token ): """截断或填充文本序列""" if len (line) > num_steps: return line[:num_steps] return line + [padding_token] * (num_steps - len (line)) print (truncate_pad(src_vocab[source[0 ]], 10 , src_vocab['<pad>' ]))
现在我们定义一个函数,可以将文本序列转换成小批量数据集用于训练。我们将特定的 <eos>
词元添加到所有序列的末尾,用于表示序列的结束。当模型通过一个词元接一个词元地生成序列进行预测时,生成的 <eos>
词元说明完成了序列输出工作。此外,我们还记录了每个文本序列的长度,统计长度时排除了填充词元,在稍后将要介绍的一些模型会需要这个长度信息。
1 2 3 4 5 6 7 def build_array_nmt (lines, vocab, num_steps ): """将机器翻译的文本序列转换成小批量""" lines = [vocab[l] for l in lines] lines = [l + [vocab['<eos>' ]] for l in lines] array = torch.tensor([truncate_pad(l, num_steps, vocab['<pad>' ]) for l in lines]) valid_len = (array != vocab['<pad>' ]).type (torch.int32).sum (1 ) return array, valid_len
最后,我们定义 load_data_nmt
函数来返回数据迭代器,以及源语言和目标语言的两种词表:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def load_data_nmt (batch_size, num_steps, num_examples=600 ): """返回翻译数据集的迭代器和词表""" text = preprocess_nmt(read_data_nmt()) source, target = tokenize_nmt(text, num_examples) src_vocab = d2l.Vocab(source, min_freq=2 , reserved_tokens=['<pad>' , '<bos>' , '<eos>' ]) tgt_vocab = d2l.Vocab(target, min_freq=2 , reserved_tokens=['<pad>' , '<bos>' , '<eos>' ]) src_array, src_valid_len = build_array_nmt(source, src_vocab, num_steps) tgt_array, tgt_valid_len = build_array_nmt(target, tgt_vocab, num_steps) data_arrays = (src_array, src_valid_len, tgt_array, tgt_valid_len) data_iter = d2l.load_array(data_arrays, batch_size) return data_iter, src_vocab, tgt_vocab train_iter, src_vocab, tgt_vocab = load_data_nmt(batch_size=2 , num_steps=8 ) for X, X_valid_len, Y, Y_valid_len in train_iter: print ('X:' , X.type (torch.int32)) print ('X的有效长度:' , X_valid_len) print ('Y:' , Y.type (torch.int32)) print ('Y的有效长度:' , Y_valid_len) break
6. 编码器-解码器架构
正如我们在上一节中所讨论的,机器翻译是序列转换模型的一个核心问题,其输入和输出都是长度可变的序列。为了处理这种类型的输入和输出,我们可以设计一个包含两个主要组件的架构:第一个组件是一个编码器 (encoder):它接受一个长度可变 的序列作为输入,并将其转换为具有固定形状 的编码状态。第二个组件是解码器 (decoder):它将固定形状 的编码状态映射到长度可变 的序列。这被称为编码器-解码器(encoder-decoder)架构,示意图可见:编码器-解码器架构 。
由于“编码器-解码器”架构是形成后续章节中不同序列转换模型的基础,因此本节将把这个架构转换为接口方便后面的代码实现。
在编码器接口中,我们只指定长度可变的序列作为编码器的输入 X
。任何继承这个 Encoder
基类的模型将完成代码实现:
1 2 3 4 5 6 7 8 9 from torch import nnclass Encoder (nn.Module): """编码器-解码器架构的基本编码器接口""" def __init__ (self, **kwargs ): super (Encoder, self).__init__(**kwargs) def forward (self, X, *args ): raise NotImplementedError
在下面的解码器接口中,我们新增一个 init_state
函数,用于将编码器的输出(enc_outputs)转换为编码后的状态。注意,此步骤可能需要额外的输入,例如输入序列的有效长度。为了逐个地生成长度可变的词元序列,解码器在每个时间步都会将输入(例如在前一时间步生成的词元)和编码后的状态映射成当前时间步的输出词元:
1 2 3 4 5 6 7 8 9 10 class Decoder (nn.Module): """编码器-解码器架构的基本解码器接口""" def __init__ (self, **kwargs ): super (Decoder, self).__init__(**kwargs) def init_state (self, enc_outputs, *args ): raise NotImplementedError def forward (self, X, state ): raise NotImplementedError
总而言之,“编码器-解码器”架构包含了一个编码器和一个解码器,并且还拥有可选的额外的参数。在前向传播中,编码器的输出用于生成编码状态,这个状态又被解码器作为其输入的一部分:
1 2 3 4 5 6 7 8 9 10 11 class EncoderDecoder (nn.Module): """编码器-解码器架构的基类""" def __init__ (self, encoder, decoder, **kwargs ): super (EncoderDecoder, self).__init__(**kwargs) self.encoder = encoder self.decoder = decoder def forward (self, enc_X, dec_X, *args ): enc_outputs = self.encoder(enc_X, *args) dec_state = self.decoder.init_state(enc_outputs, *args) return self.decoder(dec_X, dec_state)
7. 序列到序列学习(seq2seq)
遵循编码器-解码器架构的设计原则,循环神经网络编码器使用长度可变的序列作为输入,将其转换为固定形状的隐状态。换言之,输入序列的信息被编码到循环神经网络编码器的隐状态中。为了连续生成输出序列的词元,独立的循环神经网络解码器是基于输入序列的编码信息和输出序列已经看见的或者生成的词元来预测下一个词元。在机器翻译中使用两个循环神经网络进行序列到序列学习的图示以及理论介绍可见:序列到序列学习(seq2seq) 。
在序列到序列学习中,特定的 <eos>
表示序列结束词元。一旦输出序列生成此词元,模型就会停止预测。在循环神经网络解码器的初始化时间步,有两个特定的设计决定:首先,特定的 <bos>
表示序列开始词元,它是解码器的输入序列的第一个词元。其次,使用循环神经网络编码器最终的隐状态来初始化解码器的隐状态。
从技术上讲,编码器将长度可变的输入序列转换成形状固定的上下文变量,并且将输入序列的信息在该上下文变量中进行编码。
现在,让我们实现循环神经网络编码器。注意,我们使用了嵌入层(embedding layer)来获得输入序列中每个词元的特征向量。嵌入层的权重是一个矩阵,其行数等于输入词表的大小(vocab_size),其列数等于特征向量的维度(embed_size)。另外,本文选择了一个多层门控循环单元来实现编码器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import collectionsimport mathimport torchfrom torch import nnfrom d2l import torch as d2lclass Seq2SeqEncoder (d2l.Encoder): """用于序列到序列学习的循环神经网络编码器""" def __init__ (self, vocab_size, embed_size, num_hiddens, num_layers, dropout=0. , **kwargs ): super (Seq2SeqEncoder, self).__init__(**kwargs) self.embedding = nn.Embedding(vocab_size, embed_size) self.rnn = nn.GRU(embed_size, num_hiddens, num_layers, dropout=dropout) def forward (self, X, *args ): X = self.embedding(X) X = X.permute(1 , 0 , 2 ) output, state = self.rnn(X) return output, state
下面,我们实例化上述编码器的实现:我们使用一个两层门控循环单元编码器,其隐藏单元数为16。给定一小批量的输入序列 X
(批量大小为4,时间步为7)。在完成所有时间步后,最后一层的隐状态的输出是一个张量(output
由编码器的循环层返回),其形状为 (时间步数, 批量大小, 隐藏单元数)
。
由于这里使用的是门控循环单元,所以在最后一个时间步的多层隐状态的形状是 (隐藏层的数量, 批量大小, 隐藏单元的数量)
。如果使用长短期记忆网络,state
中还将包含记忆单元信息。
1 2 3 4 5 encoder = Seq2SeqEncoder(vocab_size=10 , embed_size=8 , num_hiddens=16 , num_layers=2 ) encoder.eval () X = torch.zeros((4 , 7 ), dtype=torch.long) output, state = encoder(X) print (output.shape, state.shape)
当实现解码器时,我们直接使用编码器最后一个时间步的隐状态来初始化解码器的隐状态。这就要求使用循环神经网络实现的编码器和解码器具有相同数量的层和隐藏单元 。为了进一步包含经过编码的输入序列的信息,上下文变量在所有的时间步与解码器的输入进行拼接(concatenate)。为了预测输出词元的概率分布,在循环神经网络解码器的最后一层使用全连接层来变换隐状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class Seq2SeqDecoder (d2l.Decoder): """用于序列到序列学习的循环神经网络解码器""" def __init__ (self, vocab_size, embed_size, num_hiddens, num_layers, dropout=0. , **kwargs ): super (Seq2SeqDecoder, self).__init__(**kwargs) self.embedding = nn.Embedding(vocab_size, embed_size) self.rnn = nn.GRU(embed_size + num_hiddens, num_hiddens, num_layers, dropout=dropout) self.dense = nn.Linear(num_hiddens, vocab_size) def init_state (self, enc_outputs, *args ): return enc_outputs[1 ] def forward (self, X, state ): X = self.embedding(X).permute(1 , 0 , 2 ) context = state[-1 ].repeat(X.shape[0 ], 1 , 1 ) X_and_context = torch.cat((X, context), 2 ) output, state = self.rnn(X_and_context, state) output = self.dense(output).permute(1 , 0 , 2 ) return output, state
下面,我们用与前面提到的编码器中相同的超参数来实例化解码器。如我们所见,解码器的输出形状变为 (批量大小, 时间步数, 词表大小)
,其中张量的最后一个维度存储预测的词元分布。
1 2 3 4 5 decoder = Seq2SeqDecoder(vocab_size=10 , embed_size=8 , num_hiddens=16 , num_layers=2 ) decoder.eval () state = decoder.init_state(encoder(X)) output, state = decoder(X, state) print (output.shape, state.shape)
在每个时间步,解码器预测了输出词元的概率分布。类似于语言模型,可以使用 Softmax 来获得分布,并通过计算交叉熵损失函数来进行优化。回想一下我们将特定的填充词元添加到序列的末尾,因此不同长度的序列可以以相同形状的小批量加载。但是,我们应该将填充词元的预测排除在损失函数的计算之外。
为此,我们可以使用下面的 sequence_mask
函数通过零值化 屏蔽不相关的项,以便后面任何不相关预测的计算都是与零的乘积,结果都等于零。例如,如果两个序列的有效长度(不包括填充词元)分别为1和2,则第一个序列的第一项和第二个序列的前两项之后的剩余项将被清除为零:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 print (torch.arange(3 )[None , :]) print (torch.arange(3 )[None , :] < torch.tensor([1 , 2 ])[:, None ])def sequence_mask (X, valid_len, value=0 ): """在序列中屏蔽不相关的项""" maxlen = X.size(1 ) mask = torch.arange((maxlen), dtype=torch.float32, device=X.device)[None , :] < valid_len[:, None ] X[~mask] = value return X X = torch.tensor([[1 , 2 , 3 ], [4 , 5 , 6 ]]) print (sequence_mask(X, torch.tensor([1 , 2 ])))
现在,我们可以通过扩展 Softmax 交叉熵损失函数来遮蔽不相关的预测。最初,所有预测词元的掩码都设置为1。一旦给定了有效长度,与填充词元对应的掩码将被设置为0。最后,将所有词元的损失乘以掩码,以过滤掉损失中填充词元产生的不相关预测。
1 2 3 4 5 6 7 8 9 10 11 12 class MaskedSoftmaxCELoss (nn.CrossEntropyLoss): """带遮蔽的softmax交叉熵损失函数""" def forward (self, pred, label, valid_len ): weights = torch.ones_like(label) weights = sequence_mask(weights, valid_len) self.reduction='none' unweighted_loss = super (MaskedSoftmaxCELoss, self).forward(pred.permute(0 , 2 , 1 ), label) weighted_loss = (unweighted_loss * weights).mean(dim=1 ) return weighted_loss
我们可以创建三个相同的序列来进行代码健全性检查,然后分别指定这些序列的有效长度为4、2和0。结果就是,第一个序列的损失应为第二个序列的两倍,而第三个序列的损失应为零。
1 2 loss = MaskedSoftmaxCELoss() print (loss(torch.ones(3 , 4 , 10 ), torch.ones((3 , 4 ), dtype=torch.long), torch.tensor([4 , 2 , 0 ])))
在下面的循环训练过程中,特定的序列开始词元(<bos>
)和原始的输出序列(不包括序列结束词元 <eos>
)拼接在一起作为解码器的输入。这被称为强制教学 (teacher forcing),因为原始的输出序列(词元的标签)被送入解码器。或者,将来自上一个时间步的预测得到的词元作为解码器的当前输入。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 def train_seq2seq (net, data_iter, lr, num_epochs, tgt_vocab, device ): """训练序列到序列模型""" def xavier_init_weights (m ): if type (m) == nn.Linear: nn.init.xavier_uniform_(m.weight) if type (m) == nn.GRU: for param in m._flat_weights_names: if "weight" in param: nn.init.xavier_uniform_(m._parameters[param]) net.apply(xavier_init_weights) net.to(device) optimizer = torch.optim.Adam(net.parameters(), lr=lr) loss_function = MaskedSoftmaxCELoss() net.train() writer = SummaryWriter('../logs/seq2seq_train_log' ) for epoch in range (num_epochs): timer = d2l.Timer() num_tokens = 0 train_loss = [] for batch in tqdm(data_iter): X, X_valid_len, Y, Y_valid_len = [x.to(device) for x in batch] bos = torch.tensor([tgt_vocab['<bos>' ]] * Y.shape[0 ], device=device).reshape(-1 , 1 ) dec_input = torch.cat([bos, Y[:, :-1 ]], 1 ) Y_hat, _ = net(X, dec_input, X_valid_len) loss = loss_function(Y_hat, Y, Y_valid_len).mean() optimizer.zero_grad() loss.backward() d2l.grad_clipping(net, 1 ) optimizer.step() train_loss.append(loss) num_tokens += Y_valid_len.sum () if (epoch + 1 ) % 10 == 0 : train_loss = sum (train_loss) / len (train_loss) writer.add_scalar('train_loss' , train_loss, epoch + 1 ) print (f'loss {train_loss:.3 f} , {num_tokens / timer.stop():.1 f} tokens/sec on {str (device)} ' ) writer.close() embed_size, num_hiddens, num_layers, dropout = 32 , 32 , 2 , 0.1 batch_size, num_steps, lr, num_epochs = 64 , 10 , 0.005 , 300 device = torch.device('cuda' if torch.cuda.is_available() else 'cpu' ) train_iter, src_vocab, tgt_vocab = d2l.load_data_nmt(batch_size, num_steps) encoder = Seq2SeqEncoder(len (src_vocab), embed_size, num_hiddens, num_layers, dropout) decoder = Seq2SeqDecoder(len (tgt_vocab), embed_size, num_hiddens, num_layers, dropout) net = d2l.EncoderDecoder(encoder, decoder) train_seq2seq(net, train_iter, lr, num_epochs, tgt_vocab, device)
为了采用一个接着一个词元的方式预测输出序列,每个解码器当前时间步的输入都将来自于前一时间步的预测词元。与训练类似,序列开始词元(<bos>
)在初始时间步被输入到解码器中,当输出序列的预测遇到序列结束词元(<eos>
)时,预测就结束了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 def predict_seq2seq (net, src_sentence, src_vocab, tgt_vocab, num_steps, device, save_attention_weights=False ): """序列到序列模型的预测""" net.eval () src_tokens = src_vocab[src_sentence.lower().split(' ' )] + [src_vocab['<eos>' ]] enc_valid_len = torch.tensor([len (src_tokens)], device=device) src_tokens = d2l.truncate_pad(src_tokens, num_steps, src_vocab['<pad>' ]) enc_X = torch.unsqueeze(torch.tensor(src_tokens, dtype=torch.long, device=device), dim=0 ) enc_outputs = net.encoder(enc_X, enc_valid_len) dec_state = net.decoder.init_state(enc_outputs, enc_valid_len) dec_X = torch.unsqueeze(torch.tensor([tgt_vocab['<bos>' ]], dtype=torch.long, device=device), dim=0 ) output_seq, attention_weight_seq = [], [] for _ in range (num_steps): Y, dec_state = net.decoder(dec_X, dec_state) dec_X = Y.argmax(dim=2 ) pred = dec_X.squeeze(dim=0 ).type (torch.int32).item() if save_attention_weights: attention_weight_seq.append(net.decoder.attention_weights) if pred == tgt_vocab['<eos>' ]: break output_seq.append(pred) return ' ' .join(tgt_vocab.to_tokens(output_seq)), attention_weight_seq
我们可以通过与真实的标签序列进行比较来评估预测序列。虽然BLEU(bilingual evaluation understudy)最先是用于评估机器翻译的结果,但现在它已经被广泛用于测量许多应用的输出序列的质量。BLEU的详细介绍可见:序列到序列学习(seq2seq) 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 def bleu (pred_seq, label_seq, k ): """计算BLEU""" pred_tokens, label_tokens = pred_seq.split(' ' ), label_seq.split(' ' ) len_pred, len_label = len (pred_tokens), len (label_tokens) score = math.exp(min (0 , 1 - len_label / len_pred)) for n in range (1 , k + 1 ): num_matches, label_subs = 0 , collections.defaultdict(int ) for i in range (len_label - n + 1 ): label_subs[' ' .join(label_tokens[i: i + n])] += 1 for i in range (len_pred - n + 1 ): if label_subs[' ' .join(pred_tokens[i: i + n])] > 0 : num_matches += 1 label_subs[' ' .join(pred_tokens[i: i + n])] -= 1 score *= math.pow (num_matches / (len_pred - n + 1 ), math.pow (0.5 , n)) return score engs = ['go .' , "i lost ." , 'he\'s calm .' , 'i\'m home .' ] fras = ['va !' , 'j\'ai perdu .' , 'il est calme .' , 'je suis chez moi .' ] for eng, fra in zip (engs, fras): translation, attention_weight_seq = predict_seq2seq(net, eng, src_vocab, tgt_vocab, num_steps, device) print (f'{eng} => {translation} , bleu {bleu(translation, fra, k=2 ):.3 f} ' )
8. 束搜索
束搜索为预测输出序列的一个算法,详细介绍可见:束搜索 。