进程间通信方式—消息队列(System V IPC)
文章目录
- 进程间通信方式---消息队列(System V IPC)
- 消息队列
- 1.消息队列进程间通信原理
- 2.msgget 系统调用
- 3.msgsnd 系统调用
- 4.msgrcv 系统调用
- 5.msgctl 系统调用
- 6.函数使用案例
- 7.实现生产者消费者模型
- 运行结果:
- 8.无血缘关系进程通信
消息队列
1.消息队列进程间通信原理
System V IPC 消息队列是一种进程间通信(IPC)机制,它允许不同进程通过发送和接收消息来进行通信。消息队列就像是一个邮箱系统,进程可以将消息(信件)发送到队列(邮箱)中,其他进程可以从这个队列中接收消息。
消息队列是在两个进程间传递二进制数据块的方式,每个数据块都有一个特定类型,接收方可以根据类型来有选择地接收数据,而不一定像管道和命名管道那样必须以先进先出的方式接收数据。
原理:
多个进程通过共享消息队列的标识符(msqid
)来访问同一个消息队列。发送进程将消息放入消息队列后,消息队列会按照一定的规则(如先进先出)存储这些消息。接收进程可以根据消息类型等条件从消息队列中取出消息。这样,不同进程之间就可以通过消息队列进行数据传输和通信,实现进程间的同步和信息共享。例如,在生产者 - 消费者模型中,生产者进程将生产的数据作为消息发送到消息队列,消费者进程从消息队列中接收消息并进行消费,通过消息类型等机制可以确保消息的正确发送和接收,从而实现生产者和消费者之间的协调工作。
Linux消息队列的API都定义在sys/msg.h
头文件中,包括4个系统调用:msgget
、msgsnd
、msgrcv
、msgctl
。
2.msgget 系统调用
msgget
系统调用创建一个消息队列,或获取一个已有的消息队列:
#include <sys/msg.h>
int msgget(key_t key, int msgflg);
- 参数:
key_t key
:- 这是一个键值,用于标识一个全局唯一的消息队列。可以通过
ftok
函数生成一个唯一的key
值,或者使用IPC_PRIVATE
来创建一个私有消息队列(通常用于具有亲缘关系的进程,如父子进程)。
- 这是一个键值,用于标识一个全局唯一的消息队列。可以通过
int msgflg
:- 用于控制消息队列的创建和访问权限,它由以下几种标志组成:
IPC_CREAT
:如果消息队列不存在,则创建它。如果和IPC_EXCL
一起使用(IPC_CREAT | IPC_EXCL
),则只有在消息队列不存在时才创建,若已存在则msgget
函数返回 - 1 并设置errno
为EEXIST
。权限标志
:如0666
等,用于指定消息队列的访问权限,格式与文件权限相同(用户、组、其他用户的读、写、执行权限)。
- 用于控制消息队列的创建和访问权限,它由以下几种标志组成:
- 返回值:
- 成功时,返回一个非负整数,即消息队列的标识符(
msqid
)。 - 失败时,函数返回 - 1,并设置
errno
变量来指示错误原因,例如EEXIST
(当IPC_CREAT | IPC_EXCL
且队列已存在时)、ENOENT
(当没有IPC_CREAT
且队列不存在时)等。
- 成功时,返回一个非负整数,即消息队列的标识符(
如果它用于创建消息队列的话,与之相关的内核数据结构msqid_ds将被创建并初始化。
struct msqid_ds
{
struct ipc_perm msg_perm; // 消息队列的操作权限
time_t msg_stime; // 最后一次调用msgsnd的时间
time_t msg_rtime; // 最后一次调用msgrcv的时间
time_t msg_ctime; // 最后一次被修改的时间
unsigned long msg_cbytes; // 消息队列中已有的字节数
msgqnum_t msg_qnum; // 消息队列中已有的消息数
msglen_t msg_qbytes; // 消息队列允许的最大字节数
pid_t msg_lspid; // 最后执行msgsnd的进程的PID
pid_t msg_lrpid; // 最后执行msgrcv的进程的PID
};
3.msgsnd 系统调用
msgsnd
系统调用将一条消息添加到消息队列中:
#include <sys/msg.h>
int msgsnd(int msqid, const void* msg_ptr, size_t msg_sz, int msgflg);
struct msgbuf{
long mtype;//消息类型
char mtext[512];//消息数据
};
参数:
int msqid
:
-
消息队列的标识符,由
msgget
函数返回。 -
const void* msg_ptr
:-
指向要发送消息的指针。消息的结构必须以一个长整型成员变量开始,这个长整型变量用于存放消息类型,后面可以跟随消息的实际数据。
msg_ptr
参数指向一个准备发送的消息,消息被定义为如下类型:struct msgbuf{ long mtype; /* 消息类型 */ char mtext[512]; /* 消息数据 */ };
-
-
size_t msg_sz
:- 这是消息数据部分的大小,不包括消息类型的长整型变量所占的字节数。
-
int msgflg
:- 控制消息发送的行为,和semget的flag一样的,常用的标志有:
0
:表示阻塞发送,如果消息队列已满,则发送进程会阻塞,直到有空间可以发送消息。IPC_NOWAIT
:表示非阻塞发送,如果消息队列已满,则msgsnd
函数立即返回 - 1,并设置errno
为EAGAIN
。
- 控制消息发送的行为,和semget的flag一样的,常用的标志有:
返回值:
- 成功时,返回
0
。 - 失败时,返回 - 1,并设置
errno
来指示错误原因,如EAGAIN
(非阻塞发送时队列已满)、EINVAL
(参数无效)、EIDRM
(消息队列已被删除)等。
处于阻塞状态的msgsnd调用可能被如下两种异常情况所中断:
-
消息队列被移除。此时msgsnd调用将立即返回并设置errno为EIDRM。
-
程序接收到信号。此时msgsnd调用将立即返回并设置errno为EINTR。
msgsnd成功时将修改内核数据结构msqid_ds的部分字段,如下所示:
-
将msg_qnum加1。
-
将msg_lspid设置为调用进程的PID。
-
将msg_stime设置为当前的时间。
4.msgrcv 系统调用
msgrcv
系统调用从消息队列中获取消息:
#include <sys/msg.h>
int msgrcv(int msqid, void* msg_ptr, size_t msg_sz, long int msgtype, int msgflg);
参数:
int msqid
:- 消息队列的标识符。
void* msg_ptr
:- 一个指向接收消息缓冲区的指针。与
msgsnd
类似,缓冲区的结构应以一个长整型开始用于存放接收到的消息类型,后面是存放消息数据的空间。
- 一个指向接收消息缓冲区的指针。与
size_t msg_sz
:- 这是接收消息缓冲区中数据部分的大小。
long int msgtype
:- 指定要接收的消息类型,可以有以下几种取值:
0
:接收(读取)消息队列中的第一条消息,不考虑消息类型。> 0
:接收第一个消息类型等于msgtype
的消息。(除非指定了标志MSG_EXCEPT,见后文)< 0
:接收(读取)消息队列中第一个类型值比msgtype的绝对值小的消息。
- 指定要接收的消息类型,可以有以下几种取值:
int msgflg
:- 控制消息接收的行为,常用标志有:
0
:表示阻塞接收,如果消息队列中没有符合条件的消息,则接收进程会阻塞,直到有符合条件的消息到达。IPC_NOWAIT
:表示非阻塞接收,如果消息队列中没有符合条件的消息,则msgrcv
函数立即返回 - 1,并设置errno
为ENOMSG
。MSG_EXCEPT
。如果msgtype大于0,则接收消息队列中第一个非msgtype类型的消息。MSG_NOERROR
。如果消息数据部分的长度超过了msg_sz,就将它截断。
- 控制消息接收的行为,常用标志有:
返回值:
- 成功时,返回接收到的消息数据部分的字节数。
- 失败时,返回 - 1,并设置
errno
来指示错误原因,如ENOMSG
(非阻塞接收时没有符合条件的消息)、EINVAL
(参数无效)、EIDRM
(消息队列已被删除)等。
处于阻塞状态的msgrcv调用还可能被如下两种异常情况所中断:
-
消息队列被移除。此时msgrcv调用将立即返回并设置errno为EIDRM。
-
程序接收到信号。此时msgrcv调用将立即返回并设置errno为EINTR。
msgrcv成功时将修改内核数据结构msqid_ds的部分字段,如下所示:
-
将msg_qnum减1。
-
将msg_lrpid设置为调用进程的PID。
-
将msg_rtime设置为当前的时间。
5.msgctl 系统调用
msgctl
系统调用,用于对消息队列进行控制操作(控制消息队列某些属性),如获取消息队列的状态信息、设置消息队列的属性、删除消息队列等。
#incldue <sys/msg.h>
int msgctl(int msqid, int command, struct msqid_ds* buf);
参数:
int msqid
:- 消息队列的标识符。
int command
:- 这是一个控制命令,用于指定对消息队列进行何种操作(见下表),常见的命令有:
IPC_STAT
:获取消息队列的状态信息,并将其存储到buf
所指向的struct msqid_ds
结构体中。这个结构体包含了消息队列的各种属性,如操作权限、当前消息数量等。IPC_SET
:根据buf
所指向的struct msqid_ds
结构体中的信息来设置消息队列的属性。例如,可以修改消息队列的操作权限等。IPC_RMID
:删除由msqid
标识的消息队列。这是一个非常重要且具有危险性的操作,一旦执行,消息队列及其所包含的消息将被永久删除。
- 这是一个控制命令,用于指定对消息队列进行何种操作(见下表),常见的命令有:
struct msqid_ds* buf
:- 这是一个指向struct msqid_ds结构体的指针,其作用取决于command参数的值:
- 当
command
为IPC_STAT
时,buf
用于存储获取到的消息队列的状态信息。 - 当
command
为IPC_SET
时,buf
指向的结构体中的信息将被用于设置消息队列的属性。 - 如果
command
不涉及IPC_STAT
或IPC_SET
操作(如IPC_RMID
),buf
通常可以设置为NULL
。
- 当
- 这是一个指向struct msqid_ds结构体的指针,其作用取决于command参数的值:
返回值:
- 成功时,取决于command(见下表)。
- 失败时,返回 - 1,并设置
errno
来指示错误原因,如EINVAL
(msqid
无效,或者command
参数无效,或者buf
指向的结构体无效(在IPC_STAT
或IPC_SET
操作时))、EPERM
(调用进程没有足够的权限来执行请求的操作)等。
6.函数使用案例
以下是使用 System V IPC 消息队列相关函数(msgget
、msgsnd
、msgrcv
和msgctl
)的一个简单 C 语言案例:
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <string.h>
// 消息结构体
struct msgbuf {
long mtype;
char mtext[100];
};
int main() {
// 创建消息队列
int msqid = msgget(IPC_PRIVATE, 0666 | IPC_CREAT);
if (msqid == -1) {
perror("msgget");
return 1;
}
// 发送消息
struct msgbuf send_msg;
send_msg.mtype = 1;
strcpy(send_msg.mtext, "Hello, World!");
if (msgsnd(msqid, &send_msg, sizeof(send_msg.mtext), 0) == -1) {
perror("msgsnd");
return 1;
}
// 接收消息
struct msgbuf recv_msg;
if (msgrcv(msqid, &recv_msg, sizeof(recv_msg.mtext), 1, 0) == -1) {
perror("msgrcv");
return 1;
}
printf("Received message: %s\n", recv_msg.mtext);
// 获取消息队列状态 buf存储消息队列的信息
struct msqid_ds buf;
if (msgctl(msqid, IPC_STAT, &buf) == -1) {
perror("msgctl - IPC_STAT");
return 1;
}
printf("Messages in queue: %ld\n", buf.msg_qnum);
// 删除消息队列
if (msgctl(msqid, IPC_RMID, NULL) == -1) {
perror("msgctl - IPC_RMID");
return 1;
}
return 0;
}
代码解释:
消息结构体定义
- 定义了
struct msgbuf
结构体,它包含一个长整型mtype
(用于表示消息类型)和一个字符数组mtext
(用于存储消息内容)。
创建消息队列(msgget
)
- 使用
IPC_PRIVATE
作为key
来创建一个新的私有消息队列,权限设置为0666
(用户、组和其他用户都有读写权限)。如果msgget
调用成功,返回消息队列标识符msqid
;否则,打印错误信息并返回。
发送消息(msgsnd
)
- 初始化
send_msg
结构体,设置mtype
为1
,并将消息内容设置为"Hello, World!"
。 - 使用
msgsnd
函数将消息发送到消息队列中。msgsnd
函数的参数包括消息队列标识符msqid
、消息结构体指针&send_msg
、消息数据部分大小sizeof(send_msg.mtext)
和标志0
(表示阻塞发送,如果队列满则等待)。如果发送失败,打印错误信息并返回。
接收消息(msgrcv
)
- 初始化
recv_msg
结构体。 - 使用
msgrcv
函数从消息队列中接收消息。msgrcv
函数的参数包括消息队列标识符msqid
、接收消息结构体指针&recv_msg
、接收消息数据部分大小sizeof(recv_msg.mtext)
、要接收的消息类型1
和标志0
(表示阻塞接收,如果没有符合条件的消息则等待)。如果接收失败,打印错误信息并返回。 - 接收到消息后,打印出消息内容。
获取消息队列状态(msgctl
)
- 定义
struct msqid_ds
类型的变量buf
。 - 使用
msgctl
函数的IPC_STAT
命令获取消息队列的状态信息,并将其存储在buf
中。如果获取状态失败,打印错误信息并返回。 - 打印出消息队列中的消息数量(
buf.msg_qnum
)。
删除消息队列(msgctl
)
- 使用
msgctl
函数的IPC_RMID
命令删除消息队列。如果删除失败,打印错误信息并返回。
7.实现生产者消费者模型
**消息队列并不能实现互斥。**以下是使用上述消息队列函数实现的生产者 - 消费者模型的 C 代码:
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#define MAX_MSG_SIZE 100
// 消息结构体
struct msgbuf {
long mtype;
char mtext[MAX_MSG_SIZE];
};
// 生产者函数
void producer(int msqid) {
struct msgbuf msg;
msg.mtype = 1; // 消息类型设为1
srand(time(NULL));
while (1) {
int num = rand() % 100;
sprintf(msg.mtext, "%d", num);
if (msgsnd(msqid, &msg, sizeof(msg.mtext), 0) == -1) {
perror("msgsnd");
exit(1);
}
printf("Producer sent: %s\n", msg.mtext);
sleep(1);
}
}
// 消费者函数
void consumer(int msqid) {
struct msgbuf msg;
while (1) {
if (msgrcv(msqid, &msg, sizeof(msg.mtext), 1, 0) == -1) {
perror("msgrcv");
exit(1);
}
printf("Consumer received: %s\n", msg.mtext);
sleep(2);
}
}
int main() {
// 创建消息队列
int msqid = msgget(IPC_PRIVATE, IPC_CREAT | 0666);
if (msqid == -1) {
perror("msgget");
return 1;
}
pid_t pid = fork();
if (pid < 0) {
perror("fork");
return 1;
} else if (pid == 0) {
// 子进程为消费者
consumer(msqid);
} else {
// 父进程为生产者
producer(msqid);
}
// 等待子进程结束(这里只是简单等待,实际可能需要更完善的机制)
wait(NULL);
// 删除消息队列
if (msgctl(msqid, IPC_RMID, NULL) == -1) {
perror("msgctl");
return 1;
}
return 0;
}
代码解释:
消息结构体定义:
- 定义了
struct msgbuf
结构体,包含一个长整型的mtype
(消息类型)和一个字符数组mtext
(用于存放消息数据)。
生产者函数:
- 生成一个随机数,将其转换为字符串后放入消息结构体的
mtext
中,然后使用msgsnd
将消息发送到消息队列中,发送的消息类型为1
,发送操作是阻塞的(msgflg
为0
)。
消费者函数:
- 使用
msgrcv
从消息队列中接收消息类型为1
的消息,接收操作是阻塞的(msgflg
为0
),接收到消息后打印出消息内容。
主函数:
- 使用
msgget
创建一个私有消息队列。 - 通过
fork
创建子进程,子进程作为消费者,父进程作为生产者。 - 最后等待子进程结束,并使用
msgctl
删除消息队列。
并不能够实现互斥,只是能够通信。
运行结果:
8.无血缘关系进程通信
ftok
函数的定义和功能- 函数原型:
key_t ftok(const char *pathname, int proj_id);
- 功能:
ftok
函数用于生成一个唯一的key
(键值),这个key
通常用于 System V IPC(进程间通信)机制中,如创建共享内存、消息队列和信号量集等。它将一个文件路径名(pathname
)和一个项目标识符(proj_id
)组合起来,生成一个适合作为 System V IPC 资源标识符的key
值。
- 函数原型:
- 参数解释
const char *pathname
:- 这是一个指向文件路径名的指针。这个文件路径必须是一个已经存在的文件的有效路径,通常使用当前目录(
."
)或者一个程序相关的配置文件路径等。ftok
函数会使用文件的inode
(索引节点)信息作为生成key
的一部分。 - 注意,如果文件被删除然后重新创建,即使文件名相同,
inode
可能会改变,这会导致ftok
生成不同的key
值。
- 这是一个指向文件路径名的指针。这个文件路径必须是一个已经存在的文件的有效路径,通常使用当前目录(
int proj_id
:- 这是一个
0 - 255
之间的整数,作为项目标识符。它和文件路径的inode
信息一起组合生成key
。不同的项目可以使用不同的proj_id
来区分,这样即使基于同一个文件路径,不同的项目也能生成不同的key
值用于各自的 IPC 资源。例如,一个程序中有两个不同的模块需要使用消息队列进行通信,它们可以使用相同的文件路径但不同的proj_id
来生成不同的key
,以创建两个独立的消息队列。
- 这是一个
- 返回值
- 成功时,
ftok
函数返回一个key_t
类型的非负整数,这个整数可以作为shmget
、msgget
、semget
等 System V IPC 函数的key
参数来创建或获取对应的 IPC 资源。 - 失败时,返回-1,并且会设置errno来指示错误原因。常见的错误原因包括:
EACCESS
:没有权限访问pathname
指定的文件。ENOENT
:pathname
指定的文件不存在。
- 成功时,
发送端
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <string.h>
// 消息结构体
struct msgbuf {
long mtype;
char mtext[100];
};
int main() {
// 通过ftok生成key
key_t key = ftok(".", 'c');
if (key == -1) {
perror("ftok");
return 1;
}
// 创建消息队列
int msqid = msgget(key, IPC_CREAT | 0666);
if (msqid == -1) {
perror("msgget");
return 1;
}
// 准备发送消息的结构体
struct msgbuf send_msg;
send_msg.mtype = 1;
strcpy(send_msg.mtext, "Hello from sender!");
if (msgsnd(msqid, &send_msg, sizeof(send_msg.mtext), 0) == -1) {
perror("msgsnd");
return 1;
}
printf("Sender sent message...\n");
sleep(10);
// 删除消息队列
if (msgctl(msqid, IPC_RMID, NULL) == -1) {
perror("msgctl for delete");
return 1;
}
return 0;
}
接收端
#include <stdio.h>
#include <stdlib.combined
#include <sys/types.combined
#include <sys/ipc.combined
#include <sys/msg.combined
#include <string.combined
// 消息结构体
struct msgbuf {
long mtype;
char mtext[100];
};
int main() {
// 通过ftok生成key
key_t key = ftok(".", 'c');
if (key == -1) {
perror("ftok");
return 1;
}
// 获取消息队列
int msqid = msgget(key, 0666);
if (msqid == -1) {
perror("msgget")
return 1;
}
// 准备接收消息的结构体
struct msgbuf recv_msg;
if (msgrcv(msqid, &recv_msg, sizeof(recv_msg.mtext), 1, 0) == -1) {
perror("msgrcv")
return 1;
}
printf("Receiver received message: %s\n", recv_msg.mtext);
return 0;
}
运行结果: