NLP 机器翻译:从RNN到Transformer
NLP 机器翻译从RNN到Transformer1. 机器翻译简介机器翻译Machine Translation, MT是自然语言处理NLP的重要任务旨在将一种语言的文本自动翻译成另一种语言。从早期的基于规则的方法到现代的深度学习方法机器翻译技术经历了显著的发展。核心挑战语言差异不同语言的语法结构、词汇和表达习惯存在显著差异上下文理解翻译需要考虑上下文语境而非逐词翻译歧义处理同一词汇在不同语境下可能有不同含义流畅性与准确性平衡翻译的准确性和目标语言的流畅性2. 基于RNN的机器翻译循环神经网络RNN及其变体LSTM、GRU是早期深度学习机器翻译的主流方法。2.1 编码器-解码器架构基于RNN的机器翻译通常采用编码器-解码器Encoder-Decoder架构编码器将源语言序列编码为固定长度的上下文向量解码器根据上下文向量生成目标语言序列2.2 实现示例import torch import torch.nn as nn import torch.optim as optim import numpy as np # 定义RNN编码器 class Encoder(nn.Module): def __init__(self, input_size, hidden_size, num_layers1): super(Encoder, self).__init__() self.hidden_size hidden_size self.num_layers num_layers self.embedding nn.Embedding(input_size, hidden_size) self.rnn nn.LSTM(hidden_size, hidden_size, num_layers, batch_firstTrue) def forward(self, x): # x: (batch_size, seq_len) embedded self.embedding(x) # embedded: (batch_size, seq_len, hidden_size) outputs, (hidden, cell) self.rnn(embedded) # outputs: (batch_size, seq_len, hidden_size) # hidden: (num_layers, batch_size, hidden_size) # cell: (num_layers, batch_size, hidden_size) return hidden, cell # 定义RNN解码器 class Decoder(nn.Module): def __init__(self, output_size, hidden_size, num_layers1): super(Decoder, self).__init__() self.hidden_size hidden_size self.num_layers num_layers self.embedding nn.Embedding(output_size, hidden_size) self.rnn nn.LSTM(hidden_size, hidden_size, num_layers, batch_firstTrue) self.fc nn.Linear(hidden_size, output_size) def forward(self, x, hidden, cell): # x: (batch_size, 1) - 一次输入一个词 x x.unsqueeze(1) embedded self.embedding(x) # embedded: (batch_size, 1, hidden_size) output, (hidden, cell) self.rnn(embedded, (hidden, cell)) # output: (batch_size, 1, hidden_size) prediction self.fc(output.squeeze(1)) # prediction: (batch_size, output_size) return prediction, hidden, cell # 定义Seq2Seq模型 class Seq2Seq(nn.Module): def __init__(self, encoder, decoder): super(Seq2Seq, self).__init__() self.encoder encoder self.decoder decoder def forward(self, source, target, teacher_forcing_ratio0.5): batch_size source.shape[0] target_len target.shape[1] target_vocab_size self.decoder.fc.out_features # 存储解码器的输出 outputs torch.zeros(batch_size, target_len, target_vocab_size).to(source.device) # 编码器的最后隐藏状态作为解码器的初始状态 hidden, cell self.encoder(source) # 解码器的第一个输入是起始标记 x target[:, 0] for t in range(1, target_len): # 解码一步 output, hidden, cell self.decoder(x, hidden, cell) # 存储输出 outputs[:, t, :] output # 决定是否使用teacher forcing teacher_force torch.rand(1).item() teacher_forcing_ratio # 下一个输入是真实目标或预测值 x target[:, t] if teacher_force else output.argmax(1) return outputs # 示例使用 # 假设我们有以下词汇表大小 source_vocab_size 10000 target_vocab_size 10000 hidden_size 256 num_layers 2 # 创建模型 encoder Encoder(source_vocab_size, hidden_size, num_layers) decoder Decoder(target_vocab_size, hidden_size, num_layers) model Seq2Seq(encoder, decoder) # 定义损失函数和优化器 criterion nn.CrossEntropyLoss() optimizer optim.Adam(model.parameters(), lr0.001) # 示例输入 # 假设source和target是已经转换为索引的序列 source torch.randint(0, source_vocab_size, (32, 10)) # (batch_size, seq_len) target torch.randint(0, target_vocab_size, (32, 15)) # (batch_size, seq_len) # 前向传播 outputs model(source, target) # 计算损失 # 只考虑从第1步开始的输出因为第0步是起始标记 loss criterion(outputs[:, 1:].reshape(-1, target_vocab_size), target[:, 1:].reshape(-1)) # 反向传播 optimizer.zero_grad() loss.backward() optimizer.step() print(fLoss: {loss.item():.4f})2.3 注意力机制为了解决长序列翻译的问题引入了注意力机制Attention Mechanism# 带有注意力机制的解码器 class AttentionDecoder(nn.Module): def __init__(self, output_size, hidden_size, num_layers1): super(AttentionDecoder, self).__init__() self.hidden_size hidden_size self.num_layers num_layers self.embedding nn.Embedding(output_size, hidden_size) self.rnn nn.LSTM(hidden_size * 2, hidden_size, num_layers, batch_firstTrue) self.fc nn.Linear(hidden_size, output_size) self.attention nn.Linear(hidden_size * 2, 1) def forward(self, x, hidden, cell, encoder_outputs): # x: (batch_size, 1) # encoder_outputs: (batch_size, seq_len, hidden_size) x x.unsqueeze(1) embedded self.embedding(x) # embedded: (batch_size, 1, hidden_size) # 计算注意力权重 seq_len encoder_outputs.shape[1] hidden_repeated hidden[-1].unsqueeze(1).repeat(1, seq_len, 1) # hidden_repeated: (batch_size, seq_len, hidden_size) attention_input torch.cat((hidden_repeated, encoder_outputs), dim2) # attention_input: (batch_size, seq_len, hidden_size * 2) attention_weights torch.softmax(self.attention(attention_input), dim1) # attention_weights: (batch_size, seq_len, 1) # 计算上下文向量 context_vector torch.bmm(attention_weights.permute(0, 2, 1), encoder_outputs) # context_vector: (batch_size, 1, hidden_size) # concatenate embedded input and context vector rnn_input torch.cat((embedded, context_vector), dim2) # rnn_input: (batch_size, 1, hidden_size * 2) output, (hidden, cell) self.rnn(rnn_input, (hidden, cell)) # output: (batch_size, 1, hidden_size) prediction self.fc(output.squeeze(1)) # prediction: (batch_size, output_size) return prediction, hidden, cell, attention_weights # 带有注意力机制的Seq2Seq模型 class AttentionSeq2Seq(nn.Module): def __init__(self, encoder, decoder): super(AttentionSeq2Seq, self).__init__() self.encoder encoder self.decoder decoder def forward(self, source, target, teacher_forcing_ratio0.5): batch_size source.shape[0] target_len target.shape[1] target_vocab_size self.decoder.fc.out_features # 存储解码器的输出 outputs torch.zeros(batch_size, target_len, target_vocab_size).to(source.device) attention_weights_list [] # 编码器的输出和最后隐藏状态 encoder_outputs, (hidden, cell) self.encoder(source) # 解码器的第一个输入是起始标记 x target[:, 0] for t in range(1, target_len): # 解码一步 output, hidden, cell, attention_weights self.decoder(x, hidden, cell, encoder_outputs) # 存储输出 outputs[:, t, :] output attention_weights_list.append(attention_weights) # 决定是否使用teacher forcing teacher_force torch.rand(1).item() teacher_forcing_ratio # 下一个输入是真实目标或预测值 x target[:, t] if teacher_force else output.argmax(1) return outputs, attention_weights_list # 修改编码器以返回所有输出 class EncoderWithOutputs(nn.Module): def __init__(self, input_size, hidden_size, num_layers1): super(EncoderWithOutputs, self).__init__() self.hidden_size hidden_size self.num_layers num_layers self.embedding nn.Embedding(input_size, hidden_size) self.rnn nn.LSTM(hidden_size, hidden_size, num_layers, batch_firstTrue) def forward(self, x): embedded self.embedding(x) outputs, (hidden, cell) self.rnn(embedded) return outputs, (hidden, cell) # 创建带有注意力机制的模型 encoder EncoderWithOutputs(source_vocab_size, hidden_size, num_layers) decoder AttentionDecoder(target_vocab_size, hidden_size, num_layers) attention_model AttentionSeq2Seq(encoder, decoder) # 前向传播 outputs, attention_weights attention_model(source, target) # 计算损失 loss criterion(outputs[:, 1:].reshape(-1, target_vocab_size), target[:, 1:].reshape(-1)) print(fAttention model loss: {loss.item():.4f})3. 基于Transformer的机器翻译Transformer模型的出现彻底改变了机器翻译领域通过自注意力机制Self-Attention解决了RNN的序列依赖问题。3.1 Transformer架构Transformer由以下主要组件组成自注意力层捕获序列中任意位置之间的依赖关系位置编码为模型提供序列位置信息前馈网络对注意力输出进行非线性变换编码器-解码器结构编码器处理源语言解码器生成目标语言3.2 实现示例import torch import torch.nn as nn import torch.nn.functional as F # 位置编码 class PositionalEncoding(nn.Module): def __init__(self, d_model, max_seq_len5000): super(PositionalEncoding, self).__init__() pe torch.zeros(max_seq_len, d_model) position torch.arange(0, max_seq_len, dtypetorch.float).unsqueeze(1) div_term torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) pe[:, 0::2] torch.sin(position * div_term) pe[:, 1::2] torch.cos(position * div_term) pe pe.unsqueeze(0) # (1, max_seq_len, d_model) self.register_buffer(pe, pe) def forward(self, x): # x: (batch_size, seq_len, d_model) seq_len x.size(1) x x self.pe[:, :seq_len, :] return x # 多头注意力 class MultiHeadAttention(nn.Module): def __init__(self, d_model, num_heads): super(MultiHeadAttention, self).__init__() self.num_heads num_heads self.d_model d_model self.d_k d_model // num_heads self.W_q nn.Linear(d_model, d_model) self.W_k nn.Linear(d_model, d_model) self.W_v nn.Linear(d_model, d_model) self.W_o nn.Linear(d_model, d_model) def forward(self, q, k, v, maskNone): batch_size q.size(0) # 线性变换并分拆成多个头 q self.W_q(q).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2) k self.W_k(k).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2) v self.W_v(v).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2) # 计算注意力分数 scores torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.d_k) if mask is not None: scores scores.masked_fill(mask 0, -1e9) # 计算注意力权重 attn_weights F.softmax(scores, dim-1) # 加权求和 output torch.matmul(attn_weights, v) # 合并多个头 output output.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model) output self.W_o(output) return output, attn_weights # 前馈网络 class FeedForward(nn.Module): def __init__(self, d_model, d_ff): super(FeedForward, self).__init__() self.fc1 nn.Linear(d_model, d_ff) self.fc2 nn.Linear(d_ff, d_model) self.relu nn.ReLU() def forward(self, x): return self.fc2(self.relu(self.fc1(x))) # 编码器层 class EncoderLayer(nn.Module): def __init__(self, d_model, num_heads, d_ff, dropout0.1): super(EncoderLayer, self).__init__() self.self_attn MultiHeadAttention(d_model, num_heads) self.feed_forward FeedForward(d_model, d_ff) self.norm1 nn.LayerNorm(d_model) self.norm2 nn.LayerNorm(d_model) self.dropout nn.Dropout(dropout) def forward(self, x, mask): # 自注意力 attn_output, _ self.self_attn(x, x, x, mask) x self.norm1(x self.dropout(attn_output)) # 前馈网络 ff_output self.feed_forward(x) x self.norm2(x self.dropout(ff_output)) return x # 解码器层 class DecoderLayer(nn.Module): def __init__(self, d_model, num_heads, d_ff, dropout0.1): super(DecoderLayer, self).__init__() self.self_attn MultiHeadAttention(d_model, num_heads) self.cross_attn MultiHeadAttention(d_model, num_heads) self.feed_forward FeedForward(d_model, d_ff) self.norm1 nn.LayerNorm(d_model) self.norm2 nn.LayerNorm(d_model) self.norm3 nn.LayerNorm(d_model) self.dropout nn.Dropout(dropout) def forward(self, x, enc_output, look_ahead_mask, padding_mask): # 自注意力带掩码 attn_output, _ self.self_attn(x, x, x, look_ahead_mask) x self.norm1(x self.dropout(attn_output)) # 交叉注意力 attn_output, _ self.cross_attn(x, enc_output, enc_output, padding_mask) x self.norm2(x self.dropout(attn_output)) # 前馈网络 ff_output self.feed_forward(x) x self.norm3(x self.dropout(ff_output)) return x # Transformer模型 class Transformer(nn.Module): def __init__(self, src_vocab_size, tgt_vocab_size, d_model512, num_layers6, num_heads8, d_ff2048, dropout0.1): super(Transformer, self).__init__() self.encoder_embedding nn.Embedding(src_vocab_size, d_model) self.decoder_embedding nn.Embedding(tgt_vocab_size, d_model) self.positional_encoding PositionalEncoding(d_model) self.encoder_layers nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)]) self.decoder_layers nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)]) self.fc nn.Linear(d_model, tgt_vocab_size) self.dropout nn.Dropout(dropout) def forward(self, src, tgt, src_mask, tgt_mask): # 编码器 enc_emb self.dropout(self.positional_encoding(self.encoder_embedding(src))) enc_output enc_emb for layer in self.encoder_layers: enc_output layer(enc_output, src_mask) # 解码器 dec_emb self.dropout(self.positional_encoding(self.decoder_embedding(tgt))) dec_output dec_emb for layer in self.decoder_layers: dec_output layer(dec_output, enc_output, tgt_mask, src_mask) # 输出层 output self.fc(dec_output) return output # 示例使用 import math # 超参数 src_vocab_size 10000 tgt_vocab_size 10000 d_model 512 num_layers 6 num_heads 8 d_ff 2048 dropout 0.1 # 创建模型 model Transformer(src_vocab_size, tgt_vocab_size, d_model, num_layers, num_heads, d_ff, dropout) # 示例输入 src torch.randint(0, src_vocab_size, (32, 10)) # (batch_size, src_seq_len) tgt torch.randint(0, tgt_vocab_size, (32, 15)) # (batch_size, tgt_seq_len) # 创建掩码 def create_mask(src, tgt): src_mask (src ! 0).unsqueeze(1).unsqueeze(2) # (batch_size, 1, 1, src_seq_len) tgt_mask (tgt ! 0).unsqueeze(1).unsqueeze(3) # (batch_size, 1, tgt_seq_len, 1) seq_len tgt.size(1) look_ahead_mask torch.triu(torch.ones(seq_len, seq_len), diagonal1).bool() tgt_mask tgt_mask ~look_ahead_mask # (batch_size, 1, tgt_seq_len, tgt_seq_len) return src_mask, tgt_mask src_mask, tgt_mask create_mask(src, tgt) # 前向传播 outputs model(src, tgt, src_mask, tgt_mask) # 计算损失 criterion nn.CrossEntropyLoss(ignore_index0) loss criterion(outputs[:, :-1].reshape(-1, tgt_vocab_size), tgt[:, 1:].reshape(-1)) print(fTransformer loss: {loss.item():.4f})4. 性能比较4.1 模型性能对比模型优势劣势BLEU分数推理速度RNN实现简单适合短序列长序列性能差并行度低25-30慢RNNAttention长序列性能提升并行度低训练速度慢30-35中Transformer并行度高长序列性能好计算复杂度高内存需求大35-45快4.2 训练时间比较import time # 训练时间测试 def train_time_test(model, data_loader, epochs5): start_time time.time() for epoch in range(epochs): for batch in data_loader: # 模拟训练步骤 src, tgt batch outputs model(src, tgt) loss criterion(outputs, tgt) loss.backward() end_time time.time() return end_time - start_time # 假设我们有数据加载器 # data_loader DataLoader(...) # 测试不同模型的训练时间 # rnn_time train_time_test(rnn_model, data_loader) # attention_time train_time_test(attention_model, data_loader) # transformer_time train_time_test(transformer_model, data_loader) # print(fRNN training time: {rnn_time:.2f}s) # print(fRNNAttention training time: {attention_time:.2f}s) # print(fTransformer training time: {transformer_time:.2f}s)5. 最佳实践5.1 数据预处理分词使用子词分词如BPE、WordPiece处理未登录词标准化统一大小写、标点符号处理长度控制设置合理的序列长度过长的序列可以截断批次处理使用批次填充padding确保同一批次长度一致5.2 模型训练学习率调度使用预热warmup和线性衰减的学习率策略批量大小根据GPU内存调整一般越大越好混合精度使用FP16加速训练标签平滑减少模型对预测的过度自信早停监控验证集性能防止过拟合5.3 模型优化# 混合精度训练 from torch.cuda.amp import autocast, GradScaler transformer Transformer(src_vocab_size, tgt_vocab_size) transformer.cuda() optimizer optim.Adam(transformer.parameters(), lr0.0001, betas(0.9, 0.98), eps1e-9) scaler GradScaler() for epoch in range(epochs): for batch in data_loader: src, tgt batch src, tgt src.cuda(), tgt.cuda() optimizer.zero_grad() with autocast(): outputs transformer(src, tgt[:, :-1], src_mask, tgt_mask[:, :-1, :-1]) loss criterion(outputs.reshape(-1, tgt_vocab_size), tgt[:, 1:].reshape(-1)) scaler.scale(loss).backward() scaler.step(optimizer) scaler.update()5.4 推理优化束搜索Beam Search平衡翻译质量和多样性长度惩罚避免生成过短或过长的翻译批处理批量处理多个输入以提高推理速度模型量化使用INT8量化减少模型大小和加速推理6. 实际应用案例6.1 使用Hugging Face Transformersfrom transformers import MarianMTModel, MarianTokenizer # 加载预训练模型和分词器 model_name Helsinki-NLP/opus-mt-en-zh tokenizer MarianTokenizer.from_pretrained(model_name) model MarianMTModel.from_pretrained(model_name) # 翻译示例 text Hello, how are you? translated model.generate(**tokenizer(text, return_tensorspt, paddingTrue)) result tokenizer.decode(translated[0], skip_special_tokensTrue) print(f原文: {text}) print(f译文: {result}) # 批量翻译 texts [I love machine learning., Transformers are powerful., NLP is fascinating.] translated model.generate(**tokenizer(texts, return_tensorspt, paddingTrue)) results [tokenizer.decode(t, skip_special_tokensTrue) for t in translated] for i, (src, tgt) in enumerate(zip(texts, results)): print(f{i1}. 原文: {src}) print(f 译文: {tgt})6.2 自定义模型训练# 假设我们有一个平行语料库 train_data [ (I am happy, 我很高兴), (She loves cats, 她喜欢猫), # 更多数据... ] # 构建词汇表 src_vocab set() tgt_vocab set() for src, tgt in train_data: src_vocab.update(src.split()) tgt_vocab.update(tgt.split()) src_vocab {word: i1 for i, word in enumerate(src_vocab)} # 0 留作 padding tgt_vocab {word: i1 for i, word in enumerate(tgt_vocab)} tgt_vocab[sos] len(tgt_vocab) 1 tgt_vocab[eos] len(tgt_vocab) 1 # 数据转换 def prepare_data(data, src_vocab, tgt_vocab): src_ids [] tgt_ids [] for src, tgt in data: src_id [src_vocab.get(word, 0) for word in src.split()] tgt_id [tgt_vocab[sos]] [tgt_vocab.get(word, 0) for word in tgt.split()] [tgt_vocab[eos]] src_ids.append(src_id) tgt_ids.append(tgt_id) return src_ids, tgt_ids src_ids, tgt_ids prepare_data(train_data, src_vocab, tgt_vocab) # 填充和批次处理 # 这里需要实现数据加载器... # 训练模型 # 这里使用前面定义的Transformer模型...7. 结论从RNN到Transformer机器翻译技术经历了显著的发展RNN奠定了序列到序列学习的基础但在长序列处理上存在局限性RNNAttention通过注意力机制改善了长序列翻译质量Transformer通过自注意力机制实现了并行计算显著提高了翻译质量和速度未来发展方向多语言翻译支持更多语言对和零-shot翻译领域适应针对特定领域如医学、法律的翻译优化解释性提高翻译模型的可解释性低资源语言改善低资源语言的翻译质量实时翻译进一步提高推理速度支持实时应用通过不断的技术创新机器翻译正在变得越来越准确、流畅和高效为跨语言交流提供了有力的工具。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2551941.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!