并发编程(一)—— 多进程

之前在《基于异常控制流的进程协作》一文中我们从操作系统和硬件的角度简单分析了单核 CPU 上多进程的并发和协作的实现机制。这次我们换个角度,从使用方式的角度来分析并发编程。

一、为什么要使用多进程并发

我们先想想,为什么需要多进程并发?

  • 多任务机制。一个程序不可能实现所有的任务,所以计算机上必然要运行多个程序。如果选择串行模式的话,一个进程只能在等待另一个进程结束后才能执行,很多功能将无法实现。比如在打开文件浏览器程序浏览文件时,就不能开始执行 word 处理程序
  • 加快任务处理的速度。一个 CPU 在一个时刻内只能执行一个进程的指令,反之亦然,一个进程在一个时刻内只能享有一个 CPU 资源。如果计算机硬件上有多个 CPU ,我们就能利用多个进程来使用多个 CPU 的资源

既然有多进程的需求,那就有多进程协作的需求。进程和进程之间如何协作呢?
什么样算是协作?协作就是一个进程告诉另一个进程发生了什么事,然后另一个进程决定如何反应。
这其实就是两件事:怎么沟通信息和如何处理信息。

二、多进程并发如何沟通信息

如何沟通信息呢?程序无非就是“输入-处理-输出”的流程,所谓接受信息和发送信息,其实就是在接受输入和产生输出,只不过输入的来源是其它进程的输出,输出的去向是其它进程的输入。但是进程又偏偏给每个程序一个独自享有整个计算机资源的假象,每个进程是感知不到其它进程的,于是对每个进程而言,“其它”进程的输入和输出就成了无稽之谈。
不过,我们其实还是有机会的。想想看,输入和输出都是保存在内存中的,进程不一定要明确地把信息发送到其它某一个进程中去,只需要多个进程能够共享读写同样的内存区域就行了。
怎么样创建共享的内存区域呢?

2.1 共享内核内存空间

内核空间中最上面的两个页大小的空间是每个进程私有的空间,暂且称为 2P 。虽然 2P 是私有的,但是当其它进程处于内核模式下时,是有读写任意地址的权限的,其中自然包括 2P。而恰好在 2P 空间的最下面保存着进程的 thread_info::task_struct *task_struct 又保存着进程的信号位向量。当进程在内核模式下读写这个区域时,就能实现向进程发送信号或者接受信号的功能。
在内核空间的最下面保存着内核代码和数据,暂且把这段区域称为 KN,KN 对所有进程来说都是共享读的。把除去 2P 和 KN 后剩下的区域称为 PP,操作系统提供了 PIPEFIFOMessage Queue 三种 first-in-first-out 的队列在 PP 中创建共享内存段。PIPE 只是一个内存 Buffer,而 FIFO 被设计为一个虚拟内存文件,可以像文件一样打开和读写(不过并没有真正读写磁盘)。Message Queue 和 FIFO 都使用到了文件路径,不过 FIFO 是真的创建了一个虚拟文件,而 Message Queue 只是使用已存在文件的路径作为 ID。和 PIPE 与 FIFO 不同,Message Queue 中传递的数据不再是以字节(byte)为单位,而是以特定类型的数据包(byte package, message)为单位。

2.2 共享用户内存空间

在用户空间中可以使用 mmapshmsem 创建共享内存段。
mmap 可以把磁盘文件映射到内存区域中,如果两个进程映射了同一个文件,它们就共享了同一个内存段。mmap 也可以映射一个匿名文件(fd 为 0),不过此时没有了文件路径作为 ID。
shm 和 sem 都使用文件路径作为 ID,但是 shm 创建的内存空间没有阻塞功能,如果想要阻塞功能,就需要 sem。

2.3 共享“共享内存在哪”的信息

怎么样告知其它进程一段内存区域是共享的呢?
task_struct 中的信号由操作系统内核在内核模式中自动读和写,所以不用担心。
FIFO、Message Queue、mmap 磁盘文件、shm、sem 都使用到了具体的文件路径/ID,所以多进程中可以通过相同的文件名来获取共享内存区,也可以通过 forkexec 继承的变量和文件描述符表来获取共享内存区域。
通过 PIPE 和 mmap 匿名文件创建的共享内存区域,只有通过这些函数调用的返回值(指针或者文件描述符)才能获取到,而这些返回值保存在进程的虚拟内存空间中,只有通过 fork 复制父进程的虚拟内存空间才能获取到。

三、多进程并发如何处理信息

3.1 如何发送和接受消息

程序可以用 kill 系统调用向指定进程发送信号,但是并没有接受信号的系统调用,因为发送信号和接受信号是同时发生的(此处接受信号并不等于处理信号);PIPE 和 FIFO 队列使用 readwrite 系统调用来接受和发送信息,Message Queue 使用 msgsndmsgrcv 系统调用来发送和接受消息;mmap 和 shm 直接使用指针地址读写。

3.2 如何处理消息

发送消息总是即时的,但是接受新消息却需要程序主动去检查或者被通知才能知道。在接受到新消息前,程序是异步地执行其它指令,还是阻塞着直到新消息到来呢?
信号处理机制总是异步的,除非通过 waitpid 显示地阻塞当前进程并等待子进程的 SIGCHLD 信号。进程从内核模式切换到用户模式之际,会自动检查接受到地信号并进行处理。而这样地时机,一般发生在进程刚刚被调入 CPU 开始执行时,或着进程刚刚完成一次系统调用并返回时。程序可以用 signal 来为信号注册优先的处理函数,如果没有注册,则调用默认的信号处理程序。
三种队列总是阻塞的,当队列满时会阻塞写,当队列空时会阻塞读。
使用 mmap 或者 shm 的话,就没有操作系统提供的异步或者阻塞机制,只能由程序自己来实现。

四、多进程协作 API

4.1 fork 和 exec

多进程协作的前提是创建进程。
fork 用来从当前进程分叉出一个子进程,然后返回两次,在父进程中返回子进程的 pid,在子进程中返回 0。子进程“继承”了父进程的页表和内存段,但是子进程和父进程所有“非共享写”内存段都被设置为私有的写时复制的,其中的页都被设置为只读的。当两个进程中任何一个尝试进行非共享写内存段中的页面写操作时,会触发页错误从而进行写时复制。除了页表和 vma 外,子进程还复制了父进程的文件描述符表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/* fork_example.c
* 正如上述程序所展示的那样,`fork` 返回了两次。
* 两个进程共享了内存(`g`),但是修改内存时触发了写时复制(`l`)。
* 子进程复制了父进程的文件描述符表,所以它们的 `printf` 输出到同一个 `STDOUT`。
*/

#include <sys/types.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>

int g = 5;

int main() {
int l = 3;

pid_t pid = fork();
if (pid == 0) {
l += 1;
printf("child process: pid=%d, g=%d, l=%d\n", pid, g, l);
} else {
printf("parent process: pid=%d, g=%d, l=%d\n", pid, g, l);
}
}

/* output:
* parent process: pid=696, g=5, l=3
* child process: pid=0, g=5, l=4
*/

接下来看一个 exec 的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/* exec_example.c
* 在子进程中使用 `exec` 把子进程替换为 `/bin/ls` 程序,替换成功后,子进程就不再执行了。
* 但是因为 `exec` 不会删除文件描述符表,所以其 STDOUT 和父进程还是同一个。
*/
#include <sys/types.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <cstring>

int g = 5;

int main() {
int l = 3;

pid_t pid = fork();
if (pid == 0) {
l += 1;
printf("child process: pid=%d, g=%d, l=%d\n", pid, g, l);

char * argv_list[] = {"/bin/ls","/",NULL};
int ok = execv("/bin/ls", argv_list);

if (ok == -1) {
printf("failed to replace process: %s\n", strerror(errno));
}
printf("child process: pid=%d, g=%d, l=%d\n", pid, g, l);
} else {
printf("parent process: pid=%d, g=%d, l=%d\n", pid, g, l);
sleep(3);
}
}

/* parent process: pid=2160, g=5, l=3
* child process: pid=0, g=5, l=4
* bin dev home lib lib64 mnt proc run snap sys usr
* boot etc init lib32 media opt root sbin srv tmp var
*/

4.2 信号机制

程序使用 signal 来注册信号处理函数,使用 kill 来向其它进程发送信号。
使用 signal 注册的信号处理函数,在被调用时会阻塞其它信号。在有些平台上,信号处理函数执行完成时需要程序来重新注册,在此之前接受到的信号可能会采取默认处理程序。如果想要更大的控制力度,可以使用 sigaction 函数,但是它用起来也更困难。
kill 函数的第一个参数为 pid,pid 如何发挥作用,在 linux 文档里面有详细说明,这里就不赘述了。
需要注意的是,信号处理有可能打断挂起的系统调用,系统调用被打断后可能不再继续执行,而是返回 ERINT 错误。信号处理函数自己也有可能被其它信号处理函数打断,但是被打断后会继续执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/*
* signal_example.c
* 程序被打断后,触发 `SIGINT` 处理函数,在其中又使用 `kill` 系统调用向自己发送了 `SIGABRT` 信号。
* 从 `kill` 系统调用返回后,内核检查发现接受到了 `SIGABRT` 信号,于是先去执行 `SIGABRT` 的处理函数。
* 直到 `SIGABRT` 处理函数返回后且没有检查到新的信号,才继续执行 `SIGINT` 处理函数。
* 可以发现,信号处理函数和程序的主逻辑以及其它信号处理函数是并发执行的。
*/
#include <stdlib.h>
#include <stdio.h>
#include <signal.h>
#include <sys/types.h>
#include <zconf.h>

int s1 = 0;
int s2 = 0;
int pid = 0;

void sig_user1_handler(int) {
kill(pid, SIGABRT);
s1 = 1;
exit(s1 + s2);
}

void sig_user2_handler(int) {
s2 = 2;
}

int main() {
signal(SIGINT, sig_user1_handler);
signal(SIGABRT, sig_user2_handler);
pid = getpid();

while (true) {}
}

/*
* ^C
* Process finished with exit code 3
*/

有时候进程可能已经没有其它地任务需要处理,但是还不能退出,因为需要它来处理信号。常规的思路时在进程最后使用一个无限的循环来保持进程,在循环中可以使用 sleep 或者 pause 这样的函数避免过高的 CPU 占用。
在这种情况下,sigsuspend 函数比 sleep 或者 pause 更合适。因为它不是以固定时间点长来决定复苏时间的,而是在收到信号的时候决定复苏,并直接进入信号处理程序。显然,sigsuspend 的精度更高。

4.3 三种队列

接下来分别说明 PIPE、FIFO 和 Message Queue 三种先进先出队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/*
* pipe_example.c
* pipe 在内核中创建了一个 buffer 并返回两个文件描述符,一个用来读, 一个用来写
* 文件描述符只能在父子进程之间共享,所以 PIPE 只能用于父子进程之间
* pipe 默认是堵塞的,使用 read 和 write 系统调用来以字节为单位进行读写
*/

#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>

int main() {
int r_w[2];
if (pipe(r_w) == -1) {
printf("failed to open pipe: %s\n", strerror(errno));
exit(1);
}

int read_fd = r_w[0];
int write_fd = r_w[1];
pid_t pid = fork();

if (pid == 0) {
char buf[10];
if (read(read_fd, buf, 10) == -1) {
printf("failed to read pipe: %s\n", strerror(errno));
exit(1);
}
printf("%s\n", buf);
} else {
char buf[10] = "12345abcd";
if (write(write_fd, buf, 10) == -1) {
printf("failed to write pipe: %s\n", strerror(errno));
exit(1);
}
sleep(1);
}
}

接下来看看 FIFO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/*
* fifo_write_example.c
* 使用路径 “/tmp/testfifo” 创建或者打开一个 FIFO 文件
*/
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <sys/stat.h>
#include<fcntl.h>

int main() {
int pid = getpid();
int fd = mkfifo("/tmp/testfifo", S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
if (fd < 0 && errno == EEXIST) {
fd = open("/tmp/testfifo", O_RDWR);
}
if (fd < 0) {
printf("failed to create fifo: %s\n", strerror(errno));
exit(1);
}

char msg[6];
sprintf(msg, "%s", "hello");
while (true) {
write(fd, msg, 6);
sleep(1);
}
}

/*
* fifo_read_example.c
* 使用路径 "/tmp/testfifo" 打开 FIFO 文件
* 即使两个进程不是父子/兄弟关系,但是使用同样的 FIFO 文件路径也能获得同样的共享内存
*/
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <sys/stat.h>
#include<fcntl.h>

int main() {
int fd = open("/tmp/testfifo", O_RDONLY);
if (fd < 0) {
printf("failed to create fifo: %s\n", strerror(errno));
exit(1);
}

char msg[6];

while (true) {
read(fd, msg, 6);
printf("%s\n", msg);
}
}

PIPE 和 FIFO 的区别在于前者只能通过文件描述符共享,后者可以通过文件路径共享,因此前者只适用于亲缘进程之间,后者适用于任何进程之间。
Message Queue 和 FIFO 相似,又稍有不同:

  • 二者都以文件路径作为共享的标志,但是 Message Queue 在文件路径之上再加了一个 project_id,也就是说使用 Message Queue 时,必须文件路径和 project_id 都相同才能获得同一个共享队列
  • FIFO 以文件的形式存在,但是 Message Queue 仍然只是内存 Buffer。Message Queue 只是以文件路径的 node 信息来生成队列的独特 ID,它自己不是文件,不创建文件,不读写文件,只是要求文件路径已经存在
  • Message Queue 里面的消息不再是以 byte 为单位来读写了,而是以一个打包的数据格式来读写。数据格式由程序自己定义,但是数据包中的前 8 个字节会被解释为 long 的消息类型,而且发送和接受时的数据格式大小必须一致
  • 使用 Message Queue 接受消息时虽然也是先进先出的,可以按消息的类型来筛选
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
// msg_write.c
#include <sys/msg.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <stdio.h>
#include <stdlib.h>
#include <zconf.h>
#include <cerrno>
#include <cstring>

struct msg {
long mtype;
char mdata[6];
};

int main() {
key_t key;

// 使用 / 路径和 ‘A’ 生成 id
if ((key = ftok("/", 'A')) <= 0) {
printf("failed to get key: %s\n", strerror(errno));
exit(1);
} else {
printf("ftkey: %d\n", key);
}

int msg_id = msgget(key, IPC_CREAT | 0666);
if (msg_id < 0) {
printf("failed to get message queue: %s\n", strerror(errno));
exit(1);
} else {
printf("msg_id: %d\n", msg_id);
}

#define count 6
msg msg_array[count];
char * word[count] = {"hello", "world", "happy", "codes", "great", "minds"};
for (int i = 0; i < count; i++) {
msg_array[i].mtype = (i >> 1) + 1;
memcpy(&(msg_array[i].mdata), word[i], 6 * sizeof(char));

// 以 msg 数据格式为基本的传送单元
if (msgsnd(msg_id, &msg_array[i], sizeof(msg), 0) < 0) {
printf("failed to write msg: %s\n", strerror(errno));
exit(1);
}
}
}

// msg_read.c
#include <sys/msg.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <stdio.h>
#include <stdlib.h>
#include <zconf.h>
#include <cerrno>
#include <cstring>

struct msg {
long mtype;
char mdata[6];
};

int main() {
key_t key;
// 使用 / 路径和 ‘A’ 生成 id
if ((key = ftok("/", 'A')) <= 0) {
printf("failed to get key: %s\n", strerror(errno));
exit(1);
} else {
printf("ftkey: %d\n", key);
}

int msg_id = msgget(key, IPC_CREAT | 0666);
if (msg_id < 0) {
printf("failed to get message queue: %s\n", strerror(errno));
exit(1);
} else {
printf("msg_id: %d\n", msg_id);
}

msg m;
while (true) {
// 同样以 msg 为基本的接受单元,筛选消息类型为 3 的数据包
if (msgrcv(msg_id, &m, sizeof(msg), 3, 0) < 0) {
printf("failed to receive msg: %s\n", strerror(errno));
exit(1);
}
printf("%s\n", m.mdata);
}
}

PIPE、FIFO 和 Message Queue 三种队列提供的共享内存的方式限制了读写数据的顺序。有时这正是我们想要的,比如在实现“生产-消费”模式的时候。但有时候我们想要更大的读写自由,这时候,mmap 和 shm 就派上用场了。它们能创建一个共享内存区域,在区域内可以自由寻址,而不用总是从最前或者最后读写。

4.4 mmap 和 shm

mmap 能够把一个文件映射到内存中,它有这么几个特征:

  • 以文件的数据初始化内存段,文件被修改会立即反应到内存中
  • 被映射的段的大小 segmentation_size 由 fd_offset、file_size 和 page_size。或者说,segmentation_size = min((file_size - fd_offset) * page_size * N),N >= 0。文件为空则段大小为 0,读取超出范围的地址会发生 SIGBUG 错误
  • 不能以“把文件映射到进程的虚拟内存段再选择共享出去与否”这种角度来理解 mmap,而要以“把文件映射到物理内存段共享给所有进程读,然后进程决定如何写这段内存”的角度来理解 mmap。进程在映射时选择的“私有”或者“共享”模式,只是决定进程的写操作是私有还是共享的,而读操作总是默认共享的。所以,选择 MAP_PRIVATE 并不能保证其内存段不会被其它进程修改,除非其它进程选择了 MAP_PIVATE 的映射模式
  • MAP_SHARED 模式下对内存段的修改会被写入到磁盘中,时机是调用 munmap 或者 msync 时

下面来看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135

// mmap_create_file.c 创建包含合理数据的文件,否则 mmap 得到空内存段
#include <sys/mman.h>
#include <sys/types.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>

struct data_t {
int id;
char label[4];
};

int main() {
char *file = "/tmp/testmmap";

int fd = open(file, O_RDWR | O_CREAT | O_TRUNC, 0666);
if (fd < 0) {
printf("failed to open file: %s\n", strerror(errno));
exit(1);
}
size_t file_size = sizeof(char) * 4096;

char *buf = (char *) malloc(file_size);
char label[4] = "CRT";
size_t data_cnt = file_size / sizeof(data_t);
data_t *data_buf = (data_t *) buf;
for (int i = 0; i < data_cnt; i++) {
(data_buf + i)->id = i;
memcpy((data_buf + i)->label, label, 4);
}
lseek(fd, 0, SEEK_SET);
write(fd, buf, file_size);

sleep(10);
close(fd);
}

// mmap_write_example.c 以共享写方式映射文件
// 它对内存段的修改会反应到磁盘文件上
// 也会反应到所有以 MAP_SHARED 方式映射的内存段中,
// 以及所有以 MAP_PRIVATE 映射但对应页还没有触发写时复制的内存段中
#include <sys/mman.h>
#include <sys/types.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>

struct data_t {
int id;
char label[4];
};

int main() {
char *file = "/tmp/testmmap";

int fd = open(file, O_RDWR, 0666);
if (fd < 0) {
printf("failed to open file: %s\n", strerror(errno));
exit(1);
}

size_t file_size = sizeof(char) * 4096;
data_t *ptr = (data_t *) mmap(NULL, file_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (ptr == NULL) {
printf("failed to map file: %s\n", strerror(errno));
exit(1);
}
close(fd);

ptr->id += 1;
char label[4] = "WRT";
memcpy(ptr->label, label, 4);

while (true) {
sleep(1);
}
munmap(ptr, file_size);
}

// mmap_read_example.c 以私有写方式映射文件
// 它对内存段的修改不会反应到磁盘文件上
// 它对任何页的修改对其它以任何方式映射这段内存的进程都不可见
// 但是其它以 MAP_SHARED 方式映射这段内存的进程所做的的修改,只要自己还没有写时复制过,就都可见

#include <sys/mman.h>
#include <sys/types.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>

struct data_t {
int id;
char label[4];
};

int main() {
char *file = "/tmp/testmmap";

int fd = open(file, O_RDWR | O_CREAT, 0666);
if (fd < 0) {
printf("failed to open file: %s\n", strerror(errno));
exit(1);
}

data_t *data_array = (data_t *) mmap(NULL, sizeof(data_t) * 3, PROT_READ, MAP_PRIVATE, fd, 0);
if (data_array == NULL) {
printf("failed to map file: %s\n", strerror(errno));
exit(1);
} else {
close(fd);
}

printf("first three data:\n");
for (int i = 0; i < 3; i++) {
printf("\tdata_t %d: id=%d, label=%s\n", i, (data_array + i)->id, (data_array + i)->label);
}
printf("=========\n");

printf("first data: \n");
while (true) {
printf("\tdata_t 0: id=%d, label=%s\n", data_array->id, data_array->label);
sleep(1);
}
}

使用具体文件做映射的好处在于可以使用文件初始化内存段,并且可以在任意进程之间以文件路径为 ID 传递共享区域的信息。但是其也有弊端,比如内存段大小受到文件大小的限制,而且对文件和内存段的修改会互相影响。
匿名文件映射与之恰好互补:匿名文件映射的内存段以 0 初始化,不受具体文件大小限制,不会和磁盘文件互相影响。但是,由于没有具体文件路径在多进程之间用作 ID,匿名映射得到的空间只能在 fork 出来的父子和兄弟进程之间共享。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// mmap_anonymous_example.c 
#include <sys/mman.h>
#include <sys/types.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>

struct data_t {
int id;
int group;
};

int main() {
int data_cnt = 1024;
size_t mem_size = sizeof(data_t) * data_cnt;

data_t *data_private = (data_t *) mmap(NULL, mem_size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
data_t *data_shared = (data_t *) mmap(NULL, mem_size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);

for (int i = 0; i < data_cnt; i++) {
(data_private + i)->id = i;
(data_private + i)->group = i % 10;

(data_shared + i)->id = i;
(data_shared + i)->group = i % 10;
}

int pid = fork();
if (pid < 0) {
printf("failed to create child process: %s\n", strerror(errno));
exit(1);
} else if (pid == 0) {
data_private->id = 99;
data_private->group = 99;
data_shared->id = 66;
data_shared->group = 66;
printf("first of data_private in child become: id=%d, group=%d\n", data_private->id, data_private->group);
printf("first of data_shared in child become: id=%d, group=%d\n", data_shared->id, data_shared->group);
} else if (pid > 0) {
sleep(3);
printf("first of data_private in parent is: id=%d, group=%d\n", data_private->id, data_private->group);
printf("first of data_shared in parent is: id=%d, group=%d\n", data_shared->id, data_shared->group);
}
}

// output:
// first of data_private in child become: id=99, group=99
// first of data_shared in child become: id=66, group=66
// first of data_private in parent is: id=0, group=0
// first of data_shared in parent is: id=66, group=66

匿名映射不再基于文件,fd 使用 -1,offset 为 0,MAP_SHAREDMAP_PRIVATE 的机制仍然是一样的。
也正是由于它不再使用文件作为 ID,所以只有在共享了指针的父子和兄弟进程之间能够使用。
shm 集合了磁盘文件映射的部分优点和匿名文件映射的所有优点:完全基于内存,但是又有文件路径可作为 ID 使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
// shm_write_example.c
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/types.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>

struct data_t {
int id;
int group;
};


int main() {
char *shm_file = "/dev/shm/A";
char project_id = 'A';
key_t key = ftok(shm_file, project_id);
if (key < 0) {
printf("failed to get key: %s\n", strerror(errno));
exit(1);
}

int data_cnt = 1024;
size_t mem_size = sizeof(data_t) * data_cnt;
int shm_id = shmget(key, mem_size, IPC_CREAT | 0666);
if (shm_id <= 0) {
printf("failed to get shm id: %s\n", strerror(errno));
exit(1);
}

data_t *data_ptr;
data_ptr = (data_t *) shmat(shm_id, NULL, 0);
if (data_ptr == NULL) {
printf("failed to create shared memory: %s\n", strerror(errno));
exit(1);
}

for (int i = 0; i < data_cnt; i++) {
(data_ptr + i)->id = i;
(data_ptr + i)->group = i % 10;
}

}

// shm_read_example.c
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/types.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>

struct data_t {
int id;
int group;
};


int main() {
char *shm_file = "/dev/shm/A";
char project_id = 'A';
key_t key = ftok(shm_file, project_id);
if (key < 0) {
printf("failed to get key: %s\n", strerror(errno));
exit(1);
}

int data_cnt = 1024;
size_t mem_size = sizeof(data_t) * data_cnt;
int shm_id = shmget(key, mem_size, IPC_CREAT | 0666);
if (shm_id <= 0) {
printf("failed to get shm id: %s\n", strerror(errno));
exit(1);
}

data_t *data_ptr;
data_ptr = (data_t *) shmat(shm_id, NULL, 0);
if (data_ptr == NULL) {
printf("failed to create shared memory: %s\n", strerror(errno));
exit(1);
}

for (int i = 0; i < data_cnt; i++) {
printf("data_t %d: id=%d, group=%d\n", i, (data_ptr + i)->id, (data_ptr + i)->group);
}

}

// output:
// data_t 0: id=0, group=0
// data_t 1: id=1, group=1
// ...
// data_t 1022: id=1022, group=2
// data_t 1023: id=1023, group=3

可以看到 shm 和 Message Queue 在创建共享内存区的语法上非常接近,都是以 ftok 生成的 key 作为 ID 生成共享内存区域。只不过前者使用的是用户内存空间,允许的容量更大,在范围内也能自由寻址读写。

4.5 信号量

正如我们所看到的,内核空间的信号有事件通知,三个队列都有阻塞功能,而用户空间的 mmap 和 shm 共享内存手段却既没有事件通知也没有阻塞功能。这时候,带有阻塞功能的 sem 信号量就成了救世主。
sem 不是队列,但是可以把它看作一个阻塞队列的计数器:每次读取 sem 时如果它的值大于 0 则减 N,如果等于 0 则阻塞。

// sem_P_example.c
#include <sys/ipc.h>
#include <sys/types.h>
#include <sys/sem.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>

static sembuf opP;
static sembuf opV;

// 封装读取阻塞操作
void P(int sem_id) {
    while (semop((sem_id), &opP, 1) < 0) {
        if (errno == EINTR) continue;
        printf("P error: %s\n", strerror(errno));
    }
}

// 封装写入操作
// 其实读取和写入都是修改信号量
void V(int sem_id) {
    while (semop((sem_id), &opV, 1) < 0) {
        if (errno == EINTR) continue;
        printf("V error: %s\n", strerror(errno));
    }
}

key_t get_key(char *file, char id) {
    key_t key = ftok(file, id);
    if (key <= 0) {
        printf("failed to get key: %s\n", strerror(errno));
        exit(1);
    }
    return key;
}

void init_PV() {
    opP.sem_num = 0;
    opP.sem_flg = 0;
    opP.sem_op = -1;  // 读取操作,如果信号量的值小于 1 就会阻塞,直到值大于等于 0,然后把它减 1

    opV.sem_num = 0;
    opV.sem_flg = 0;
    opV.sem_op = 1;  // 写入操作,把信号量的值加 1
}

int create_sim(key_t key) {
    int sem_id = semget(key, 1, IPC_CREAT | 0666);
    if (sem_id < 0) {
        printf("failed to create sem: %s\n", strerror(errno));
        exit(1);
    }

    // 初始时信号量的值设置为 0
    if (semctl(sem_id, 0, SETVAL, 0) < 0) {
        printf("failed to init sem: %s\n", strerror(errno));
        exit(1);
    }

    init_PV();

    return sem_id;
}

int main() {
    char *file = "/tmp/testsem";
    char project_id = 'Z';
    key_t key = get_key(file, project_id);
    int sem_id = create_sim(key);

    while (true) {
        printf("acquiring sem...\n");
        P(sem_id);
        printf("acquiring sem success...\n");
    }
}

// sem_V_example.c
int main() {
    char *file = "/tmp/testsem";
    char project_id = 'Z';
    key_t key = get_key(file, project_id);
    int sem_id = create_sim(key);

    printf("releasing sem...\n");
    V(sem_id);
}

sem 虽然提供了堵塞的功能,但是使用方式很受限,不仅只能通过系统调用交互,而且还限定了数据格式。如果想实现队列,需要配合 mmap 或者 shm 使用。
但是不要小看信号量提供的堵塞功能。堵塞不仅仅只是对进程的挂起,它也是实现进程互斥的一种方式(即加锁),其作用非常重要。

4.6 waitpid

多进程并发时,除了在进程运行时需要协作外,当进程退出时也需要协作,比如我们想知道进程是不是遇到了什么异常情况而退出的。
Linux 的设计是这样的:当进程退出后,保留进程的资源(也就是状态),只是向其父进程发送 SIGCHLD 信号。父进程在收集必要信息后再回收子进程的资源。而收集已退出子进程信息并回收其资源的函数就是 waitpid 系统调用。
waitpid 的使用方式是一个经典的话题,在网络上有很多讨论,这里不再去赘述。在这里想要强调的是:收到 SIGCHLD 信号和 waitpid 回收子进程是两个独立的事件。
父进程可以堵塞地调用 waitpid 等待子进程结束并回收其资源,但子进程结束发送 SIGCHLD 后并不意味着父进程马上从 waitpid 被阻塞的地方开始执行,它仍然先执行信号处理程序。
这提供了一个在父进程非阻塞调用 waitpid 的思路:在 SIGCHLD 的处理函数中非阻塞地调用 waitpid。但是需要注意,在 SIGCHLD 处理程序运行时,会阻塞 SIGCHLD 信号。所以不要天真地每收到一个 SIGCHLD 就执行一次 waitpid,而是要每收到一次 SIGCHLD 就尽可能多地调用 waitpid

五、总结

多进程并发既是多任务系统的必然要求,又能发挥多核 CPU 的优势。
fork 是创建新进程的手段,创建出来的进程成为原来进程的子进程,并继承了父进程的内存空间和文件描述符表。可以使用 exec 删除当前的内存空间并加载一个新的程序,不过文件描述符表仍然会保存。
多进程协作的关键在于“多进程怎么共享信息”和“进程怎么处理收到的信息”,也即多进程通信和消息处理。
多进程通信的手段基于共享内存。信号机制是内核内存空间的共享,不过其读写行为完全由内核控制。除了信号,内核内存空间还有三种先进先出队列用于共享内存。PIPE 只适用于亲缘进程,FIFO 和 Message Queue 通过文件路径 ID 在任意进程之间共享内存。mmap、 shm 和 sem 用于共享用户内存空间,mmap 匿名文件映射和 PIPE 一样只适用于亲缘进程之间,而 mmap 实名文件、 shm 和 sem 可以用于任意进程之间。mmap 实名文件获得的内存空间和磁盘会互相影响,而 shm 和 sem 不受此影响。
进程可以阻塞地等待信息再处理消息,也可以使用异步等消息来了再处理。信号处理机制是由内核全流程接管的异步的,内核提供的三种队列都是阻塞的(当然也可以选择不阻塞)。共享用户空间内存的三种机制中,只有信号量是堵塞的,没有任何一种是异步的。
进程结束后其状态被保存下来了,必须要到父进程通过 waitpid 收集状态并回收资源。