参考文献:McMahan B, Moore E, Ramage D, et al. Communication-efficient learning of deep networks from decentralized data[C]//Artificial intelligence and statistics. PMLR, 2017: 1273-1282.
参考的文章:
1.联邦学习代码解读,超详细-CSDN博客
2.联邦学习开山之作代码解读与收获_联邦学习代码-CSDN博客
3.数据独立同分布vs非独立同分布:https://www.zhihu.com/question/395555567/answer/3214082688
目录
Part One.论文解读
Part Two. FedAvg 代码解读
1.代码整体结构
2.options.py
2.1 federated arguments
2.2 model arguments
2.3 other argument
3. models.py
3.1 MLP多层感知机
3.2 LeNet-5
3.3 CNN卷积神经网络
3.4 modelC自定义模型
4. sampling.py
4.0 数据分布
4.1 mnist_iid()
4.2 mnist_noniid()
4.3 mnist_noniid_unequal
4.4 cifar10
5. update.py
5.1 DatasetSplite(Dataset)
5.2 LocalUpdate
5.2.1准备工作
5.2.2 train_val_test()
5.2.3 update_weights() 本地权重更新
5.2.4 评估函数:inference(self,model)
5.2.5 test_inference(self,model)
6. utils.py
6.1 get_dataset(args)
6.2 average_weights(w)
6.3 exp_details(args)
7. 主函数 federated_main.py
7.1 库的引用
7.2 主函数
7.3 建立模型
7.4 模型训练
7.5 测试
8.作图PLOTTING
Part One.论文解读
对文论的理解就直接贴之前的汇报PPT了。
回看去年的汇报ppt,感觉当时的理解并不是很深入,比如对数据的独立同分布和非独立同分布的认知就几乎为零,完全没有当回事。然后在简单弄懂了训练流程后,就开始疯狂啃收敛性证明,感觉也是被带偏了,浪费了很多时间。
这个暑假突然悟了,准备按自己的节奏走。回顾一下万能Baseline FedAvg的代码。







Part Two. FedAvg 代码解读
1.代码整体结构

2.options.py
从最简单的超参数开始。可以看到参数分为三类:
- federated arguments 联邦参数
- model arguments 模型参数
- other arguments 其他参数
超参数代码框架:
import argparse
def args_parser():
    parser = argparse.ArgumentParser()
    # federated arguments (Notation for the arguments followed from paper)
    parser.add_argument('--epochs', type=int, default=10, help="number of rounds of training:R")
    
    # model arguments
    parser.add_argument('--model', type=str, default='mlp', help='model name')
    
    # other arguments
    parser.add_argument('--dataset', type=str, default='mnist', help="name of dataset")
    parser.add_argument('--seed', type=int, default=1, help='random seed')
    args = parser.parse_args()
    return args
2.1 federated arguments
- epochs:训练轮数,R,默认10轮
- num_users:用户数K,默认100个用户
- frac:用户选取比例C,默认0.1
- local_ep:本地训练次数E,默认10
- local_bs:本地训练小批次的大小,默认32
- lr:学习率,默认0.01
- momentum:SGD动量,默认0.5
# federated arguments (Notation for the arguments followed from paper)
    parser.add_argument('--epochs', type=int, default=10, help="number of rounds of training:R")
    parser.add_argument('--num_users', type=int, default=100, help="number of users: K")
    parser.add_argument('--frac', type=float, default=0.1, help="the fraction of clients: C")
    parser.add_argument('--local_ep', type=int, default=10, help="the number of local epochs: E")
    parser.add_argument('--local_bs', type=int, default=32, help="local batch size: B")
    parser.add_argument('--lr', type=float, default=0.01, help="learning rate")
    parser.add_argument('--momentum', type=float, default=0.5, help="SGD momentum (default: 0.5)")2.2 model arguments
- model:模型,默认mlp多层感知机
- kernel_num:卷积核数量,默认9
- kernel_size:卷积核大小,默认3、4、5
- num_channels:图像通道数,默认1
- norm:归一化方法,默认batch_norm
- num_filters:过滤器数量,默认32
- max_pool:最大池化,默认True
# model arguments
    parser.add_argument('--model', type=str, default='mlp', help='model name')
    parser.add_argument('--kernel_num', type=int, default=9, help='number of each kind of kernel')
    parser.add_argument('--kernel_sizes', type=str, default='3,4,5', help='comma-separated kernel size to use for convolution')
    parser.add_argument('--num_channels', type=int, default=1, help="number of channels of imgs")
    parser.add_argument('--norm', type=str, default='batch_norm', help="batch_norm, layer_norm, or None")
    parser.add_argument('--num_filters', type=int, default=32, help="number of filters for conv nets -- 32 for mini-imagenet, 64 for omiglot.")
    parser.add_argument('--max_pool', type=str, default='True',help="Whether use max pooling rather than strided convolutions")2.3 other argument
- dataset:数据集,默认mnist
- num_classes:分类数量,默认10
- gpu:gpu,默认为使用
- optimizer:优化器,默认sgd
- iid:独立同分布,默认1(独立同分布)
- unequal:平均分配数据集,默认0(平均分配)
- stopping_rounds:停止轮数,默认10 (??和前面epochs也是训练轮数,这里早停设置?)
- verbose:日志显示,默认1,(0不输出,1输出带进度条的日志,2输出不带进度条的日志)
- seed:随机种子,默认7
# other arguments
    parser.add_argument('--dataset', type=str, default='mnist', help="name of dataset")
    parser.add_argument('--num_classes', type=int, default=10, help="number of classes")
    parser.add_argument('--gpu', default=None, help="To use cuda, set to a specific GPU ID. Default set to use CPU.")
    parser.add_argument('--optimizer', type=str, default='sgd', help="type of optimizer")
    parser.add_argument('--iid', type=int, default=1, help='Default set to IID. Set to 0 for non-IID.')
    parser.add_argument('--unequal', type=int, default=0, help='whether to use unequal data splits for non-i.i.d setting (use 0 for equal splits)')
    parser.add_argument('--stopping_rounds', type=int, default=10, help='rounds of early stopping')
    parser.add_argument('--verbose', type=int, default=1, help='verbose')
    parser.add_argument('--seed', type=int, default=1, help='random seed')3. models.py
接着,说模型,模型这也相对简单。联邦的实验一般不需要自己去设计模型,选择已有模型完成实验即可,联邦的侧重点不在模型本身。这里主要使用了3个模型完成训练:
- MLP多层感知机
- LeNet-5
- CNN卷积神经网络
- modelC自定义模型
3.1 MLP多层感知机

class MLP(nn.Module):
    def __init__(self, dim_in, dim_hidden, dim_out):
        super(MLP, self).__init__()
        # 将输入特征映射到隐藏层
        self.layer_input = nn.Linear(dim_in, dim_hidden)
        self.relu = nn.ReLU()
        # Dropout 防止过拟合,提高模型的泛化能力
        self.dropout = nn.Dropout()
        # 将隐藏层的输出映射到输出层
        self.layer_hidden = nn.Linear(dim_hidden, dim_out)
        # 将输出转换为概率分布,从而得到每个类别的概率
        self.softmax = nn.Softmax(dim=1)
    def forward(self, x):
        #  MLP 模型的前向传播
        x = x.view(-1, x.shape[1]*x.shape[-2]*x.shape[-1])
        x = self.layer_input(x)
        x = self.dropout(x)
        x = self.relu(x)
        x = self.layer_hidden(x)
        return self.softmax(x)3.2 LeNet-5

class MyLeNet5(nn.Module):
    def __init__(self):
        super(MyLeNet5, self).__init__()
        self.c1 = nn.Conv2d(in_channels=1, out_channels=6,kernel_size=5, padding=2)
        self.Sigmoid = nn.Sigmoid()
        self.s2 = nn.AvgPool2d(kernel_size=2, stride=2)
        self.c3 = nn.Conv2d(in_channels=6, out_channels=16, kernel_size=5)
        self.s4 = nn.AvgPool2d(kernel_size=2, stride=2)
        self.c5 = nn.Conv2d(in_channels=16, out_channels=120, kernel_size=5)
        self.flatten = nn.Flatten()
        self.f6 = nn.Linear(120, 84)
        self.output = nn.Linear(84, 10)
    # forward():定义前向传播过程,描述了各层之间的连接关系
    def forward(self, x):
        x = self.Sigmoid(self.c1(x))
        x = self.s2(x)
        x = self.Sigmoid(self.c3(x))
        x = self.s4(x)
        x = self.c5(x)
        x = self.flatten(x)
        x = self.f6(x)
        x = self.output(x)
        return x3.3 CNN卷积神经网络


class CNNMnist(nn.Module):
    def __init__(self, args):
        super(CNNMnist, self).__init__()
        self.conv1 = nn.Conv2d(args.num_channels, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, args.num_classes)
    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, x.shape[1]*x.shape[2]*x.shape[3])
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)
class CNNFashion_Mnist(nn.Module):
    def __init__(self, args):
        super(CNNFashion_Mnist, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=5, padding=2),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(2))
        self.layer2 = nn.Sequential(
            nn.Conv2d(16, 32, kernel_size=5, padding=2),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2))
        self.fc = nn.Linear(7*7*32, 10)
    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.view(out.size(0), -1)
        out = self.fc(out)
        return out
class CNNCifar(nn.Module):
    def __init__(self, args):
        super(CNNCifar, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, args.num_classes)
    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return F.log_softmax(x, dim=1)3.4 modelC自定义模型
class modelC(nn.Module):
    def __init__(self, input_size, n_classes=10, **kwargs):
        super(AllConvNet, self).__init__()
        self.conv1 = nn.Conv2d(input_size, 96, 3, padding=1)
        self.conv2 = nn.Conv2d(96, 96, 3, padding=1)
        self.conv3 = nn.Conv2d(96, 96, 3, padding=1, stride=2)
        self.conv4 = nn.Conv2d(96, 192, 3, padding=1)
        self.conv5 = nn.Conv2d(192, 192, 3, padding=1)
        self.conv6 = nn.Conv2d(192, 192, 3, padding=1, stride=2)
        self.conv7 = nn.Conv2d(192, 192, 3, padding=1)
        self.conv8 = nn.Conv2d(192, 192, 1)
        self.class_conv = nn.Conv2d(192, n_classes, 1)
    def forward(self, x):
        x_drop = F.dropout(x, .2)
        conv1_out = F.relu(self.conv1(x_drop))
        conv2_out = F.relu(self.conv2(conv1_out))
        conv3_out = F.relu(self.conv3(conv2_out))
        conv3_out_drop = F.dropout(conv3_out, .5)
        conv4_out = F.relu(self.conv4(conv3_out_drop))
        conv5_out = F.relu(self.conv5(conv4_out))
        conv6_out = F.relu(self.conv6(conv5_out))
        conv6_out_drop = F.dropout(conv6_out, .5)
        conv7_out = F.relu(self.conv7(conv6_out_drop))
        conv8_out = F.relu(self.conv8(conv7_out))
        class_out = F.relu(self.class_conv(conv8_out))
        pool_out = F.adaptive_avg_pool2d(class_out, 1)
        pool_out.squeeze_(-1)
        pool_out.squeeze_(-1)
        return pool_out4. sampling.py
4.0 数据分布
独立同分布与非独立同分布参考知乎上的一篇文章。
参考:https://www.zhihu.com/question/395555567/answer/3214082688
联邦学习的non-iid的定义可参考《《Advances and Open Problems in Federated Learning》》
参考文献:Kairouz P, McMahan H B, Avent B, et al. Advances and open problems in federated learning[J]. Foundations and trends® in machine learning, 2021, 14(1–2): 1-210.
全文copy过来
作者:反向人
链接:https://www.zhihu.com/question/395555567/answer/3214082688
来源:知乎
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
1、Feature distribution skew( 特征分布偏差)
所谓一千个人里有一千个哈姆雷特,每个人审美都不一样,我们“判断的角度”不一样。比如有人会看他的五官身材,有人会看谈吐涵养,
2、Lable distribution skew (标签分布偏差)
我们“判断的标准/结果”不一样,有人会说他好看或者难看,有人会说友善还是粗鲁。那第三种和第四种是不是就更好理解啦~
3、Same label, different features (相同的标签,不同的特征)
4、Same features, different label (相同的特征,不同的标签)
5、Quantity skew or unbalancedness ( 数量倾斜或不平衡)
心中默念“输入特征,输出标签”,你就明白前四种Non-IID分布啦。第五种更简单啦,即不同客户拥有不同的数据量。
在传统的机器学习中,通常假设训练数据是独立同分布的(IID),这意味着每个数据样本都是独立地从相同的概率分布中抽取的,因此样本之间是相互独立的且具有相同的分布特性。
但在联邦学习中,由于数据存储在不同的本地设备上,这些设备可能采集不同类型的数据、数据量不同、数据质量不同,或者数据在不同时间和地点收集,因此不同设备上的数据样本之间可能具有不同的分布特性或相关性,而不满足独立同分布的假设。这就是联邦学习的Non-IID,通常有两个问题
- 模型收敛困难:当本地数据的分布不同或者数据质量差异较大时,全局模型的收敛可能会受到影响,因为不同设备的本地模型更新可能不容易合并。
- 性能不稳定:由于非iid数据分布,全局模型可能在某些设备上表现良好,但在其他设备上表现较差,导致性能不稳定。
而Non-IID经常伴随着异构性这三个字一起出现。
具体来说,非独立同分布是异构性的一种表现,异构性比非独立同分布更广泛。
异构性:通常是指系统或群体中包含多种不同类型、属性、特性或性质的元素。一篇联邦学习的论文把异构性分成了三种:
1、设备异构性:不同设备可能拥有不同的硬件性能,包括CPU、GPU、内存等,导致计算能力不同;网络速度和稳定性
2、统计异构性:设备的数据可能来自不同的数据源、采集方式、环境条件或时间段,导致数据的统计性质存在差异
3、数据异构性:来自不同的数据源,设备的数据可以是多种类型(数值/文本/图像)
4.1 mnist_iid()
灵魂拷问:这种随机分的样本真的就独立同分布吗?
参考4.2 noniid的抽取方法,先分类,在每个类别里面随机不重复平均抽

def mnist_iid(dataset, num_users):
    """
    从 MNIST 数据集中采样独立同分布 (IID) 的客户端数据
    超参数:param dataset
    超参数:param num_users:
    return: dict of image index
    """
    # 计算每个客户端的采样数量(平均分配)
    num_items = int(len(dataset)/num_users)
    # print("num_users is a", type(num_users), "with value", num_users)
    # print("num_items is a", type(num_items), "with value", num_items)
    # 初始化客户端数据字典dict_users存储客户端数据
    # 创建列表all_idxs,包含数据集的所有索引,从 0 到 len(dataset)-1.
    dict_users, all_idxs = {}, [i for i in range(len(dataset))]
    # 采样客户端数据
    # 对于每个客户端使用 np.random.choice() 从 all_idxs 列表中随机选择num_items个唯一的索引
    # 随机索引,且不重复,确保每个客户端获得唯一的一组样本.
    # 选择的索引被存储为 dict_users[i] 字典中的一个集合. 使用集合可以确保没有重复的索引.
    for i in range(num_users):
        dict_users[i] = set(np.random.choice(all_idxs, num_items, replace=False))
        all_idxs = list(set(all_idxs) - dict_users[i])
        # print("dict_users[i] is a", type(dict_users[i]), "with value", dict_users[i])
    return dict_users4.2 mnist_noniid()

def mnist_noniid(dataset, num_users):
    """
    从 MNIST 数据集中采样非独立同分布 (non-IID) 的客户端数据
    """
    # 数据集划分:
    # 该函数将 MNIST 训练集 (60,000 个样本) 划分为 200 个 "碎片" (shards),每个碎片包含 300 个样本。
    # 函数创建一个列表 idx_shard 来跟踪这 200 个碎片的索引。
    num_shards, num_imgs = 200, 300
    idx_shard = [i for i in range(num_shards)]
    # 函数创建一个字典 dict_users,其中每个键 (客户端索引) 对应一个空的 NumPy 数组。
    dict_users = {i: np.array([]) for i in range(num_users)}
    idxs = np.arange(num_shards*num_imgs)
    labels = dataset.train_labels.numpy()
    # 对标签进行排序:
    # 函数获取训练标签 dataset.train_labels.numpy(),并将其与样本索引 idxs 组合成一个 2D 数组 idxs_labels。
    # 然后对 idxs_labels 按标签列进行排序,得到排序后的索引 idxs
    idxs_labels = np.vstack((idxs, labels))
    idxs_labels = idxs_labels[:, idxs_labels[1, :].argsort()]
    idxs = idxs_labels[0, :]
    # divide and assign 2 shards/client
    # 为每个客户端分配数据:
    # 对于每个客户端 (索引为 i),函数随机选择 2 个碎片 (不重复),并将这 2 个碎片中的所有样本索引添加到 dict_users[i] 中。
    # 选择的碎片索引从 idx_shard 列表中删除,确保每个客户端获得唯一的一组样本。
    for i in range(num_users):
        rand_set = set(np.random.choice(idx_shard, 2, replace=False))
        idx_shard = list(set(idx_shard) - rand_set)
        for rand in rand_set:
            dict_users[i] = np.concatenate(
                (dict_users[i], idxs[rand*num_imgs:(rand+1)*num_imgs]), axis=0)
            print("dict_users[i] is a", type(dict_users[i]), "with value", dict_users[i])
    return dict_users4.3 mnist_noniid_unequal

def mnist_noniid_unequal(dataset, num_users):
    """
    从MNIST数据集中采样非I.I.D.(非独立同分布)且数量不均等的客户端数据
    """
    # 将整个MNIST训练集(60,000张图像)划分为1200个分片(shards),每个分片包含50张图像
    # 创建一个包含 1200 个索引的列表,表示 1200 个分片
    # 创建一个字典 dict_users,键为客户端编号 i,值为一个空的 NumPy 数组
    # 获取 MNIST 训练集的标签数据,并转换为 NumPy 数组
    num_shards, num_imgs = 1200, 50
    idx_shard = [i for i in range(num_shards)]
    dict_users = {i: np.array([]) for i in range(num_users)}
    idxs = np.arange(num_shards*num_imgs)
    labels = dataset.train_labels.numpy()
    # sort labels
    # 将图像索引 idxs 和标签 labels 垂直堆叠(vstack)到一个二维 NumPy 数组 idxs_labels 中
    # 现在 idxs_labels 是一个 2 x 60,000 的数组,第一行是图像索引,第二行是对应的标签
    idxs_labels = np.vstack((idxs, labels))
    idxs_labels = idxs_labels[:, idxs_labels[1, :].argsort()]
    idxs = idxs_labels[0, :]
    # 设置每个客户端获得的最小和最大分片数量为1和30。
    min_shard = 1
    max_shard = 30
    # Divide the shards into random chunks for every client
    # 使用 NumPy 的 random.randint() 函数生成 num_users 个随机整数
    # 这些随机整数表示每个客户端被分配的分片(shard)数量
    # 每个随机整数都在 min_shard 和 max_shard+1 之间(包括 max_shard)
    random_shard_size = np.random.randint(min_shard, max_shard+1,
                                          size=num_users)
    # 将上一步生成的随机分片数量进行归一化
    # 首先计算所有客户端分片数量的总和
    # 然后将每个客户端的分片数量除以总和,得到一个 0 到 1 之间的比例
    # 最后将这个比例乘以总分片数 num_shards,得到每个客户端应该被分配的实际分片数量
    # 使用 np.around() 将结果四舍五入为整数
    random_shard_size = np.around(random_shard_size / sum(random_shard_size) * num_shards)
    # 将上一步得到的浮点数分片数量转换为整数
    random_shard_size = random_shard_size.astype(int)
    # Assign the shards randomly to each client
    if sum(random_shard_size) > num_shards:
        for i in range(num_users):
            # First assign each client 1 shard to ensure every client has
            # atleast one shard of data
            rand_set = set(np.random.choice(idx_shard, 1, replace=False))
            idx_shard = list(set(idx_shard) - rand_set)
            for rand in rand_set:
                dict_users[i] = np.concatenate(
                    (dict_users[i], idxs[rand*num_imgs:(rand+1)*num_imgs]),
                    axis=0)
        random_shard_size = random_shard_size-1
        # Next, randomly assign the remaining shards
        for i in range(num_users):
            if len(idx_shard) == 0:
                continue
            shard_size = random_shard_size[i]
            if shard_size > len(idx_shard):
                shard_size = len(idx_shard)
            rand_set = set(np.random.choice(idx_shard, shard_size,
                                            replace=False))
            idx_shard = list(set(idx_shard) - rand_set)
            for rand in rand_set:
                dict_users[i] = np.concatenate(
                    (dict_users[i], idxs[rand*num_imgs:(rand+1)*num_imgs]),
                    axis=0)
    else:
        for i in range(num_users):
            shard_size = random_shard_size[i]
            rand_set = set(np.random.choice(idx_shard, shard_size,
                                            replace=False))
            idx_shard = list(set(idx_shard) - rand_set)
            for rand in rand_set:
                dict_users[i] = np.concatenate(
                    (dict_users[i], idxs[rand*num_imgs:(rand+1)*num_imgs]),
                    axis=0)
        if len(idx_shard) > 0:
            # Add the leftover shards to the client with minimum images:
            shard_size = len(idx_shard)
            # Add the remaining shard to the client with lowest data
            k = min(dict_users, key=lambda x: len(dict_users.get(x)))
            rand_set = set(np.random.choice(idx_shard, shard_size,
                                            replace=False))
            idx_shard = list(set(idx_shard) - rand_set)
            for rand in rand_set:
                dict_users[k] = np.concatenate(
                    (dict_users[k], idxs[rand*num_imgs:(rand+1)*num_imgs]),
                    axis=0)
    return dict_users4.4 cifar10
cifar_iid() 和 cifar10_noniid(),思路同上,不赘述。
5. update.py
5.1 DatasetSplite(Dataset)
class DatasetSplit(Dataset):
    # __init__ 方法是 DatasetSplit 类的构造函数
    # 它接受两个参数:
    # dataset: 要被划分的原始数据集。
    # idxs: 一个用于创建子集的索引列表。
    def __init__(self, dataset, idxs):
        # 将传入的原始数据集存储在self.dataset中
        self.dataset = dataset
        # 将输入的索引 idxs 转换为整数列表,并存储在 self.idxs 属性中
        self.idxs = [int(i) for i in idxs]
    def __len__(self):
        # 这个方法返回子集的长度,即 self.idxs 列表的长度
        return len(self.idxs)
    def __getitem__(self, item):
        #将图像和标签作为 PyTorch 张量返回
        image, label = self.dataset[self.idxs[item]]
        return torch.tensor(image), torch.tensor(label)
5.2 LocalUpdate
5.2.1准备工作
为客户端本地模型更新做准备,通过封装公共逻辑,使客户端更新的实现更加简洁和模块化:
- 参数设置
- 日志记录
- 数据准备:后续的训练、验证和测试集准备
- 计算设备准备
- 损失函数设置
    def __init__(self, args, dataset, idxs, logger):
        self.args = args
        self.logger = logger
        self.trainloader, self.validloader, self.testloader = self.train_val_test(
            dataset, list(idxs))
        # 指定运算设备
        self.device = 'cuda' if args.gpu else 'cpu'
        # 损失函数用的NLL
        self.criterion = nn.NLLLoss().to(self.device)5.2.2 train_val_test()
train_val_test方法的作用是根据给定的数据集和用户索引,划分出训练集、验证集和测试集的数据加载器。比例为8:1:1.
    def train_val_test(self, dataset, idxs):
        # split indexes for train, validation, and test (80, 10, 10)
        idxs_train = idxs[:int(0.8*len(idxs))]
        idxs_val = idxs[int(0.8*len(idxs)):int(0.9*len(idxs))]
        idxs_test = idxs[int(0.9*len(idxs)):]
        trainloader = DataLoader(DatasetSplit(dataset, idxs_train),
                                 batch_size=self.args.local_bs, shuffle=True)
        validloader = DataLoader(DatasetSplit(dataset, idxs_val),
                                 batch_size=int(len(idxs_val)/10), shuffle=False)
        testloader = DataLoader(DatasetSplit(dataset, idxs_test),
                                batch_size=int(len(idxs_test)/10), shuffle=False)
        return trainloader, validloader, testloader
5.2.3 update_weights() 本地权重更新
- 输入模型和全局更新的回合数
- 优化器选择
- 训练循环
- 可视化
- 输出更新后的权重和损失平均值
    def update_weights(self, model, global_round):
        # Set mode to train model
        model.train()
        epoch_loss = []
        # Set optimizer for the local updates
        if self.args.optimizer == 'sgd':
            optimizer = torch.optim.SGD(model.parameters(), lr=self.args.lr,
                                        momentum=0.5)
        elif self.args.optimizer == 'adam':
            optimizer = torch.optim.Adam(model.parameters(), lr=self.args.lr,
                                         weight_decay=1e-4)
        for iter in range(self.args.local_ep):
            batch_loss = []
            for batch_idx, (images, labels) in enumerate(self.trainloader):
                images, labels = images.to(self.device), labels.to(self.device)
                # pytoch框架训练模型定式5步
                model.zero_grad()
                log_probs = model(images)
                loss = self.criterion(log_probs, labels)
                loss.backward()
                optimizer.step()
                # 打印训练日志并记录损失值,监控训练过程并分析模型性能
                if self.args.verbose and (batch_idx % 10 == 0):
                    print('| Global Round : {} | Local Epoch : {} | [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                        global_round, iter, batch_idx * len(images),
                        len(self.trainloader.dataset),
                        100. * batch_idx / len(self.trainloader), loss.item()))
                #保存程序中的数据,然后利用tensorboard工具来进行可视化
                self.logger.add_scalar('loss', loss.item())
                batch_loss.append(loss.item())
            epoch_loss.append(sum(batch_loss)/len(batch_loss))
        return model.state_dict(), sum(epoch_loss) / len(epoch_loss)5.2.4 评估函数:inference(self,model)
通取测试集图像和标签,模型出结果后计算loss,然后累加。
代码定义了一个 inference() 函数,用于在测试集上计算模型的推理准确率和损失:
Chat一下,解释得还不错,直接贴过来
    def inference(self, model):
        """ Returns the inference accuracy and loss.
        """
        model.eval()
        loss, total, correct = 0.0, 0.0, 0.0
        for batch_idx, (images, labels) in enumerate(self.testloader):
            images, labels = images.to(self.device), labels.to(self.device)
            # Inference
            outputs = model(images)
            batch_loss = self.criterion(outputs, labels)
            loss += batch_loss.item()
            # Prediction
            _, pred_labels = torch.max(outputs, 1)
            pred_labels = pred_labels.view(-1)
            correct += torch.sum(torch.eq(pred_labels, labels)).item()
            total += len(labels)
        accuracy = correct/total
        return accuracy, loss
5.2.5 test_inference(self,model)
与LocalUpdate中的inference函数完全一致,只不过这里的输入参数除了args和model,还要指定test_dataset.
还不太懂为啥又测试一次。
6. utils.py
封装了一些工具函数:
- get_dataset()
- average_weights()
- exp_details()
6.1 get_dataset(args)
Chat确实是生产力。
6.2 average_weights(w)
FedAvg 加权平均。
def average_weights(w):
    """
    Returns the average of the weights.
    """
    w_avg = copy.deepcopy(w[0])
    for key in w_avg.keys():
        for i in range(1, len(w)):
            w_avg[key] += w[i][key]
        w_avg[key] = torch.div(w_avg[key], len(w))
    return w_avg
6.3 exp_details(args)
之前的代码没有可视化。这个可以参考。方便实验数据分析。
7. 主函数 federated_main.py
7.1 库的引用
import os
import copy
import time
import pickle
import numpy as np
from tqdm import tqdm
import torch
from tensorboardX import SummaryWriter
from options import args_parser
from update import LocalUpdate, test_inference
from models import MLP, CNNMnist, CNNFashion_Mnist, CNNCifar
from utils import get_dataset, average_weights, exp_details7.2 主函数
库引用之后直接主函数。先完成一些准备工作(试验方案和实验数据提前规划好):
- 时间
- 日志
- 超参数
- 可视化参数
- 计算设备
- 数据集与用户数据分配
if __name__ == '__main__':
    start_time = time.time()
    # define paths
    path_project = os.path.abspath('..')
    logger = SummaryWriter('../logs')
    args = args_parser()
    exp_details(args)
    if args.gpu_id:
        torch.cuda.set_device(args.gpu_id)
    device = 'cuda' if args.gpu else 'cpu'
    # load dataset and user groups
    train_dataset, test_dataset, user_groups = get_dataset(args)7.3 建立模型
model.py里面定义的模型。 model.py里面我加了个LeNet-5,这里还没写进去。主要是不会写(触发拖延症,后面再写)。
    # BUILD MODEL
    if args.model == 'cnn':
        # Convolutional neural netork
        if args.dataset == 'mnist':
            global_model = CNNMnist(args=args)
        elif args.dataset == 'fmnist':
            global_model = CNNFashion_Mnist(args=args)
        elif args.dataset == 'cifar':
            global_model = CNNCifar(args=args)
    elif args.model == 'mlp':
        # Multi-layer preceptron
        img_size = train_dataset[0][0].shape
        len_in = 1
        for x in img_size:
            len_in *= x
            global_model = MLP(dim_in=len_in, dim_hidden=64,
                               dim_out=args.num_classes)
    else:
        exit('Error: unrecognized model')7.4 模型训练
    # Set the model to train and send it to device.
    global_model.to(device)
    global_model.train()
    print(global_model)
    # copy weights
    global_weights = global_model.state_dict()
    # Training
    train_loss, train_accuracy = [], []
    val_acc_list, net_list = [], []
    cv_loss, cv_acc = [], []
    print_every = 2
    val_loss_pre, counter = 0, 0
    for epoch in tqdm(range(args.epochs)):
        local_weights, local_losses = [], []
        print(f'\n | Global Training Round : {epoch+1} |\n')
        global_model.train()
        m = max(int(args.frac * args.num_users), 1)
        idxs_users = np.random.choice(range(args.num_users), m, replace=False)
        for idx in idxs_users:
            local_model = LocalUpdate(args=args, dataset=train_dataset,
                                      idxs=user_groups[idx], logger=logger)
            w, loss = local_model.update_weights(
                model=copy.deepcopy(global_model), global_round=epoch)
            local_weights.append(copy.deepcopy(w))
            local_losses.append(copy.deepcopy(loss))
        # update global weights
        global_weights = average_weights(local_weights)
        # update global weights
        global_model.load_state_dict(global_weights)
        loss_avg = sum(local_losses) / len(local_losses)
        train_loss.append(loss_avg)
        # Calculate avg training accuracy over all users at every epoch
        list_acc, list_loss = [], []
        global_model.eval()
        for c in range(args.num_users):
            local_model = LocalUpdate(args=args, dataset=train_dataset,
                                      idxs=user_groups[idx], logger=logger)
            acc, loss = local_model.inference(model=global_model)
            list_acc.append(acc)
            list_loss.append(loss)
        train_accuracy.append(sum(list_acc)/len(list_acc))
        # print global training loss after every 'i' rounds
        if (epoch+1) % print_every == 0:
            print(f' \nAvg Training Stats after {epoch+1} global rounds:')
            print(f'Training Loss : {np.mean(np.array(train_loss))}')
            print('Train Accuracy: {:.2f}% \n'.format(100*train_accuracy[-1]))7.5 测试
测试,保存训练损失和训练精度了,输出时间。
# Test inference after completion of training
    test_acc, test_loss = test_inference(args, global_model, test_dataset)
    print(f' \n Results after {args.epochs} global rounds of training:')
    print("|---- Avg Train Accuracy: {:.2f}%".format(100*train_accuracy[-1]))
    print("|---- Test Accuracy: {:.2f}%".format(100*test_acc))
    # Saving the objects train_loss and train_accuracy:
    file_name = '../save/objects/{}_{}_{}_C[{}]_iid[{}]_E[{}]_B[{}].pkl'.\
        format(args.dataset, args.model, args.epochs, args.frac, args.iid,
               args.local_ep, args.local_bs)
    with open(file_name, 'wb') as f:
        pickle.dump([train_loss, train_accuracy], f)
    print('\n Total Run Time: {0:0.4f}'.format(time.time()-start_time))8.作图PLOTTING
之前的图是Origin画的。这个作者写的作图代码没有认真看。





















