# coding:utf8 import torch import torch.nn as nn import numpy as np import random import json """ 基于pytorch的网络编写 实现一个RNN网络完成多分类任务 判断字符 'a' 第一次出现在字符串中的位置 """ class TorchModel(nn.Module): def __init__(self, vector_dim, sentence_length, vocab, num_classes): super(TorchModel, self).__init__() self.embedding = nn.Embedding(len(vocab), vector_dim, padding_idx=0) # embedding层 self.rnn = nn.GRU(vector_dim, vector_dim, batch_first=True) # RNN层 self.classify = nn.Linear(vector_dim, num_classes) # 线性层 self.loss = nn.CrossEntropyLoss() # loss函数采用交叉熵损失 # 当输入真实标签,返回loss值;无真实标签,返回预测值 def forward(self, x, y=None): x = self.embedding(x) # (batch_size, sen_len) -> (batch_size, sen_len, vector_dim) _, hidden = self.rnn(x) # (batch_size, sen_len, vector_dim) -> (1, batch_size, vector_dim) hidden = hidden.squeeze(0) # (1, batch_size, vector_dim) -> (batch_size, vector_dim) y_pred = self.classify(hidden) # (batch_size, vector_dim) -> (batch_size, num_classes) if y is not None: return self.loss(y_pred, y) # 预测值和真实值计算损失 else: return torch.softmax(y_pred, dim=1) # 输出预测概率分布 # 字符集 def build_vocab(): chars = "abcdefghijklmnopqrstuvwxyz" # 字符集 vocab = {"pad": 0} for index, char in enumerate(chars): vocab[char] = index + 1 # 每个字对应一个序号 vocab['unk'] = len(vocab) # 未知字符 return vocab # 随机生成一个样本 def build_sample(vocab, sentence_length): # 随机从字表选取sentence_length个字,可能重复 x = [random.choice(list(vocab.keys())) for _ in range(sentence_length)] # 确保每个样本都包含字符 'a' a_position = random.randint(0, sentence_length - 1) x[a_position] = 'a' # 将字转换成序号 x = [vocab.get(word, vocab['unk']) for word in x] return x, a_position # 建立数据集 def build_dataset(sample_length, vocab, sentence_length): dataset_x = [] dataset_y = [] for i in range(sample_length): x, y = build_sample(vocab, sentence_length) dataset_x.append(x) dataset_y.append(y) return torch.LongTensor(dataset_x), torch.LongTensor(dataset_y) # 建立模型 def build_model(vocab, char_dim, sentence_length): num_classes = sentence_length # 分类数等于句子长度(每个位置一个类别) model = TorchModel(char_dim, sentence_length, vocab, num_classes) return model # 测试代码 def evaluate(model, vocab, sentence_length): model.eval() x, y = build_dataset(200, vocab, sentence_length) # 建立200个用于测试的样本 correct, wrong = 0, 0 with torch.no_grad(): y_pred = model(x) # 模型预测 y_pred = torch.argmax(y_pred, dim=1) # 获取预测的类别 for y_p, y_t in zip(y_pred, y): # 与真实标签进行对比 if y_p == y_t: correct += 1 else: wrong += 1 print("正确预测个数:%d, 正确率:%f" % (correct, correct / (correct + wrong))) return correct / (correct + wrong) def main(): # 配置参数 epoch_num = 30 # 训练轮数 batch_size = 20 # 每次训练样本个数 train_sample = 500 # 每轮训练总共训练的样本总数 char_dim = 20 # 每个字的维度 sentence_length = 6 # 样本文本长度 learning_rate = 0.005 # 学习率 # 建立字表 vocab = build_vocab() # 建立模型 model = build_model(vocab, char_dim, sentence_length) # 选择优化器 optim = torch.optim.Adam(model.parameters(), lr=learning_rate) log = [] # 训练过程 for epoch in range(epoch_num): model.train() watch_loss = [] for batch in range(int(train_sample / batch_size)): x, y = build_dataset(batch_size, vocab, sentence_length) # 构造一组训练样本 optim.zero_grad() # 梯度归零 loss = model(x, y) # 计算loss loss.backward() # 计算梯度 optim.step() # 更新权重 watch_loss.append(loss.item()) print("=========\n第%d轮平均loss:%f" % (epoch + 1, np.mean(watch_loss))) acc = evaluate(model, vocab, sentence_length) # 测试本轮模型结果 log.append([acc, np.mean(watch_loss)]) # 保存模型 torch.save(model.state_dict(), "model.pth") # 保存词表 with open("vocab.json", "w", encoding="utf8") as writer: writer.write(json.dumps(vocab, ensure_ascii=False, indent=2)) # 生成200个随机测试样本 test_strings = [] for _ in range(200): # 随机生成一个包含字符 'a' 的字符串 chars = list("abcdefghijklmnopqrstuvwxyz") s = [random.choice(chars) for _ in range(sentence_length)] a_pos = random.randint(0, sentence_length - 1) s[a_pos] = 'a' # 确保字符串中包含 'a' test_strings.append(''.join(s)) # 用模型预测并输出结果 print("\n随机生成的200个测试样本预测结果:") predict("model.pth", "vocab.json", test_strings) # 使用训练好的模型做预测 def predict(model_path, vocab_path, input_strings): char_dim = 20 # 每个字的维度 sentence_length = 6 # 样本文本长度 # 加载字符表 vocab = json.load(open(vocab_path, "r", encoding="utf8")) # 建立模型 model = build_model(vocab, char_dim, sentence_length) # 加载训练好的权重 model.load_state_dict(torch.load(model_path)) x = [] for input_string in input_strings: # 将输入字符串转换为字符序号列表,并进行填充 seq = [vocab.get(char, vocab['unk']) for char in input_string] if len(seq) < sentence_length: seq = seq + [vocab['pad']] * (sentence_length - len(seq)) else: seq = seq[:sentence_length] x.append(seq) model.eval() # 测试模式 with torch.no_grad(): # 不计算梯度 result = model.forward(torch.LongTensor(x)) # 模型预测 # 只打印前10个结果和最后10个结果,避免输出过多 if len(input_strings) > 20: display_indices = list(range(10)) + list(range(len(input_strings) - 10, len(input_strings))) for i in display_indices: predicted_position = torch.argmax(result[i]).item() print(f"输入:{input_strings[i]}, 预测类别:{predicted_position}, 概率分布:{result[i].numpy()}") print(f"... 中间省略 {len(input_strings) - 20} 个结果 ...") else: for i, input_string in enumerate(input_strings): predicted_position = torch.argmax(result[i]).item() print(f"输入:{input_string}, 预测类别:{predicted_position}, 概率分布:{result[i].numpy()}") if __name__ == "__main__": main() 预测率百分之90
改进优化
要提升模型准确率,可以从数据、模型架构、训练策略等多方面优化。以下是针对你提供的 RNN 模型的具体改进方案,结合代码实现,逐步提升准确率:
一、数据增强与预处理优化
1. 增加数据多样性(解决过拟合)
- 问题:原数据仅包含 26 个字母,且强制每个样本包含
a
,可能导致模型泛化能力不足。 - 改进:
- 允许样本不包含
a
(需新增类别-1
或视为无效样本),或在预测时将 “无 a” 作为特殊类别。 - 引入更多字符(如标点、大写字母),提升输入多样性。
- 允许样本不包含
2. 延长序列长度(适配更长文本)
- 问题:固定
sentence_length=6
限制了模型处理更长序列的能力。 - 改进:
- 动态填充序列,允许输入长度可变(需修改 RNN 层为
pack_padded_sequence
)。 - 增加最大序列长度(如设为 10),生成更多样化的样本。
- 动态填充序列,允许输入长度可变(需修改 RNN 层为
代码调整示例:
python
运行
# 允许序列长度可变(以最大长度10为例)
sentence_length = 10
x, y = build_sample(vocab, sentence_length) # 生成长度为10的样本
二、模型架构优化
1. 改用双向 RNN(捕捉上下文信息)
- 问题:单向 GRU 仅能捕捉上文信息,无法利用下文语境。
- 改进:使用
nn.BiGRU
替换单向 GRU,增强特征提取能力。
代码调整:
python
运行
class TorchModel(nn.Module):
def __init__(self, vector_dim, sentence_length, vocab, num_classes):
super().__init__()
self.embedding = nn.Embedding(len(vocab), vector_dim, padding_idx=0)
# 双向GRU,输出维度翻倍(hidden_size * 2)
self.rnn = nn.BiGRU(vector_dim, vector_dim, batch_first=True, bidirectional=True)
self.classify = nn.Linear(vector_dim * 2, num_classes) # 线性层输入维度改为2*vector_dim
self.loss = nn.CrossEntropyLoss()
2. 增加网络层数与隐藏层维度
- 问题:单层 GRU 表达能力有限,隐藏层维度(20)较小。
- 改进:
- 增加 GRU 层数(如
num_layers=2
)。 - 增大隐藏层维度(如
vector_dim=32
)。
- 增加 GRU 层数(如
代码调整:
python
运行
self.rnn = nn.BiGRU(vector_dim, vector_dim, batch_first=True, bidirectional=True, num_layers=2)
三、训练策略优化
1. 调整学习率与优化器
- 问题:初始学习率(0.005)可能过高,导致收敛不稳定。
- 改进:
- 降低学习率(如
lr=0.001
),并使用学习率衰减(StepLR
)。 - 改用更鲁棒的优化器(如
AdamW
),减少过拟合。
- 降低学习率(如
代码调整:
python
运行
from torch.optim import lr_scheduler
optim = torch.optim.AdamW(model.parameters(), lr=0.001)
scheduler = lr_scheduler.StepLR(optim, step_size=5, gamma=0.9) # 每5轮衰减学习率
for epoch in range(epoch_num):
# 训练...
scheduler.step() # 更新学习率
2. 增加正则化(防止过拟合)
- 问题:模型可能在训练集上过拟合,导致测试集准确率下降。
- 改进:
- 在 RNN 层后添加
Dropout
层。 - 在损失函数中添加权重衰减(L2 正则)。
- 在 RNN 层后添加
代码调整:
python
运行
class TorchModel(nn.Module):
def __init__(self, vector_dim, sentence_length, vocab, num_classes):
super().__init__()
self.embedding = nn.Embedding(len(vocab), vector_dim, padding_idx=0)
self.rnn = nn.BiGRU(vector_dim, vector_dim, batch_first=True, bidirectional=True, num_layers=2)
self.dropout = nn.Dropout(0.2) # 添加Dropout层
self.classify = nn.Linear(vector_dim * 2, num_classes)
self.loss = nn.CrossEntropyLoss()
def forward(self, x, y=None):
x = self.embedding(x)
out, hidden = self.rnn(x)
out = self.dropout(out) # 在RNN输出后应用Dropout
# 取最后一个时间步的隐藏状态(双向合并)
hidden = torch.cat((hidden[-2, :, :], hidden[-1, :, :]), dim=1) # 双向GRU的隐藏状态拼接
y_pred = self.classify(hidden)
# ...
四、预测逻辑优化
1. 利用序列所有时间步输出(而非仅最后隐藏状态)
- 问题:原模型仅使用最后一个时间步的隐藏状态,忽略中间时间步的信息。
- 改进:对 RNN 的所有时间步输出取平均或池化,获取更全面的特征。
代码调整:
python
运行
def forward(self, x, y=None):
x = self.embedding(x)
out, _ = self.rnn(x) # out shape: (batch_size, seq_len, hidden_size*2)
out = self.dropout(out)
# 平均池化:对所有时间步取平均
out = torch.mean(out, dim=1)
y_pred = self.classify(out)
# ...
五、完整优化后代码(关键部分)
python
运行
class TorchModel(nn.Module):
def __init__(self, vector_dim, sentence_length, vocab, num_classes):
super().__init__()
self.embedding = nn.Embedding(len(vocab), vector_dim, padding_idx=0)
self.rnn = nn.BiGRU(
vector_dim,
vector_dim,
batch_first=True,
bidirectional=True,
num_layers=2,
dropout=0.2 # 层间Dropout
)
self.dropout = nn.Dropout(0.3)
self.classify = nn.Linear(vector_dim * 2, num_classes)
self.loss = nn.CrossEntropyLoss()
def forward(self, x, y=None):
x = self.embedding(x) # (batch, seq_len, vec_dim)
out, _ = self.rnn(x) # (batch, seq_len, 2*vec_dim)
out = self.dropout(out)
# 平均池化替代仅取最后隐藏状态
out = torch.mean(out, dim=1) # (batch, 2*vec_dim)
y_pred = self.classify(out)
if y is not None:
return self.loss(y_pred, y)
else:
return torch.softmax(y_pred, dim=1)
# 训练配置优化
epoch_num = 50 # 增加训练轮数
batch_size = 32 # 增大批量大小
train_sample = 2000 # 增加训练样本数量
char_dim = 32 # 增大词向量维度
learning_rate = 0.001 # 降低学习率
六、预期效果
- 准确率提升:通过双向 RNN、数据增强、正则化等优化,测试集准确率可从 90% 提升至95% 以上。
- 泛化能力增强:模型能更好处理不同长度序列、字符分布变化的情况。
注意:实际效果需根据数据规模和硬件条件调整,建议逐步验证单一优化项的影响(如先加双向 RNN,再加数据增强),定位关键改进点。
# coding:utf8 import torch import torch.nn as nn import numpy as np import random import json from torch.optim import lr_scheduler """ 基于pytorch的网络编写 实现一个RNN网络完成多分类任务 判断字符 'a' 第一次出现在字符串中的位置 """ class TorchModel(nn.Module): def __init__(self, vector_dim, sentence_length, vocab, num_classes): super(TorchModel, self).__init__() self.embedding = nn.Embedding(len(vocab), vector_dim, padding_idx=0) # embedding层 # 修复:使用nn.GRU并设置bidirectional=True实现双向GRU self.rnn = nn.GRU( vector_dim, vector_dim, batch_first=True, bidirectional=True, num_layers=2, # 2层网络 dropout=0.2 # 层间dropout ) self.dropout = nn.Dropout(0.3) # 输出层dropout self.classify = nn.Linear(vector_dim * 2, num_classes) # 线性层,输入维度需乘以2(双向) self.loss = nn.CrossEntropyLoss() # loss函数采用交叉熵损失 # 当输入真实标签,返回loss值;无真实标签,返回预测值 def forward(self, x, y=None): x = self.embedding(x) # (batch_size, sen_len) -> (batch_size, sen_len, vector_dim) out, _ = self.rnn(x) # (batch_size, sen_len, vector_dim*2) out = self.dropout(out) # 使用平均池化替代仅最后时间步的隐藏状态,捕获更全面的序列信息 out = torch.mean(out, dim=1) # (batch_size, vector_dim*2) y_pred = self.classify(out) # (batch_size, num_classes) if y is not None: return self.loss(y_pred, y) # 预测值和真实值计算损失 else: return torch.softmax(y_pred, dim=1) # 输出预测概率分布 # 字符集,增加多样性 def build_vocab(): chars = "abcdefghijklmnopqrstuvwxyz0123456789,.!?-_" # 增加数字和符号 vocab = {"pad": 0} for index, char in enumerate(chars): vocab[char] = index + 1 # 每个字对应一个序号 vocab['unk'] = len(vocab) # 未知字符 return vocab # 随机生成一个样本,支持更灵活的配置 def build_sample(vocab, sentence_length, force_a=True): # 随机从字表选取sentence_length个字,可能重复 x = [random.choice(list(vocab.keys())) for _ in range(sentence_length)] # 控制字符 'a' 的出现概率,增加数据多样性 if force_a or random.random() > 0.2: # 80%的概率包含 'a' a_position = random.randint(0, sentence_length - 1) x[a_position] = 'a' # 将字符转换为对应的索引 x_indices = [vocab.get(char, vocab['unk']) for char in x] return x_indices, a_position else: # 不包含 'a' 的情况,类别设为 -1(实际训练时会被过滤) x_indices = [vocab.get(char, vocab['unk']) for char in x] return x_indices, -1 # 建立数据集,支持更灵活的配置 def build_dataset(sample_length, vocab, sentence_length, force_a=True): dataset_x = [] dataset_y = [] for i in range(sample_length): x, y = build_sample(vocab, sentence_length, force_a) # 过滤掉不包含 'a' 的样本(y=-1) if y != -1: dataset_x.append(x) dataset_y.append(y) # 确保有数据可用 if len(dataset_x) == 0: raise ValueError("生成的样本中没有符合条件的样本(全部不包含 'a')") return torch.LongTensor(dataset_x), torch.LongTensor(dataset_y) # 建立模型 def build_model(vocab, char_dim, sentence_length): num_classes = sentence_length # 分类数等于句子长度(每个位置一个类别) model = TorchModel(char_dim, sentence_length, vocab, num_classes) return model # 测试代码 def evaluate(model, vocab, sentence_length): model.eval() x, y = build_dataset(500, vocab, sentence_length, force_a=True) # 使用500个样本评估 correct, wrong = 0, 0 with torch.no_grad(): y_pred = model(x) # 模型预测 y_pred = torch.argmax(y_pred, dim=1) # 获取预测的类别 for y_p, y_t in zip(y_pred, y): # 与真实标签进行对比 if y_p == y_t: correct += 1 else: wrong += 1 accuracy = correct / (correct + wrong) print(f"正确预测个数:{correct}, 总样本数:{correct + wrong}, 正确率:{accuracy:.4f}") return accuracy def main(): # 配置参数 epoch_num = 50 # 增加训练轮数 batch_size = 32 # 增大batch size train_sample = 2000 # 增加训练样本数量 char_dim = 32 # 增大词向量维度 sentence_length = 8 # 增加句子长度,提高任务难度 learning_rate = 0.001 # 降低初始学习率 # 建立字表 vocab = build_vocab() # 建立模型 model = build_model(vocab, char_dim, sentence_length) # 选择优化器,使用AdamW(带权重衰减的Adam) optim = torch.optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=1e-4) # 学习率调度器,每10轮降低学习率 scheduler = lr_scheduler.StepLR(optim, step_size=10, gamma=0.5) log = [] # 训练过程 for epoch in range(epoch_num): model.train() watch_loss = [] for batch in range(int(train_sample / batch_size)): x, y = build_dataset(batch_size, vocab, sentence_length) # 构造一组训练样本 optim.zero_grad() # 梯度归零 loss = model(x, y) # 计算loss loss.backward() # 计算梯度 optim.step() # 更新权重 watch_loss.append(loss.item()) # 更新学习率 scheduler.step() # 打印当前学习率 current_lr = scheduler.get_last_lr()[0] print(f"=========\n第{epoch + 1}/{epoch_num}轮,平均loss:{np.mean(watch_loss):.4f},学习率:{current_lr:.6f}") acc = evaluate(model, vocab, sentence_length) # 测试本轮模型结果 log.append([acc, np.mean(watch_loss)]) # 保存模型 torch.save(model.state_dict(), "model_optimized.pth") # 保存词表 with open("vocab_optimized.json", "w", encoding="utf8") as writer: writer.write(json.dumps(vocab, ensure_ascii=False, indent=2)) # 生成200个随机测试样本 test_strings = [] for _ in range(200): # 随机生成一个包含字符 'a' 的字符串 chars = list("abcdefghijklmnopqrstuvwxyz0123456789,.!?-_") s = [random.choice(chars) for _ in range(sentence_length)] a_pos = random.randint(0, sentence_length - 1) s[a_pos] = 'a' # 确保字符串中包含 'a' test_strings.append(''.join(s)) # 用模型预测并输出结果 print("\n随机生成的200个测试样本预测结果(展示前10个和后10个):") predict("model_optimized.pth", "vocab_optimized.json", test_strings) # 使用训练好的模型做预测 def predict(model_path, vocab_path, input_strings): char_dim = 32 # 与训练时保持一致 sentence_length = 8 # 与训练时保持一致 # 加载字符表 vocab = json.load(open(vocab_path, "r", encoding="utf8")) # 建立模型 model = build_model(vocab, char_dim, sentence_length) # 加载训练好的权重 model.load_state_dict(torch.load(model_path)) x = [] for input_string in input_strings: # 将输入字符串转换为字符序号列表,并进行填充 seq = [vocab.get(char, vocab['unk']) for char in input_string] if len(seq) < sentence_length: seq = seq + [vocab['pad']] * (sentence_length - len(seq)) else: seq = seq[:sentence_length] x.append(seq) model.eval() # 测试模式 with torch.no_grad(): # 不计算梯度 result = model.forward(torch.LongTensor(x)) # 模型预测 # 只打印前10个结果和最后10个结果,避免输出过多 if len(input_strings) > 20: display_indices = list(range(10)) + list(range(len(input_strings) - 10, len(input_strings))) for i in display_indices: predicted_position = torch.argmax(result[i]).item() print(f"输入:{input_strings[i]}, 预测类别:{predicted_position}, 概率分布:{result[i].numpy()}") print(f"... 中间省略 {len(input_strings) - 20} 个结果 ...") else: for i, input_string in enumerate(input_strings): predicted_position = torch.argmax(result[i]).item() print(f"输入:{input_strings[i]}, 预测类别:{predicted_position}, 概率分布:{result[i].numpy()}") if __name__ == "__main__": main()