linux C -- 消息队列
- 前言
- 一、System V(IPC)消息队列
- 接口调用主要涉及到 msgget、msgsnd、msgrcv 和 msgctl 四个接口:
- 1、创建消息队列 msgget
- 2、发送消息到队列
- 3、从队列接收信息
- 4、控制消息队列 msgctl
- 5、删除消息队列
- 二、代码编写
- 1、发送部分的代码
- 2、代码完成后编译一下
- 3、启动运行,传递第一个消息队列
- 4、可以使用ipcs -q查看消息队列的信息
- 5、接收部分的代码
- 6、同样的编译然后运行
- 三、POSIX(mq)消息队列
前言
进程间通信是linux下经常用到的通信方式,可用于多个进程之间的通信,也可在一个进程内通信。
消息队列就是一堆消息的有序集合(队列),并缓存于内核中。如此一来,多个进程就可通过访问内核来实现多个进程之间的通信。目前存在的消息队列有POSIX(mq)与System V(IPC)标准接口。
一、System V(IPC)消息队列
接口说明
消息缓冲区的结构定义一般如下:
struct msg_form {
long mtype;//类型
char mtext[];//消息内容,可以是定长数组或者变长数组
};
头文件:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
接口调用主要涉及到 msgget、msgsnd、msgrcv 和 msgctl 四个接口:
1、创建消息队列 msgget
/**
函数:创建或获取消息队列
入参:key:key可取由ftok创建的key值或1个整数;
msgflag主要有两个值IPC_CREAT 和IPC_EXC,指的是需要新创建消息队列ID,低位可用来确定消息队列的访问权限。例如(IPC_CREAT | 0777)
返回:成功返回消息队列id,失败返回-1
**/
int msgget(key_t key, int msgflg)
2、发送消息到队列
/**
函数:发送消息到队列
入参:msqid:消息队列的ID(由msgget生成的消息队列标识符)
msgp:msgq为指向的用户定义缓冲区,一般定义为结构体,首个成员为long型,表示消息的类型,另外一个一般为char mtext[];
msgsz:发送消息正文的字节数,注意这里的是指正文内容mtext里面数据的字节数
msgflg:标志位,IPC_NOWAIT消息没有发送完成函数也会立即返回,0 直到发送完成函数才返回
返回:成功返回0,错误返回-1
**/
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg)
3、从队列接收信息
/**
函数:发送消息到队列
入参:msqid:消息队列的ID(由msgget生成的消息队列标识符)
msgp:读取到的数据要放到哪里,这里填我们自定义的结构体对象
msgsz:要读取的正文字节数,是指正文内容mtext里面数据的字节数
msgtyp:要读取的消息类型,mtype(默认用0)
msgflg:标志位,IPC_NOWAIT非堵塞等待,0 堵塞等待
返回:成功返回读取到的字节数,失败就返回-1,错误码被设置
**/
int msgrcv(int msgid, void * msgq, size_t msgsz, long int msgtyp, int msgflg)
进阶说明
/**
msgtyp:
=0 直接返回第一条消息(FIFO原理)
>0 若msgflg不包含MSG_EXCEPT,则返回消息队列中第一个类型为msgtpy,
若包含,则返回消息队列第一个类型为msgtpy
<0 返回消息队列中类型≤msgtpy绝对值的消息,若多个,取最小
msgflg:
不包含MSG_NOERROR,消息又太长,则不对该消息做任何处理直接返回-1
包含MSG_NOERROR,则该消息被截取msgsz字节返回,剩余部分被丢弃
*/
4、控制消息队列 msgctl
/**
函数:发送消息到队列
入参:msqid:消息队列的ID(由msgget生成的消息队列标识符)
cmd:IPC_STAT 将msg相关的内核信息存储到buf指向的msqid_ds 结构体中。
IPC_SET 该命令用来设置消息队列的属性,要设置的属性存储在buf指向的msqid结构中;
IPC_RMID 删除msqid标识的消息队列
buf:在标志位中设置了IPC_STAT,指针所指向的变量里面就能拿到相关的内核信息,
如果不关心内核信息可以设置为nullptr
返回:成功返回0,错误返回-1
**/
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
msqid_ds 这个结构体是比较复杂的,需要进阶的朋友可以自行了解
struct msqid_ds {
struct ipc_perm msg_perm{ // 消息队列的权限信息
key_t key; // 消息队列的键值
uid_t uid; // 拥有者的用户ID
gid_t gid; // 拥有者的组ID
uid_t cuid; // 创建者的用户ID
gid_t cgid; // 创建者的组ID
mode_t mode; // 权限
unsigned short seq; // 序列号
}
time_t msg_stime; // 上次发送消息的时间
time_t msg_rtime; // 上次接收消息的时间
time_t msg_ctime; // 上次变更时间
unsigned long msg_cbytes; // 消息队列中的字节数
msgqnum_t msg_qnum; // 消息队列中的消息数量
msglen_t msg_qbytes; // 消息队列的最大字节数
pid_t msg_lspid; // 最后发送消息的进程ID
pid_t msg_lrpid; // 最后接收消息的进程ID
};
5、删除消息队列
IPC_RMID
立即删除消息队列,此时所有阻塞在对该消息队列的,msgsnd和msgrcv函数调用,
都会立即返回失败,errno为EIDRM
// 删除消息队列
if (msgctl(msqid, IPC_RMID, NULL) == -1) {
perror("Error deleting message queue");
return 1;
}
二、代码编写
1、发送部分的代码
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <sys/msg.h>
#include <string.h>
int main (void) {
printf("创建消息队列...\n");
key_t key = ftok(".",100);
if(key == -1){
perror("ftok");
return -1;
}
int msgid = msgget(key,IPC_CREAT | 0777);
if(msgid == -1){
perror("msggeet");
return -1;
}
printf("从消息队列(0x%08x/%d)中发送消息...\n",key,msgid);
for(;;){
struct {
long mtype;
char mtext[1024];
}msgbuf;
printf(">");
gets(msgbuf.mtext);
msgbuf.mtype = 1; //传输的消息类型是>0的整数
ssize_t msgsz = msgsnd(msgid,(void*)&msgbuf,(strlen(msgbuf.mtext)+1)*sizeof(msgbuf.mtext[0]),IPC_NOWAIT);
if (msgsz < 0) {
if(errno == EIDRM) {
printf("消息队列(0x%08x/%d)已被销毁!!!\n",key,msgid);
break;
}
else {
perror("msgsnd");
return -1;
}
}
else
printf("%04ld<%s\n",msgsz,msgbuf.mtext);//消息回显
}
printf("结束了!\n");
return 0;
}
2、代码完成后编译一下
使用ls查看一下
3、启动运行,传递第一个消息队列
4、可以使用ipcs -q查看消息队列的信息
● 我们发现msgid可以为0
● 消息队列中已经存储了一条消息了,但是还没人消费
5、接收部分的代码
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <sys/msg.h>
int main (void) {
printf("获取消息队列...\n");
key_t key = ftok(".",100);
if(key == -1){
perror("ftok");
return -1;
}
int msgid = msgget(key,0);
if(msgid == -1){
perror("msggeet");
return -1;
}
printf("从消息队列(0x%08x/%d)中接收消息...\n",key,msgid);
for(;;){
struct {
long mtype;
char mtext[1024];
}msgbuf;
ssize_t msgsz = msgrcv(msgid,(void*)&msgbuf,sizeof(msgbuf.mtext)-sizeof(msgbuf.mtext[0]),0,MSG_NOERROR);
if (msgsz < 0) {
if(errno == EIDRM) {
printf("消息队列(0x%08x/%d)已被销毁!!!\n",key,msgid);
break;
}
else {
perror("msgrcv");
return -1;
}
}
else
printf("%04ld<%s\n",msgsz,msgbuf.mtext);
}
printf("结束了!\n");
return 0;
}
6、同样的编译然后运行
最终的效果就是这样了
2、代码封装
示例代码解析
● msg_que.h
/********************************
程序功能:进程通信-消息队列(system V)
author:zyh
date:2021.5.21
*********************************/
#ifndef _MSG_QUE_H_
#define _MSG_QUE_H_
#ifdef __cplusplus
extern "C" {
#endif
/*
注意:进程通信消息队列的总缓冲区大小有限制,传输的消息量比较多的时候,就不适合使用
在Linux中,/proc/sys/kernel/msgmax和/proc/sys/kernel/msgmnb文件记载了消息缓冲队列的大小
其中,kernel.msgmax表示消息大小的最大值,kernel.msgmnb表示消息缓冲区的最大值。
*/
//#define MSG_FILE "/tmp/msgque" //进程通信共有的文件
#define MAX_QUEUE_TEXT_LENGTH 1024*2 //发送消息最大长度
struct msg_form {
long mtype;//类型
char mtext[MAX_QUEUE_TEXT_LENGTH];//消息
};
typedef enum { //类型(类型必须大于0)
NONE = 0,
TYPE_1_MSG,
TYPE_2_MSG,
TYPE_3_MSG,
TYPE_4_MSG,
TYPE_5_MSG,
}MSG_QUE_TYPE;
/**
函数功能:获取文件路径key值
入参: path:文件路径,例如:/tmp/msgque
出参:
返回:成功返回key值,失败返回-1
**/
int get_path_key(const char *path);
/**
函数功能:创建消息队列,返回消息队列的标识
入参: key:消息队列名
出参:无
返回:成功返回消息队列的标识,失败返回-1
**/
int creatMsgQue(int key);
/**
函数功能:消息队列的删除
入参:msqid:消息队列的标识
出参:无
返回:成功返回0,失败返回-1
**/
int deleteMsgQue(int msgid);
/**
函数功能:发送消息
入参:
msgid:由msgget函数返回的消息队列的标识码,即将消息添加到那个消息队列中。
type:消息类型
msg:发送的消息
length:消息长度
出参:无
返回:成功返回0,失败返回-1
**/
int sndMsgQue(int msgid, int type, char *msg, int length);
/**
函数功能:从消息队列中堵塞接收消息
入参:msgid:为读的对象,即从哪个消息队列获取的消息
type:消息类型,0的话,函数将不做类型检查而自动返回队列中的最旧的消息。
rcvBuf:接收消息缓冲区
bufSize:接收消息缓冲区大小
出参:rcvBuf:接收消息的缓冲区
返回:成功返回接收到的实际字节数,失败返回-1
解除阻塞的条件有以下三个:
1、消息队列中有了满足条件的消息。
2、消息队列被删除。
3、用msgrcv()的进程被信号中断。
**/
int blockRcvMsgQue(int msgid, int type, char rcvBuf[], int bufSize);
/**
函数功能:从消息队列中不堵塞接收消息
入参:
msgid:为读的对象,即从哪个消息队列获取的消息
type:消息类型,0的话,函数将不做类型检查而自动返回队列中的最旧的消息。
rcvBuf:接收消息缓冲区
bufSize:接收消息缓冲区大小
出参:rcvBuf:接收消息的缓冲区
返回:成功返回接收到的实际字节数,-1:消息队列为空
**/
int noBlockRcvMsgQue(int msgid, int type, char *rcvBuf, int bufSize);
#ifdef __cplusplus
}
#endif
#endif
● msg_que.c
/********************************
程序功能:进程通信-消息队列(system V)
author:zyh
date:2021.5.21
*********************************/
#include <stdio.h>
#include <stdlib.h>
#include <sys/msg.h>
#include <unistd.h>
#include <string.h>
#include "msg_que.h"
/**
函数功能:获取文件路径key值
入参: path:文件路径,例如:/tmp/msgque
出参:
返回:成功返回key值,失败返回-1
**/
int get_path_key(const char *path)
{
key_t key;
//获取key值,key代表要创建的消息队列的标识, 即为ipc键
key = ftok(path, 'z');
if (0 > key) { //第一个参数代表路径,第二个参数代表权限只使用8bits
perror("ftok error");
return -1;
}
return key;
}
/**
函数功能:创建消息队列,返回消息队列的标识
入参: key:消息队列名
出参:无
返回:成功返回消息队列的标识,失败返回-1
**/
int creatMsgQue(int key)
{
int msqid;
/*
key_t key;
//获取key值,key代表要创建的消息队列的标识, 即为ipc键
if((key = ftok(MSG_FILE,'z')) < 0) { //第一个参数代表路径,第二个参数代表权限只使用8bits
perror("ftok error");
return -1;
}
*/
printf("Message Queue - key is: %d\n", key);
//创建消息队列,第一个参数 key:为由ftok创建的key值,第二个参数 msgflg:用来确定消息队列的访问权限。
if (0 > (msqid = msgget(key, IPC_CREAT | 0777))) { //第二个参数 用来确定消息队列的访问权限。返回消息队列的标识 如果这个消息队列已经存在,则返回ID
perror("msgget error");
return -1;
}
return msqid;
}
/**
函数功能:消息队列的删除
入参:msqid:消息队列的标识
出参:无
返回:成功返回0,失败返回-1
**/
int deleteMsgQue(int msgid)
{
int ret = 0;
ret = msgctl(msgid, IPC_RMID, NULL);
if (0 > ret) {
perror("msgctl");
return -1;
}
return 0;
}
/**
函数功能:发送消息
入参:
msgid:由msgget函数返回的消息队列的标识码,即将消息添加到那个消息队列中。
type:消息类型
msgBuff:发送的消息
length:消息长度
出参:无
返回:成功返回0,失败返回-1
**/
int sndMsgQue(int msgid, int type, char *msg, int length)
{
int ret = 0;
struct msg_form msgbuf;
memset(&msgbuf, 0, sizeof(msgbuf));
if (length >= (int)sizeof(msgbuf.mtext)) {
fprintf(stderr, "msg length is too long\n");
return -1;
}
msgbuf.mtype = type;
memcpy(msgbuf.mtext, msg, length);
//ret = msgsnd(msgid, (void*)&msgbuf, sizeof(msgbuf) - sizeof(long), IPC_NOWAIT);//当队列满时不阻塞,立刻返回
ret = msgsnd(msgid, (void*)&msgbuf, length, IPC_NOWAIT);//当队列满时不阻塞,立刻返回
if (0 > ret) {
perror("msgsnd");
return -1;
}
return 0;
}
/**
函数功能:从消息队列中堵塞接收消息
入参:msgid:为读的对象,即从哪个消息队列获取的消息
type:消息类型,0的话,函数将不做类型检查而自动返回队列中的最旧的消息。
rcvBuf:接收消息缓冲区
bufSize:接收消息缓冲区大小
出参:rcvBuff:接收消息的缓冲区
返回:成功返回接收到的实际字节数,失败返回-1
解除阻塞的条件有以下三个:
1、消息队列中有了满足条件的消息。
2、消息队列被删除。
3、用msgrcv()的进程被信号中断。
**/
int blockRcvMsgQue(int msgid, int type, char rcvBuf[], int bufSize)
{
int ret = 0;
struct msg_form msgbuf;
memset(&msgbuf, 0, sizeof(msgbuf));
ret = msgrcv(msgid, (void*)&msgbuf, sizeof(msgbuf) - sizeof(long), type, 0);//没有指定IPC_NOWAIT,进程阻塞,挂起执行直至有了指定类型的消息
if (0 > ret) {
perror("msgrcv");
return -1;
}
//printf("rcv size ret=%d\n", ret);
//memcpy(rcvBuf, msgbuf.mtext, ret);
memcpy(rcvBuf, msgbuf.mtext, bufSize);
return ret;
}
/**
函数功能:从消息队列中不堵塞接收消息
入参:
msgid:为读的对象,即从哪个消息队列获取的消息
type:消息类型,0的话,函数将不做类型检查而自动返回队列中的最旧的消息。
rcvBuff:接收消息缓冲区
rcvSize:接收消息缓冲区大小
出参:rcvBuff:接收消息的缓冲区
返回:成功返回接收到的实际字节数,-1:消息队列为空
**/
int noBlockRcvMsgQue(int msgid, int type, char *rcvBuf, int bufSize)
{
int ret = 0;
struct msg_form msgbuf;
memset(&msgbuf, 0, sizeof(msgbuf));
ret = msgrcv(msgid, (void*)&msgbuf, sizeof(msgbuf) - sizeof(long), type, IPC_NOWAIT);//不阻塞,如果消息队列为空,则返回一个ENOMSG
if (0 > ret) {
//perror("msgrcv");
return -1;
}
//printf("rcv size ret=%d\n", ret);
memcpy(rcvBuf, msgbuf.mtext, bufSize);
return ret;
}
● send_demo.c-发送端(发送两种类型的数据)
/********************************
程序功能:进程通信
author:zyh
date:2021.5.21
*********************************/
#include <stdio.h>
#include <stdlib.h>
#include <sys/msg.h>
#include <unistd.h>
#include <string.h>
#include <time.h>
#include "msg_que.h"
#define MSG_QUE_KEY_ID 45464 //消息队列标识
int main ()
{
int msgid = -1;
char msgBuff[256] = {0};
int i = 0;
msgid = creatMsgQue(MSG_QUE_KEY_ID);
if (0 > msgid) {
printf("start msgQue failed\n");
return -1;
}
printf("start msgQue success (msgid=%d)\n", msgid);
while (1) {
sprintf(msgBuff, "TYPE_1_MSG:%d", i);
sndMsgQue(msgid, 1, msgBuff, strlen(msgBuff));
sprintf(msgBuff, "TYPE_2_MSG:%d", i);
sndMsgQue(msgid, 2, msgBuff, strlen(msgBuff));
i++;
sleep(2);
}
deleteMsgQue(msgid);
return 0;
}
● recv1_demo.c 接收端接收类型1的数据
/********************************
程序功能:进程通信
author:zyh
date:2021.5.21
*********************************/
#include <stdio.h>
#include <stdlib.h>
#include <sys/msg.h>
#include <unistd.h>
#include <string.h>
#include <time.h>
#include "msg_que.h"
#define MSG_QUE_KEY_ID 45464 //消息队列标识
int main ()
{
int msgid = -1;
char rcvBuff[256] = {0};
msgid = creatMsgQue(MSG_QUE_KEY_ID);
if (0 > msgid) {
printf("start msgQue failed\n");
return -1;
}
printf("start msgQue success (msgid=%d)\n", msgid);
while (1) {
memset(rcvBuff, 0, sizeof(rcvBuff));
blockRcvMsgQue(msgid, 1, rcvBuff, sizeof(rcvBuff));
printf("TYPE_1_MSG:%s\n", rcvBuff);
}
deleteMsgQue(msgid);
return 0;
}
三、POSIX(mq)消息队列
直接使用系统接口:
编译指令gcc mq_test.c -o test -Wall -lrt
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
typedef struct {
int a;
int b;
char str[32];
} my_data_t;
/*
- 我们首先打开一个进程通信--POSIX (mq)消息队列,如果它不存在,我们会创建一个。这是通过指定O_CREAT标志实现的。
- 我们设置消息队列的属性,包括队列的最大消息数,每个消息的最大长度,以及队列当前的消息数。
- 我们在mq_receive()调用中传递一个指向存储消息的缓冲区的指针。如果当前没有消息,此调用将会阻塞,直到接收到一个消息。为了进行非阻塞接收,可以在打开消息队列时设置mq_flags字段为O_NONBLOCK。
- 用mq_close关闭消息队列和mq_unlink删除消息队列是有序执行,因为直到所有使用此队列的进程都关闭后,队列才可以被删除
*/
int main()
{
mqd_t mqd;
struct mq_attr attr;
attr.mq_flags = O_NONBLOCK; /*0或O_NONBLOCK,0堵塞,O_NONBLOCK为非阻塞 */
attr.mq_maxmsg = 128; /* 队列最大消息数 */
attr.mq_msgsize = 256; /* 队列每个消息的最大长度 */
attr.mq_curmsgs = 0; /* 队列当前消息数 */
//1. 创建或打开消息队列
mqd = mq_open("/mq_name", O_CREAT | O_RDWR, 0644, &attr);//必须以/开头,并且后续不能有其他/,形如/abc
if (0 > mqd) {
perror("mq_open");
return -1;
}
mq_getattr(mqd, &attr);//获取mqd指向的消息队列的属性,存放到attr结构体,成功:0,出错:-1
printf("attr.mq_curmsgs = %ld\n", attr.mq_curmsgs);
/*2. 发送消息
prio:消息的优先级:它是一个小于MQ_PRIO_MAX的数,数值越大,优先级越高。
posix消息队列在调用mq_receive时,总是返回队列中最高优先级的最早消息。
如果消息不需要设定优先级,那么可以在mq_send时设置msg_prio为0,mq_receive的msg_prio设置为NULL。
*/
char msg[256] = "hello";
int prio = 0;
mq_send(mqd, msg, strlen(msg), prio);//如果队列已满,在堵塞模式下,将阻塞,直到队列未满,成功返回0,失败返回-1
//发送自定义结构体
my_data_t data = {1, 2, "zhou"};
mq_send(mqd, (char *)&data, sizeof(data), prio);
//
mq_getattr(mqd, &attr);//获取mqd指向的消息队列的属性,存放到attr结构体,成功:0,出错:-1
printf("attr.mq_curmsgs = %ld\n", attr.mq_curmsgs);
//3. 接收消息
char buffer[256] = {0};
int rcv_bytes = 0;
/*msg_len参数要大于等于mq_msgsize的,如果小于该值,就会返回EMSGSIZE错误;
如果队列为空,在堵塞模式下,将阻塞直到有消息为止, 返回指定消息队列中最高优先级的最早消息,成功返回读取消息的内容的字节数,出错返回-1
*/
rcv_bytes = mq_receive(mqd, buffer, attr.mq_msgsize, NULL);
printf("rcv_bytes=%d, buffer=%s\n", rcv_bytes, buffer);
my_data_t rcv_data = {0};
rcv_bytes = mq_receive(mqd, (char *)&rcv_data, attr.mq_msgsize, NULL);
printf("rcv_data.a=%d, rcv_data.b=%d, rcv_data.str=%s\n", rcv_data.a, rcv_data.b, rcv_data.str);
//4. 关闭消息队列
mq_close(mqd); /* 关闭消息队列 */
mq_unlink("/mq_name"); /* 删除消息队列 */
}