之前在《基于异常控制流的进程协作》 一文中我们从操作系统和硬件的角度简单分析了单核 CPU 上多进程的并发和协作的实现机制 。这次我们换个角度,从使用方式 的角度来分析并发编程。
一、为什么要使用多进程并发 我们先想想,为什么需要多进程并发?
多任务机制。一个程序不可能实现所有的任务,所以计算机上必然要运行多个程序。如果选择串行模式的话,一个进程只能在等待另一个进程结束后才能执行,很多功能将无法实现。比如在打开文件浏览器程序浏览文件时,就不能开始执行 word 处理程序
加快任务处理的速度。一个 CPU 在一个时刻内只能执行一个进程的指令,反之亦然,一个进程在一个时刻内只能享有一个 CPU 资源。如果计算机硬件上有多个 CPU ,我们就能利用多个进程来使用多个 CPU 的资源
既然有多进程的需求,那就有多进程协作的需求。进程和进程之间如何协作呢? 什么样算是协作?协作就是一个进程告诉另一个进程发生了什么事,然后另一个进程决定如何反应。 这其实就是两件事:怎么沟通信息和如何处理信息。
二、多进程并发如何沟通信息 如何沟通信息呢?程序无非就是“输入-处理-输出”的流程,所谓接受信息和发送信息,其实就是在接受输入和产生输出,只不过输入的来源是其它进程的输出,输出的去向是其它进程的输入。但是进程又偏偏给每个程序一个独自享有整个计算机资源的假象,每个进程是感知不到其它进程的,于是对每个进程而言,“其它”进程的输入和输出就成了无稽之谈。 不过,我们其实还是有机会的。想想看,输入和输出都是保存在内存中的,进程不一定要明确地把信息发送到其它某一个进程中去,只需要多个进程能够共享读写同样的内存区域 就行了。 怎么样创建共享的内存区域呢?
2.1 共享内核内存空间 内核空间中最上面的两个页大小的空间是每个进程私有的空间,暂且称为 2P 。虽然 2P 是私有的,但是当其它进程处于内核模式下时,是有读写任意地址的权限的,其中自然包括 2P。而恰好在 2P 空间的最下面保存着进程的 thread_info::task_struct *
,task_struct
又保存着进程的信号位向量。当进程在内核模式下读写这个区域时,就能实现向进程发送信号 或者接受信号的功能。 在内核空间的最下面保存着内核代码和数据,暂且把这段区域称为 KN,KN 对所有进程来说都是共享读的。把除去 2P 和 KN 后剩下的区域称为 PP,操作系统提供了 PIPE 、FIFO 和 Message Queue 三种 first-in-first-out 的队列在 PP 中创建共享内存段。PIPE 只是一个内存 Buffer,而 FIFO 被设计为一个虚拟内存文件,可以像文件一样打开和读写(不过并没有真正读写磁盘)。Message Queue 和 FIFO 都使用到了文件路径,不过 FIFO 是真的创建了一个虚拟文件,而 Message Queue 只是使用已存在文件的路径作为 ID。和 PIPE 与 FIFO 不同,Message Queue 中传递的数据不再是以字节(byte)为单位,而是以特定类型的数据包(byte package, message)为单位。
2.2 共享用户内存空间 在用户空间中可以使用 mmap 、shm 和 sem 创建共享内存段。 mmap 可以把磁盘文件映射到内存区域中,如果两个进程映射了同一个文件,它们就共享了同一个内存段。mmap 也可以映射一个匿名文件(fd 为 0),不过此时没有了文件路径作为 ID。 shm 和 sem 都使用文件路径作为 ID,但是 shm 创建的内存空间没有阻塞功能,如果想要阻塞功能,就需要 sem。
2.3 共享“共享内存在哪”的信息 怎么样告知其它进程一段内存区域是共享的呢?task_struct
中的信号由操作系统内核在内核模式中自动读和写,所以不用担心。 FIFO、Message Queue、mmap 磁盘文件、shm、sem 都使用到了具体的文件路径/ID,所以多进程中可以通过相同的文件名来获取共享内存区,也可以通过 fork
和 exec
继承的变量和文件描述符表来获取共享内存区域。 通过 PIPE 和 mmap 匿名文件创建的共享内存区域,只有通过这些函数调用的返回值(指针或者文件描述符)才能获取到,而这些返回值保存在进程的虚拟内存空间中,只有通过 fork
复制父进程的虚拟内存空间才能获取到。
三、多进程并发如何处理信息 3.1 如何发送和接受消息 程序可以用 kill
系统调用向指定进程发送信号,但是并没有接受信号的系统调用,因为发送信号和接受信号是同时发生的(此处接受信号并不等于处理信号);PIPE 和 FIFO 队列使用 read
和 write
系统调用来接受和发送信息,Message Queue 使用 msgsnd
和 msgrcv
系统调用来发送和接受消息;mmap 和 shm 直接使用指针地址读写。
3.2 如何处理消息 发送消息总是即时的,但是接受新消息却需要程序主动去检查或者被通知才能知道。在接受到新消息前,程序是异步 地执行其它指令,还是阻塞 着直到新消息到来呢? 信号处理机制总是异步的,除非通过 waitpid
显示地阻塞当前进程并等待子进程的 SIGCHLD
信号。进程从内核模式切换到用户模式之际,会自动检查接受到地信号并进行处理。而这样地时机,一般发生在进程刚刚被调入 CPU 开始执行时,或着进程刚刚完成一次系统调用并返回时。程序可以用 signal
来为信号注册优先的处理函数,如果没有注册,则调用默认的信号处理程序。 三种队列总是阻塞的,当队列满时会阻塞写,当队列空时会阻塞读。 使用 mmap 或者 shm 的话,就没有操作系统提供的异步或者阻塞机制,只能由程序自己来实现。
四、多进程协作 API 4.1 fork 和 exec 多进程协作的前提是创建进程。fork
用来从当前进程分叉出一个子进程,然后返回两次,在父进程中返回子进程的 pid,在子进程中返回 0。子进程“继承”了父进程的页表和内存段,但是子进程和父进程所有“非共享写”内存段都被设置为私有的写时复制的,其中的页都被设置为只读的。当两个进程中任何一个尝试进行非共享写内存段中的页面写操作时,会触发页错误从而进行写时复制。除了页表和 vma 外,子进程还复制了父进程的文件描述符表:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 #include <sys/types.h> #include <unistd.h> #include <stdio.h> #include <stdlib.h> int g = 5 ;int main () { int l = 3 ; pid_t pid = fork(); if (pid == 0 ) { l += 1 ; printf ("child process: pid=%d, g=%d, l=%d\n" , pid, g, l); } else { printf ("parent process: pid=%d, g=%d, l=%d\n" , pid, g, l); } }
接下来看一个 exec
的示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 #include <sys/types.h> #include <unistd.h> #include <stdio.h> #include <stdlib.h> #include <errno.h> #include <cstring> int g = 5 ;int main () { int l = 3 ; pid_t pid = fork(); if (pid == 0 ) { l += 1 ; printf ("child process: pid=%d, g=%d, l=%d\n" , pid, g, l); char * argv_list[] = {"/bin/ls" ,"/" ,NULL }; int ok = execv("/bin/ls" , argv_list); if (ok == -1 ) { printf ("failed to replace process: %s\n" , strerror(errno)); } printf ("child process: pid=%d, g=%d, l=%d\n" , pid, g, l); } else { printf ("parent process: pid=%d, g=%d, l=%d\n" , pid, g, l); sleep(3 ); } }
4.2 信号机制 程序使用 signal
来注册信号处理函数,使用 kill
来向其它进程发送信号。 使用 signal
注册的信号处理函数,在被调用时会阻塞其它信号。在有些平台上,信号处理函数执行完成时需要程序来重新注册,在此之前接受到的信号可能会采取默认处理程序。如果想要更大的控制力度,可以使用 sigaction
函数,但是它用起来也更困难。kill
函数的第一个参数为 pid,pid 如何发挥作用,在 linux 文档里面有详细说明,这里就不赘述了。 需要注意的是,信号处理有可能打断挂起的系统调用,系统调用被打断后可能不再继续执行,而是返回 ERINT 错误。信号处理函数自己也有可能被其它信号处理函数打断,但是被打断后会继续执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 #include <stdlib.h> #include <stdio.h> #include <signal.h> #include <sys/types.h> #include <zconf.h> int s1 = 0 ;int s2 = 0 ;int pid = 0 ;void sig_user1_handler (int ) { kill(pid, SIGABRT); s1 = 1 ; exit (s1 + s2); } void sig_user2_handler (int ) { s2 = 2 ; } int main () { signal(SIGINT, sig_user1_handler); signal(SIGABRT, sig_user2_handler); pid = getpid(); while (true ) {} }
有时候进程可能已经没有其它地任务需要处理,但是还不能退出,因为需要它来处理信号。常规的思路时在进程最后使用一个无限的循环来保持进程,在循环中可以使用 sleep
或者 pause
这样的函数避免过高的 CPU 占用。 在这种情况下,sigsuspend
函数比 sleep
或者 pause
更合适。因为它不是以固定时间点长来决定复苏时间的,而是在收到信号的时候决定复苏,并直接进入信号处理程序。显然,sigsuspend
的精度更高。
4.3 三种队列 接下来分别说明 PIPE、FIFO 和 Message Queue 三种先进先出队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 #include <sys/types.h> #include <unistd.h> #include <stdlib.h> #include <stdio.h> #include <string.h> #include <errno.h> int main () { int r_w[2 ]; if (pipe(r_w) == -1 ) { printf ("failed to open pipe: %s\n" , strerror(errno)); exit (1 ); } int read_fd = r_w[0 ]; int write_fd = r_w[1 ]; pid_t pid = fork(); if (pid == 0 ) { char buf[10 ]; if (read(read_fd, buf, 10 ) == -1 ) { printf ("failed to read pipe: %s\n" , strerror(errno)); exit (1 ); } printf ("%s\n" , buf); } else { char buf[10 ] = "12345abcd" ; if (write(write_fd, buf, 10 ) == -1 ) { printf ("failed to write pipe: %s\n" , strerror(errno)); exit (1 ); } sleep(1 ); } }
接下来看看 FIFO
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 #include <sys/types.h> #include <unistd.h> #include <stdlib.h> #include <stdio.h> #include <string.h> #include <errno.h> #include <sys/stat.h> #include <fcntl.h> int main () { int pid = getpid(); int fd = mkfifo("/tmp/testfifo" , S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); if (fd < 0 && errno == EEXIST) { fd = open("/tmp/testfifo" , O_RDWR); } if (fd < 0 ) { printf ("failed to create fifo: %s\n" , strerror(errno)); exit (1 ); } char msg[6 ]; sprintf (msg, "%s" , "hello" ); while (true ) { write(fd, msg, 6 ); sleep(1 ); } } #include <sys/types.h> #include <unistd.h> #include <stdlib.h> #include <stdio.h> #include <string.h> #include <errno.h> #include <sys/stat.h> #include <fcntl.h> int main () { int fd = open("/tmp/testfifo" , O_RDONLY); if (fd < 0 ) { printf ("failed to create fifo: %s\n" , strerror(errno)); exit (1 ); } char msg[6 ]; while (true ) { read(fd, msg, 6 ); printf ("%s\n" , msg); } }
PIPE 和 FIFO 的区别在于前者只能通过文件描述符共享,后者可以通过文件路径共享,因此前者只适用于亲缘进程之间,后者适用于任何进程之间。 Message Queue 和 FIFO 相似,又稍有不同:
二者都以文件路径作为共享的标志,但是 Message Queue 在文件路径之上再加了一个 project_id,也就是说使用 Message Queue 时,必须文件路径和 project_id 都相同才能获得同一个共享队列
FIFO 以文件的形式存在,但是 Message Queue 仍然只是内存 Buffer。Message Queue 只是以文件路径的 node 信息来生成队列的独特 ID,它自己不是文件,不创建文件,不读写文件,只是要求文件路径已经存在
Message Queue 里面的消息不再是以 byte 为单位来读写了,而是以一个打包的数据格式来读写。数据格式由程序自己定义,但是数据包中的前 8 个字节会被解释为 long 的消息类型,而且发送和接受时的数据格式大小必须一致
使用 Message Queue 接受消息时虽然也是先进先出的,可以按消息的类型来筛选
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 #include <sys/msg.h> #include <sys/types.h> #include <sys/ipc.h> #include <stdio.h> #include <stdlib.h> #include <zconf.h> #include <cerrno> #include <cstring> struct msg { long mtype; char mdata[6 ]; }; int main () { key_t key; if ((key = ftok("/" , 'A' )) <= 0 ) { printf ("failed to get key: %s\n" , strerror(errno)); exit (1 ); } else { printf ("ftkey: %d\n" , key); } int msg_id = msgget(key, IPC_CREAT | 0666 ); if (msg_id < 0 ) { printf ("failed to get message queue: %s\n" , strerror(errno)); exit (1 ); } else { printf ("msg_id: %d\n" , msg_id); } #define count 6 msg msg_array[count]; char * word[count] = {"hello" , "world" , "happy" , "codes" , "great" , "minds" }; for (int i = 0 ; i < count; i++) { msg_array[i].mtype = (i >> 1 ) + 1 ; memcpy (&(msg_array[i].mdata), word[i], 6 * sizeof (char )); if (msgsnd(msg_id, &msg_array[i], sizeof (msg), 0 ) < 0 ) { printf ("failed to write msg: %s\n" , strerror(errno)); exit (1 ); } } } #include <sys/msg.h> #include <sys/types.h> #include <sys/ipc.h> #include <stdio.h> #include <stdlib.h> #include <zconf.h> #include <cerrno> #include <cstring> struct msg { long mtype; char mdata[6 ]; }; int main () { key_t key; if ((key = ftok("/" , 'A' )) <= 0 ) { printf ("failed to get key: %s\n" , strerror(errno)); exit (1 ); } else { printf ("ftkey: %d\n" , key); } int msg_id = msgget(key, IPC_CREAT | 0666 ); if (msg_id < 0 ) { printf ("failed to get message queue: %s\n" , strerror(errno)); exit (1 ); } else { printf ("msg_id: %d\n" , msg_id); } msg m; while (true ) { if (msgrcv(msg_id, &m, sizeof (msg), 3 , 0 ) < 0 ) { printf ("failed to receive msg: %s\n" , strerror(errno)); exit (1 ); } printf ("%s\n" , m.mdata); } }
PIPE、FIFO 和 Message Queue 三种队列提供的共享内存的方式限制了读写数据的顺序。有时这正是我们想要的,比如在实现“生产-消费”模式的时候。但有时候我们想要更大的读写自由,这时候,mmap 和 shm 就派上用场了。它们能创建一个共享内存区域,在区域内可以自由寻址,而不用总是从最前或者最后读写。
4.4 mmap 和 shm mmap 能够把一个文件映射到内存中,它有这么几个特征:
以文件的数据初始化内存段,文件被修改会立即反应到内存中
被映射的段的大小 segmentation_size 由 fd_offset、file_size 和 page_size。或者说,segmentation_size = min((file_size - fd_offset) * page_size * N),N >= 0。文件为空则段大小为 0,读取超出范围的地址会发生 SIGBUG 错误
不能以“把文件映射到进程的虚拟内存段再选择共享出去与否”这种角度来理解 mmap,而要以“把文件映射到物理内存段共享给所有进程读,然后进程决定如何写这段内存”的角度来理解 mmap。进程在映射时选择的“私有”或者“共享”模式,只是决定进程的写操作是私有还是共享的,而读操作总是默认共享的。所以,选择 MAP_PRIVATE 并不能保证其内存段不会被其它进程修改,除非其它进程选择了 MAP_PIVATE 的映射模式
MAP_SHARED 模式下对内存段的修改会被写入到磁盘中,时机是调用 munmap 或者 msync 时
下面来看代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 #include <sys/mman.h> #include <sys/types.h> #include <fcntl.h> #include <unistd.h> #include <stdlib.h> #include <stdio.h> #include <string.h> #include <errno.h> struct data_t { int id; char label[4 ]; }; int main () { char *file = "/tmp/testmmap" ; int fd = open(file, O_RDWR | O_CREAT | O_TRUNC, 0666 ); if (fd < 0 ) { printf ("failed to open file: %s\n" , strerror(errno)); exit (1 ); } size_t file_size = sizeof (char ) * 4096 ; char *buf = (char *) malloc (file_size); char label[4 ] = "CRT" ; size_t data_cnt = file_size / sizeof (data_t ); data_t *data_buf = (data_t *) buf; for (int i = 0 ; i < data_cnt; i++) { (data_buf + i)->id = i; memcpy ((data_buf + i)->label, label, 4 ); } lseek(fd, 0 , SEEK_SET); write(fd, buf, file_size); sleep(10 ); close(fd); } #include <sys/mman.h> #include <sys/types.h> #include <fcntl.h> #include <unistd.h> #include <stdlib.h> #include <stdio.h> #include <string.h> #include <errno.h> struct data_t { int id; char label[4 ]; }; int main () { char *file = "/tmp/testmmap" ; int fd = open(file, O_RDWR, 0666 ); if (fd < 0 ) { printf ("failed to open file: %s\n" , strerror(errno)); exit (1 ); } size_t file_size = sizeof (char ) * 4096 ; data_t *ptr = (data_t *) mmap(NULL , file_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0 ); if (ptr == NULL ) { printf ("failed to map file: %s\n" , strerror(errno)); exit (1 ); } close(fd); ptr->id += 1 ; char label[4 ] = "WRT" ; memcpy (ptr->label, label, 4 ); while (true ) { sleep(1 ); } munmap(ptr, file_size); } #include <sys/mman.h> #include <sys/types.h> #include <fcntl.h> #include <unistd.h> #include <stdlib.h> #include <stdio.h> #include <string.h> #include <errno.h> struct data_t { int id; char label[4 ]; }; int main () { char *file = "/tmp/testmmap" ; int fd = open(file, O_RDWR | O_CREAT, 0666 ); if (fd < 0 ) { printf ("failed to open file: %s\n" , strerror(errno)); exit (1 ); } data_t *data_array = (data_t *) mmap(NULL , sizeof (data_t ) * 3 , PROT_READ, MAP_PRIVATE, fd, 0 ); if (data_array == NULL ) { printf ("failed to map file: %s\n" , strerror(errno)); exit (1 ); } else { close(fd); } printf ("first three data:\n" ); for (int i = 0 ; i < 3 ; i++) { printf ("\tdata_t %d: id=%d, label=%s\n" , i, (data_array + i)->id, (data_array + i)->label); } printf ("=========\n" ); printf ("first data: \n" ); while (true ) { printf ("\tdata_t 0: id=%d, label=%s\n" , data_array->id, data_array->label); sleep(1 ); } }
使用具体文件做映射的好处在于可以使用文件初始化内存段,并且可以在任意进程之间以文件路径为 ID 传递共享区域的信息。但是其也有弊端,比如内存段大小受到文件大小的限制,而且对文件和内存段的修改会互相影响。 匿名文件映射与之恰好互补:匿名文件映射的内存段以 0 初始化,不受具体文件大小限制,不会和磁盘文件互相影响。但是,由于没有具体文件路径在多进程之间用作 ID,匿名映射得到的空间只能在 fork
出来的父子和兄弟进程之间共享。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 #include <sys/mman.h> #include <sys/types.h> #include <fcntl.h> #include <unistd.h> #include <stdlib.h> #include <stdio.h> #include <string.h> #include <errno.h> struct data_t { int id; int group; }; int main () { int data_cnt = 1024 ; size_t mem_size = sizeof (data_t ) * data_cnt; data_t *data_private = (data_t *) mmap(NULL , mem_size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1 , 0 ); data_t *data_shared = (data_t *) mmap(NULL , mem_size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1 , 0 ); for (int i = 0 ; i < data_cnt; i++) { (data_private + i)->id = i; (data_private + i)->group = i % 10 ; (data_shared + i)->id = i; (data_shared + i)->group = i % 10 ; } int pid = fork(); if (pid < 0 ) { printf ("failed to create child process: %s\n" , strerror(errno)); exit (1 ); } else if (pid == 0 ) { data_private->id = 99 ; data_private->group = 99 ; data_shared->id = 66 ; data_shared->group = 66 ; printf ("first of data_private in child become: id=%d, group=%d\n" , data_private->id, data_private->group); printf ("first of data_shared in child become: id=%d, group=%d\n" , data_shared->id, data_shared->group); } else if (pid > 0 ) { sleep(3 ); printf ("first of data_private in parent is: id=%d, group=%d\n" , data_private->id, data_private->group); printf ("first of data_shared in parent is: id=%d, group=%d\n" , data_shared->id, data_shared->group); } }
匿名映射不再基于文件,fd 使用 -1,offset 为 0,MAP_SHARED
和 MAP_PRIVATE
的机制仍然是一样的。 也正是由于它不再使用文件作为 ID,所以只有在共享了指针的父子和兄弟进程之间能够使用。shm
集合了磁盘文件映射的部分优点和匿名文件映射的所有优点:完全基于内存,但是又有文件路径可作为 ID 使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 #include <sys/ipc.h> #include <sys/shm.h> #include <sys/types.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <errno.h> struct data_t { int id; int group; }; int main () { char *shm_file = "/dev/shm/A" ; char project_id = 'A' ; key_t key = ftok(shm_file, project_id); if (key < 0 ) { printf ("failed to get key: %s\n" , strerror(errno)); exit (1 ); } int data_cnt = 1024 ; size_t mem_size = sizeof (data_t ) * data_cnt; int shm_id = shmget(key, mem_size, IPC_CREAT | 0666 ); if (shm_id <= 0 ) { printf ("failed to get shm id: %s\n" , strerror(errno)); exit (1 ); } data_t *data_ptr; data_ptr = (data_t *) shmat(shm_id, NULL , 0 ); if (data_ptr == NULL ) { printf ("failed to create shared memory: %s\n" , strerror(errno)); exit (1 ); } for (int i = 0 ; i < data_cnt; i++) { (data_ptr + i)->id = i; (data_ptr + i)->group = i % 10 ; } } #include <sys/ipc.h> #include <sys/shm.h> #include <sys/types.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <errno.h> struct data_t { int id; int group; }; int main () { char *shm_file = "/dev/shm/A" ; char project_id = 'A' ; key_t key = ftok(shm_file, project_id); if (key < 0 ) { printf ("failed to get key: %s\n" , strerror(errno)); exit (1 ); } int data_cnt = 1024 ; size_t mem_size = sizeof (data_t ) * data_cnt; int shm_id = shmget(key, mem_size, IPC_CREAT | 0666 ); if (shm_id <= 0 ) { printf ("failed to get shm id: %s\n" , strerror(errno)); exit (1 ); } data_t *data_ptr; data_ptr = (data_t *) shmat(shm_id, NULL , 0 ); if (data_ptr == NULL ) { printf ("failed to create shared memory: %s\n" , strerror(errno)); exit (1 ); } for (int i = 0 ; i < data_cnt; i++) { printf ("data_t %d: id=%d, group=%d\n" , i, (data_ptr + i)->id, (data_ptr + i)->group); } }
可以看到 shm 和 Message Queue 在创建共享内存区的语法上非常接近,都是以 ftok 生成的 key 作为 ID 生成共享内存区域。只不过前者使用的是用户内存空间,允许的容量更大,在范围内也能自由寻址读写。
4.5 信号量 正如我们所看到的,内核空间的信号有事件通知,三个队列都有阻塞功能,而用户空间的 mmap 和 shm 共享内存手段却既没有事件通知也没有阻塞功能。这时候,带有阻塞功能的 sem 信号量就成了救世主。 sem 不是队列,但是可以把它看作一个阻塞队列的计数器:每次读取 sem 时如果它的值大于 0 则减 N,如果等于 0 则阻塞。
// sem_P_example.c
#include <sys/ipc.h>
#include <sys/types.h>
#include <sys/sem.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
static sembuf opP;
static sembuf opV;
// 封装读取阻塞操作
void P(int sem_id) {
while (semop((sem_id), &opP, 1) < 0) {
if (errno == EINTR) continue;
printf("P error: %s\n", strerror(errno));
}
}
// 封装写入操作
// 其实读取和写入都是修改信号量
void V(int sem_id) {
while (semop((sem_id), &opV, 1) < 0) {
if (errno == EINTR) continue;
printf("V error: %s\n", strerror(errno));
}
}
key_t get_key(char *file, char id) {
key_t key = ftok(file, id);
if (key <= 0) {
printf("failed to get key: %s\n", strerror(errno));
exit(1);
}
return key;
}
void init_PV() {
opP.sem_num = 0;
opP.sem_flg = 0;
opP.sem_op = -1; // 读取操作,如果信号量的值小于 1 就会阻塞,直到值大于等于 0,然后把它减 1
opV.sem_num = 0;
opV.sem_flg = 0;
opV.sem_op = 1; // 写入操作,把信号量的值加 1
}
int create_sim(key_t key) {
int sem_id = semget(key, 1, IPC_CREAT | 0666);
if (sem_id < 0) {
printf("failed to create sem: %s\n", strerror(errno));
exit(1);
}
// 初始时信号量的值设置为 0
if (semctl(sem_id, 0, SETVAL, 0) < 0) {
printf("failed to init sem: %s\n", strerror(errno));
exit(1);
}
init_PV();
return sem_id;
}
int main() {
char *file = "/tmp/testsem";
char project_id = 'Z';
key_t key = get_key(file, project_id);
int sem_id = create_sim(key);
while (true) {
printf("acquiring sem...\n");
P(sem_id);
printf("acquiring sem success...\n");
}
}
// sem_V_example.c
int main() {
char *file = "/tmp/testsem";
char project_id = 'Z';
key_t key = get_key(file, project_id);
int sem_id = create_sim(key);
printf("releasing sem...\n");
V(sem_id);
}
sem 虽然提供了堵塞的功能,但是使用方式很受限,不仅只能通过系统调用交互,而且还限定了数据格式。如果想实现队列,需要配合 mmap 或者 shm 使用。 但是不要小看信号量提供的堵塞功能。堵塞不仅仅只是对进程的挂起,它也是实现进程互斥的一种方式(即加锁),其作用非常重要。
4.6 waitpid 多进程并发时,除了在进程运行时需要协作外,当进程退出时也需要协作,比如我们想知道进程是不是遇到了什么异常情况而退出的。 Linux 的设计是这样的:当进程退出后,保留进程的资源(也就是状态),只是向其父进程发送 SIGCHLD
信号。父进程在收集必要信息后再回收子进程的资源。而收集已退出子进程信息并回收其资源的函数就是 waitpid
系统调用。waitpid
的使用方式是一个经典的话题,在网络上有很多讨论,这里不再去赘述。在这里想要强调的是:收到 SIGCHLD
信号和 waitpid
回收子进程是两个独立 的事件。 父进程可以堵塞地调用 waitpid
等待子进程结束并回收其资源,但子进程结束发送 SIGCHLD
后并不意味着父进程马上从 waitpid
被阻塞的地方开始执行,它仍然先执行信号处理程序。 这提供了一个在父进程非阻塞调用 waitpid
的思路:在 SIGCHLD
的处理函数中非阻塞地调用 waitpid
。但是需要注意,在 SIGCHLD
处理程序运行时,会阻塞 SIGCHLD
信号。所以不要天真地每收到一个 SIGCHLD
就执行一次 waitpid
,而是要每收到一次 SIGCHLD
就尽可能多地调用 waitpid
。
五、总结 多进程并发既是多任务系统的必然要求,又能发挥多核 CPU 的优势。fork
是创建新进程的手段,创建出来的进程成为原来进程的子进程,并继承了父进程的内存空间和文件描述符表。可以使用 exec
删除当前的内存空间并加载一个新的程序,不过文件描述符表仍然会保存。 多进程协作的关键在于“多进程怎么共享信息”和“进程怎么处理收到的信息”,也即多进程通信和消息处理。 多进程通信的手段基于共享内存。信号机制是内核内存空间的共享,不过其读写行为完全由内核控制。除了信号,内核内存空间还有三种先进先出队列用于共享内存。PIPE 只适用于亲缘进程,FIFO 和 Message Queue 通过文件路径 ID 在任意进程之间共享内存。mmap、 shm 和 sem 用于共享用户内存空间,mmap 匿名文件映射和 PIPE 一样只适用于亲缘进程之间,而 mmap 实名文件、 shm 和 sem 可以用于任意进程之间。mmap 实名文件获得的内存空间和磁盘会互相影响,而 shm 和 sem 不受此影响。 进程可以阻塞地等待信息再处理消息,也可以使用异步等消息来了再处理。信号处理机制是由内核全流程接管的异步的,内核提供的三种队列都是阻塞的(当然也可以选择不阻塞)。共享用户空间内存的三种机制中,只有信号量是堵塞的,没有任何一种是异步的。 进程结束后其状态被保存下来了,必须要到父进程通过 waitpid
收集状态并回收资源。