简介:
归纳了一套基本框架,以帮助使用者快速创建新的模型,同时有paddlepaddle版本和pytorch版本的,它们虽有差别,但是对于初级使用者,只是两种不同但是很相近的语法而已。都采用paddle平台作为载体来存项目。
模型框架paddle
这里用paddlepaddle的图像分类项目为例,这是项目链接:
花卉分类:https://aistudio.baidu.com/projectdetail/9165955?sUid=15411139&shared=1&ts=1747486226503
1.导入库与参数配置
import os
import zipfile
import random
import json
import paddle
import sys
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
from paddle.io import Dataset
train_parameters = {
"input_size": [3, 224, 224], # 输入图片的shape
"class_dim": -1, # 分类数
"src_path": "/home/aistudio/data/data6504/flower7595.zip", # 原始数据集路径
"target_path": "/home/aistudio/data/", # 要解压的路径
"train_list_path": "/home/aistudio/data/train.txt", # train.txt路径
"eval_list_path": "/home/aistudio/data/eval.txt", # eval.txt路径
"readme_path": "/home/aistudio/data/readme.json", # readme.json路径
"label_dict": {}, # 标签字典
"num_epochs": 20, # 训练轮数
"train_bath_size": 8, # 训练时每个批次的大小
"skip_steps": 10,
"save_steps": 300,
"learning_strategy": { # 优化函数相关的配置
"lr": 0.0001 # 超参数学习率
},
"checkpoints": "/home/aistudio/work/checkpoints" # 保存的路径
}
2.数据准备
解压等预处理
生成数据列表(路径-标签)get_data_list得到eval.txt和train.txt
建立数据读取器Reader:
__init__使用txt得到列表(path--labels)
__getitem用序号依次读取列表,使用path时直接加载图片
# # 一、数据准备
# (1)解压原始数据集
#
# (2)按照比例划分训练集与验证集
#
# (3)乱序,生成数据列表
#
# (4)定义数据读取器
# In[35]:
def unzip_data(src_path, target_path):
# 解压原始数据集,将src_path路径下的zip压缩包解压至target_path目录下
if (not os.path.isdir(target_path + "Chinese Medicine")):
z = zipfile.ZipFile(src_path, "r")
z.extractall(path=target_path)
z.close()
# In[36]:
#就是建立2个txt,按照样本--标签的形式
# 函数 生成数据列表
def get_data_list(target_path, train_list_path, eval_list_path):
# 存放所有类别的信息
class_detail = []
# 获取所有类别保存的文件夹名称
data_list_path = target_path + "flowers/"
class_dirs = os.listdir(data_list_path)
# 总的图像数量
all_class_images = 0
# 存放类别标签
class_label = 0
# 存放类别数目
class_dim = 0
# 存储要写进eval.txt和train.txt中的内容
trainer_list = []
eval_list = []
# 读取每个类别
for class_dir in class_dirs:
if class_dir != ".DS_Store":
class_dim += 1
# 每个类别的信息
class_detail_list = {}
eval_sum = 0
trainer_sum = 0
# 统计每个类别有多少张图片
class_sum = 0
# 获取类别路径
path = data_list_path + class_dir
# 获取所有图片
img_paths = os.listdir(path)
for img_path in img_paths: # 遍历文件夹下的每个图片
if img_path.split(".")[-1] == "jpg":
name_path = path + '/' + img_path # 每张图片的路径
if class_sum % 8 == 0: # 每8张图片取一个做验证数据
eval_sum += 1 # test_sum为测试数据的数目
eval_list.append(name_path + "\t%d" % class_label + "\n")
else:
trainer_sum += 1
trainer_list.append(name_path + "\t%d" % class_label + "\n")
class_sum += 1 # 每类图片的数目
all_class_images += 1 # 所有类图片的数目
else:
continue
# 说明的json文件的class_detail数据
class_detail_list['class_name'] = class_dir # 类别名称
class_detail_list['class_label'] = class_label # 类别标签
class_detail_list['class_eval_images'] = eval_sum # 该类数据的测试集数目
class_detail_list['class_trainer_images'] = trainer_sum # 该类数据的训练集数目
class_detail.append(class_detail_list)
# 初始化标签列表
train_parameters['label_dict'][str(class_label)] = class_dir
class_label += 1
# 初始化分类树
train_parameters['class_dim'] = class_dim
# 乱序
random.shuffle(eval_list)
with open(eval_list_path, 'a') as f:
for eval_image in eval_list:
f.write(eval_image)
random.shuffle(trainer_list)
with open(train_list_path, 'a') as f2:
for train_image in trainer_list:
f2.write(train_image)
# 说明的json文件信息
readjson = {}
readjson['all_class_name'] = data_list_path # 文件父目录
readjson['all_class_images'] = all_class_images
readjson['class_detail'] = class_detail
jsons = json.dumps(readjson, sort_keys=True, indent=4, separators=(',', ': '))
with open(train_parameters['readme_path'], 'w') as f:
f.write(jsons)
print('生成数据列表完成!')
# In[37]:
# 参数初始化
src_path = train_parameters['src_path']
target_path = train_parameters['target_path']
train_list_path = train_parameters['train_list_path']
eval_list_path = train_parameters['eval_list_path']
# 解压原始数据到指定路径
unzip_data(src_path, target_path)
# 划分训练集与验证集,乱序,生成数据列表
# 每次生成数据列表前,首先清空 train.txt 和 eval.txt
with open(train_list_path, 'w') as f:
f.seek(0)
f.truncate()
with open(eval_list_path, 'w') as f:
f.seek(0)
f.truncate()
# 生成数据列表
get_data_list(target_path, train_list_path, eval_list_path)
# In[38]:s
class Reader(Dataset):
def __init__(self, data_path, mode='train'):
'''
数据读取器
:param data_path:数据集所在路径
:param mode: train or eval
'''
super().__init__()
self.data_path = data_path
self.img_paths = []
self.labels = []
if mode == 'train':
with open(os.path.join(self.data_path, "train.txt"), "r", encoding="utf-8") as f:
self.info = f.readlines()
for img_info in self.info:
img_path, label = img_info.strip().split('\t')
self.img_paths.append(img_path)
self.labels.append(int(label))
else:
with open(os.path.join(self.data_path, "eval.txt"), "r", encoding="utf-8") as f:
self.info = f.readlines()
for img_info in self.info:
img_path, label = img_info.strip().split('\t')
self.img_paths.append(img_path)
self.labels.append(int(label))
def __getitem__(self, index):
# 获取一组数据 :param index: 文件索引号
# 第一步打开图像文件并获取label值
img_path = self.img_paths[index]
img = Image.open(img_path)
if img.mode != 'RGB':
img = img.convert('RGB')
img = img.resize((224, 224), Image.BILINEAR)
img = np.array(img).astype('float32')
img = img.transpose((2, 0, 1)) / 255
label = self.labels[index]
label = np.array([label], dtype="int64")
return img, label
def print_sample(self, index: int = 0):
print("文件名", self.img_paths[index], "\t标签值", self.labels[index])
def __len__(self):
return len(self.img_paths)
# In[39]:
# 训练数据加载
train_dataset = Reader('/home/aistudio/data', mode='train')
train_loader = paddle.io.DataLoader(train_dataset, batch_size=16, shuffle=True)
# 测试数据加载
eval_dataset = Reader('/home/aistudio/data', mode='eval')
eval_loader = paddle.io.DataLoader(eval_dataset, batch_size=8, shuffle=False)
# In[40]:
train_dataset.print_sample(200)
print(train_dataset.__len__())
eval_dataset.print_sample(0)
print(eval_dataset.__len__())
print(eval_dataset.__getitem__(10)[0].shape)
print(eval_dataset.__getitem__(10)[1].shape)
3.模型构建
根据数据集类型和格式建立模型(我这个初学者喜欢用ai生成然后分析一下就结束)
class ConvPool(paddle.nn.Layer):
# 卷积+池化
def __init__(self, num_channels, num_filters, filter_size, pool_size, pool_stride, groups, conv_stride=1,
conv_padding=1, ):
super(ConvPool, self).__init__()
for i in range(groups):
self.add_sublayer( # 添加子层实例
'bb_%d' % i,
paddle.nn.Conv2D( # layer
in_channels=num_channels, # 通道数
out_channels=num_filters, # 卷积核个数
kernel_size=filter_size, # 卷积核大小
stride=conv_stride, # 步长
padding=conv_padding # padding
)
)
self.add_sublayer(
'relu%d' % i,
paddle.nn.ReLU()
)
num_channels = num_filters
self.add_sublayer(
'Maxpool',
paddle.nn.MaxPool2D(
kernel_size=pool_size, # 池化核大小
stride=pool_stride # 池化步长
)
)
def forward(self, inputs):
x = inputs
for prefix, sub_layer in self.named_children():
# print(prefix,sub_layer)
x = sub_layer(x)
return x
# In[42]:
class VGGNet(paddle.nn.Layer):
# 卷积+池化
def __init__(self, num_classes=1000):
super(VGGNet, self).__init__()
# 第一组:64个卷积核,2个卷积层
self.convpool01 = ConvPool(
num_channels=3, # 输入通道数(RGB图像)
num_filters=64,
filter_size=3,
pool_size=2,
pool_stride=2,
groups=2 # 两个连续卷积层
)
# 第二组:128个卷积核,2个卷积层
self.convpool02 = ConvPool(
num_channels=64,
num_filters=128,
filter_size=3,
pool_size=2,
pool_stride=2,
groups=2
)
# 第三组:256个卷积核,3个卷积层
self.convpool03 = ConvPool(
num_channels=128,
num_filters=256,
filter_size=3,
pool_size=2,
pool_stride=2,
groups=3
)
# 第四组:512个卷积核,3个卷积层
self.convpool04 = ConvPool(
num_channels=256,
num_filters=512,
filter_size=3,
pool_size=2,
pool_stride=2,
groups=3
)
# 第五组:512个卷积核,3个卷积层
self.convpool05 = ConvPool(
num_channels=512,
num_filters=512,
filter_size=3,
pool_size=2,
pool_stride=2,
groups=3
)
# 全连接层
self.fc1 = paddle.nn.Linear(512 * 7 * 7, 4096) # 根据输入尺寸计算
self.fc2 = paddle.nn.Linear(4096, 4096)
self.fc3 = paddle.nn.Linear(4096, num_classes)
def forward(self, x, label=None):
"""前向计算"""
# 卷积层部分
out = self.convpool01(x)
out = self.convpool02(out)
out = self.convpool03(out)
out = self.convpool04(out)
out = self.convpool05(out)
# 展平操作
out = paddle.flatten(out, start_axis=1, stop_axis=-1)
# 全连接层
out = paddle.nn.functional.relu(self.fc1(out))
out = paddle.nn.functional.relu(self.fc2(out))
out = self.fc3(out)
# 计算准确率(如果提供了label)
if label is not None:
acc = paddle.metric.accuracy(input=out, label=label)
return out, acc
else:
return out
4.模型训练与预估
# In[43]:
def draw_process(title, color, iters, data, label):
plt.title(title, fontsize=24)
plt.xlabel("iter", fontsize=20)
plt.ylabel(label, fontsize=20)
plt.plot(iters, data, color=color, label=label)
plt.legend()
plt.grid()
plt.show()
# In[44]:
print(train_parameters['class_dim'])
print(train_parameters['label_dict'])
# In[45]:
from paddle.optimizer.lr import CosineAnnealingDecay
model = VGGNet()
model.train()
cross_entropy = paddle.nn.CrossEntropyLoss()
optimizer = paddle.optimizer.Adam(learning_rate=train_parameters['learning_strategy']['lr'],
parameters=model.parameters())
steps = 0
Iters, total_loss, total_acc = [], [], []
for epo in range(train_parameters['num_epochs']):
for _, data in enumerate(train_loader()):
steps += 1
x_data = data[0]
y_data = data[1]
predicts, acc = model(x_data, y_data)
loss = cross_entropy(predicts, y_data)
loss.backward()
optimizer.step()
optimizer.clear_grad()
if steps % train_parameters["skip_steps"] == 0:
Iters.append(steps)
total_loss.append(loss.numpy()[0])
total_acc.append(acc.numpy()[0])
# 打印中间过程
print('epo: {}, step: {}, loss is: {}, acc is: {}'.format(epo, steps, loss.numpy(), acc.numpy()))
# 保存模型参数
if steps % train_parameters["save_steps"] == 0:
save_path = train_parameters["checkpoints"] + "/" + "save_dir_" + str(steps) + '.pdparams'
print('save model to: ' + save_path)
paddle.save(model.state_dict(), save_path)
paddle.save(model.state_dict(), train_parameters["checkpoints"] + "/" + "save_dir_final.pdparams")
draw_process("trainning loss", "red", Iters, total_loss, "trainning loss")
draw_process("trainning acc", "green", Iters, total_acc, "trainning acc")
# # 四、模型评估
# In[46]:
'''
模型评估
'''
from paddle.optimizer.lr import CosineAnnealingDecay
model__state_dict = paddle.load('work/checkpoints/save_dir_final.pdparams')
model_eval = VGGNet()
model_eval.set_state_dict(model__state_dict)
model_eval.eval()
accs = []
for _, data in enumerate(eval_loader()):
x_data = data[0]
y_data = data[1]
predicts = model_eval(x_data)
acc = paddle.metric.accuracy(predicts, y_data)
accs.append(acc.numpy()[0])
print('模型在验证集上的准确率为:', np.mean(accs))
# # 五、模型预测
# In[47]:
def load_image(img_path):
'''
预测图片预处理
'''
img = Image.open(img_path)
if img.mode != 'RGB':
img = img.convert('RGB')
img = img.resize((224, 224), Image.BILINEAR)
img = np.array(img).astype('float32')
img = img.transpose((2, 0, 1)) / 255 # HWC to CHW 及归一化
return img
infer_dst_path = 'data/flowers/rose/'
label_dic = train_parameters['label_dict']
# In[48]:
model__state_dict = paddle.load('work/checkpoints/save_dir_final.pdparams')
model_predict = VGGNet()
model_predict.set_state_dict(model__state_dict)
model_predict.eval()
infer_imgs_path = os.listdir(infer_dst_path)
# print(infer_imgs_path)
for infer_img_path in infer_imgs_path[:10]:
infer_img = load_image(infer_dst_path + infer_img_path)
infer_img = infer_img[np.newaxis, :, :, :] # reshape(-1,3,224,224)
infer_img = paddle.to_tensor(infer_img)
result = model_predict(infer_img)
lab = np.argmax(result.numpy())
print("rose样本: {},被预测为:{}".format(infer_img_path, label_dic[str(lab)]))
模型框架pytorch
这里以pytorch的声音情绪识别分类为例,以下是项目链接:
语音情绪识别:https://aistudio.baidu.com/projectdetail/9166158?sUid=15411139&shared=1&ts=1747488195503
这个pytorch仅作为框架示例,改改模型就可以直接用