基于联邦学习的MNIST手写数字识别-Pytorch实现
基于联邦学习的MNIST手写数字识别-Pytorch实现
基于联邦学习的MNIST手写数字识别-Pytorch实现
背景知识
联邦学习 :联邦学习是为了解决“数据孤岛”的问题,即不同机构由于隐私保护等限制无法获取更多的数据来完善模型,从而形成孤岛。联邦学习让不同的机构通过一个中心服务器传输模型参数,在一定程度上达到了共享数据集的效果。各个机构通过联邦学习框架进行合作,最大化其收益。
MNIST数据集 :MNIST数据集是计算机视觉领域中比较常用的数据集,它包含60000个训练数据和10000个测试数据。其内容是0-9的手写数字,且每张图片都是灰度图像,像素为28×28,具体见下图:

Pytorch :Pytorch是开源的机器学习库,在近几年的论文中得到广泛应用。
导入库
import torch
import torchvision
import torchvision.transforms as transforms
import torch.utils.data.dataloader as dataloader
from torch.utils.data import Subset
import torch.nn as nn
import torch.optim as optim
from torch.nn.parameter import Parameter读取MNIST数据集
使用Subset函数对训练数据集进行划分,这里总共有ABC三个机构,每个机构的训练集数目为1000。然后将训练数据集放入Dataloader里,这里batch_size即每次往神经网络放入多少数据,比如batch_size为100就是每次放入100个数据,总共放10次。shuffle为True意为将数据集打乱。这里我们把整个训练集作为一个batch_size,所以不需要打乱。
train_set = torchvision.datasets.MNIST(root="./data",train=True,transform=transforms.ToTensor(),download=True)
train_set_A=Subset(train_set,range(0,1000))
train_set_B=Subset(train_set,range(1000,2000))
train_set_C=Subset(train_set,range(2000,3000))
train_loader_A = dataloader.DataLoader(dataset=train_set_A,batch_size=1000,shuffle=False)
train_loader_B = dataloader.DataLoader(dataset=train_set_B,batch_size=1000,shuffle=False)
train_loader_C = dataloader.DataLoader(dataset=train_set_C,batch_size=1000,shuffle=False)
test_set = torchvision.datasets.MNIST(root="./data",train=False,transform=transforms.ToTensor(),download=True)
test_set=Subset(test_set,range(0,2000))
test_loader = dataloader.DataLoader(dataset=test_set,shuffle=True)训练和测试
train_and_test_1 :普通的训练测试过程。
首先定义神经网络的类型,这里用的是最简单的三层神经网络(也可以说是两层,不算输入层),输入层28×28,隐藏层12个神经元,输出层10个神经元。
def train_and_test_1(train_loader,test_loader):
class NeuralNet(nn.Module):
def __init__(self, input_num, hidden_num, output_num):
super(NeuralNet, self).__init__()
self.fc1 = nn.Linear(input_num, hidden_num) # 服从正态分布的权重w
self.fc2 = nn.Linear(hidden_num, output_num)
nn.init.normal_(self.fc1.weight)
nn.init.normal_(self.fc2.weight)
nn.init.constant_(self.fc1.bias, val=0) # 初始化bias为0
nn.init.constant_(self.fc2.bias, val=0)
self.relu = nn.ReLU() # Relu激励函数
def forward(self, x):
x = self.fc1(x)
x = self.relu(x)
y = self.fc2(x)
return y
epoches = 20 # 迭代20轮
lr = 0.01 # 学习率,即步长
input_num = 784
hidden_num = 12
output_num = 10
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = NeuralNet(input_num, hidden_num, output_num)
model.to(device)
loss_func = nn.CrossEntropyLoss() # 损失函数的类型:交叉熵损失函数
optimizer = optim.Adam(model.parameters(), lr=lr) # Adam优化,也可以用SGD随机梯度下降法
# optimizer = optim.SGD(model.parameters(), lr=lr)
for epoch in range(epoches):
flag = 0
for images, labels in train_loader:
images = images.reshape(-1, 28 * 28).to(device)
labels = labels.to(device)
output = model(images)
loss = loss_func(output, labels)
optimizer.zero_grad()
loss.backward() # 误差反向传播,计算参数更新值
optimizer.step() # 将参数更新值施加到net的parameters上
# 以下两步可以看每轮损失函数具体的变化情况
# if (flag + 1) % 10 == 0:
# print('Epoch [{}/{}], Loss: {:.4f}'.format(epoch + 1, epoches, loss.item()))
flag += 1
params = list(model.named_parameters()) # 获取模型参数
# 测试,评估准确率
correct = 0
total = 0
for images, labels in test_loader:
images = images.reshape(-1, 28 * 28).to(device)
labels = labels.to(device)
output = model(images)
values, predicte = torch.max(output, 1) # 0是每列的最大值,1是每行的最大值
total += labels.size(0)
# predicte == labels 返回每张图片的布尔类型
correct += (predicte == labels).sum().item()
print("The accuracy of total {} images: {}%".format(total, 100 * correct / total))
return paramstrain*and_test_2 :联邦后的训练测试过程。
注意每次联邦前,bias 都要重置为 0,这样效果会好。并且需要把做完平均后的 w 传入到模型中。
def train_and_test_2(train_loader,test_loader,com_para_fc1,com_para_fc2):
class NeuralNet(nn.Module):
def **init**(self, input_num, hidden_num, output_num,com_para_fc1,com_para_fc2):
super(NeuralNet, self).**init**()
self.fc1 = nn.Linear(input_num, hidden_num)
self.fc2 = nn.Linear(hidden_num, output_num)
self.fc1.weight=Parameter(com_para_fc1)
self.fc2.weight=Parameter(com_para_fc2)
nn.init.constant*(self.fc1.bias, val=0)
nn.init.constant_(self.fc2.bias, val=0)
self.relu = nn.ReLU()
def forward(self, x):
x = self.fc1(x)
x = self.relu(x)
y = self.fc2(x)
return y
epoches = 20
lr = 0.01
input_num = 784
hidden_num = 12
output_num = 10
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = NeuralNet(input_num, hidden_num, output_num,com_para_fc1,com_para_fc2)
model.to(device)
loss_func = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)
# optimizer = optim.SGD(model.parameters(), lr=lr)
for epoch in range(epoches):
flag = 0
for images, labels in train_loader:
# (images, labels) = data
images = images.reshape(-1, 28 * 28).to(device)
labels = labels.to(device)
output = model(images)
loss = loss_func(output, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
# if (flag + 1) % 10 == 0:
# print('Epoch [{}/{}], Loss: {:.4f}'.format(epoch + 1, epoches, loss.item()))
flag += 1
params = list(model.named_parameters())#get the index by debuging
correct = 0
total = 0
for images, labels in test_loader:
images = images.reshape(-1, 28 * 28).to(device)
labels = labels.to(device)
output = model(images)
values, predicte = torch.max(output, 1)
total += labels.size(0)
correct += (predicte == labels).sum().item()
print("The accuracy of total {} images: {}%".format(total, 100 * correct / total))
return paramscombine_params :对模型参数做平均,只对权重 w 做平均。
def combine_params(para_A,para_B,para_C):
fc1_wA=para_A[0][1].data
fc1_wB=para_B[0][1].data
fc1_wC=para_C[0][1].data
fc2_wA=para_A[2][1].data
fc2_wB=para_B[2][1].data
fc2_wC=para_C[2][1].data
com_para_fc1=(fc1_wA+fc1_wB+fc1_wC)/3
com_para_fc2=(fc2_wA+fc2_wB+fc2_wC)/3
return com_para_fc1,com_para_fc2主函数 :先进行正常的训练测试,再进行联邦后的训练测试。
para_A=train_and_test_1(train_loader_A,test_loader)
para_B=train_and_test_1(train_loader_B,test_loader)
para_C=train_and_test_1(train_loader_C,test_loader)
for i in range(10):
print("The {} round to be federated!!!".format(i+1))
com_para_fc1,com_para_fc2=combine_params(para_A,para_B,para_C)
para_A=train_and_test_2(train_loader_A,test_loader,com_para_fc1,com_para_fc2)
para_B=train_and_test_2(train_loader_B,test_loader,com_para_fc1,com_para_fc2)
para_C=train_and_test_2(train_loader_C,test_loader,com_para_fc1,com_para_fc2)实验结果
三个机构 :

可以看到,刚开始三个机构的准确率分别为 24.85%、17.95%和 20%,而联邦以后,其准确率有明显提高,且随着联邦轮数的增加,最终三个机构的准确率均能达到 80%以上。
五个机构 :对于五个机构,只需改一下对参数做平均的函数以及主函数。

机构数目越多,意味着可共享的数据越多,当第六轮联邦后,各机构的准确率均达到 82%以上,继续联邦,其准确率提高不大。由于我们采用的是最简单的三层神经网络(输入层、隐藏层、输出层),这样的准确率还是令人满意的。
结语
以前碰到配置环境中的 bug、程序的 bug 或者无从下手得项目,都是看 csdn 博客来学习。本着分享知识和见证自己成长的初心,我写下了自己的第一篇博客!