写在前面

参考书籍

Aston Zhang, Zachary C. Lipton, Mu Li, Alexander J. Smola. Dive into Deep Learning. 2020.

简介 - Dive-into-DL-PyTorch (tangshusen.me)

自然语言处理

source code: NJU-ymhui/DeepLearning: Deep Learning with pytorch (github.com)

use git to clone: https://github.com/NJU-ymhui/DeepLearning.git

/NLP

word2vec_dataset.py word2vec_pretraining.py fasttext.py similarity_compare.py BERT.py BERT_pretraining_dataset.py

词嵌入

在自然语言系统中,是意义的基本单位。下面介绍一个新的概念:词向量,这是用于表示单词意义的向量。那么现在还需要将每个词映射到对应的词向量,这项技术就是词嵌入

下面将介绍一些将词映射到向量的相关技术。

为什么不再使用独热向量

在之前做机器翻译时曾尝试用过独热编码的向量,但现在来看,这并不是一个好的选择。一个重要原因是独热向量不能表达不同词之间的相似度,比如经常使用的余弦相似度,它对两个向量x, y使用余弦表示两个向量之间的相似度

然而根据独热向量的定义可以得知,任意两个不同词的独热向量的余弦为0,即独热向量不能编码词之间的相似性。

自监督的word2vec

  • 跳元模型
  • 连续词袋模型

跳元模型Skip-Gram

详见15.1. Skip-Gram — Dive into Deep Learning 1.0.3 documentation (d2l.ai)

连续词袋CBOW

详见15.1. CBOW — Dive into Deep Learning 1.0.3 documentation (d2l.ai)

近似训练

负采样

详见15.2. Negative-Sampling — Dive into Deep Learning 1.0.3 documentation (d2l.ai)

层序Softmax

层序softmax是另一种近似训练的方法,使用二叉树,其中树的每个叶节点表示词表V中的一个词。

其原理,详见15.2. Softmax — Dive into Deep Learning 1.0.3 documentation (d2l.ai)

预训练词嵌入的数据集

直接上代码。

code

import os
import math
import random
import torch
from d2l import torch as d2l


def read_ptb():
"""将ptb数据集加载到文本行的列表中"""
data_dir = d2l.download_extract('ptb')
# 读取训练集
with open(os.path.join(data_dir, 'ptb.train.txt')) as f:
raw_text = f.read()
return [line.split() for line in raw_text.split('\n')]


def subsample(sentences, vocab):
"""下采样高频词"""
# 排除未知词元<unk>
sentences = [[token for token in line if vocab[token] != vocab.unk] for line in sentences]
counter = d2l.count_corpus(sentences)
num_tokens = sum(counter.values())

# 如果在下采样期间保留词元,返回True
def keep(token):
return random.uniform(0, 1) < math.sqrt(1e-4 / counter[token] * num_tokens)

return [[token for token in line if keep(token)] for line in sentences], counter


def compare_counts(token, sentences, subsampled):
"""比较下采样前后某个token的频次"""
return (f'"{token}" count before subsample: {sum(l.count(token) for l in sentences)}\n'
f'after subsample: {sum(l.count(token) for l in subsampled)}')


# 中心词和上下文词的提取
def get_centers_and_contexts(corpus, max_window_size):
"""返回跳元模型中的中心词和上下文词"""
centers, contexts = [], []
for line in corpus:
# 要形成”中心词-上下文词“对,每个句子至少需要有两个词
if len(line) < 2:
continue
centers += line
for i in range(len(line)):
window_size = random.randint(1, max_window_size)
indices = list(range(max(0, i - window_size), min(len(line), i + 1 + window_size)))
# 从上下文词中排除中心词
indices.remove(i)
contexts.append([line[idx] for idx in indices])
return centers, contexts


# 为了根据预定义的分布对噪声词进行采样,我们定义如下RandomGenerator类
# 其中采用分布通过变量sampling_weights传递
class RandomGenerator:
"""根据n个采样权重在{1, ..., n}中随机抽取"""
def __init__(self, sampling_weights):
# Exclude
self.population = list(range(1, len(sampling_weights) + 1))
self.sampling_weights = sampling_weights
self.candidates = []
self.i = 0

def draw(self):
if self.i == len(self.candidates):
# 缓存k个随机采样结果
self.candidates = random.choices(self.population, self.sampling_weights, k=10000)
self.i = 0
self.i += 1
return self.candidates[self.i - 1]


# 对于一对中心词和上下文词,我们随机抽取了K个(实验中为5个)噪声词。
# 根据word2vec论文中的建议,将噪声词w的采样概率P(w)设置为其在字典中的相对频率,其幂为0.75
def get_negatives(all_contexts, vocab, counter, K):
"""返回负采样中的噪声词"""
# 索引为1、2、... (索引0是词表中排除的未知标记)
sampling_weights = [counter[vocab.to_tokens(i)] ** 0.75 for i in range(1, len(vocab))]
all_negatives, generator = [], RandomGenerator(sampling_weights)
for contexts in all_contexts:
negatives = []
while len(negatives) < len(contexts) * K:
neg = generator.draw()
# 噪声词不能是上下文词
if neg not in contexts: # 通过这个条件保证
negatives.append(neg)
all_negatives.append(negatives)
return all_negatives


def batchify(data):
"""返回带有负采样的跳元模型的小批量样本"""
max_len = max(len(c) + len(n) for _, c, n in data)
centres, contexts_negatives, masks, labels = [], [], [], []
for center, context, negative in data:
cur_len = len(context) + len(negative)
centres += [center]
contexts_negatives += [context + negative + [0] * (max_len - cur_len)]
masks += [[1] * cur_len + [0] * (max_len - cur_len)]
labels += [[1] * len(context) + [0] * (max_len - len(context))]
return (torch.tensor(centres).reshape((-1, 1)), torch.tensor(contexts_negatives),
torch.tensor(masks), torch.tensor(labels))


# 整合一下代码
def load_data_ptb(batch_size, max_window_size, num_noise_words):
"""下载PTB数据集,然后将其加载到内存中"""
sentences = read_ptb()
vocab = d2l.Vocab(sentences, min_freq=10)
subsampled, counter = subsample(sentences, vocab)
corpus = [vocab[line] for line in subsampled]
all_centers, all_contexts = get_centers_and_contexts(corpus, max_window_size)
all_negatives = get_negatives(all_contexts, vocab, counter, num_noise_words)

class PTBDataset(torch.utils.data.Dataset):
def __init__(self, centers, contexts, negatives):
assert len(centers) == len(contexts) == len(negatives)
self.centers = centers
self.contexts = contexts
self.negatives = negatives

def __getitem__(self, index):
return self.centers[index], self.contexts[index], self.negatives[index]

def __len__(self):
return len(self.centers)

dataset = PTBDataset(all_centers, all_contexts, all_negatives)
data_iter = torch.utils.data.DataLoader(dataset, batch_size,
shuffle=True, collate_fn=batchify, num_workers=0)
return data_iter, vocab


if __name__ == "__main__":
# 读取数据集
d2l.DATA_HUB['ptb'] = (d2l.DATA_URL + 'ptb.zip', '319d85e578af0cdc590547f26231e4e31cdf1e42')
sentences = read_ptb()
print(f'length of sentence: {len(sentences)}')

# 构建词表
vocab = d2l.Vocab(sentences, min_freq=10)
print(f'vocab size: {len(vocab)}')

# 下采样
subsampled, counter = subsample(sentences, vocab)

# 可视化下采样前后每句话词元数量的直方图
d2l.show_list_len_pair_hist(['origin', 'subsampled'], '# tokens per sentence', 'count', sentences, subsampled)
d2l.plt.show()
# 查看一下词元高频词 the 在下采样前后的频次
print(compare_counts('the', sentences, subsampled)) # 可看出高频词the很多都被去除了
# 查看一下词元低频词 join 在下采样前后的频次
print(compare_counts('join', subsampled, subsampled)) # 可看出低频词join被完全保留了

# 下采样之后,将词元映射到它们在语料库中的索引
corpus = [vocab[line] for line in subsampled]
print(corpus[:3])

# 验证一下提取中心词和上下文词的函数
tiny_set = [list(range(7)), list(range(7, 10))]
print("raw data: ", tiny_set)
for center, context in zip(*get_centers_and_contexts(tiny_set, 2)):
print('center word ', center, ', its', 'context word:', context)

# 在PTB数据集上进行训练时,将最大上下文窗口大小设为5
all_centers, all_contexts = get_centers_and_contexts(corpus, 5)

# 使用负采样进行近似训练
generator = RandomGenerator([2, 3, 4])
print([generator.draw() for _ in range(10)])

# 负采样
all_negatives = get_negatives(all_contexts, vocab, counter, 5)

# 小批量加载训练实例
# 先测试一下batchify函数
x_1 = (1, [2, 2], [3, 3, 3, 3])
x_2 = (1, [2, 2, 2], [3, 3])
batch = batchify((x_1, x_2))
names = ['centers', 'contexts_negatives', 'masks', 'labels']
for name, data in zip(names, batch):
print(name, '=', data)

# 检验load_data_ptb返回的数据迭代器
data_iter, vocab = load_data_ptb(512, 5, 5)
for batch in data_iter:
for name, data in zip(names, batch):
print(name, 'shape:', data.shape)
break # 看一下第一个就够了

output

length of sentence: 42069
vocab size: 6719
"the" count before subsample: 50770
after subsample: 2043
"join" count before subsample: 45
after subsample: 45
[[], [2115, 406], [22, 5277, 3054, 1580]]
raw data: [[0, 1, 2, 3, 4, 5, 6], [7, 8, 9]]
center word 0 , its context word: [1, 2]
center word 1 , its context word: [0, 2]
center word 2 , its context word: [0, 1, 3, 4]
center word 3 , its context word: [2, 4]
center word 4 , its context word: [3, 5]
center word 5 , its context word: [4, 6]
center word 6 , its context word: [4, 5]
center word 7 , its context word: [8, 9]
center word 8 , its context word: [7, 9]
center word 9 , its context word: [7, 8]
[2, 2, 2, 3, 2, 1, 2, 1, 3, 2]
centers = tensor([[1],
[1]])
contexts_negatives = tensor([[2, 2, 3, 3, 3, 3],
[2, 2, 2, 3, 3, 0]])
masks = tensor([[1, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 0]])
labels = tensor([[1, 1, 0, 0, 0, 0],
[1, 1, 1, 0, 0, 0]])
centers shape: torch.Size([512, 1])
contexts_negatives shape: torch.Size([512, 60])
masks shape: torch.Size([512, 60])
labels shape: torch.Size([512, 60])

预训练word2vec

继续实现跳元语法模型,然后在PTB数据集上使用负采样预训练word2vec。

code

import math
import torch
from torch import nn
from d2l import torch as d2l
from word2vec_dataset import load_data_ptb


# 定义前向传播
def skip_gram(center, contexts_and_negatives, embed_v, embed_u):
v = embed_v(center)
u = embed_u(contexts_and_negatives)
pred = torch.bmm(v, u.permute(0, 2, 1))
return pred


# 在训练负采样的跳元模型之前,先定义损失函数
# 二元交叉熵损失
class SigmoidBCELoss(nn.Module):
# 带掩码的二元交叉熵损失
def __init__(self):
super().__init__()

def forward(self, inputs, target, mask=None):
out = nn.functional.binary_cross_entropy_with_logits(inputs, target, weight=mask, reduction="none")
return out.mean(dim=1)


# 在二元交叉熵损失中使用sigmoid激活函数
def sigmoid(x):
return -math.log(1 / (1 + math.exp(-x)))


# 定义训练阶段代码, 注意有填充的存在
def train(net, data_iter, lr, num_epochs, device=d2l.try_gpu()):
def init_weights(m):
if type(m) == nn.Embedding:
nn.init.xavier_uniform_(m.weight)
net.apply(init_weights)
net = net.to(device)
optimizer = torch.optim.Adam(net.parameters(), lr=lr)
animator = d2l.Animator(xlabel='epoch', ylabel='loss', xlim=[1, num_epochs])
# 规范化的损失之和,规范化的损失数
metric = d2l.Accumulator(2)
for epoch in range(num_epochs):
timer, num_batches = d2l.Timer(), len(data_iter)
for i, batch in enumerate(data_iter):
optimizer.zero_grad()
center, context_negative, mask, label = [data.to(device) for data in batch]
pred = skip_gram(center, context_negative, net[0], net[1])
l = (loss(pred.reshape(label.shape).float(), label.float(), mask) / mask.sum(axis=1) * mask.shape[1])
l.sum().backward()
optimizer.step()
metric.add(l.sum(), l.numel())
if (i + 1) % (num_batches // 5) == 0 or i == num_batches - 1:
animator.add(epoch + (i + 1) / num_batches, (metric[0] / metric[1],))
print(f'loss: {metric[0] / metric[1]:.3f}, {metric[1] / timer.stop():.1f} tokens / sec on {device}')


# 应用词嵌入
# 在训练好word2vec模型之后,我们可以使用训练好的模型中词向量的余弦相似度来从词表中找到与输入单词语义最相似的单词
def get_similar_tokens(query_token, k, embed):
W = embed.weight.data
x = W[vocab[query_token]]
# 计算余弦相似性,增加1e-9以获得数值稳定性
cos = torch.mv(W, x) / torch.sqrt(torch.sum(W * W, dim=1) * torch.sum(x * x + 1e-9))
topk = torch.topk(cos, k=k + 1)[1].cpu().numpy().astype('int32')
for i in topk[1:]:
# 删除输入词
print(f'cosine sim = {float(cos[i]):.3f}: {vocab.to_tokens(i)}')


if __name__ == "__main__":
batch_size, max_window_size, num_noise_words = 512, 5, 5
# 获得数据迭代器和词表
data_iter, vocab = load_data_ptb(batch_size, max_window_size, num_noise_words)

# 构造跳元模型
# 嵌入层
embed = nn.Embedding(num_embeddings=20, embedding_dim=4)
print(f'Parameter embedding_weight: {embed.weight.shape}, dtype = {embed.weight.dtype}')
# 嵌入层的输入是词的索引,对于任何词元索引i,其向量表示可以从嵌入层中的权重矩阵的第i行获得
# 由于embed的向量维度设为4,因此当小批量词元索引的形状为(2,3)时,嵌入层返回具有形状(2,3,4)的向量
x = torch.tensor([[1, 2, 3], [4, 5, 6]])
print("embed layer:")
print(embed(x))
# 查看skip_gram函数的输出现状
print("shape of skip_gram output:")
print(skip_gram(torch.ones((2, 1), dtype=torch.long), torch.ones((2, 4), dtype=torch.long)
, embed, embed).shape)

# 训练
loss = SigmoidBCELoss()
pred = torch.tensor([[1.1, -2.2, 3.3, -4.4]] * 2)
label = torch.tensor([[1.0, 0.0, 0.0, 0.0], [0.0, 1.0, 0.0, 0.0]])
mask = torch.tensor([[1, 1, 1, 1], [1, 1, 0, 0]])
print("loss:")
print(loss(pred, label, mask) * mask.shape[1] / mask.sum(axis=1))
# 初始化模型参数
embed_size, vocab_size = 100, len(vocab)
net = nn.Sequential(
nn.Embedding(num_embeddings=vocab_size, embedding_dim=embed_size),
nn.Embedding(num_embeddings=vocab_size, embedding_dim=embed_size)
)
# 使用负采样来训练跳元模型
lr, num_epochs = 0.002, 5
train(net, data_iter, lr, num_epochs, d2l.try_gpu())
d2l.plt.show() # 可视化

# 应用词嵌入
get_similar_tokens('chip', 3, net[0])

output

Parameter embedding_weight: torch.Size([20, 4]), dtype = torch.float32
embed layer:
tensor([[[ 0.9359, 1.1477, -0.9412, 1.2183],
[-0.5608, 0.2200, -0.1888, 0.5098],
[ 0.1096, 0.2246, -1.7004, -0.9497]],

[[-0.1224, 0.2635, 0.1246, -0.4876],
[ 0.6482, 0.5379, 2.3570, 1.1432],
[ 0.0884, -0.8212, -1.0040, -0.7317]]], grad_fn=<EmbeddingBackward0>)
shape of skip_gram output:
torch.Size([2, 1, 4])
loss:
tensor([0.9352, 1.8462])
loss: 0.410, 123025.4 tokens / sec on cpu
cosine sim = 0.722: microprocessor
cosine sim = 0.703: laptop
cosine sim = 0.691: intel

全局向量的词嵌入GloVe

上下文窗口内的词共现可以携带丰富的语义信息。比如“固体”更可能与“冰”一同出现而不是“水”,反观“蒸汽”则更可能和“水”一起出现。此外,可以预先计算此类共现的全局语料库统计数据:这可以提高训练效率。为了利用整个语料库中的统计信息进行词嵌入,使用全局语料库统计

具体原理见15.5. Word Embedding with Global Vectors (GloVe) — Dive into Deep Learning 1.0.3 documentation (d2l.ai)

模型见15.5. GloVe Model— Dive into Deep Learning 1.0.3 documentation (d2l.ai)

子词嵌入

有些单词可以被视作其他单词的变种,比如dog和dogs,help和helps、helping,又比如boy与boyfriend的关系和girl与girlfriend的关系一样。这种多个词之间潜在的联系有时会传达相当有用的信息,并在预测分析时提供关键的上下文;遗憾的是,word2vec和GloVe都没有对词的内部结构进行讨论

fastText模型

原理见15.6. fastText — Dive into Deep Learning 1.0.3 documentation (d2l.ai)

字节对编码

在fastText中,所有提取的子词都必须是指定的长度,例如3到6,因此词表大小不能预定义。为了在固定大小的词表中允许可变长度的子词,我们可以应用一种称为字节对编码(Byte Pair Encoding,BPE)的压缩算法来提取子词。

字节对编码执行训练数据集的统计分析,以发现单词内的公共符号,诸如任意长度的连续字符。从长度为1的符号开始,字节对编码迭代地合并最频繁的连续符号对产生新的更长的符号

code

import collections


# 返回词内最频繁的连续符号对
def get_max_freq_pair(token_freq):
pairs = collections.defaultdict(int)
for token, freq in token_freq.items():
symbols = token.split()
for i in range(len(symbols) - 1):
# "pairs"的键是两个连续符号的元组
pairs[symbols[i], symbols[i + 1]] += freq
return max(pairs, key=pairs.get) # 具有最大值的"pairs"键


# 合并最频繁的连续符号对以产生新符号
def merge_symbols(max_freq_pair, token_freq, symbols):
# 将最频繁的符号对合并为一个新的符号,并添加到symbols列表中
symbols.append(''.join(max_freq_pair))
# 初始化新的词频字典,用于存储更新后的词频
new_token_freq = dict()
# 遍历原始的词频字典
for token, freq in token_freq.items():
# 将token中的max_freq_pair替换为新的符号,并保持其他部分不变
new_token = token.replace(' '.join(max_freq_pair), ''.join(max_freq_pair))
# 更新新的词频字典,记录合并后的词频
new_token_freq[new_token] = token_freq[token]
return new_token_freq


# 尝试将单词从输入参数symbols分成可能最长的子词
def segment_BPE(tokens, symbols):
outputs = []
for token in tokens:
start, end = 0, len(token)
cur_output = []
# 具有符号中可能最长子字的词元段
while start < len(token) and start < end:
if token[start:end] in symbols:
cur_output.append(token[start:end])
start = end
end = len(token)
else:
end -= 1
if start < len(token):
cur_output.append('[UNK]')
outputs.append(' '.join(cur_output))
return outputs


if __name__ == "__main__":
# 初始化符号词表,内容为所有英文小写字符、特殊的词尾符号'_', 和 未知符号'<unk>'
symbols = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n',
'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '_', '[UNK]']
# 定义一个原始token频率字典,用于记录不同token出现的频率
raw_token_freq = {'fast_': 4, 'faster_': 3, 'tall_': 5, 'taller_': 4}
# 初始化一个空字典,用于存储处理后的token频率
token_freq = {}
# 遍历原始token频率字典,对每个token进行处理
for token, freq in raw_token_freq.items():
# 将token按字符拆分,并用空格连接,然后将其作为新的键存储在token_freq中
token_freq[' '.join(list(token))] = raw_token_freq[token]
print("token_freq:")
print(token_freq)

# 对词典token_freq的键迭代地执行字节对编码算法
num_merges = 10
for i in range(num_merges):
max_freq_pair = get_max_freq_pair(token_freq)
token_freq = merge_symbols(max_freq_pair, token_freq, symbols)
print(f'merge #{i + 1}:', max_freq_pair)
print("symbols:")
print(symbols)

# 使用列表symbols中的子词(从前面提到的数据集学习)来表示另一个数据集的tokens
tokens = ['tallest_', 'fatter_']
print("use token in symbols to represent another dataset tokens:")
print(segment_BPE(tokens, symbols))

output

token_freq:
{'f a s t _': 4, 'f a s t e r _': 3, 't a l l _': 5, 't a l l e r _': 4}
merge #1: ('t', 'a')
merge #2: ('ta', 'l')
merge #3: ('tal', 'l')
merge #4: ('f', 'a')
merge #5: ('fa', 's')
merge #6: ('fas', 't')
merge #7: ('e', 'r')
merge #8: ('er', '_')
merge #9: ('tall', '_')
merge #10: ('fast', '_')
symbols:
['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '_', '[UNK]', 'ta', 'tal', 'tall', 'fa', 'fas', 'fast', 'er', 'er_', 'tall_', 'fast_']
use token in symbols to represent another dataset tokens:
['tall e s t _', 'fa t t er_']

词的相似性和类比任务

大型语料库上预先训练的词向量可以应用于下游的自然语言处理任务。下面将展示大型语料库中预训练词向量的语义,即将预训练词向量应用到词的相似性和类比任务中。

code

import os
import torch
from torch import nn
from d2l import torch as d2l


# 为了加载预训练的GloVe和fastText嵌入
class TokenEmbedding:
"""Glove嵌入"""
def __init__(self, embedding_name):
self.idx_to_token, self.idx_to_vec = self._load_embedding(embedding_name)
self.unknown_idx = 0
self.token_to_idx = {
token: idx for idx, token in enumerate(self.idx_to_token)
}

def _load_embedding(self, embedding_name):
idx_to_token, idx_to_vec = ['<unk>'], []
data_dir = d2l.download_extract(embedding_name)
with open(os.path.join(data_dir, 'vec.txt'), 'r', encoding='utf-8') as f:
for line in f:
elems = line.rstrip().split(' ')
token, elems = elems[0], [float(elem) for elem in elems[1:]]
# 跳过标题信息,比如fasttext首行
if len(elems) > 1:
idx_to_token.append(token)
idx_to_vec.append(elems)
idx_to_vec = [[0] * len(idx_to_vec[0])] + idx_to_vec
return idx_to_token, torch.tensor(idx_to_vec)

def __getitem__(self, tokens):
indices = [self.token_to_idx.get(token, self.unknown_idx) for token in tokens]
vecs = self.idx_to_vec[torch.tensor(indices)]
return vecs

def __len__(self):
return len(self.idx_to_token)


# 使用knn为词分类,以便根据词向量之间的余弦相似度为输入词查找语义相似的词
def knn(W, x, k):
# 增加1e-9以获得数值稳定性
cos = torch.mv(W, x.reshape(-1, )) / (torch.sqrt(torch.sum(W * W, axis=1) + 1e-9) * torch.sqrt((x * x).sum()))
_, topk = torch.topk(cos, k=k)
return topk, [cos[int(i)] for i in topk]


# 使用TokenEmbedding的实例embed中预训练好的词向量来搜索相近的词
def get_similar_tokens(query_token, k, embed):
topks, cos = knn(embed.idx_to_vec, embed[[query_token]], k + 1)
for i, c in zip(topks[1:], cos[1:]):
# 排除输入词
print(f'{embed.idx_to_token[int(i)]}: cosine similarity = {float(c):.3f}')


# 在词类比中,找到一个词,其向量值与vec(c) + vec(b) - vec(a)最接近
def get_analogy(token_a, token_b, token_c, embed):
vecs = embed[[token_a, token_b, token_c]]
x = vecs[1] - vecs[0] + vecs[2] # b - a + c
topks, cos = knn(embed.idx_to_vec, x, 1)
return embed.idx_to_token[int(topks[0])] # 删除未知词


if __name__ == "__main__":
# 加载预训练向量
d2l.DATA_HUB['glove.6b.50d'] = (d2l.DATA_URL + 'glove.6B.50d.zip',
'0b8703943ccdb6eb788e6f091b8946e82231bc4d')
d2l.DATA_HUB['glove.6b.100d'] = (d2l.DATA_URL + 'glove.6B.100d.zip',
'cd43bfb07e44e6f27cbcc7bc9ae3d80284fdaf5a')
d2l.DATA_HUB['glove.42b.300d'] = (d2l.DATA_URL + 'glove.42B.300d.zip',
'b5116e234e9eb9076672cfeabf5469f3eec904fa')
d2l.DATA_HUB['wiki.en'] = (d2l.DATA_URL + 'wiki.en.zip',
'c1816da3821ae9f43899be655002f6c723e91b88')

# 加载50维GloVe嵌入,以创建TokenEmbedding实例
glove_6b50d = TokenEmbedding('glove.6b.50d')
# 看一下词表大小
print("size of vocab:")
print(len(glove_6b50d))

# 应用预训练词向量

# 词相似度
print("token similarity:")
# glove_6b50d中预训练词向量的词表包含400000个词和一个特殊的未知词元。
# 排除输入词和未知词元后,在词表中找到与“chip”一词语义最相似的三个词
print(get_similar_tokens('chip', 3, glove_6b50d))
# 再看一下girl, beautiful
print(get_similar_tokens('girl', 3, glove_6b50d))
print(get_similar_tokens('beautiful', 3, glove_6b50d))

# 词类比
print("token compare:")
# 除了找到相似的词,还可以将词向量应用到词类比任务中,比如“man”:“woman”::“son”:“daughter”就是一个词的类比
# “man”是对“woman”的类比,“son”是对“daughter”的类比
# 对于单词类比a : b :: c : d,给出前三个词a、b和c,找到d。
# 用vec(w)表示词w的向量,为了完成这个类比,将要找到一个词w,其向量与vec(c) + vec(b) − vec(a)的结果最相似、
print("man:woman::son:?")
print(get_analogy('man', 'woman', 'son', glove_6b50d))
print("beijing:china::tokyo:?")
print(get_analogy('beijing', 'china', 'tokyo', glove_6b50d))
print("bad:worst::big:?")
print(get_analogy('bad', 'worst', 'big', glove_6b50d))

output

size of vocab:
400001
token similarity:
chips: cosine similarity = 0.856
intel: cosine similarity = 0.749
electronics: cosine similarity = 0.749
None
boy: cosine similarity = 0.933
woman: cosine similarity = 0.907
mother: cosine similarity = 0.835
None
lovely: cosine similarity = 0.921
gorgeous: cosine similarity = 0.893
wonderful: cosine similarity = 0.830
None
token compare:
man:woman::son:?
daughter
beijing:china::tokyo:?
japan
bad:worst::big:?
biggest

Transformers的双向编码器表示(BERT)

介绍到现在,上文提到的所有词嵌入模型都是上下文无关的,而现在,我们引入上下文敏感模型。

上下文无关/敏感模型

考虑之前使用的那些词嵌入模型word2vec和GloVe,它们都将相同的预训练向量分配给同一个词,而不考虑词的上下文(如果有的话)。形式上,任何词元x的上下文无关表示是函数f(x),其仅将x作为其输入。考虑到自然语言中丰富的多义现象和复杂的语义,上下文无关表示具有明显的局限性。因为同一个词在不同的上下文中可能表达截然不同的意思。

这推动了上下文敏感模型的出现,其中词的表征取决于上下文,即词元x的上下文敏感表示函数f(x, c(x)),取决于x及其上下文c(x)

特定任务/不可知任务

现有的各种自然语言处理的解决方案都依赖于一个特定于任务的架构,然而为每一个任务设计一个特定的架构是一件很困难的事情。GPT模型为上下文的敏感表示设计了通用的任务无关模型GPT建立在Transformer解码器的基础上,预训练了一个用于表示文本序列的语言模型。当将GPT应用于下游任务时,语言模型的输出将被送到一个附加的线性输出层,以预测任务的标签。

然而,由于语言模型的自回归特性GPT只能向前看(从左到右)。在“i went to the bank to deposit cash”(我去银行存现金)和“i went to the bank to sit down”(我去河岸边坐下)的上下文中,由于“bank”对其左边的上下文敏感,GPT将返回“bank”的相同表示,尽管它有不同的含义。

BERT

将两个最好的(上下文敏感模型和不可知任务)结合起来。

原理见15.8. Bidirectional Encoder Representations from Transformers (BERT) — Dive into Deep Learning 1.0.3 documentation (d2l.ai)

code

import torch
from torch import nn
from d2l import torch as d2l


# get_tokens_and_segments将一个句子或两个句子作为输入,然后返回BERT输入序列的标记及其相应的片段索引。
def get_tokens_and_segments(tokens_a, tokens_b=None):
"""获取输入序列的词元及其片段索引"""
tokens = ['<cls>'] + tokens_a + ['<sep>']
# 0和1分别标记A和B
segments = [0] * (len(tokens_a) + 2)
if tokens_b is not None:
tokens += tokens_b + ['<sep>']
segments += [1] * (len(tokens_b) + 1)
return tokens, segments


# BERTEncoder使用片段嵌入和可学习的位置嵌入
class BERTEncoder(nn.Module):
"""BERT编码器"""
def __init__(self, vocab_size, num_hidden, norm_shape, ffn_num_input, ffn_num_hidden, num_heads,
num_layers, dropout, max_len=1000, key_size=768, query_size=768, value_size=768, **kwargs):
super(BERTEncoder, self).__init__(**kwargs)
self.token_embedding = nn.Embedding(vocab_size, num_hidden)
self.segment_embedding = nn.Embedding(2, num_hidden)
self.blks = nn.Sequential()
for i in range(num_layers):
self.blks.add_module(f"{i}", d2l.EncoderBlock(
key_size, query_size, value_size, num_hidden, norm_shape, ffn_num_input, ffn_num_hidden, num_heads, dropout, True
))
# 在BERT中,位置嵌入是可学习的,因此创建一个足够长的位置嵌入参数
self.pos_embedding = nn.Parameter(torch.randn(1, max_len, num_hidden))

def forward(self, tokens, segments, valid_lens):
# X的形状保持不变: (批量大小,最大序列长度,num_hidden)
X = self.token_embedding(tokens) + self.segment_embedding(segments)
X = X + self.pos_embedding.data[:, :X.shape[1], :]
for blk in self.blks:
X = blk(X, valid_lens)
return X


# 实现下面的MaskLM类来预测BERT预训练的掩蔽语言模型任务中的掩蔽标记
# 预测使用单隐藏层的多层感知机(self.mlp)。在前向推断中,它需要两个输入:BERTEncoder的编码结果和用于预测的词元位置
# 输出这些位置的预测结果
class MaskLM(nn.Module):
"""BERT的掩蔽语言模型任务"""
def __init__(self, vocab_size, num_hidden, num_inputs=768, **kwargs):
super(MaskLM, self).__init__(**kwargs)
self.mlp = nn.Sequential(
nn.Linear(num_inputs, num_hidden),
nn.ReLU(),
nn.LayerNorm(num_hidden),
nn.Linear(num_hidden, vocab_size)
)

def forward(self, X, pred_positions):
num_pred_positions = pred_positions.shape[1]
pred_positions = pred_positions.reshape(-1)
batch_size = X.shape[0]
batch_idx = torch.arange(0, batch_size)
# 假设batch_size=2,num_pred_positions=3
# 那么batch_idx是[0, 0, 0, 1, 1, 1]
batch_idx = torch.repeat_interleave(batch_idx, num_pred_positions)
masked_X = X[batch_idx, num_pred_positions]
masked_X = masked_X.reshape((batch_size, num_pred_positions, -1))
mlm_Y_hat = self.mlp(masked_X)
return mlm_Y_hat


# 下一句预测模型
class NextSentencePred(nn.Module):
"""BERT的下一句预测任务"""
def __init__(self, num_inputs, **kwargs):
super(NextSentencePred, self).__init__(**kwargs)
self.output = nn.Linear(num_inputs, 2)

def forward(self, X):
# X的形状(batch_size, num_hidden)
return self.output(X)


# 整合代码实现完整的BERT
# 在预训练BERT时,最终的损失函数时掩蔽语言模型损失函数和下一句预测损失函数的线性组合
# 现在通过实例化三个类:BERTEncoder、MaskLM和NextSentencePred来定义BERT模型
# 前向推断返回编码后的BERT表示encoded_X、掩蔽语言模型预测mlm_Y_hat和下一句预测nsp_Y_hat
class BERTModel(nn.Module):
"""BERT模型"""
def __init__(self, vocab_size, num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens, num_heads, num_layers,
dropout, max_len=1000, key_size=768, query_size=768, value_size=768,
hid_in_features=768, mlm_in_features=768, nsp_in_features=768):
super(BERTModel, self).__init__()
self.encoder = BERTEncoder(
vocab_size, num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens, num_heads, num_layers,
dropout, max_len=max_len, key_size=key_size, query_size=query_size, value_size=value_size
)
self.hidden = nn.Sequential(
nn.Linear(hid_in_features, num_hiddens),
nn.Tanh()
)
self.mlm = MaskLM(vocab_size, num_hiddens, mlm_in_features)
self.nsp = NextSentencePred(nsp_in_features)

def forward(self, tokens, segments, valid_lens=None, pred_positions=None):
encoded_X = self.encoder(tokens, segments, valid_lens)
if pred_positions is not None:
mlm_Y_hat = self.mlm(encoded_X, pred_positions)
else:
mlm_Y_hat = None
# 用于下一句预测的多层感知机分类器的隐藏层,0是"<cls>"标记的索引
nsp_Y_hat = self.nsp(self.hidden(encoded_X[:, 0, :]))
return encoded_X, mlm_Y_hat, nsp_Y_hat


if __name__ == "__main__":
# 假设词表大小为10000,为了演示BERTEncoder的前向推断,让我们创建一个实例并初始化它的参数
vocab_size, num_hidden, ffn_num_hidden, num_heads = 10000, 768, 1024, 4
norm_shape, ffn_num_input, num_layers, dropout = [768], 768, 2, 0.2
encoder = BERTEncoder(vocab_size, num_hidden, norm_shape, ffn_num_input, ffn_num_hidden, num_heads, num_layers, dropout)
tokens = torch.randint(0, vocab_size, (2, 8))
segments = torch.tensor([[0, 0, 0, 0, 1, 1, 1, 1], [0, 0, 0, 1, 1, 1, 1, 1]])
encoder_X = encoder(tokens, segments, None)
print("shape of encoded_X:")
print(encoder_X.shape)

# 预训练任务
# 预训练任务的原理见 https://d2l.ai/chapter_natural-language-processing-pretraining/bert.html#pretraining-tasks
# 演示MaskLM的前向推断
mlm = MaskLM(vocab_size, num_hidden)
mlm_positions = torch.tensor([[1, 5, 2], [6, 1, 5]])
mlm_Y_hat = mlm(encoder_X, mlm_positions)
print("shape of mlm_Y_hat:")
print(mlm_Y_hat.shape)
# 通过掩码下的预测词元mlm_Y的真实标签mlm_Y_hat,我们可以计算在BERT预训练中的遮蔽语言模型任务的交叉熵损失
mlm_Y = torch.tensor([[7, 8, 9], [10, 20, 30]])
loss = nn.CrossEntropyLoss(reduction='none')
mlm_l = loss(mlm_Y_hat.reshape((-1, vocab_size)), mlm_Y.reshape(-1))
print("shape of mlm_l:")
print(mlm_l.shape)

# 下一句预测
encoder_X = torch.flatten(encoder_X, start_dim=1)
# NSP的输入形状为(batch_size, num_hidden)
nsp = NextSentencePred(encoder_X.shape[-1])
nsp_Y_hat = nsp(encoder_X)
print("shape of nsp_Y_hat:")
print(nsp_Y_hat.shape)
# 还可以计算两个二元分类的交叉熵损失
nsp_y = torch.tensor([0, 1])
nsp_l = loss(nsp_Y_hat, nsp_y)
print("shape of nsp_l:")
print(nsp_l.shape)

output

shape of encoded_X:
torch.Size([2, 8, 768])
shape of mlm_Y_hat:
torch.Size([2, 3, 10000])
shape of mlm_l:
torch.Size([6])
shape of nsp_Y_hat:
torch.Size([2, 2])
shape of nsp_l:
torch.Size([2])

用于预训练BERT的数据集

为了训练BERT模型,我们需要以理想的格式生成数据集,以便进行两个预训练任务:遮蔽语言模型和下一句预测。根据经验,在定制的数据集上对BERT进行预训练更有效,为了方便演示,使用较小的语料库WikiText-2

与PTB数据集相比,WikiText-2

  • 保留了原来的标点符号,适合于下一句预测;
  • 保留了原来的大小写和数字;
  • 大了一倍以上。

code

import os
import random
import torch
from d2l import torch as d2l


# 读取数据集
def _read_wiki(data_dir):
file_name = os.path.join(data_dir, 'wiki.train.tokens')
with open(file_name, 'r') as f:
lines = f.readlines()
# 大写转小写
paragraphs = [line.strip().lower().split(' . ') for line in lines if len(line.split(' . ')) >= 2]
random.shuffle(paragraphs)
return paragraphs


# 为预训练定义辅助函数
# 生成下一句预测任务的数据
def _get_next_sentence(sentence, next_sentence, paragraphs):
if random.random() < 0.5:
is_next = True
else:
# paragraphs 是三重列表的嵌套
next_sentence = random.choice(random.choice(paragraphs))
is_next = False
return sentence, next_sentence, is_next


# 从输入paragraph生成用于下一句预测的训练样本,paragraph是句子列表,每个句子是词元列表
# 自变量max_len指定预训练期间的BERT输入序列的最大长度
def _get_nsp_data_from_paragraph(paragraph, paragraphs, vocab, max_len):
nsp_data_from_paragraph = []
for i in range(len(paragraph) - 1):
# 从相邻的两个句子中获取NSP数据的样本
tokens_a, tokens_b, is_next = _get_next_sentence(paragraph[i], paragraph[i + 1], paragraphs)
# 考虑1个'<cls>'词元和2个'<sep>'词元
if len(tokens_a) + len(tokens_b) + 3 > max_len:
# +3表示<cls>和两个<sep>的长度
# 如果加上特殊词元后的序列长度超过最大长度,则跳过该样本
continue
# 获取BERT输入所需的词元序列和段落标记序列
tokens, segments = d2l.get_tokens_and_segments(tokens_a, tokens_b)
# 将准备好的样本添加到NSP数据列表中
nsp_data_from_paragraph.append((tokens, segments, is_next))
return nsp_data_from_paragraph


# 生成遮蔽语言模型任务的数据
# 下面实现的函数返回可能替换后的输入词元、发生预测的词元索引和这些预测的标签
def _replace_mlm_tokens(tokens, candidate_pred_positions, num_mlm_pred, vocab):
# 为遮蔽语言模型的输入创建新的词元副本,其中输入可能包含替换的"<mask>"或随机词元
mlm_input_tokens = [token for token in tokens]
pred_positions_and_labels = []
# 打乱后用于在遮蔽语言模型任务中获取15%的随机词元进行预测
random.shuffle(candidate_pred_positions)
for mlm_pred_position in candidate_pred_positions:
if len(pred_positions_and_labels) >= num_mlm_pred:
break
masked_token = None
# 80%的概率将词替换为“<mask>”词元
if random.random() < 0.8:
masked_token = '<mask>'
else:
# 10%保持不变
if random.random() < 0.5:
masked_token = tokens[mlm_pred_position]
# 10%随即替换
else:
masked_token = random.choice(vocab.idx_to_token)
mlm_input_tokens[mlm_pred_position] = masked_token
pred_positions_and_labels.append((mlm_pred_position, tokens[mlm_pred_position]))
return mlm_input_tokens, pred_positions_and_labels


# 以下函数将BERT输入序列(tokens)作为输入并返回输出词元的索引、发生预测的词元索引以及这些预测的标签索引
def _get_mlm_data_from_tokens(tokens, vocab):
candidate_pred_positions = []
# tokens是一个字符串列表
for i, token in enumerate(tokens):
# 在遮蔽语言模型任务中不会预测特殊词元
if token in ['<cls>', '<sep>']:
continue
candidate_pred_positions.append(i)
# 遮蔽语言模型任务中只预测15%的词元
num_mlm_pred = max(1, round(len(tokens) * 0.15))
mlm_input_tokens, pred_positions_and_labels = _replace_mlm_tokens(tokens, candidate_pred_positions, num_mlm_pred, vocab)
pred_positions_and_labels = sorted(pred_positions_and_labels, key=lambda x: x[0])
pred_positions = [v[0] for v in pred_positions_and_labels]
mlm_pred_labels = [v[1] for v in pred_positions_and_labels]
return vocab[mlm_input_tokens], pred_positions, vocab[mlm_pred_labels]


# 将文本转换为预训练数据集, 将特殊的<mask>词元附加到输入
# 参数examples来自两个预训练任务辅助函数_get_nsp_data_from_paragraph和_get_mlm_data_from_tokens的输出
def _pad_bert_inputs(examples, max_len, vocab):
max_num_mlm_pred = round(max_len * 0.15)
all_token_ids, all_segments, valid_lens = [], [], []
all_pred_positions, all_mlm_weights, all_mlm_labels = [], [], []
nsp_labels = []
for (token_ids, pred_positions, mlm_pred_label_ids, segments, is_next) in examples:
all_token_ids.append(torch.tensor(token_ids + [vocab['<pad>']] * (max_len - len(token_ids)), dtype=torch.long))
all_segments.append(torch.tensor(segments + [0] * (max_len - len(segments)), dtype=torch.long))
# valid_lens不包括'<pad>'的计数
valid_lens.append(torch.tensor(len(token_ids), dtype=torch.float32))
all_pred_positions.append(torch.tensor(pred_positions + [0] * (max_num_mlm_pred - len(pred_positions)),
dtype=torch.long))
# 填充词元的预测将通过乘以 0权重 在损失中过滤掉
all_mlm_weights.append(
torch.tensor([1.0] * len(mlm_pred_label_ids) + [0.0] * (max_num_mlm_pred - len(pred_positions)),
dtype=torch.float32)
)
all_mlm_labels.append(
torch.tensor(mlm_pred_label_ids + [0] * (max_num_mlm_pred - len(mlm_pred_label_ids)),
dtype=torch.long)
)
nsp_labels.append(torch.tensor(is_next, dtype=torch.long))
return (all_token_ids, all_segments, valid_lens, all_pred_positions, all_mlm_weights, all_mlm_labels,
nsp_labels)


# WikiText数据集,词元化时出现次数少于5次的不频繁词元将被过滤,使用d2l.tokenize进行词元化
class _WikiTextDataset(torch.utils.data.Dataset):
def __init__(self, paragraphs, max_len):
# 输入paragraphs[i]是代表段落的句子字符串列表
# 而输出paragraphs[i]是代表段落的句子列表,其中每句都是词元列表
paragraphs = [d2l.tokenize(paragraph, token='word') for paragraph in paragraphs]
sentences = [sentence for paragraph in paragraphs for sentence in paragraph]
self.vocab = d2l.Vocab(sentences, min_freq=5, reserved_tokens=['<pad>', '<mask>', '<cls>', '<sep>'])
# 获取下一个句子预测任务的数据
examples = []
for paragraph in paragraphs:
examples.extend(_get_nsp_data_from_paragraph(paragraph, paragraphs, self.vocab, max_len))
# 获取遮蔽语言模型任务的数据
examples = [(_get_mlm_data_from_tokens(tokens, self.vocab) + (segments, is_next))
for tokens, segments, is_next in examples]
# 填充输入
(self.all_token_ids, self.all_segments, self.valid_lens, self.all_pred_positions, self.all_mlm_weights,
self.all_mlm_labels, self.nsp_labels) = _pad_bert_inputs(examples, max_len, self.vocab)

def __getitem__(self, idx):
return (self.all_token_ids[idx], self.all_segments[idx], self.valid_lens[idx], self.all_pred_positions[idx],
self.all_mlm_weights[idx], self.all_mlm_labels[idx], self.nsp_labels[idx])

def __len__(self):
return len(self.all_token_ids)


# 加载数据集并生成预训练样本
def load_data_wiki(batch_size, max_len):
"""加载WikiText-2数据集"""
data_dir = d2l.download_extract('wikitext-2', 'wikitext-2')
paragraphs = _read_wiki(data_dir)
train_set = _WikiTextDataset(paragraphs, max_len)
train_iter = torch.utils.data.DataLoader(train_set, batch_size, shuffle=True, num_workers=1)
return train_iter, train_set.vocab


if __name__ == "__main__":
# 在WikiText-2 数据集中,每行代表一个段落,其中在任意标点符号及其前面的词元之间插入空格
# 保留至少有两句话的段落
# 使用分号作为分隔符来拆分句子
d2l.DATA_HUB['wikitext-2'] = (
'https://s3.amazonaws.com/research.metamind.io/wikitext/'
'wikitext-2-v1.zip', '3c914d17d80b1459be871a5039ac23e752a53cbe')

# 查看一下小批量BERT预训练样本的形状
# 在每个BERT输入序列中,为遮蔽语言模型任务预测10(64 * 0.15)个位置
batch_size, max_len = 512, 64
train_iter, vocab = load_data_wiki(batch_size, max_len)
for (token_X, segments_X, valid_lens_X, pred_positions_X, mlm_weights_X, mlm_Y, nsp_y) in train_iter:
print("shape of token_X, segments_X, valid_lens_X, pred_positions_X, mlm_weights_X, mlm_Y, nsp_y:")
print(token_X.shape, segments_X.shape, valid_lens_X.shape, pred_positions_X.shape, mlm_weights_X.shape,
mlm_Y.shape, nsp_y.shape)
break # 看一个就行
# 看一下词表大小
print("vocab size:")
print(len(vocab))

output

TBD
似乎有点bug,zip文件没法从官网下载下来

预训练BERT

TBD