# coding:utf8
import torch
import torch.nn as nn
import numpy as np
import random
import json
"""
基于pytorch的网络编写
实现一个RNN网络完成多分类任务
判断字符 'a' 第一次出现在字符串中的位置
"""
class TorchModel(nn.Module):
def __init__(self, vector_dim, sentence_length, vocab, num_classes):
super(TorchModel, self).__init__()
self.embedding = nn.Embedding(len(vocab), vector_dim, padding_idx=0) # embedding层
self.rnn = nn.GRU(vector_dim, vector_dim, batch_first=True) # RNN层
self.classify = nn.Linear(vector_dim, num_classes) # 线性层
self.loss = nn.CrossEntropyLoss() # loss函数采用交叉熵损失
# 当输入真实标签,返回loss值;无真实标签,返回预测值
def forward(self, x, y=None):
x = self.embedding(x) # (batch_size, sen_len) -> (batch_size, sen_len, vector_dim)
_, hidden = self.rnn(x) # (batch_size, sen_len, vector_dim) -> (1, batch_size, vector_dim)
hidden = hidden.squeeze(0) # (1, batch_size, vector_dim) -> (batch_size, vector_dim)
y_pred = self.classify(hidden) # (batch_size, vector_dim) -> (batch_size, num_classes)
if y is not None:
return self.loss(y_pred, y) # 预测值和真实值计算损失
else:
return torch.softmax(y_pred, dim=1) # 输出预测概率分布
# 字符集
def build_vocab():
chars = "abcdefghijklmnopqrstuvwxyz" # 字符集
vocab = {"pad": 0}
for index, char in enumerate(chars):
vocab[char] = index + 1 # 每个字对应一个序号
vocab['unk'] = len(vocab) # 未知字符
return vocab
# 随机生成一个样本
def build_sample(vocab, sentence_length):
# 随机从字表选取sentence_length个字,可能重复
x = [random.choice(list(vocab.keys())) for _ in range(sentence_length)]
# 确保每个样本都包含字符 'a'
a_position = random.randint(0, sentence_length - 1)
x[a_position] = 'a'
# 将字转换成序号
x = [vocab.get(word, vocab['unk']) for word in x]
return x, a_position
# 建立数据集
def build_dataset(sample_length, vocab, sentence_length):
dataset_x = []
dataset_y = []
for i in range(sample_length):
x, y = build_sample(vocab, sentence_length)
dataset_x.append(x)
dataset_y.append(y)
return torch.LongTensor(dataset_x), torch.LongTensor(dataset_y)
# 建立模型
def build_model(vocab, char_dim, sentence_length):
num_classes = sentence_length # 分类数等于句子长度(每个位置一个类别)
model = TorchModel(char_dim, sentence_length, vocab, num_classes)
return model
# 测试代码
def evaluate(model, vocab, sentence_length):
model.eval()
x, y = build_dataset(200, vocab, sentence_length) # 建立200个用于测试的样本
correct, wrong = 0, 0
with torch.no_grad():
y_pred = model(x) # 模型预测
y_pred = torch.argmax(y_pred, dim=1) # 获取预测的类别
for y_p, y_t in zip(y_pred, y): # 与真实标签进行对比
if y_p == y_t:
correct += 1
else:
wrong += 1
print("正确预测个数:%d, 正确率:%f" % (correct, correct / (correct + wrong)))
return correct / (correct + wrong)
def main():
# 配置参数
epoch_num = 30 # 训练轮数
batch_size = 20 # 每次训练样本个数
train_sample = 500 # 每轮训练总共训练的样本总数
char_dim = 20 # 每个字的维度
sentence_length = 6 # 样本文本长度
learning_rate = 0.005 # 学习率
# 建立字表
vocab = build_vocab()
# 建立模型
model = build_model(vocab, char_dim, sentence_length)
# 选择优化器
optim = torch.optim.Adam(model.parameters(), lr=learning_rate)
log = []
# 训练过程
for epoch in range(epoch_num):
model.train()
watch_loss = []
for batch in range(int(train_sample / batch_size)):
x, y = build_dataset(batch_size, vocab, sentence_length) # 构造一组训练样本
optim.zero_grad() # 梯度归零
loss = model(x, y) # 计算loss
loss.backward() # 计算梯度
optim.step() # 更新权重
watch_loss.append(loss.item())
print("=========\n第%d轮平均loss:%f" % (epoch + 1, np.mean(watch_loss)))
acc = evaluate(model, vocab, sentence_length) # 测试本轮模型结果
log.append([acc, np.mean(watch_loss)])
# 保存模型
torch.save(model.state_dict(), "model.pth")
# 保存词表
with open("vocab.json", "w", encoding="utf8") as writer:
writer.write(json.dumps(vocab, ensure_ascii=False, indent=2))
# 生成200个随机测试样本
test_strings = []
for _ in range(200):
# 随机生成一个包含字符 'a' 的字符串
chars = list("abcdefghijklmnopqrstuvwxyz")
s = [random.choice(chars) for _ in range(sentence_length)]
a_pos = random.randint(0, sentence_length - 1)
s[a_pos] = 'a' # 确保字符串中包含 'a'
test_strings.append(''.join(s))
# 用模型预测并输出结果
print("\n随机生成的200个测试样本预测结果:")
predict("model.pth", "vocab.json", test_strings)
# 使用训练好的模型做预测
def predict(model_path, vocab_path, input_strings):
char_dim = 20 # 每个字的维度
sentence_length = 6 # 样本文本长度
# 加载字符表
vocab = json.load(open(vocab_path, "r", encoding="utf8"))
# 建立模型
model = build_model(vocab, char_dim, sentence_length)
# 加载训练好的权重
model.load_state_dict(torch.load(model_path))
x = []
for input_string in input_strings:
# 将输入字符串转换为字符序号列表,并进行填充
seq = [vocab.get(char, vocab['unk']) for char in input_string]
if len(seq) < sentence_length:
seq = seq + [vocab['pad']] * (sentence_length - len(seq))
else:
seq = seq[:sentence_length]
x.append(seq)
model.eval() # 测试模式
with torch.no_grad(): # 不计算梯度
result = model.forward(torch.LongTensor(x)) # 模型预测
# 只打印前10个结果和最后10个结果,避免输出过多
if len(input_strings) > 20:
display_indices = list(range(10)) + list(range(len(input_strings) - 10, len(input_strings)))
for i in display_indices:
predicted_position = torch.argmax(result[i]).item()
print(f"输入:{input_strings[i]}, 预测类别:{predicted_position}, 概率分布:{result[i].numpy()}")
print(f"... 中间省略 {len(input_strings) - 20} 个结果 ...")
else:
for i, input_string in enumerate(input_strings):
predicted_position = torch.argmax(result[i]).item()
print(f"输入:{input_string}, 预测类别:{predicted_position}, 概率分布:{result[i].numpy()}")
if __name__ == "__main__":
main()
预测率百分之90
改进优化
要提升模型准确率,可以从数据、模型架构、训练策略等多方面优化。以下是针对你提供的 RNN 模型的具体改进方案,结合代码实现,逐步提升准确率:
一、数据增强与预处理优化
1. 增加数据多样性(解决过拟合)
- 问题:原数据仅包含 26 个字母,且强制每个样本包含
a,可能导致模型泛化能力不足。 - 改进:
- 允许样本不包含
a(需新增类别-1或视为无效样本),或在预测时将 “无 a” 作为特殊类别。 - 引入更多字符(如标点、大写字母),提升输入多样性。
- 允许样本不包含
2. 延长序列长度(适配更长文本)
- 问题:固定
sentence_length=6限制了模型处理更长序列的能力。 - 改进:
- 动态填充序列,允许输入长度可变(需修改 RNN 层为
pack_padded_sequence)。 - 增加最大序列长度(如设为 10),生成更多样化的样本。
- 动态填充序列,允许输入长度可变(需修改 RNN 层为
代码调整示例:
python
运行
# 允许序列长度可变(以最大长度10为例)
sentence_length = 10
x, y = build_sample(vocab, sentence_length) # 生成长度为10的样本
二、模型架构优化
1. 改用双向 RNN(捕捉上下文信息)
- 问题:单向 GRU 仅能捕捉上文信息,无法利用下文语境。
- 改进:使用
nn.BiGRU替换单向 GRU,增强特征提取能力。
代码调整:
python
运行
class TorchModel(nn.Module):
def __init__(self, vector_dim, sentence_length, vocab, num_classes):
super().__init__()
self.embedding = nn.Embedding(len(vocab), vector_dim, padding_idx=0)
# 双向GRU,输出维度翻倍(hidden_size * 2)
self.rnn = nn.BiGRU(vector_dim, vector_dim, batch_first=True, bidirectional=True)
self.classify = nn.Linear(vector_dim * 2, num_classes) # 线性层输入维度改为2*vector_dim
self.loss = nn.CrossEntropyLoss()
2. 增加网络层数与隐藏层维度
- 问题:单层 GRU 表达能力有限,隐藏层维度(20)较小。
- 改进:
- 增加 GRU 层数(如
num_layers=2)。 - 增大隐藏层维度(如
vector_dim=32)。
- 增加 GRU 层数(如
代码调整:
python
运行
self.rnn = nn.BiGRU(vector_dim, vector_dim, batch_first=True, bidirectional=True, num_layers=2)
三、训练策略优化
1. 调整学习率与优化器
- 问题:初始学习率(0.005)可能过高,导致收敛不稳定。
- 改进:
- 降低学习率(如
lr=0.001),并使用学习率衰减(StepLR)。 - 改用更鲁棒的优化器(如
AdamW),减少过拟合。
- 降低学习率(如
代码调整:
python
运行
from torch.optim import lr_scheduler
optim = torch.optim.AdamW(model.parameters(), lr=0.001)
scheduler = lr_scheduler.StepLR(optim, step_size=5, gamma=0.9) # 每5轮衰减学习率
for epoch in range(epoch_num):
# 训练...
scheduler.step() # 更新学习率
2. 增加正则化(防止过拟合)
- 问题:模型可能在训练集上过拟合,导致测试集准确率下降。
- 改进:
- 在 RNN 层后添加
Dropout层。 - 在损失函数中添加权重衰减(L2 正则)。
- 在 RNN 层后添加
代码调整:
python
运行
class TorchModel(nn.Module):
def __init__(self, vector_dim, sentence_length, vocab, num_classes):
super().__init__()
self.embedding = nn.Embedding(len(vocab), vector_dim, padding_idx=0)
self.rnn = nn.BiGRU(vector_dim, vector_dim, batch_first=True, bidirectional=True, num_layers=2)
self.dropout = nn.Dropout(0.2) # 添加Dropout层
self.classify = nn.Linear(vector_dim * 2, num_classes)
self.loss = nn.CrossEntropyLoss()
def forward(self, x, y=None):
x = self.embedding(x)
out, hidden = self.rnn(x)
out = self.dropout(out) # 在RNN输出后应用Dropout
# 取最后一个时间步的隐藏状态(双向合并)
hidden = torch.cat((hidden[-2, :, :], hidden[-1, :, :]), dim=1) # 双向GRU的隐藏状态拼接
y_pred = self.classify(hidden)
# ...
四、预测逻辑优化
1. 利用序列所有时间步输出(而非仅最后隐藏状态)
- 问题:原模型仅使用最后一个时间步的隐藏状态,忽略中间时间步的信息。
- 改进:对 RNN 的所有时间步输出取平均或池化,获取更全面的特征。
代码调整:
python
运行
def forward(self, x, y=None):
x = self.embedding(x)
out, _ = self.rnn(x) # out shape: (batch_size, seq_len, hidden_size*2)
out = self.dropout(out)
# 平均池化:对所有时间步取平均
out = torch.mean(out, dim=1)
y_pred = self.classify(out)
# ...
五、完整优化后代码(关键部分)
python
运行
class TorchModel(nn.Module):
def __init__(self, vector_dim, sentence_length, vocab, num_classes):
super().__init__()
self.embedding = nn.Embedding(len(vocab), vector_dim, padding_idx=0)
self.rnn = nn.BiGRU(
vector_dim,
vector_dim,
batch_first=True,
bidirectional=True,
num_layers=2,
dropout=0.2 # 层间Dropout
)
self.dropout = nn.Dropout(0.3)
self.classify = nn.Linear(vector_dim * 2, num_classes)
self.loss = nn.CrossEntropyLoss()
def forward(self, x, y=None):
x = self.embedding(x) # (batch, seq_len, vec_dim)
out, _ = self.rnn(x) # (batch, seq_len, 2*vec_dim)
out = self.dropout(out)
# 平均池化替代仅取最后隐藏状态
out = torch.mean(out, dim=1) # (batch, 2*vec_dim)
y_pred = self.classify(out)
if y is not None:
return self.loss(y_pred, y)
else:
return torch.softmax(y_pred, dim=1)
# 训练配置优化
epoch_num = 50 # 增加训练轮数
batch_size = 32 # 增大批量大小
train_sample = 2000 # 增加训练样本数量
char_dim = 32 # 增大词向量维度
learning_rate = 0.001 # 降低学习率
六、预期效果
- 准确率提升:通过双向 RNN、数据增强、正则化等优化,测试集准确率可从 90% 提升至95% 以上。
- 泛化能力增强:模型能更好处理不同长度序列、字符分布变化的情况。
注意:实际效果需根据数据规模和硬件条件调整,建议逐步验证单一优化项的影响(如先加双向 RNN,再加数据增强),定位关键改进点。
# coding:utf8
import torch
import torch.nn as nn
import numpy as np
import random
import json
from torch.optim import lr_scheduler
"""
基于pytorch的网络编写
实现一个RNN网络完成多分类任务
判断字符 'a' 第一次出现在字符串中的位置
"""
class TorchModel(nn.Module):
def __init__(self, vector_dim, sentence_length, vocab, num_classes):
super(TorchModel, self).__init__()
self.embedding = nn.Embedding(len(vocab), vector_dim, padding_idx=0) # embedding层
# 修复:使用nn.GRU并设置bidirectional=True实现双向GRU
self.rnn = nn.GRU(
vector_dim,
vector_dim,
batch_first=True,
bidirectional=True,
num_layers=2, # 2层网络
dropout=0.2 # 层间dropout
)
self.dropout = nn.Dropout(0.3) # 输出层dropout
self.classify = nn.Linear(vector_dim * 2, num_classes) # 线性层,输入维度需乘以2(双向)
self.loss = nn.CrossEntropyLoss() # loss函数采用交叉熵损失
# 当输入真实标签,返回loss值;无真实标签,返回预测值
def forward(self, x, y=None):
x = self.embedding(x) # (batch_size, sen_len) -> (batch_size, sen_len, vector_dim)
out, _ = self.rnn(x) # (batch_size, sen_len, vector_dim*2)
out = self.dropout(out)
# 使用平均池化替代仅最后时间步的隐藏状态,捕获更全面的序列信息
out = torch.mean(out, dim=1) # (batch_size, vector_dim*2)
y_pred = self.classify(out) # (batch_size, num_classes)
if y is not None:
return self.loss(y_pred, y) # 预测值和真实值计算损失
else:
return torch.softmax(y_pred, dim=1) # 输出预测概率分布
# 字符集,增加多样性
def build_vocab():
chars = "abcdefghijklmnopqrstuvwxyz0123456789,.!?-_" # 增加数字和符号
vocab = {"pad": 0}
for index, char in enumerate(chars):
vocab[char] = index + 1 # 每个字对应一个序号
vocab['unk'] = len(vocab) # 未知字符
return vocab
# 随机生成一个样本,支持更灵活的配置
def build_sample(vocab, sentence_length, force_a=True):
# 随机从字表选取sentence_length个字,可能重复
x = [random.choice(list(vocab.keys())) for _ in range(sentence_length)]
# 控制字符 'a' 的出现概率,增加数据多样性
if force_a or random.random() > 0.2: # 80%的概率包含 'a'
a_position = random.randint(0, sentence_length - 1)
x[a_position] = 'a'
# 将字符转换为对应的索引
x_indices = [vocab.get(char, vocab['unk']) for char in x]
return x_indices, a_position
else:
# 不包含 'a' 的情况,类别设为 -1(实际训练时会被过滤)
x_indices = [vocab.get(char, vocab['unk']) for char in x]
return x_indices, -1
# 建立数据集,支持更灵活的配置
def build_dataset(sample_length, vocab, sentence_length, force_a=True):
dataset_x = []
dataset_y = []
for i in range(sample_length):
x, y = build_sample(vocab, sentence_length, force_a)
# 过滤掉不包含 'a' 的样本(y=-1)
if y != -1:
dataset_x.append(x)
dataset_y.append(y)
# 确保有数据可用
if len(dataset_x) == 0:
raise ValueError("生成的样本中没有符合条件的样本(全部不包含 'a')")
return torch.LongTensor(dataset_x), torch.LongTensor(dataset_y)
# 建立模型
def build_model(vocab, char_dim, sentence_length):
num_classes = sentence_length # 分类数等于句子长度(每个位置一个类别)
model = TorchModel(char_dim, sentence_length, vocab, num_classes)
return model
# 测试代码
def evaluate(model, vocab, sentence_length):
model.eval()
x, y = build_dataset(500, vocab, sentence_length, force_a=True) # 使用500个样本评估
correct, wrong = 0, 0
with torch.no_grad():
y_pred = model(x) # 模型预测
y_pred = torch.argmax(y_pred, dim=1) # 获取预测的类别
for y_p, y_t in zip(y_pred, y): # 与真实标签进行对比
if y_p == y_t:
correct += 1
else:
wrong += 1
accuracy = correct / (correct + wrong)
print(f"正确预测个数:{correct}, 总样本数:{correct + wrong}, 正确率:{accuracy:.4f}")
return accuracy
def main():
# 配置参数
epoch_num = 50 # 增加训练轮数
batch_size = 32 # 增大batch size
train_sample = 2000 # 增加训练样本数量
char_dim = 32 # 增大词向量维度
sentence_length = 8 # 增加句子长度,提高任务难度
learning_rate = 0.001 # 降低初始学习率
# 建立字表
vocab = build_vocab()
# 建立模型
model = build_model(vocab, char_dim, sentence_length)
# 选择优化器,使用AdamW(带权重衰减的Adam)
optim = torch.optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=1e-4)
# 学习率调度器,每10轮降低学习率
scheduler = lr_scheduler.StepLR(optim, step_size=10, gamma=0.5)
log = []
# 训练过程
for epoch in range(epoch_num):
model.train()
watch_loss = []
for batch in range(int(train_sample / batch_size)):
x, y = build_dataset(batch_size, vocab, sentence_length) # 构造一组训练样本
optim.zero_grad() # 梯度归零
loss = model(x, y) # 计算loss
loss.backward() # 计算梯度
optim.step() # 更新权重
watch_loss.append(loss.item())
# 更新学习率
scheduler.step()
# 打印当前学习率
current_lr = scheduler.get_last_lr()[0]
print(f"=========\n第{epoch + 1}/{epoch_num}轮,平均loss:{np.mean(watch_loss):.4f},学习率:{current_lr:.6f}")
acc = evaluate(model, vocab, sentence_length) # 测试本轮模型结果
log.append([acc, np.mean(watch_loss)])
# 保存模型
torch.save(model.state_dict(), "model_optimized.pth")
# 保存词表
with open("vocab_optimized.json", "w", encoding="utf8") as writer:
writer.write(json.dumps(vocab, ensure_ascii=False, indent=2))
# 生成200个随机测试样本
test_strings = []
for _ in range(200):
# 随机生成一个包含字符 'a' 的字符串
chars = list("abcdefghijklmnopqrstuvwxyz0123456789,.!?-_")
s = [random.choice(chars) for _ in range(sentence_length)]
a_pos = random.randint(0, sentence_length - 1)
s[a_pos] = 'a' # 确保字符串中包含 'a'
test_strings.append(''.join(s))
# 用模型预测并输出结果
print("\n随机生成的200个测试样本预测结果(展示前10个和后10个):")
predict("model_optimized.pth", "vocab_optimized.json", test_strings)
# 使用训练好的模型做预测
def predict(model_path, vocab_path, input_strings):
char_dim = 32 # 与训练时保持一致
sentence_length = 8 # 与训练时保持一致
# 加载字符表
vocab = json.load(open(vocab_path, "r", encoding="utf8"))
# 建立模型
model = build_model(vocab, char_dim, sentence_length)
# 加载训练好的权重
model.load_state_dict(torch.load(model_path))
x = []
for input_string in input_strings:
# 将输入字符串转换为字符序号列表,并进行填充
seq = [vocab.get(char, vocab['unk']) for char in input_string]
if len(seq) < sentence_length:
seq = seq + [vocab['pad']] * (sentence_length - len(seq))
else:
seq = seq[:sentence_length]
x.append(seq)
model.eval() # 测试模式
with torch.no_grad(): # 不计算梯度
result = model.forward(torch.LongTensor(x)) # 模型预测
# 只打印前10个结果和最后10个结果,避免输出过多
if len(input_strings) > 20:
display_indices = list(range(10)) + list(range(len(input_strings) - 10, len(input_strings)))
for i in display_indices:
predicted_position = torch.argmax(result[i]).item()
print(f"输入:{input_strings[i]}, 预测类别:{predicted_position}, 概率分布:{result[i].numpy()}")
print(f"... 中间省略 {len(input_strings) - 20} 个结果 ...")
else:
for i, input_string in enumerate(input_strings):
predicted_position = torch.argmax(result[i]).item()
print(f"输入:{input_strings[i]}, 预测类别:{predicted_position}, 概率分布:{result[i].numpy()}")
if __name__ == "__main__":
main()


















