System V消息队列 | POSIX 消息队列 | |
主 要 函 数 | #include <sys/msg.h> int msgget(key_t key, int oflag) int msgsnd(int msqid, const void * ptr, size_t length, int flag) ssize_t msgrcv (int msqid, void *ptr, size_t length, long type, int flag) int msgctl(int msqid, int cmd, struct msqid_ds *buf) | #include <mqueue.h> mqd_t mq_open(const char *name, int oflag,mode_t mode, struct mq_attr attr ); int mq_close(mqd_t mqdes);// int mq_unlink(const char *name); int mq_getattr(mqd_t mqdes, struct mq_attr *attr); int mq_setattr(mqd_t mqdes, struct mq_attr *attr,struct mq_attr *oattr); int mq_send(mqd_t mqdes, const char *ptr, size_t len, unsigned int prio); ssize_t mq_receive(mqd_t mqdes, char *ptr, size_t len, unsigned int *prio); int mq_notify(mqd_t mqdes, const struct sigevent *notification); |
基 本 用 法 | 1.创建或获取消息队列:使用msgget()函数来创建或获取一个消息队列。该函数接受一个键(key)和一个标志(flag)作为参数。如果键的值为IPC_PRIVATE或当前没有消息队列与给定键相关联,将会创建一个新的消息队列。标志位可以用来指定权限组合。 2. 往消息队列中放入消息:使用msgsnd()函数来往一个消息队列中放入一个消息。该函数接受四个参数,分别为消息队列标识符、指向消息的指针、消息的大小以及标志位。成功放入消息后,该函数返回0,否则返回-1并设置errno来表示错误原因。 3. 从消息队列中读取消息:使用msgrcv()函数来从一个消息队列中读取一个消息。该函数接受五个参数,分别为消息队列标识符、指向消息的指针、消息的最大大小、消息的类型以及标志位。成功读取消息后,该函数返回读取到的消息的大小,否则返回-1并设置errno来表示错误原 因。 4. 控制消息队列:使用msgctl()函数来对一个消息队列进行控制操 作,如删除、设置权限等。该函 数接受三个参数,分别为消息队 列标识符、操作命令以及一个可 选的参数。 | 1.创建或打开消息队列:使用mqd_t mq_open(const char *name,int oflag,mode_t mode,struct mq_attr*attr);函数来创建或打开一个消息队列。该函数接受队列名称、打开标志以及可选的权限和属性作为参数。如果队列不存在且指定了创建标志,将会创建一个新的消息队列。成功创建或打开后,函数返回一个 7. 发送消息:使用int mq_send(mqd_t mqdes,const char *msg_ptr,size_t msg_len,unsigned int msg_prio);函数来发送一个消息到指定的消息队列。该函数接受消息队列描述符、指向消息的指针以及消息的大小作为参数。发送消息时,可以指定消息的优 |
Notes:
1. mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr) 中oflag和mode 参数说明
•参数oflag:同int open(const char *pathname, int flags, mode_t mode);函数的的oflag类似有
O_RDONLY、O_RDWR, O_WRONLY,除此之外还有 O_CREAT、O_EXCL(如果 O_CREAT 指定,但name 不存在,就返回错误),O_NONBLOCK(以非阻塞方式打开消息队列,在正常情况下mq_receive和mq_send 函数会阻塞的地方,使用该标志打开的消息队列会返回 EAGAIN 错误)。
•参数mode:同int open(const char *pathname, int flags, mode_t mode);函数的mode参数,用于指定权限位, 比如0644权限
2. 关于 struct mq_attr属性结构体:
struct mq_attr
{
long mq_flags;//阻塞标志, 0(阻塞)或O_NONBLOCK
long mq_maxmsg;//最大消息数
long mq_msgsize;//每个消息最大大小
long mq_curmsgs;//当前消息数
};
3. mq_notiy函数的使用注意事项:
a. 注册撤销:当通知被发送给它的注册进程时,其注册会被撤销。这意味着,如果希望继续接收通知,进程必须再次调用 mq_notify 以重新注册。
b. 空队列与数据到来:消息机制触发条件是,在消息队列为空的情况下有数据到来才会触发。当消息队列不为空时,即使有新的数据到来也不会触发通知。
c. 阻塞与通知:只有当没有任何线程阻塞在该队列的 mq_receive 调用的前提下,通知才会发出。这意味着,如果有线程正在等待接收消息,通知可能不会被发送。
4. struct sigevent和sigval_t sigev_val 的定义如下:
union sigval { /* Data passed with notification */
int sival_int; /* Integer value */
void *sival_ptr; /* Pointer value */
};
struct sigevent {
int sigev_notify; /* Notification method */
int sigev_signo; /* Notification signal */
union sigval sigev_value;
/* Data passed with notification */
void (*sigev_notify_function) (union sigval);
/* Function used for thread
notification (SIGEV_THREAD) */
void *sigev_notify_attributes;
/* Attributes for notification thread
(SIGEV_THREAD) */
pid_t sigev_notify_thread_id;
/* ID of thread to signal
(SIGEV_THREAD_ID); Linux-specific */};
a. sigev_notify取值:
•SIGEV_NONE:这个值表示不需要任何通知。当sigev_notify被设置为这个值时,即使事件发生了,也不会有任何通知发送到进程。
•SIGEV_SIGNAL:事件发生时,将sigev_signo指定的信号发送给指定的进程;
•SIGEV_THREAD:事件发生时,内核会(在此进程内)以sigev_notify_attributes为线程属性创建一个线程,并让其执行sigev_notify_function,并以sigev_value为其参数
b. sigev_signo: 在sigev_notify=SIGEV_SIGNAL时使用,指定信号类别, 例如SIGUSR1、SIGUSR2 等
c.sigev_value: sigev_notify=SIGEV_SIGEV_THREAD时使用,作为sigev_notify_function的参数, 当发送信号时,这个值会传递给信号处理函数。
示例代码1:使用阻塞方式读写
#include <stdio.h>
#include <mqueue.h>
#include <pthread.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#if 0
mqd_t mq_open(const char *name, int oflag,mode_t mode, struct mq_attr attr );
int mq_close(mqd_t mqdes);
int mq_send(mqd_t mqdes, const char *ptr, size_tlen, unsigned int prio);
ssize_t mq_receive(mqd_t mqdes, char *ptr, size_tlen, unsigned int *prio);
int mq_unlink(const char *name);
struct mq_attr
{
long mq_flags;//阻塞标志, 0(阻塞)或O_NONBLOCK
long mq_maxmsg;//最大消息数
long mq_msgsize;//每个消息最大大小
long mq_curmsgs;//当前消息数
};
#endif
#define QUEQUE_NAME "/test_queue"
#define MESSAGE "mqueque,test!"
void *sender_thread(void *arg)
{
//发送消息
mqd_t mqd = *(mqd_t *)arg;
int send_size = -1;
char message[] = MESSAGE;
printf("sender thread message=%s,mqd=%d\n",message,mqd);
send_size = mq_send(mqd,message,strlen(message)+1,0);
if(-1 == send_size){
if(errno == EAGAIN){
printf("message queque is full!\n");
}else{
perror("mq_send");
}
}
return NULL;
}
void *receiver_thread(void *arg)
{
mqd_t mqd = *(mqd_t *)arg;
ssize_t receiver_size = -1;
//接收消息
char buffer[256];
printf("Receive thread start\n");
receiver_size = mq_receive(mqd,buffer,sizeof(buffer),NULL);
printf("receive thread message=%smqd=%d,receiver_size=%ld\n",buffer,mqd,receiver_size);
return NULL;
}
int main(int argc,char *argv[])
{
pthread_t sender,receiver;
//创建消息队列
mqd_t mqd = -1;
struct mq_attr attr;
attr.mq_flags = 0;
attr.mq_maxmsg = 10;
attr.mq_msgsize = 256;
attr.mq_curmsgs = 0;
mqd = mq_open(QUEQUE_NAME,O_CREAT | O_RDWR,0666,&attr);
if(mqd == (mqd_t)-1){
perror("mq_open");
return -1;
}
if(pthread_create(&sender,NULL,sender_thread,(void*)&mqd) != 0){
perror("pthread_create sender");
return -1;
}
if(pthread_create(&receiver,NULL,receiver_thread,(void*)&mqd) != 0){
perror("pthread_create receiver");
return -1;
}
pthread_join(sender,NULL);
pthread_join(receiver,NULL);
mq_close(mqd);
//mq_unlink(QUEQUE_NAME);
return 0;
}
示例2: 使用mq_notify sigev_notify = SIGEV_THREAD异步通知的方式实现
#include <stdio.h>
#include <mqueue.h>
#include <pthread.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <signal.h>
#include <stdlib.h>
// 定义消息队列名称和要发送的消息内容
#define QUEQUE_NAME "/test_queue"
#define MESSAGE "mqueque,test!"
void *sender_thread(void *arg)
{
// 获取消息队列描述符
mqd_t mqd = *(mqd_t *)arg;
int send_size = -1;
char message[] = MESSAGE;
// 打印发送消息的信息
printf("sender thread message=%s, mqd=%d\n", message, mqd);
// 发送消息
send_size = mq_send(mqd, message, strlen(message) + 1, 0);
if (send_size == -1) {
if (errno == EAGAIN) {
printf("message queue is full!\n");
} else {
perror("mq_send");
}
}
return NULL;
}
void notify_thread(union sigval arg)
{
// 获取消息队列描述符
mqd_t mqd = *((mqd_t *)arg.sival_ptr);
char buffer[256];
ssize_t recv_size = -1;
memset(buffer, 0, sizeof(buffer));
printf("notify_thread start, mqd=%d\n", mqd);
// 接收消息
recv_size = mq_receive(mqd, buffer, sizeof(buffer), NULL);
printf("notify_thread receive_size=%ld, buffer=%s\n", recv_size, buffer);
if (recv_size == -1) {
if (errno == EAGAIN) {
printf("message queue is empty!\n");
} else {
perror("mq_receive");
exit(1);
}
}
// 重新注册通知
struct sigevent sev;
sev.sigev_notify = SIGEV_THREAD;
sev.sigev_value.sival_ptr = arg.sival_ptr;
sev.sigev_notify_function = notify_thread;
if (mq_notify(mqd, &sev) == -1) {
perror("mq_notify");
exit(1);
}
}
int main(int argc, char *argv[])
{
pthread_t sender;
mqd_t mqd = -1;
// 设置消息队列属性
struct mq_attr attr;
attr.mq_flags = 0;
attr.mq_maxmsg = 10;
attr.mq_msgsize = 256;
attr.mq_curmsgs = 0;
// 创建消息队列
mqd = mq_open(QUEQUE_NAME, O_CREAT | O_RDWR, 0666, &attr);
if (mqd == (mqd_t)-1) {
perror("mq_open");
return -1;
}
// 设置消息队列通知
struct sigevent sev;
sev.sigev_notify = SIGEV_THREAD;
sev.sigev_value.sival_ptr = &mqd;
sev.sigev_notify_function = notify_thread;
if (mq_notify(mqd, &sev) == -1) {
perror("mq_notify");
mq_close(mqd);
mq_unlink(QUEQUE_NAME);
return -1;
}
// 创建发送消息的线程
if (pthread_create(&sender, NULL, sender_thread, (void*)&mqd) != 0) {
perror("pthread_create sender");
mq_close(mqd);
mq_unlink(QUEQUE_NAME);
return -1;
}
// 等待发送线程结束
pthread_join(sender, NULL);
// 等待一段时间以接收消息
sleep(5);
// 关闭并删除消息队列
mq_close(mqd);
mq_unlink(QUEQUE_NAME);
return 0;
}