写在前面

参考书籍

Aston Zhang, Zachary C. Lipton, Mu Li, Alexander J. Smola. Dive into Deep Learning. 2020.

简介 - Dive-into-DL-PyTorch (tangshusen.me)

现代循环神经网络

source code: NJU-ymhui/DeepLearning: Deep Learning with pytorch (github.com)

use git to clone: https://github.com/NJU-ymhui/DeepLearning.git

/ModernRNN

GRU_self.py GRU_lib.py LSTM_self.py LSTM_lib.py deep_rnn_lib.py machine_translation.py encoder_decoder.py

seq2seq.py

门控循环单元GRU

门控循环单元是长短期记忆LSTM的一个稍微简化的变体。我们从介绍它开始。

门控循环单元和普通的循环神经网络最大的区别在于:前者支持隐状态的门控。这意味着模型有专门的机制来确定应该何时更新隐状态,以及应该何时重置隐状态,并且这些机制是可学习的。

关于更新门、重置门、隐状态和候选隐状态,详见10.2. Gated Recurrent Units (GRU) — Dive into Deep Learning 1.0.3 documentation (d2l.ai)

下面我们着重介绍如何实现GRU。

从零实现GRU

我们依然使用《时光机器》数据集

code

import torch
from d2l import torch as d2l


# 初始化模型参数
def get_params(vocab_size, num_hidden, device):
num_inputs = num_outputs = vocab_size

def normal(shape):
return torch.randn(size=shape, device=device) * 0.01

def three():
return (
normal((num_inputs, num_hidden)),
normal((num_hidden, num_hidden)),
torch.zeros(num_hidden, device=device)
)

W_xz, W_hz, b_z = three() # 更新门参数
W_xr, W_hr, b_r = three() # 重置门参数
W_xh, W_hh, b_h = three() # 候选隐状态参数
# 输出层参数
W_hq = normal((num_hidden, num_outputs))
b_q = torch.zeros(num_outputs, device=device)
# 附加梯度
# 注:附加梯度是一种集成学习技术,通过将多个弱学习器组合到一起,逐步提高模型的预测性能
params = [W_xz, W_hz, b_z, W_xr, W_hr, b_r, W_xh, W_hh, b_h, W_hq, b_q]
for param in params:
param.requires_grad_(True)
return params


# 定义模型
def init_gru_state(batch_size, num_hidden, device):
return torch.zeros((batch_size, num_hidden), device=device),


def gru(inputs, state, params):
W_xz, W_hz, b_z, W_xr, W_hr, b_r, W_xh, W_hh, b_h, W_hq, b_q = params
H, = state
outputs = []
for X in inputs:
Z = torch.sigmoid(torch.matmul(X, W_xz) + torch.matmul(H, W_hz) + b_z)
R = torch.sigmoid(torch.matmul(X, W_xr) + torch.matmul(H, W_hr) + b_r)
H_tilda = torch.tanh(torch.matmul(X, W_xh) + torch.matmul(R * H, W_hh) + b_h)
H = Z * H + (1 - Z) * H_tilda
Y = torch.matmul(H, W_hq) + b_q
outputs.append(Y)
return torch.cat(outputs, dim=0), (H, )


if __name__ == "__main__":
batch_size, num_steps = 32, 35
train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)

# 训练与预测
vocab_size, num_hidden, device = len(vocab), 256, d2l.try_gpu()
num_epochs, lr = 500, 1 # 这些参数和之前一样
model = d2l.RNNModelScratch(len(vocab), num_hidden, device, get_params, init_gru_state, gru)
d2l.train_ch8(model, train_iter, vocab, lr, num_epochs, device)
# 可视化困惑度
d2l.plt.show()

output

time traveller                                                  
perplexity 1.2, 23127.7 tokens/sec on cpu
time traveller but now you begin this wo legh wime yo u gan a ju
travelleryou can show ble i have been at work upon thisgeom

简洁实现的GRU

深度学习框架中的API包含了前面介绍的所有细节,我们可以直接实例化门控循环单元模型。

code

from torch import nn
from d2l import torch as d2l


if __name__ == "__main__":
batch_size, num_steps = 32, 35
train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)
vocab_size, device = len(vocab), d2l.try_gpu()
num_epochs, lr = 500, 1

num_inputs, num_hidden = vocab_size, 256
gru_layer = nn.GRU(num_inputs, num_hidden) # 实例化GRU
model = d2l.RNNModel(gru_layer, vocab_size)
model = model.to(device)
d2l.train_ch8(model, train_iter, vocab, lr, num_epochs, device)
d2l.plt.show()

output

perplexity 1.0, 16676.1 tokens/sec on cpu
time traveller for so it will be convenient to speak of himwas e
travelleryou can show black is white by argument said filby

长短期记忆网络LSTM

长期以来,隐变量模型存在着长期信息保存和短期输入缺失的问题,首次解决这一问题使用的是长短期记忆网络模型。

关于输入门、忘记门、输出门、候选记忆元、记忆元和隐状态,详见门控记忆元

从零实现LSTM

依然使用时光机器数据集。

code

import torch
from d2l import torch as d2l


# 初始化模型参数
def get_lstm_params(vocab_size, num_hidden, device):
num_inputs = num_outputs = vocab_size

def normal(shape):
return torch.randn(size=shape, device=device) * 0.01

def three():
return (
normal((num_inputs, num_hidden)),
normal((num_hidden, num_hidden)),
torch.zeros(num_hidden, device=device)
)

# 输入门参数
W_xi, W_hi, b_i = three()
# 遗忘门参数
W_xf, W_hf, b_f = three()
# 输出门参数
W_xo, W_ho, b_o = three()
# 候选记忆元参数
W_xc, W_hc, b_c = three()
# 输出层参数
W_hq = normal((num_hidden, num_outputs))
b_q = torch.zeros(num_outputs, device=device)
# 附加梯度
params = [W_xi, W_hi, b_i, W_xf, W_hf, b_f, W_xo, W_ho, b_o, W_xc, W_hc, b_c, W_hq, b_q]
for param in params:
param.requires_grad_(True)
return params


# 定义模型
def init_lstm_state(batch_size, num_hidden, device):
"""初始化函数:LSTM的隐状态需要返回一个额外的记忆元,其单元值为0,形状为(批量大小,隐藏单元数)"""
return (torch.zeros((batch_size, num_hidden), device=device),
torch.zeros((batch_size, num_hidden), device=device))


# 此处实际模型的定义与GRU格式类似
def lstm(inputs, state, params):
W_xi, W_hi, b_i, W_xf, W_hf, b_f, W_xo, W_ho, b_o, W_xc, W_hc, b_c, W_hq, b_q = params
(H, C) = state
outputs = []
for X in inputs:
I = torch.sigmoid(torch.matmul(X, W_xi) + torch.matmul(H, W_hi) + b_i)
F = torch.sigmoid(torch.matmul(X, W_xf) + torch.matmul(H, W_hf) + b_f)
O = torch.sigmoid(torch.matmul(X, W_xo) + torch.matmul(H, W_ho) + b_o)
C_tilda = torch.tanh(torch.matmul(X, W_xc) + torch.matmul(H, W_hc) + b_c)
C = F * C + I * C_tilda
H = O * torch.tanh(C)
Y = torch.matmul(H, W_hq) + b_q
outputs.append(Y)
return torch.cat(outputs, dim=0), (H, C)


if __name__ == "__main__":
batch_size, num_steps = 32, 35
train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)

# 训练和预测
num_epochs, lr = 500, 1
vocab_size, num_hidden, device = len(vocab), 256, d2l.try_gpu()
model = d2l.RNNModelScratch(vocab_size, num_hidden, device, get_lstm_params, init_lstm_state, lstm)
d2l.train_ch8(model, train_iter, vocab, lr, num_epochs, device)
# 可视化困惑度
d2l.plt.show()

output

perplexity 1.1, 10388.7 tokens/sec on cpu
time travellerit wollareftrev ssich aly lemesyou back an whree a
travellerbyuccouco bain the psychologistyes so it seemed to

简洁实现的LSTM

code

from torch import nn
from d2l import torch as d2l


if __name__ == "__main__":
batch_size, num_steps = 32, 35
train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)
vocab_size, num_hidden, device = len(vocab), 256, d2l.try_gpu()

num_inputs = vocab_size
num_epochs, lr = 500, 1
lstm_layer = nn.LSTM(num_inputs, num_hidden)
model = d2l.RNNModel(lstm_layer, vocab_size)
model = model.to(device)
d2l.train_ch8(model, train_iter, vocab, lr, num_epochs, device)
d2l.plt.show()

output

perplexity 1.0, 10847.3 tokens/sec on cpu
time traveller for so it will be convenient to speak of himwas e
traveller with a slight accession ofcheerfulness really thi

深度循环神经网络

到目前为止,我们只讨论了具有一个单向隐藏层的循环神经网络,事实上,我们可以将多层循环神经网络叠在一起

理论部分详见10.3. Deep Recurrent Neural Networks — Dive into Deep Learning 1.0.3 documentation (d2l.ai)

简洁实现的深度循环神经网络

现有的API已经实现了该模型中的所有逻辑细节,方便起见,我们直接介绍简洁版本。

code

from torch import nn
from d2l import torch as d2l


if __name__ == "__main__":
batch_size, num_steps = 32, 35
train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)

vocab_size, num_hidden, num_layers = len(vocab), 256, 2
num_inputs = vocab_size
num_epochs, lr, device = 500, 2, d2l.try_gpu()
lstm_layer = nn.LSTM(num_inputs, num_hidden, num_layers)
model = d2l.RNNModel(lstm_layer, vocab_size)
model = model.to(device)
d2l.train_ch8(model, train_iter, vocab, lr * 1.0, num_epochs, device)
d2l.plt.show()

output

perplexity 1.0, 8220.6 tokens/sec on cpu
time travelleryou can show black is white by argument said filby
travelleryou can show black is white by argument said filby

双向循环神经网络

基本理论详见10.4. Bidirectional Recurrent Neural Networks — Dive into Deep Learning 1.0.3 documentation (d2l.ai)

机器翻译与数据集

code

import os
import torch
from d2l import torch as d2l
from matplotlib import pyplot as plt


def read_data_nmt():
"""载入\"英语-法语\"数据集"""
data_dir = d2l.download_extract('fra-eng')
with open(os.path.join(data_dir, 'fra.txt'), 'r', encoding='utf-8') as f:
return f.read()


def preprocess_nmt(text):
"""对原始数据做一些预处理,如将不间断空格替换为一个空格,小写替换大写,单词和标点之间插入空格"""
def no_space(char, prev_char):
return char in set(',.!?') and prev_char != ' '

# 使用空格替换不间断空格
# 使用小写替换大写
text = text.replace('\u202f', ' ').replace('\xa0', ' ').lower()
# 在单词和标点符号之间插入空格
out = [' ' + char if i > 0 and no_space(char, text[i - 1]) else
char for i, char in enumerate(text)]
return ''.join(out)


# 词元化
def tokenize_nmt(text, num_examples=None):
"""词元化\"英语-法语\"数据集"""
source, target = [], []
for i, line in enumerate(text.split('\n')):
if num_examples and i > num_examples:
break
parts = line.split('\t')
if len(parts) == 2:
source.append(parts[0].split(' '))
target.append(parts[1].split(' '))
return source, target


# 绘制每个文本序列所包含的词元数量的直方图
def show_list_len_pair_hist(legend, xlabel, ylabel, xlist, ylist):
"""绘制列表长度对的直方图"""
# 使用原生pyplot 绘制直方图
plt.figure(figsize=(5, 3))
_, _, patches = plt.hist([[len(l) for l in xlist], [len(l) for l in ylist]])
plt.xlabel(xlabel)
plt.ylabel(ylabel)
for patch in patches[1].patches:
patch.set_hatch('/')
plt.legend(legend)
plt.show()


# 加载数据集
def truncate_pad(line, num_steps, padding_token):
"""截断或填充文本序列"""
if len(line) > num_steps:
return line[:num_steps] # 截断
return line + [padding_token] * (num_steps - len(line)) # 填充


def build_array_nmt(lines, vocab, num_steps):
"""将机器翻译的文本序列转换成小批量"""
lines = [vocab[l] for l in lines]
lines = [l + [vocab['<eos>']] for l in lines]
array = torch.tensor([truncate_pad(
l, num_steps, vocab['<pad>']
) for l in lines])
valid_len = (array != vocab['<pad>']).type(torch.int32).sum(1)
return array, valid_len


def load_data_nmt(batch_size, num_steps, num_examples=600):
"""返回翻译数据集的迭代器和词表"""
reserved_tokens = ['<pad>', '<bos>', '<eos>']
# 从文件中读取原始数据,并进行预处理
text = preprocess_nmt(read_data_nmt())
# 对预处理后的数据进行分词,并按需限制样本数量
source, target = tokenize_nmt(text, num_examples)
# 构建源语言和目标语言的词表,最小频率设为2
src_vocab = d2l.Vocab(source, min_freq=2, reserved_tokens=reserved_tokens)
target_vocab = d2l.Vocab(target, min_freq=2, reserved_tokens=reserved_tokens)
# 将分词后的文本序列转换为索引数组和有效长度数组
src_array, src_valid_len = build_array_nmt(source, src_vocab, num_steps)
target_array, target_valid_len = build_array_nmt(target, target_vocab, num_steps)
# 组合所有数据数组,以便加载到迭代器中
data_arrays = [src_array, src_valid_len, target_array, target_valid_len]
# 创建并返回数据迭代器,以及源语言和目标语言的词表
data_iter = d2l.load_array(data_arrays, batch_size)
return data_iter, src_vocab, target_vocab


if __name__ == "__main__":
d2l.DATA_HUB['fra-eng'] = (d2l.DATA_URL + 'fra-eng.zip',
'94646ad1522d915e7b0f9296181140edcf86a4f5')
# 加载数据
raw_text = read_data_nmt()
print("raw data:")
print(raw_text[:75])

# 数据预处理
print("after preprocessing:")
text = preprocess_nmt(raw_text)
print(text[:75])

# 词元化
source, target = tokenize_nmt(text)
print("after tokenizing:")
print(source[:6])
print(target[:6])

show_list_len_pair_hist(['source', 'target'], '# tokens per sequence', 'count', source, target)

# 词表
src_vocab = d2l.Vocab(source, min_freq=2, reserved_tokens=['<pad>', '<bcs>', '<eos>'])
print("vocab size:")
print(len(src_vocab))

# 截断或填充文本
print(truncate_pad(src_vocab[source[0]], 10, src_vocab['<pad>']))

# 加载迭代器和词表
train_iter, src_vocab, target_vocab = load_data_nmt(batch_size=2, num_steps=8)
# 可视化一部分数据
for X, X_valid_len, Y, Y_valid_len in train_iter:
print('X:', X.type(torch.int32))
print('X的有效长度:', X_valid_len)
print('Y:', Y.type(torch.int32))
print('Y的有效长度:', Y_valid_len)
break

output

raw data:
Go. Va !
Hi. Salut !
Run! Cours !
Run! Courez !
Who? Qui ?
Wow! Ça alors !

after preprocessing:
go . va !
hi . salut !
run ! cours !
run ! courez !
who ? qui ?
wow ! ça al
after tokenizing:
[['go', '.'], ['hi', '.'], ['run', '!'], ['run', '!'], ['who', '?'], ['wow', '!']]
[['va', '!'], ['salut', '!'], ['cours', '!'], ['courez', '!'], ['qui', '?'], ['ça', 'alors', '!']]
vocab size:
10012
[47, 4, 1, 1, 1, 1, 1, 1, 1, 1]
X: tensor([[ 9, 28, 4, 3, 1, 1, 1, 1],
[16, 51, 4, 3, 1, 1, 1, 1]], dtype=torch.int32)
X的有效长度: tensor([4, 4])
Y: tensor([[73, 0, 4, 3, 1, 1, 1, 1],
[35, 53, 5, 3, 1, 1, 1, 1]], dtype=torch.int32)
Y的有效长度: tensor([4, 4])

编码器-解码器架构

接着上面的内容来说,机器翻译是序列转换模型中的一个关键问题,其困难点主要在于输入与输出序列都是可变长的。为了解决这个问题,我们设计一个全新的架构:编码器-解码器。在这个架构中,编码器负责把变长的输入序列转化为具有固定形状的编码状态,而解码器负责把这个固定形状的编码状态再转化为变长的输出序列。

接口

声明几个接口

code

import torch
from torch import nn


class Encoder(nn.Module):
"""基本编码器接口"""
def __init__(self, **kwargs):
super(Encoder, self).__init__(**kwargs)

def forward(self, X, *args):
raise NotImplementedError


class Decoder(nn.Module):
"""基本解码器接口"""
def __init__(self, **kwargs):
super(Decoder, self).__init__(**kwargs)

def init_state(self, enc_outputs, *args):
raise NotImplementedError

def forward(self, X, state):
raise NotImplementedError


class EncoderDecoder(nn.Module):
"""编码器-解码器架构"""
def __init__(self, encoder, decoder, **kwargs):
super(EncoderDecoder, self).__init__(**kwargs)
self.encoder = encoder
self.decoder = decoder

def forward(self, enc_X, dec_X, *args):
enc_outputs = self.encoder(enc_X, *args)
dec_state = self.decoder.init_state(enc_outputs, *args)
return self.decoder(dec_X, dec_state)

序列到序列学习seq2seq

本部分我们将使用两个循环神经网络的编码器和解码器,并将其应用于序列到序列类的学习任务

理论部分详见10.7. Sequence-to-Sequence Learning for Machine Translation — Dive into Deep Learning 1.0.3 documentation (d2l.ai)

code

import collections
import math
import torch
from torch import nn
from d2l import torch as d2l
from encoder_decoder import Encoder
from encoder_decoder import Decoder
from encoder_decoder import EncoderDecoder
from machine_translation import load_data_nmt
from RNN.rnn_self import grad_clipping


class Seq2SeqEncoder(Encoder):
"""用于序列到序列学习的循环神经网络编码器"""
def __init__(self, vocab_size, embed_size, num_hidden, num_layers, dropout=0, **kwargs):
super(Seq2SeqEncoder, self).__init__(**kwargs)
# 嵌入层
self.embedding = nn.Embedding(vocab_size, embed_size)
self.rnn = nn.GRU(embed_size, num_hidden, num_layers, dropout=dropout)

def forward(self, X, *args):
# X的形状:(batch_size, num_steps, embed_size_
X = self.embedding(X)
# 在循环神经网络模型中,第一个轴对应于时间步
X = X.permute(1, 0, 2)
# 若未提及状态,默认0
output, state = self.rnn(X)
# output的形状:(num_steps, batch_size, num_hidden)
# state的形状:(num_layers, batch_size, num_hidden)
return output, state


class Seq2SeqDecoder(Decoder):
"""用于序列到序列学习的循环神经网络解码器"""
def __init__(self, vocab_size, embed_size, num_hidden, num_layer, dropout=0, **kwargs):
super(Seq2SeqDecoder, self).__init__(**kwargs)
# 定义词汇嵌入层,将词汇ID转换为词嵌入向量
self.embedding = nn.Embedding(vocab_size, embed_size)
# 定义GRU网络,用于处理序列数据
# 输入维度为词嵌入向量维度embed_size与隐藏层单元数num_hidden之和
# 隐藏层单元数为num_hidden,用于捕捉序列中的长期依赖关系
# 设置多层GRU,num_layer表示GRU的层数
# 添加dropout,用于在训练过程中防止过拟合
self.rnn = nn.GRU(embed_size + num_hidden, num_hidden, num_layer, dropout=dropout)
# 定义全连接层,将GRU的输出转换为词汇大小的预测值
# 输入维度为GRU的隐藏层单元数num_hidden,输出维度为词汇表大小vocab_size
self.dense = nn.Linear(num_hidden, vocab_size)

def init_state(self, enc_outputs, *args):
"""
:param enc_outputs: 编码器的输出
:param args: 其余参数
:return:
"""
return enc_outputs[1]

def forward(self, X, state):
# X的形状:(batch_size, num_steps, embed_size)
X = self.embedding(X).permute(1, 0, 2)
# 广播context,使其具有与X相同的num_steps
context = state[-1].repeat(X.shape[0], 1, 1)
X_and_context = torch.cat((X, context), 2)
output, state = self.rnn(X_and_context, state)
output = self.dense(output).permute(1, 0, 2)
# output的形状:(batch_size, num_steps, vocab_size)
# state的形状:(num_layers, batch_size, num_hidden)
return output, state


# 下面将通过计算交叉熵损失函数来进行优化
# 首先定义一个遮蔽函数通过零值化来屏蔽不相关预测
def sequence_mask(X, valid_len, value=0):
"""屏蔽序列中不相关的项"""
# 获取当前批次中序列的最大长度
max_len = X.size(1)
# 创建一个形状为(batch_size, max_len)的掩码,其中valid_len对应位置为True,其余为False
mask = torch.arange((max_len), dtype=torch.float32, device=X.device)[None, :] < valid_len[:, None]
# 将不符合掩码条件的元素替换为指定的value
X[~mask] = value
# 返回应用掩码后的序列数据
return X


# 定义损失函数
class MaskedSoftmaxCELoss(nn.CrossEntropyLoss):
"""带遮蔽的softmax交叉熵损失函数"""
# pred形状:(batch_size, num_steps, vocab_size)
# label形状:(batch_size, num_steps)
# valid_length形状:(batch_size,)

def forward(self, pred, label, valid_len):
# 初始化与标签形状相同的权重张量,初始权重都为1
weights = torch.ones_like(label)
# 应用sequence_mask以根据有效长度对权重进行掩码,超出有效长度的部分权重设为0
weights = sequence_mask(weights, valid_len)
# 设置reduction参数为'none',确保损失函数为每个元素返回一个未减少的损失值
self.reduction = 'none'
# 调用父类的forward方法计算未加权的损失,调整预测数据的维度以适应损失函数的要求
unweighted_loss = super(MaskedSoftmaxCELoss, self).forward(pred.permute(0, 2, 1), label)
# 将未加权的损失与掩码权重相乘,然后在序列维度上计算加权损失的平均值
weighted_loss = (unweighted_loss * weights).mean(dim=1)
return weighted_loss


# 训练序列到序列学习模型
def train_seq2seq(net, train_iter, lr, num_epochs, target_vocab, device):
"""训练序列到序列模型"""
def xavier_init_weights(m):
if type(m) == nn.Linear:
nn.init.xavier_uniform_(m.weight)
if type(m) == nn.GRU:
for param in m._flat_weights_names:
if "weight" in param:
nn.init.xavier_uniform_(m._parameters[param])

net.apply(xavier_init_weights)
net.to(device)
optimizer = torch.optim.Adam(net.parameters(), lr=lr)
loas = MaskedSoftmaxCELoss()
net.train()
animator = d2l.Animator(xlabel='epoch', ylabel='loss', xlim=[10, num_epochs])

for epoch in range(num_epochs):
timer = d2l.Timer()
metric = d2l.Accumulator(2)
for batch in train_iter:
optimizer.zero_grad()
X, X_valid_len, Y, Y_valid_len = [x.to(device) for x in batch]
bos = torch.tensor([target_vocab['<bos>']] * Y.shape[0], device=device).reshape(-1, 1)
dec_input = torch.cat([bos, Y[:, :-1]], 1) # 强制教学
Y_hat, _ = net(X, dec_input, X_valid_len)
l = loss(Y_hat, Y, Y_valid_len)
l.sum().backward()
# 这里不做l.sum()会报RuntimeError: Boolean value of Tensor with more than one value is ambiguous
grad_clipping(net, l.sum())
num_tokens = Y_valid_len.sum()
optimizer.step()
with torch.no_grad():
metric.add(l.sum(), num_tokens)
if (epoch + 1) % 10 == 0:
animator.add(epoch + 1, (metric[0] / metric[1], ))
print(f'loss {metric[0] / metric[1]}, {metric[1] / timer.stop()} tokens / sec on {device}')
d2l.plt.show() # 可视化损失曲线


# 预测
# 为了采用一个接着一个词元的方式预测输出序列,每个解码器当前时间步的输入都来自前一时间步的预测词元
def predict_seq2seq(net, src_sentence, src_vocab, target_vocab, num_steps, device, save_attention_weights=False):
"""序列到序列模型的预测"""
net.eval() # 评估模式
# 将源句子转换为词元序列,并添加序列结束符
src_tokens = src_vocab[src_sentence.lower().split(' ')] + [src_vocab['<eos>']]
# 记录有效长度,用于处理padding
end_valid_len = torch.tensor([len(src_tokens)], device=device)
# 确保序列长度不超过num_steps,不足则填充
src_tokens = d2l.truncate_pad(src_tokens, num_steps, src_vocab['<pad>'])
# 添加批量轴
enc_X = torch.unsqueeze(
torch.tensor(src_tokens, dtype=torch.long, device=device), dim=0
)
# 通过编码器编码源句子
enc_outputs = net.encoder(enc_X, end_valid_len)
# 初始化解码器状态
dec_state = net.decoder.init_state(enc_outputs, end_valid_len)
# 添加批量轴
# 解码器的输入开始于开始符号
dec_X = torch.unsqueeze(
torch.tensor([target_vocab['<bos>']], dtype=torch.long, device=device), dim=0
)
output_seq, attention_weight_seq = [], []
for _ in range(num_steps):
# 使用解码器生成下一个词元
Y, dec_state = net.decoder(dec_X, dec_state)
# 使用具有预测最高可能性的词元,作为解码器在下一时间步的输入
dec_X = Y.argmax(dim=2)
# 挤压批量轴,获取预测的词元ID
pred = dec_X.squeeze(dim=0).type(torch.int32).item()
# 保存注意力权重
if save_attention_weights:
attention_weight_seq.append(net.decoder.attention_weights)
# 若序列结束词元被预测,输出序列的生成就完成了
if pred == target_vocab['<eos>']:
break
# 累积预测的词元序列
output_seq.append(pred)
# 将词元ID序列转换为目标句子
return ' '.join(target_vocab.to_tokens(output_seq)), attention_weight_seq


# 预测序列的评估
def bleu(pred_seq, label_seq, k):
"""计算BLEU"""
# 将序列分割成词 token
pred_tokens, label_tokens = pred_seq.split(' '), label_seq.split(' ')
# 计算预测序列和标签序列的长度
len_pred, len_label = len(pred_tokens), len(label_tokens)
# 计算精度的初步分数部分
score = math.exp(min(0, 1 - len_label / len_pred))
# 遍历不同的n-gram长度
for n in range(1, k + 1):
# 初始化匹配数量和标签子序列的字典
num_matches, label_subs = 0, collections.defaultdict(int)
# 构建标签子序列的n-gram并计数
for i in range(len_label - n + 1):
label_subs[' '.join(label_tokens[i: i + n])] += 1
# 在预测序列中查找匹配的n-gram
for i in range(len_pred - n + 1):
# 如果在标签序列中找到匹配,则增加匹配数量
if label_subs[' '.join(pred_tokens[i: i + n])] > 0:
num_matches += 1
label_subs[' '.join(pred_tokens[i: i + n])] += 1
# 根据匹配数量和n-gram长度更新分数
score *= math.pow(num_matches / (len_pred - n + 1), math.pow(0.5, n))
# 返回最终的BLEU分数
return score


if __name__ == "__main__":
# 实例化一个Seq2SeqEncoder对象,用于编码序列
# 参数说明:
# vocab_size: 词汇表大小,表示输入数据的唯一词汇数量
# embed_size: 嵌入层大小,表示将词汇嵌入到多少维度的向量空间
# num_hidden: 隐藏层单元数量,决定了模型的复杂度
# num_layers: RNN的层数,多层可以提高模型的表达能力
encoder = Seq2SeqEncoder(vocab_size=10, embed_size=8, num_hidden=16, num_layers=2)
encoder.eval() # 评估模式
X = torch.zeros((4, 7), dtype=torch.long)
output, state = encoder(X)
print("encoder:")
print('output shape:', output.shape, 'state shape:', state.shape)

# 使用与上面编码器相同的超参数来实例化解码器
decoder = Seq2SeqDecoder(vocab_size=10, embed_size=8, num_hidden=16, num_layer=2)
decoder.eval()
state = decoder.init_state(encoder(X))
output, state = decoder(X, state) # 获得输出,并更新state
print("decoder:")
print('output shape:', output.shape, 'state shape:', state.shape)

# 定义损失函数
loss = MaskedSoftmaxCELoss()
print("loss demo:")
print(loss(torch.ones(3, 4, 10), torch.ones((3, 4), dtype=torch.long), torch.tensor([4, 2, 0])))

# 现在在机器翻译数据集上,我们可以创建和训练一个循环神经网络“编码器-解码器”模型用于序列到序列的学习
embed_size , num_hidden, num_layers, dropout = 32, 32, 2, 0.1
batch_size, num_steps = 64, 10
lr, num_epochs, device = 0.005, 300, d2l.try_gpu()
train_iter, src_vocab, target_vocab = load_data_nmt(batch_size, num_steps) # 用d2l的库会报编码错误
encoder = Seq2SeqEncoder(len(src_vocab), embed_size, num_hidden, num_layers, dropout)
decoder = Seq2SeqDecoder(len(target_vocab), embed_size, num_hidden, num_layers, dropout)
net = EncoderDecoder(encoder, decoder)
train_seq2seq(net, train_iter, lr, num_epochs, target_vocab, device)

# 利用训练好的“编码器-解码器”模型,将几个英语句子翻译为法语,并计算BLEU最终结果
print("start translating:")
engs = ['go .', 'i lost .', 'he\'s calm .', 'i\'m home .']
fras = ['va !', 'j\'ai perdu .', 'il est calme .', 'je suis chez moi .']
for eng, fra in zip(engs, fras):
translation, attention_weight_seq = predict_seq2seq(net, eng, src_vocab, target_vocab, num_steps, device)
print(f'{eng} => {translation}, bleu {bleu(translation, fra, k=2):.3f}')

output

encoder:
output shape: torch.Size([7, 4, 16]) state shape: torch.Size([2, 4, 16])
decoder:
output shape: torch.Size([4, 7, 10]) state shape: torch.Size([2, 4, 16])
loss demo:
tensor([2.3026, 1.1513, 0.0000])
loss 0.01964012612887416, 5921.959299792854 tokens / sec on cpu
start translating:
go . => va !, bleu 1.000
i lost . => j'ai perdu ., bleu 1.000
he's calm . => il est tom bon ?, bleu 0.447
i'm home . => je suis <unk> ., bleu 0.512

束搜索

在正式介绍束搜索之前,先介绍一下贪心搜索,并探讨其存在的问题。

本节主要以理论知识为主。

贪心搜索

理论部分详见10.8. Greedy Search — Dive into Deep Learning 1.0.3 documentation (d2l.ai)

穷举搜索

理论部分详见10.8. Exhaustive Search — Dive into Deep Learning 1.0.3 documentation (d2l.ai)

束搜索

理论部分详见10.8. Beam Search — Dive into Deep Learning 1.0.3 documentation (d2l.ai)

(•‿•)