import collections import math import torch from torch import nn from d2l import torch as d2l from encoder_decoder import Encoder from encoder_decoder import Decoder from encoder_decoder import EncoderDecoder from machine_translation import load_data_nmt from RNN.rnn_self import grad_clipping
class Seq2SeqEncoder(Encoder): """用于序列到序列学习的循环神经网络编码器""" def __init__(self, vocab_size, embed_size, num_hidden, num_layers, dropout=0, **kwargs): super(Seq2SeqEncoder, self).__init__(**kwargs) self.embedding = nn.Embedding(vocab_size, embed_size) self.rnn = nn.GRU(embed_size, num_hidden, num_layers, dropout=dropout)
def forward(self, X, *args): X = self.embedding(X) X = X.permute(1, 0, 2) output, state = self.rnn(X) return output, state
class Seq2SeqDecoder(Decoder): """用于序列到序列学习的循环神经网络解码器""" def __init__(self, vocab_size, embed_size, num_hidden, num_layer, dropout=0, **kwargs): super(Seq2SeqDecoder, self).__init__(**kwargs) self.embedding = nn.Embedding(vocab_size, embed_size) self.rnn = nn.GRU(embed_size + num_hidden, num_hidden, num_layer, dropout=dropout) self.dense = nn.Linear(num_hidden, vocab_size)
def init_state(self, enc_outputs, *args): """ :param enc_outputs: 编码器的输出 :param args: 其余参数 :return: """ return enc_outputs[1]
def forward(self, X, state): X = self.embedding(X).permute(1, 0, 2) context = state[-1].repeat(X.shape[0], 1, 1) X_and_context = torch.cat((X, context), 2) output, state = self.rnn(X_and_context, state) output = self.dense(output).permute(1, 0, 2) return output, state
def sequence_mask(X, valid_len, value=0): """屏蔽序列中不相关的项""" max_len = X.size(1) mask = torch.arange((max_len), dtype=torch.float32, device=X.device)[None, :] < valid_len[:, None] X[~mask] = value return X
class MaskedSoftmaxCELoss(nn.CrossEntropyLoss): """带遮蔽的softmax交叉熵损失函数"""
def forward(self, pred, label, valid_len): weights = torch.ones_like(label) weights = sequence_mask(weights, valid_len) self.reduction = 'none' unweighted_loss = super(MaskedSoftmaxCELoss, self).forward(pred.permute(0, 2, 1), label) weighted_loss = (unweighted_loss * weights).mean(dim=1) return weighted_loss
def train_seq2seq(net, train_iter, lr, num_epochs, target_vocab, device): """训练序列到序列模型""" def xavier_init_weights(m): if type(m) == nn.Linear: nn.init.xavier_uniform_(m.weight) if type(m) == nn.GRU: for param in m._flat_weights_names: if "weight" in param: nn.init.xavier_uniform_(m._parameters[param])
net.apply(xavier_init_weights) net.to(device) optimizer = torch.optim.Adam(net.parameters(), lr=lr) loas = MaskedSoftmaxCELoss() net.train() animator = d2l.Animator(xlabel='epoch', ylabel='loss', xlim=[10, num_epochs])
for epoch in range(num_epochs): timer = d2l.Timer() metric = d2l.Accumulator(2) for batch in train_iter: optimizer.zero_grad() X, X_valid_len, Y, Y_valid_len = [x.to(device) for x in batch] bos = torch.tensor([target_vocab['<bos>']] * Y.shape[0], device=device).reshape(-1, 1) dec_input = torch.cat([bos, Y[:, :-1]], 1) Y_hat, _ = net(X, dec_input, X_valid_len) l = loss(Y_hat, Y, Y_valid_len) l.sum().backward() grad_clipping(net, l.sum()) num_tokens = Y_valid_len.sum() optimizer.step() with torch.no_grad(): metric.add(l.sum(), num_tokens) if (epoch + 1) % 10 == 0: animator.add(epoch + 1, (metric[0] / metric[1], )) print(f'loss {metric[0] / metric[1]}, {metric[1] / timer.stop()} tokens / sec on {device}') d2l.plt.show()
def predict_seq2seq(net, src_sentence, src_vocab, target_vocab, num_steps, device, save_attention_weights=False): """序列到序列模型的预测""" net.eval() src_tokens = src_vocab[src_sentence.lower().split(' ')] + [src_vocab['<eos>']] end_valid_len = torch.tensor([len(src_tokens)], device=device) src_tokens = d2l.truncate_pad(src_tokens, num_steps, src_vocab['<pad>']) enc_X = torch.unsqueeze( torch.tensor(src_tokens, dtype=torch.long, device=device), dim=0 ) enc_outputs = net.encoder(enc_X, end_valid_len) dec_state = net.decoder.init_state(enc_outputs, end_valid_len) dec_X = torch.unsqueeze( torch.tensor([target_vocab['<bos>']], dtype=torch.long, device=device), dim=0 ) output_seq, attention_weight_seq = [], [] for _ in range(num_steps): Y, dec_state = net.decoder(dec_X, dec_state) dec_X = Y.argmax(dim=2) pred = dec_X.squeeze(dim=0).type(torch.int32).item() if save_attention_weights: attention_weight_seq.append(net.decoder.attention_weights) if pred == target_vocab['<eos>']: break output_seq.append(pred) return ' '.join(target_vocab.to_tokens(output_seq)), attention_weight_seq
def bleu(pred_seq, label_seq, k): """计算BLEU""" pred_tokens, label_tokens = pred_seq.split(' '), label_seq.split(' ') len_pred, len_label = len(pred_tokens), len(label_tokens) score = math.exp(min(0, 1 - len_label / len_pred)) for n in range(1, k + 1): num_matches, label_subs = 0, collections.defaultdict(int) for i in range(len_label - n + 1): label_subs[' '.join(label_tokens[i: i + n])] += 1 for i in range(len_pred - n + 1): if label_subs[' '.join(pred_tokens[i: i + n])] > 0: num_matches += 1 label_subs[' '.join(pred_tokens[i: i + n])] += 1 score *= math.pow(num_matches / (len_pred - n + 1), math.pow(0.5, n)) return score
if __name__ == "__main__": encoder = Seq2SeqEncoder(vocab_size=10, embed_size=8, num_hidden=16, num_layers=2) encoder.eval() X = torch.zeros((4, 7), dtype=torch.long) output, state = encoder(X) print("encoder:") print('output shape:', output.shape, 'state shape:', state.shape)
decoder = Seq2SeqDecoder(vocab_size=10, embed_size=8, num_hidden=16, num_layer=2) decoder.eval() state = decoder.init_state(encoder(X)) output, state = decoder(X, state) print("decoder:") print('output shape:', output.shape, 'state shape:', state.shape)
loss = MaskedSoftmaxCELoss() print("loss demo:") print(loss(torch.ones(3, 4, 10), torch.ones((3, 4), dtype=torch.long), torch.tensor([4, 2, 0])))
embed_size , num_hidden, num_layers, dropout = 32, 32, 2, 0.1 batch_size, num_steps = 64, 10 lr, num_epochs, device = 0.005, 300, d2l.try_gpu() train_iter, src_vocab, target_vocab = load_data_nmt(batch_size, num_steps) encoder = Seq2SeqEncoder(len(src_vocab), embed_size, num_hidden, num_layers, dropout) decoder = Seq2SeqDecoder(len(target_vocab), embed_size, num_hidden, num_layers, dropout) net = EncoderDecoder(encoder, decoder) train_seq2seq(net, train_iter, lr, num_epochs, target_vocab, device)
print("start translating:") engs = ['go .', 'i lost .', 'he\'s calm .', 'i\'m home .'] fras = ['va !', 'j\'ai perdu .', 'il est calme .', 'je suis chez moi .'] for eng, fra in zip(engs, fras): translation, attention_weight_seq = predict_seq2seq(net, eng, src_vocab, target_vocab, num_steps, device) print(f'{eng} => {translation}, bleu {bleu(translation, fra, k=2):.3f}')
|