文章目录
- 前言
 - 完整代码
 - 代码解析
 - 1. 导入必要的库
 - 2. 设备配置
 - 3. 超参数设置
 - 4. 加载MNIST数据集
 - 5. 创建数据加载器
 - 6. 定义卷积神经网络模型
 - 7. 实例化模型并移动到设备
 - 8. 定义损失函数和优化器
 - 9. 训练模型
 - 10. 测试模型
 - 11. 保存模型
 
- 常用函数解析
 - 小改进
 - 数据集部分可视化
 - 训练过程可视化
 
前言
今天要介绍的这段代码是一个使用PyTorch框架实现的卷积神经网络(CNN)模型,用于对MNIST数据集进行分类的示例。MNIST数据集是手写数字识别领域的一个标准数据集,包含0到9的灰度图像。
代码的主要组成部分如下:
-  
导入必要的库:导入PyTorch、PyTorch神经网络模块、torchvision(用于处理图像数据集)和transforms(用于图像预处理)。
 -  
设备配置:设置模型运行的设备,优先使用GPU(如果可用),否则使用CPU。
 -  
超参数设置:定义了训练迭代的轮数(
num_epochs)、类别数(num_classes)、批次大小(batch_size)和学习率(learning_rate)。 -  
加载MNIST数据集:使用
torchvision.datasets.MNIST加载MNIST训练集和测试集,并应用transforms.ToTensor将图像转换为张量。 -  
创建数据加载器:使用
torch.utils.data.DataLoader创建训练和测试数据的加载器,以便在训练和测试过程中批量加载数据。 -  
定义卷积神经网络模型:定义了一个名为
ConvNet的类,继承自nn.Module。模型包含两个卷积层(每层后接批量归一化和ReLU激活函数),以及一个全连接层。 -  
实例化模型并移动到设备:创建
ConvNet模型的实例,并将其移动到之前设置的设备上。 -  
定义损失函数和优化器:使用
nn.CrossEntropyLoss作为损失函数,torch.optim.Adam作为优化器。 -  
训练模型:进行多个epoch的训练,每个epoch中对数据集进行遍历,执行前向传播、损失计算、反向传播和参数更新。
 -  
测试模型:在测试阶段,将模型设置为评估模式,并禁用梯度计算以提高效率,然后计算模型在测试集上的准确率。
 -  
保存模型:使用
torch.save保存训练后的模型参数到文件,以便将来可以重新加载和使用模型。 
完整代码
import torch 
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
# Device configuration
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
# Hyper parameters
num_epochs = 5
num_classes = 10
batch_size = 100
learning_rate = 0.001
# MNIST dataset
train_dataset = torchvision.datasets.MNIST(root='../../data/',
                                           train=True, 
                                           transform=transforms.ToTensor(),
                                           download=True)
test_dataset = torchvision.datasets.MNIST(root='../../data/',
                                          train=False, 
                                          transform=transforms.ToTensor())
# Data loader
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                           batch_size=batch_size, 
                                           shuffle=True)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
                                          batch_size=batch_size, 
                                          shuffle=False)
# Convolutional neural network (two convolutional layers)
class ConvNet(nn.Module):
    def __init__(self, num_classes=10):
        super(ConvNet, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        self.layer2 = nn.Sequential(
            nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        self.fc = nn.Linear(7*7*32, num_classes)
        
    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.reshape(out.size(0), -1)
        out = self.fc(out)
        return out
model = ConvNet(num_classes).to(device)
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
# Train the model
total_step = len(train_loader)
for epoch in range(num_epochs):
    for i, (images, labels) in enumerate(train_loader):
        images = images.to(device)
        labels = labels.to(device)
        
        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        if (i+1) % 100 == 0:
            print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}' 
                   .format(epoch+1, num_epochs, i+1, total_step, loss.item()))
# Test the model
model.eval()  # eval mode (batchnorm uses moving mean/variance instead of mini-batch mean/variance)
with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in test_loader:
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
    print('Test Accuracy of the model on the 10000 test images: {} %'.format(100 * correct / total))
# Save the model checkpoint
torch.save(model.state_dict(), 'model.ckpt')
 
代码解析
1. 导入必要的库
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
 
- 导入PyTorch及其神经网络(nn)、计算机视觉(vision)模块和变换(transforms)模块。
 
2. 设备配置
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
 
- 使用
torch.device设置模型运行的设备,优先使用GPU(如果可用)。 
3. 超参数设置
num_epochs = 5
num_classes = 10
batch_size = 100
learning_rate = 0.001
 
- 定义训练迭代的轮数(
num_epochs),输出类别的数量(num_classes),每个批次的样本数(batch_size),以及优化算法的学习率(learning_rate)。 
4. 加载MNIST数据集
train_dataset = torchvision.datasets.MNIST(root='../../data/',
                                           train=True, 
                                           transform=transforms.ToTensor(),
                                           download=True)
test_dataset = torchvision.datasets.MNIST(root='../../data/',
                                          train=False, 
                                          transform=transforms.ToTensor())
 
- 使用
torchvision.datasets.MNIST加载MNIST数据集,包括训练集和测试集。transforms.ToTensor将图像数据转换为张量。 
5. 创建数据加载器
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                           batch_size=batch_size, 
                                           shuffle=True)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
                                          batch_size=batch_size, 
                                          shuffle=False)
 
- 使用
torch.utils.data.DataLoader创建数据加载器,用于批量加载数据,并在训练时打乱数据顺序。 
6. 定义卷积神经网络模型
class ConvNet(nn.Module):
    def __init__(self, num_classes=10):
        super(ConvNet, self).__init__()
        # 定义模型层
        pass
    
    def forward(self, x):
        # 定义前向传播过程
        pass
 
- 定义一个名为
ConvNet的类,继承自nn.Module。在__init__中初始化模型的层,在forward中定义前向传播逻辑。 
7. 实例化模型并移动到设备
model = ConvNet(num_classes).to(device)
 
- 创建
ConvNet模型的实例,并使用.to(device)将其移动到之前设置的设备上。 
8. 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
 
- 定义
nn.CrossEntropyLoss作为损失函数,使用torch.optim.Adam作为优化器。 
9. 训练模型
for epoch in range(num_epochs):
    for i, (images, labels) in enumerate(train_loader):
        # 训练过程
        pass
 
- 遍历所有epoch和batch,执行训练过程,包括数据预处理、前向传播、损失计算、反向传播和参数更新。
 
10. 测试模型
model.eval()  # eval mode
with torch.no_grad():
    # 测试过程
    pass
 
- 将模型设置为评估模式,禁用梯度计算,并执行测试过程,计算模型的准确率。
 
11. 保存模型
torch.save(model.state_dict(), 'model.ckpt')
 
- 使用
torch.save保存模型的状态字典到文件,以便之后可以重新加载和使用模型。 
常用函数解析
-  
torch.device(device_str)- 格式:
torch.device(device_str) - 参数:
device_str—— 指定设备类型和编号(如’cuda:0’)或’cpu’。 - 意义:确定模型和张量运行的设备。
 - 用法示例:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu') model.to(device) 
 - 格式:
 -  
torchvision.datasets.MNIST(...)- 格式:
torchvision.datasets.MNIST(root, train, transform, download) - 参数:指定数据集路径、是否为训练集、预处理变换、是否下载数据集。
 - 意义:加载MNIST数据集。
 - 用法示例:
train_dataset = torchvision.datasets.MNIST(root='../../data/', train=True, transform=transforms.ToTensor(), download=True) 
 - 格式:
 -  
torch.utils.data.DataLoader(...)- 格式:
torch.utils.data.DataLoader(dataset, batch_size, shuffle) - 参数:数据集对象、批次大小、是否打乱数据。
 - 意义:创建数据加载器。
 - 用法示例:
train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True) 
 - 格式:
 -  
nn.Module- 格式:作为基类,不直接实例化。
 - 意义:所有神经网络模块的基类。
 - 用法示例:
class ConvNet(nn.Module): def __init__(self, num_classes=10): super(ConvNet, self).__init__() # ... 
 -  
nn.Sequential- 格式:
nn.Sequential(*modules) - 参数:一个模块序列。
 - 意义:按顺序应用多个模块。
 - 用法示例:
self.layer1 = nn.Sequential( nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2), nn.BatchNorm2d(16), nn.ReLU(), nn.MaxPool2d(kernel_size=2, stride=2) ) 
 - 格式:
 -  
nn.Conv2d(...)- 格式:
nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding) - 参数:输入通道数、输出通道数、卷积核大小、步长、填充。
 - 意义:创建二维卷积层。
 - 用法示例:
nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2) 
 - 格式:
 -  
nn.BatchNorm2d(...)- 格式:
nn.BatchNorm2d(num_features) - 参数:特征数量。
 - 意义:创建二维批量归一化层。
 - 用法示例:
nn.BatchNorm2d(16) 
 - 格式:
 -  
nn.ReLU()- 格式:
nn.ReLU() - 意义:创建ReLU激活层。
 - 用法示例:
nn.ReLU() 
 - 格式:
 -  
nn.MaxPool2d(...)- 格式:
nn.MaxPool2d(kernel_size, stride) - 参数:池化核大小、步长。
 - 意义:创建最大池化层。
 - 用法示例:
nn.MaxPool2d(kernel_size=2, stride=2) 
 - 格式:
 -  
nn.Linear(...)- 格式:
nn.Linear(in_features, out_features) - 参数:输入特征数、输出特征数。
 - 意义:创建全连接层。
 - 用法示例:
self.fc = nn.Linear(7*7*32, num_classes) 
 - 格式:
 -  
nn.CrossEntropyLoss()- 格式:
nn.CrossEntropyLoss() - 意义:创建交叉熵损失层。
 - 用法示例:
criterion = nn.CrossEntropyLoss() 
 - 格式:
 -  
torch.optim.Adam(...)- 格式:
torch.optim.Adam(params, lr) - 参数:模型参数、学习率。
 - 意义:创建Adam优化器。
 - 用法示例:
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate) 
 - 格式:
 -  
.to(device)- 格式:
.to(device) - 参数:设备对象。
 - 意义:将模型或张量移动到指定设备。
 - 用法示例:
images = images.to(device) labels = labels.to(device) 
 - 格式:
 -  
.reshape(-1, num_features)- 格式:
reshape(new_shape) - 参数:新形状。
 - 意义:重塑张量形状。
 - 用法示例:
out = out.reshape(out.size(0), -1) 
 - 格式:
 -  
torch.max(outputs.data, 1)- 格式:
torch.max(input, dim) - 参数:输入张量、计算最大值的维度。
 - 意义:获取张量在指定维度上的最大值和索引。
 - 用法示例:
_, predicted = torch.max(outputs.data, 1) 
 - 格式:
 -  
torch.no_grad()- 格式:
torch.no_grad() - 意义:上下文管理器,用于推理或测试阶段禁用梯度计算。
 - 用法示例:
with torch.no_grad(): # 测试模型的代码 
 - 格式:
 -  
.sum().item()- 格式:
.sum(dim).item() - 参数:求和的维度。
 - 意义:计算张量在指定维度的和,并转换为Python数值。
 - 用法示例:
correct += (predicted == labels).sum().item() 
 - 格式:
 -  
model.eval()- 格式:
model.eval() - 意义:将模型设置为评估模式。
 - 用法示例:
model.eval() 
 - 格式:
 -  
torch.save(...)- 格式:
torch.save(obj, f) - 参数:要保存的对象、文件路径。
 - 意义:保存对象到文件。
 - 用法示例:
torch.save(model.state_dict(), 'model.ckpt') 
 - 格式:
 
小改进
在运行代码的时候发现可视化十分简陋,于是进行了第一波可视化小改进:读取部分数据集。
数据集部分可视化
def show_images(images):
    plt.figure(figsize=(10,10))
    for i, img in enumerate(images):
        plt.subplot(5, 5, i+1)
        plt.xticks([])
        plt.yticks([])
        plt.grid(False)
        plt.imshow(img.squeeze().numpy(), cmap='gray')
    plt.show()
# Visualize a few images
dataiter = iter(train_loader)
images, _ = next(dataiter)
show_images(images[:25])  # Visualize 25 images
 
另外,请注意,由于MNIST数据集中的图像是灰度图,它们的形状是(batch_size, channels, height, width),即(100, 1, 28, 28)。在使用show_images函数之前,我们需要将图像重塑为(batch_size, height, width),即(100, 28, 28)。以下是show_images函数的修正:
def show_images(images):
    plt.figure(figsize=(10,10))
    for i, img in enumerate(images):
        plt.subplot(5, 5, i+1)
        plt.xticks([])
        plt.yticks([])
        plt.grid(False)
        plt.imshow(img.squeeze().numpy(), cmap='gray')  # 使用 squeeze() 来去除单维度
    plt.show()
 
确保在使用show_images函数时传递正确形状的图像。如果图像是从DataLoader中获取的,你可能需要使用unsqueeze(0)来添加一个批次维度,然后再调用squeeze()来去除单维度。例如:
images = images.unsqueeze(0)  # 添加一个批次维度
show_images(images[:25])  # 可视化前25张图像
 
这样就可以正确地显示图像了。
 
 然后呢,继续执行代码,我们发现训练过程的可视化也是少得可怜,于是我们再加多一点可视化内容。
训练过程可视化
要对训练过程进行更多的可视化,咱们可以记录每个epoch的损失值,并使用Matplotlib绘制损失随epoch变化的图表。以下是如何修改代码来实现这一点:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
# ...(之前的代码保持不变,包括设备配置、数据加载、模型定义等)
# 训练模型
def train(model, device, train_loader, optimizer, epoch, num_epochs):
    model.train()  # Set the model to training mode
    total_step = len(train_loader)
    losses = []
    
    for i, (images, labels) in enumerate(train_loader):
        images, labels = images.to(device), labels.to(device)
        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        # Collect loss for plotting
        losses.append(loss.item())
        
        if (i+1) % 100 == 0:
            print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'
                   .format(epoch+1, num_epochs, i+1, total_step, loss.item()))
    return losses
# 绘制损失曲线的函数
def plot_losses(epochs, losses):
    plt.figure(figsize=(10, 5))
    for i, loss_per_epoch in enumerate(losses):
        plt.plot(loss_per_epoch, label=f'Epoch {i+1}')
    plt.title('Loss over epochs')
    plt.xlabel('Steps')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()
# 训练过程
losses_over_epochs = []
for epoch in range(num_epochs):
    losses = train(model, device, train_loader, optimizer, epoch, num_epochs)
    losses_over_epochs.append(losses)
    print(f'Epoch {epoch+1}/{num_epochs} - Average Loss: {sum(losses)/len(losses):.4f}')
# 绘制所有epoch的损失曲线
plot_losses(num_epochs, losses_over_epochs)
# ...(测试模型和保存模型的代码保持不变)
 
在这个修改后的代码中,我们添加了两个新的函数:
train:这个函数用于训练模型,并记录每个step的损失。它返回一个包含所有step损失的列表。plot_losses:这个函数接受epoch列表和损失列表作为参数,并绘制出损失随训练step变化的曲线。
在主训练循环中,我们对每个epoch调用train函数,并收集所有epoch的损失,然后使用plot_losses函数绘制损失曲线。
请注意,这里绘制的是每个step的损失,而不是每个epoch的损失均值。如果想要绘制每个epoch的平均损失,可以修改train函数来计算每个epoch的平均损失,并只记录这个值。
咱直接把最终代码贴上:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
# Device configuration
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
# Hyper parameters
num_epochs = 10
num_classes = 10
batch_size = 100
learning_rate = 0.001
# MNIST dataset
train_dataset = torchvision.datasets.MNIST(root='../../data/',
                                           train=True, 
                                           transform=transforms.ToTensor(),
                                           download=True)
test_dataset = torchvision.datasets.MNIST(root='../../data/',
                                          train=False, 
                                          transform=transforms.ToTensor())
# Data loader
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                           batch_size=batch_size, 
                                           shuffle=True)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
                                          batch_size=batch_size, 
                                          shuffle=False)
# Convolutional neural network (two convolutional layers)
class ConvNet(nn.Module):
    def __init__(self, num_classes=10):
        super(ConvNet, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        self.layer2 = nn.Sequential(
            nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        self.fc = nn.Linear(7*7*32, num_classes)
        
    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.view(out.size(0), -1)  # Flatten the output
        out = self.fc(out)
        return out
model = ConvNet(num_classes).to(device)
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
# Function to visualize images
def show_images(images):
    plt.figure(figsize=(10,10))
    for i, img in enumerate(images):
        plt.subplot(5, 5, i+1)
        plt.xticks([])
        plt.yticks([])
        plt.grid(False)
        plt.imshow(img.squeeze().numpy(), cmap='gray')
    plt.show()
# Function to train the model
def train(model, device, train_loader, optimizer, epoch, num_epochs):
    model.train()  # Set the model to training mode
    total_step = len(train_loader)
    losses = []
    
    for i, (images, labels) in enumerate(train_loader):
        images, labels = images.to(device), labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        losses.append(loss.item())
        
        if (i+1) % 100 == 0:
            print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'
                   .format(epoch+1, num_epochs, i+1, total_step, loss.item()))
    return losses
# Function to test the model
def test(model, device, test_loader):
    model.eval()  # Set the model to evaluation mode
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in test_loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return 100 * correct / total
# Function to plot training progress
def plot_progress(epochs, train_losses, test_accuracies):
    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    for i in range(epochs):
        plt.plot(train_losses[i], label=f'Epoch {i+1}')
    plt.title('Training Loss')
    plt.xlabel('Batch')
    plt.ylabel('Loss')
    plt.legend()
    plt.subplot(1, 2, 2)
    plt.plot(test_accuracies, label='Accuracy')
    plt.title('Test Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    plt.legend()
    plt.show()
# Visualize a few images
dataiter = iter(train_loader)
images, _ = next(dataiter)
show_images(images[:25])  # Visualize 25 images
# Initialize lists to monitor loss and accuracy
train_losses = []
test_accuracies = []
# Train the model
for epoch in range(num_epochs):
    print(f'Epoch {epoch+1}/{num_epochs}')
    train_loss = train(model, device, train_loader, optimizer, epoch, num_epochs)
    train_losses.append(train_loss)
    test_accuracy = test(model, device, test_loader)
    test_accuracies.append(test_accuracy)
    print(f'Epoch {epoch+1}/{num_epochs} - Average Loss: {sum(train_loss)/len(train_loss):.4f}, Accuracy: {test_accuracy:.2f}%')
# Plot training progress
plot_progress(num_epochs, train_losses, test_accuracies)
# Save the model checkpoint
torch.save(model.state_dict(), 'model.ckpt')
 
经过修改后的训练过程如下图所示:
 
 
 还有很多很多小改进的方向,就留给各位自己尝试啦。


















