写在前面

从5月8日开始,到现在,已经接近三个月的时间,我的第一段实习生涯即将结束。反思自己的实习经历,我觉得最主要的收获有三点:

  • 最重要的往往是迈出第一步,我从来没有接触过社会化的生产、经营模式,这次实习的企业虽然很小,而且是初创企业,学术氛围浓厚,也给了我较多的眼界,在实习中尝试触摸大学学习和社会生产的脱节处,下一步工作或者学习,都能提供方向上的指引。
  • 第二是克服与生俱来的恐惧心理,至今也还记得实习第一天的时候,来到公司,使用ssh连接服务器、装个python的环境都出了无数个bug,也不敢问近在身边的技术主管和后面的后台管理人员,自己的问题疯狂查CSDN,也始终解决不了,第二天第三天以降,简直如同煎熬,感觉自己什么也不会,一点也不配来这里,压力山大。现在回望,实力是一方面,当时对服务器、对git和linux的使用确实非常不熟悉,另外一个重要的方面,自然是心理压力较大,始终担心自己被别人看不起,不敢问问题,反而限制了自己的进步。
  • 第三才是技术上的一些收获,实习我丝毫没有参与公司的核心业务,主要做的工作是数据集整理和清洗,去重都鲜有涉及。因此指望通过一次实习让自己学到算法上的东西无异于痴人说梦,不过,无心插柳在于,我其实并没有刻意去学的linux命令行操作、git版本管理原理和使用操作以及!!python的标准库和基本语法(我总算脱离了写脚本的调包侠范畴,可以自行使用标准库实现功能了,什么生成器、描述器、装饰器;什么多进程、报错、甚至手工读二进制,也都有所涉及,这个算是自己天天查python文档的收获吧。
    爱心

Mpich

大模型如此普及!Simon老师强调:分布式是深度学习技术前进的方向。深以为然,苦于毫无经验也无尝试,连个单机多线和多进程都才掌握不久,于是必须从头学起。先选一个看上去还比较容易优雅的,似乎比sparkray文档要简单一点。

安装

安装请参见该安装文本,在我自己的BashStyle仓库里面也有该文件的配置。然后,安装关于python的第三方库:

1
pip3 install mpi4py

原理

MPI(Message Passing Interface)信息传送接口,是一种能够在并行运行的计算机上运行的信息传递系统,提供了对主流的科学编程语言Fortran,C和C++的支持,能够直接将程序发送到从机上实现并行运行。为了分析问题的方便,我自行定义在并行序列中,当前行为需要被分析的计算机为“主机”,除了主机以外的其他可访问的计算机为“从机”。目前尚未开始原理的学习,具体可以以下链接:

mpich 课程链接,网络资源链接

我主要使用Python调用API,在python中,万物皆可对象,因此,mpi4py将一切可以序列化的对象视为发送的数据格式,将其序列化后发送至从机上,如果要使用从机的GPU,必须保证发送的数据是C连续的(C-contiguous)因此发送过程中不能带有生成器和推导式。

mpi4py中提供了一个MPI包,运行comm = mpi4py.MPI.COMM_WORLD,将会得到一个共享通信接口comm,所有的计算机运行过程中产生的可序列化数据,只要通过comm相关命令加以操作,都能被其他从机所持有。我个人理解为如下的拓扑形状:
主从机通信拓扑结构

教程学习

使用方法

mpiexec [options] [command],例如:mpiexec -n 4 python hello-world.py

设置序列化标准

MPI默认使用最新标准,可访问参数HIGHEST_PROTOCOL查看当前支持的pickle版本,可以在运行mpiexec命令之前手动设置MPI4PY_PICKLE_PROTOCOL参数定义使用的pickle版本。

基本操作

在执行分布式操作前,必须声明通信空间和相关参数,如下命令:

1
2
3
4
5
import mpi4py

comm = mpi4py.MPI.COMM_WORLD
rank = comm.Get_rank() # 获取主机在通信空间的唯一编号
size = comm.Get_size() # 获取通信空间中全体主从机数量

应该使用的命令是mpiexec -n 10 <python command>,经过实验发现,该命令将python序列化数据发送至从机上顺序执行,但是只在主机上打印信息,最终都汇总在主机上。

教程学习

将一个python对象分布在各个机器上,每个机器取得一个对应的元素,从0号机开始执行,因此这些机器不是同步执行的。

1
2
3
4
5
6
7
8
9
10
11
12
from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

if rank == 0:
data = [(i+1)**2 for i in range(size)]
else:
data = None
data = comm.scatter(data, root=0)
assert data == (rank+1)**2

这个条件语句在主从机上都执行过一次,不同的是,0号机上执行的是一次推导式操作,从机上执行的是data = None语句,关键在于最后一句,即对于根节点为0的机器,即当前分析的0号机执行一次scatter命令,该命令将列表并发给主从机,每台机器获得一个分块,这里能够均分,因此每个机器得到一个元素(rank+1)**2。值得注意的是,这里需要做一个测试,如果分发出来的数据不能并行均分,会发生什么事?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

data = (rank+1)**2
data = comm.gather(data, root=0)
if rank == 0:
for i in range(size):
assert data[i] == (i+1)**2
else:
assert data is None

这个函数comm.gather恰好相反,它把主从机上各个数据都归总到一起,从机上数据全部清空,主机保留一个可遍历的数据结构(推测为列表)。

PS

小结一下,看完这两个例子,感觉comm通信的过程更类似于await函数,当各机节点执行到comm命令前时,都产生中断,直到该命令在主从机上得到执行,然后中断解除。

一个关于Numpy Array切分的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

sendbuf = None
if rank == 0:
sendbuf = np.empty([size, 100], dtype='i')
sendbuf.T[:,:] = range(size) #留意numpy的广播机制
recvbuf = np.empty(100, dtype='i')
comm.Scatter(sendbuf, recvbuf, root=0)
assert np.allclose(recvbuf, rank)

值得注意的是,numpy的一维数组默认是行向量,即第一个维度缺省但是默认为1(虽然你并不能认为它就是1,因为矩阵运算的时候还是会报错),因此Numpy会把从python原生的列表或者遍历器类型的数据生成的数组当作是行向量。Numpy的广播机制官方规定必须满足两个要求:

  • 两个数组的维度完全相同(指维度的数值、排布顺序都相同);
  • 两个数组的维度不完全相同,对于存在不相同的维度,至少一方的维数为1(或者缺省);满足这个条件后,numpy将维数为1的数组在该维度上延拓(不是复制),实现广播。
    广播后的数组将保持相同的大小。

对数组转置后,comm.Scatter函数自动对数据按列切分并分布到主从机各个节点上。

来自公司电脑的补充内容

应该使用的命令是mpiexec -n 10 <python command>,经过实验发现,该命令将python序列化数据发送至从机上顺序执行,但是只在主机上打印信息,最终都汇总在主机上。

教程学习

将一个python对象分布在各个机器上,每个机器取得一个对应的元素,从0号机开始执行,因此这些机器不是同步执行的。

1
2
3
4
5
6
7
8
9
10
11
12
from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

if rank == 0:
data = [(i+1)**2 for i in range(size)]
else:
data = None
data = comm.scatter(data, root=0)
assert data == (rank+1)**2

这个条件语句在主从机上都执行过一次,不同的是,0号机上执行的是一次推导式操作,从机上执行的是data = None语句,关键在于最后一句,即对于根节点为0的机器,即当前分析的0号机执行一次scatter命令,该命令将列表并发给主从机,每台机器获得一个分块,这里能够均分,因此每个机器得到一个元素(rank+1)**2。值得注意的是,这里需要做一个测试,如果分发出来的数据不能并行均分,会发生什么事?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

data = (rank+1)**2
data = comm.gather(data, root=0)
if rank == 0:
for i in range(size):
assert data[i] == (i+1)**2
else:
assert data is None

这个函数comm.gather恰好相反,它把主从机上各个数据都归总到一起,从机上数据全部清空,主机保留一个可遍历的数据结构(推测为列表)。

PS

小结一下,看完这两个例子,感觉comm通信的过程更类似于await函数,当各机节点执行到comm命令前时,都产生中断,直到该命令在主从机上得到执行,然后中断解除。

一个关于Numpy Array切分的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

sendbuf = None
if rank == 0:
sendbuf = np.empty([size, 100], dtype='i')
sendbuf.T[:,:] = range(size) #留意numpy的广播机制
recvbuf = np.empty(100, dtype='i')
comm.Scatter(sendbuf, recvbuf, root=0)
assert np.allclose(recvbuf, rank)

值得注意的是,numpy的一维数组默认是行向量,即第一个维度缺省但是默认为1(虽然你并不能认为它就是1,因为矩阵运算的时候还是会报错),因此Numpy会把从python原生的列表或者遍历器类型的数据生成的数组当作是行向量。Numpy的广播机制官方规定必须满足两个要求:

  • 两个数组的维度完全相同(指维度的数值、排布顺序都相同);
  • 两个数组的维度不完全相同,对于存在不相同的维度,至少一方的维数为1(或者缺省);满足这个条件后,numpy将维数为1的数组在该维度上延拓(不是复制),实现广播。
    广播后的数组将保持相同的大小。

对数组转置后,comm.Scatter函数自动对数据按列切分并分布到主从机各个节点上。