三、BP算法
3、梯度下降
w = w - lr * grad: w 表示权重,lr表示学习率,grad表示梯度
传统下降方式分三类:(BGD)批量梯度下降、(MBGD)小批量梯度下降、(SGD)随机梯度下降。机器学习4 线性回归、逻辑回归-CSDN博客 第九点 线性回归的第5点
优化梯度下降:
梯度为 0时,传统三类方法无法优化参数
3.1、指数加权平均 EMA
加权平均 指给每个数赋予不同的权重求得平均数。
移动平均数 指的是计算最近邻的 N 个数来获得平均数。
指数移动加权平均 (EMA ) 指各数值的权重都不同,距离越远的数字对平均数计算的贡献就越小(权重较小),距离越近则对平均数的计算贡献就越大(权重越大)。
假设:与之明天天气,那么今天和昨天的气温就比较有参考性,而上个月的今天可能就失去了关系。
那么通过今天的气温来预料明天的气温可以表示为:
   是平滑系数,取值范围为 
。
 越接近 1,表示对历史数据依赖性越高;越接近 0 则越依赖当前数据。该值越大平均数越平缓。
St 表示指数加权平均值(EMA)
Yt 表示 t 时刻的值
优点:解决了鞍点;使用历史数据可以保持整个结构处于紧密联系
缺点:依赖系数,系数的大小直接导致数据的联系;并没有对学习率优化
代码:
import torch
import matplotlib.pyplot as plt
ELEMENT_NUMBER = 30
# 1. 实际平均温度
def test01():
    # 固定随机数种子
    torch.manual_seed(0)
    # 产生30天的随机温度
    temperature = torch.randn(size=[ELEMENT_NUMBER,]) * 10
    # 绘制随机温度
    days = torch.arange(1, ELEMENT_NUMBER + 1, 1)
    plt.plot(days, temperature, color='r')
    plt.scatter(days, temperature)
    plt.show()
# 2. 指数加权平均温度
def test02(beta=0.9):  # beta 为权重系数
    # 固定随机数种子
    torch.manual_seed(0)
    # 产生30天的随机温度
    temperature = torch.randn(size=[ELEMENT_NUMBER,]) * 10
    # 存放处理后的每天的温度值
    exp_weight_avg = []
    # 遍历温度信息
    for idx, temp in enumerate(temperature):
        # 第一个元素的的 EWA 值等于自身
        if idx == 0:
            exp_weight_avg.append(temp)
            continue
        # 第二个元素的 EWA 值等于上一个 EWA 乘以 β + 当前气温乘以 (1-β)
        new_temp = exp_weight_avg[-1] * beta + (1 - beta) * temp
        exp_weight_avg.append(new_temp)
    days = torch.arange(1, ELEMENT_NUMBER + 1, 1)
    plt.plot(days, exp_weight_avg, color='r')
    plt.scatter(days, temperature)
    plt.show()
if __name__ == '__main__':
    test01()
    test02(0.2)
    test02(0.5)
    test02(0.8)
3.2、Momentum 动量
更好地应对梯度变化和梯度消失问题,从而提高训练模型的效率和稳定性。
惯性效应: 加入前面梯度的累积,使得即便遇到鞍点0,算法依旧沿着当前的方向继续更新。
        公式: = β * 
 + (1- β) * 
  :历史梯度移动加权平均值
  :当前时刻的梯度值
β : 为权重系数
优点:使用惯性平滑的解决了鞍点
缺点:并没有对学习率优化
代码:在优化器中添加参数 momentum= 内容
optimizer = optim.SGD(model.parameters(), lr=0.6, momentum=0.5)  
# momentum 参数指定了动量系数,默认为0。动量系数通常设置为 0 到0.5 之间的一个值3.3、AdaGrad
每个参数获取独立的学习率,根据历史梯度平方和调整学习率,使得参数具有较大的历史梯度的学习率减小,而参数具有较小的历史梯度的学习率保持较大,从而实现更有效的学习。
AdaGrad 避免了统一学习率的不足,更多用于处理稀疏数据和梯度变化较大的问题。
学习率公式: 参数公式:
   参数公式:
初始化学习率 α、初始化参数 θ、小常数 σ = 1e-6、初始化梯度累积变量 s = 0
每次计算梯度g、累计梯度 s = s + g ⊙ g(⊙ 表示各个分量相乘 )
优点:优化了学习率
缺点: 可能存在学习率过度衰减(累积的时间步梯度平方值越来越大,导致学习率逐渐接近零);不适合非稀疏矩阵;没有优化梯度梯度本身
代码:创建优化器对象,参数一为模型对象,参数 lr 设置初始化学习率
optimizer = optim.Adagrad(model.parameters(), lr=0.6)  3.4、RMSProp
用于解决AdaGrad在训练过程中学习率过度衰减的问题。
       修改AdaGrad 累计梯度方法( s = s + g ⊙ g) 为 指数移动平均累积历史梯度 
优点:在AdaGrad基础上优化学习率的衰减 和 鞍点问题
缺点: 对于参数过度依赖
代码:创建优化器对象,参数一为模型对象,参数 lr 设置初始化学习率,参数 momentum 设置初始动量。
optimizer = optim.RMSprop(model.parameters(), lr=0.6, momentum=0.5)  3.5、Adam
将 动量法(优化) 和 RMSProp 的优点结合在一起
优点:一阶动量(即梯度的指数加权平均)来加速收敛;二阶动量(即梯度平方的指数加权平均)来调整学习率;几乎可以在不调整超参数的情况下应用于各种深度学习模型
缺点:超参数敏感:对初始超参数较为敏感;初始阶段快速收敛,可能导致模型陷入局部最优甚至过拟合。
代码:创建优化器对象,参数一为模型对象,参数 lr 设置初始化学习率,参数 momentum 设置初始动量。
optimizer = optim.Adam(model.parameters(), lr=0.6)  四、过拟合与欠拟合
判断过拟合还是欠拟合的方式:判断训练误差和测试误差,两者都高为欠拟合,训练误差效果可以,但是测试误差高则为过拟合
1、欠拟合
1.1、概念
训练误差和测试误差都高。模型在训练数据和测试数据上的表现都不好,说明模型可能太简单,无法捕捉到数据中的复杂模式。
1.2、解决方式
增加模型复杂度:引入更多的参数、增加神经网络的层数或节点数量,使模型能够捕捉到数据中的复杂模式。
增加特征:通过特征工程添加更多有意义的特征,使模型能够更好地理解数据。
减少正则化强度:适当减小 L1、L2 正则化强度,允许模型有更多自由度来拟合数据。
训练更长时间:如果是因为训练不足导致的欠拟合,可以增加训练的轮数或时间
2、过拟合
2.1、概念
对训练数据拟合能力很强并表现很好,但在测试数据上表现较差。
可能因为数据少、模型复杂、正则化强度不足导致过度学习

2.2、解决方式
加强正则化力度、降低模型复杂度
3.2.1、L1正则化
        添加权重参数的绝对值之和:
优点:将一些权重变为零,实现特征选择,降低模型复杂度
3.2.2、L2正则化
        添加权重参数的平方和:
优点:不会将权重直接变为 0,而是缩小,这样模型就更加平滑的拟合数据,同时保留足够的表达能力。
3.2.3、Dropout
随机丢弃部分神经元,类似PCA的保留信息。
import torch
x = torch.tensor([1,2,3,1,2,3,1,2,3,1,2,3],dtype=torch.float32)
drop = torch.nn.Dropout(p=0.6)
print(drop(x))
print(f"{sum(drop(x)!=0)}, {x.shape[0]}, {sum(drop(x)!=0)/x.shape[0]}")
五、综合运用
手机价格分类_数据集-阿里云天池 阿里云数据集,手机价格预测
代码思路:
1、加载数据,划分数据集和目标、主成分分析、进行训练和测试划分
2、批量标准化数据,并使用数据加载器每次返回小批量数据
3、创建模型对象
4、训练数据,迭代加载训练数据集
5、测试数据,根据训练数据标准化对象直接对测试数据进行标准化(transform,不需要fit)
6、预测数据,同测试数据方法
结果展示:

数据加载文件 dataloader_t.py
加载数据、标准化、主成分分析、生成数据加载器对象
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
#  基础数据集
class Data_Basics(Dataset):
    def __init__(self, path):
        super(Data_Basics, self).__init__()
        # 基础数据加载
        data = pd.read_csv(path)
        print("基础数据已获取")
        self.x = data.iloc[:, :-1]
        self.y = data.iloc[:, -1]
        print(f"已获取特征:{self.x.shape},目标:{self.x.shape}")
        # 主成分分析
        self.x = Data_Pca(self.x)
        print(f"特征主成分分析已完成")
        # 将数据转化为 tensor
        self.x = torch.tensor(self.x, dtype=torch.float32)
        self.y = torch.tensor(self.y.values, dtype=torch.int64)
    def __len__(self):
        return len(self.x)
    def __getitem__(self, index):
        return self.x[index], self.y[index]
#  预测数据集
class Data_Preddataset(Dataset):
    def __init__(self, path):
        super(Data_Preddataset, self).__init__()
        # 基础数据加载
        data = pd.read_csv(path)
        self.x = data
        # 主成分分析
        self.x = Data_Pca(self.x)
        # 将数据转化为 tensor
        self.x = torch.tensor(self.x, dtype=torch.float32)
    def __len__(self):
        return len(self.x)
    def __getitem__(self, index):
        return self.x[index]
# 主成分分析
def Data_Pca(data):
    transfer1 = PCA(n_components=0.95)
    data = transfer1.fit_transform(data)
    return data
# 数据标准化
# 训练数据创建对象,测试数据直接使用训练数据标准的内容
def Data_Standard(data, type="train"):
    if type == "train":
        global transfer
        transfer = StandardScaler()
        train_data = transfer.fit_transform(data, type)
        return torch.tensor(train_data, dtype=torch.float32)
    else:
        test_data = transfer.transform(data)
        return torch.tensor(test_data, dtype=torch.float32)
# 数据加载器
def Data_Loader(data_path,pred_path = None):
    # 加载基础数据
    dataset = Data_Basics(data_path)
    # 获取数据集的类别特征
    y_unique = dataset.y.unique()
     # 划分训练集和测试集
    train_data, test_data, train_labels, test_labels = train_test_split(
        dataset.x, dataset.y, test_size=0.2, random_state=222, shuffle=True, stratify=dataset.y
    )
    
    # 数据标准化
    train_data = Data_Standard(train_data)
    # 根据训练好的数据标准化模型 标准化数据
    test_data = Data_Standard(test_data,type="test")
    print("训练和测试数据标准化完成")
     # 将划分结果转化为 tensor
    train_dataset = torch.utils.data.TensorDataset(train_data, train_labels)
    test_dataset = torch.utils.data.TensorDataset(test_data, test_labels)
    # 创建数据加载器
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True)
    test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=False)
    print("训练和测试数据加载器创建完成")
    # 判断是否有给出预测地址
    if pred_path != None:
        preddataset = Data_Preddataset(data_path)
        pred_data = preddataset.x
        pred_data = Data_Standard(pred_data,type="test")
        print("预测数据标准化完成")
        pred_dataset = torch.utils.data.TensorDataset(pred_data)
        pred_loader = torch.utils.data.DataLoader(pred_dataset, batch_size=32, shuffle=False)
        print("预测数据加载器创建完成")
        
    return train_loader, test_loader,y_unique,pred_loader
    # 使用字典
    return {'train_loader':train_loader, 'test_loader':test_loader, 'y_unique':y_unique, 'pred_loader':pred_loader}网络模型文件 datanet_t.py
五层网络模型:前四层使用 LeakyReLU 函数激活;最后一次因为需要使用交叉熵,所以无需使用激活函数。
import torch
import torch.nn as nn
class Net(nn.Module):
    def __init__(self,input_size, out_size):
        super(Net,self).__init__()
        self.hide1 = nn.Sequential(
                                nn.Linear(input_size, 128),
                                nn.LeakyReLU(0.05)
                                )
        self.hide2 = nn.Sequential(
                                nn.Linear(128, 256),
                                nn.LeakyReLU(0.05)
                                )
        self.hide3 = nn.Sequential(
                                  nn.Linear(256, 512),
                                  nn.LeakyReLU(0.05),
                                )
        self.hide4 = nn.Sequential(
                                nn.Linear(512, 128),
                                nn.LeakyReLU(0.05)
                                )
        self.out = nn.Linear(128, out_size)
        print("网络对象创建完成")
    def forward(self, inpu_data):
        temp_data = self.hide1(inpu_data)
        temp_data = self.hide2(temp_data)
        temp_data = self.hide3(temp_data)
        temp_data = self.hide4(temp_data)
        out_data = self.out(temp_data)
        return out_data
    
    def weight_start(self):
        torch.nn.init.kaiming_normal(self.hide1[0].weight, nonlinearity='leaky_relu')
        torch.nn.init.kaiming_normal(self.hide2[0].weight, nonlinearity='leaky_relu')
        torch.nn.init.kaiming_normal(self.hide3[0].weight, nonlinearity='leaky_relu')
        torch.nn.init.kaiming_normal(self.hide4[0].weight, nonlinearity='leaky_relu')
        print("参数初始化完成")
    
数据训练文件 datatrain_t.py
创建交叉熵损失函数对象,小批量训练。
from dataloader_t import Data_Loader,Data_Basics
import torch
from sklearn.model_selection import train_test_split
from datanet_t import Net
def train(train_loader,y_unique,epochs = 200):
    # 获取 数据加载器
    for x,y in train_loader:
        x_shape, y_shape = x.shape, y.shape
        break
    # 创建模型 引入每次训练集合的列数
    # 调用模型初始化
    model = Net(x_shape[1], len(y_unique))
    model.weight_start()
    # 创建损失函数
    loss_fn = torch.nn.CrossEntropyLoss()
    print("损失函数对象完成")
    # 创建优化器
    optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
    print("优化器对象完成,开始训练")
    # 根据训练代数循环
    for epoch in range(epochs):
        print(f"训练代数:{epoch}")
        for x_train, y_true in train_loader:
            # 预测类别
            y_pred = model(x_train)
            # 计算损失函数
            loss = loss_fn(y_pred, y_true)
            # 梯度清零
            optimizer.zero_grad()
            # 反向传播
            loss.backward()
            # 梯度更新
            optimizer.step()
    print("训练完成")
    # 保存模型
    torch.save(model.state_dict(), "../model/model.pth")
    print("训模型保存成功")
数据测试文件 datatest_t.py
import torch
from datanet_t import Net
def Data_test(train_loader,test_loader,y_unique):
    # 获取创建模型的输入和输出特征
    for x,y in train_loader:
        x_shape, y_shape = x.shape, y.shape
        break
    
    # 创建模型
    model = Net(x_shape[1], len(y_unique))
    model.load_state_dict(torch.load(f"../model/model.pth"))
    model.eval()
    print("模型加载完成,开始测试")
    with torch.no_grad():  # 关闭梯度计算
        true_num = 0
        sum_num = 0
        for x_test,y_true in test_loader:
            predictions = model(x_test)
            predicted_classes = torch.argmax(predictions,dim=1)  # 获取预测类别
            true_num += (predicted_classes == y_true).sum().item()
            sum_num += len(y_true)
        print(f"测试结束,分数为:{true_num/sum_num}")
        
        
数据预测文件 datapred_t.py
import torch
import torch.nn as nn
from datanet_t import Net
def Data_pred(train_loader,pred_data,y_unique):
    # 获取创建模型的输入和输出特征
    for x,y in train_loader:
        x_shape = x.shape
        break
    
    # 创建模型
    model = Net(x_shape[1], len(y_unique))
    model.load_state_dict(torch.load(f"../model/model.pth"))
    model.eval()
    print("模型加载完成,开始预测")
    with torch.no_grad():  # 关闭梯度计算
        predicted_classes_list =[]
        for x_pred, in pred_data:
            y_pred = model(x_pred)
            predicted_classes = torch.argmax(y_pred,dim=1).tolist()
            predicted_classes_list.extend(predicted_classes)
        print(f"预测结果为:\n{predicted_classes_list}")
        #     predicted_classes.extend = torch.argmax(y_pred,dim=1).tolist()  # 获取预测类别
        # print(f"预测结果为:{predicted_classes}")
主运行文件 mian.py
from dataloader_t import Data_Loader,Data_Basics
from datatrain_t import train
from datatest_t import Data_test
from datapred_t import Data_pred
class Main:
    def __init__(self):
        self.x = 1
    def mian_data(self):
    # 加载数据
        self.train_loader,self.test_loader,self.y_unique,self.pred_loader =  Data_Loader("../data/手机价格预测.csv","../data/手机价格预测test.csv")
    def mian_train(self):
        # 训练
        train(self.train_loader,self.y_unique,epochs=200)
    def main_test(self):
        # 测试
        Data_test(self.train_loader,self.test_loader,self.y_unique)
        
    def main_pred(self):
        # 预测
        Data_pred(self.train_loader,self.pred_loader,self.y_unique)
def mian_data():
    global train_loader,test_loader,y_unique,pred_loader
# 加载数据
    train_loader,test_loader,y_unique,pred_loader =  Data_Loader("../data/手机价格预测.csv","../data/手机价格预测test.csv")
def mian_train():
    global train_loader,test_loader,y_unique,pred_loader
    # 训练
    train(train_loader,y_unique,epochs=200)
def main_test():
    
    global train_loader,test_loader,y_unique,pred_loader
    # 测试
    Data_test(train_loader,test_loader,y_unique)
    
def main_pred():
    global train_loader,test_loader,y_unique,pred_loader
    # 预测
    Data_pred(train_loader,pred_loader,y_unique)
if __name__ == "__main__":
    mian_data()
    mian_train()
    main_test()
    main_pred()
# if __name__ == "__main__":
#     main = Main()
#     main.mian_data()
#     main.mian_train()
#     main.main_test()
#     main.main_pred()



















