一、简介
消息队列,是一种常用于任务间通信的数据结构,实现了接收来自任务或中断的不固定长度的消息,并根据不同的接口选择传递消息是否存放在自己空间。任务能够从队列里面读取消息,当队列中的消息是空时,挂起读取任务;当队列中有新消息时,挂起的读取任务被唤醒并处理新消息。
用户在处理业务时,消息队列提供了异步处理机制,允许将一个消息放在队列,但并不立即处理,同时队列还能起到缓冲消息的作用。
LiteOS中使用队列数据结构实现任务异步通信工作,具有如下特性:
- 消息以先进先出方式排队,支持异步读写工作方式
- 读队列和写队列都支持超时机制
- 发送消息类型由通信双方约定,可以允许不同长度(不超过队列节点最大值)的消息。
- 一个任务能够从任意一个消息队列接收和发送消息
- 单个任务能够从同一个消息队列接收和发送消息
- 当队列使用结束后,如果是动态申请的内存,需要通过释放内存函数进行内存回收。
更多队列概念,请参考如下链接:FreeRTOS学习四(队列)_xqueuesendtobackfromisr_t_guest的博客-CSDN博客
Message Queue
二、 运作机制
创建队列时,根据用户传入队列长度和消息节点大小来开辟相应的内存空间,以供该队列使用,返回队列ID。
在队列控制块中维护一个消息头字节位置Head和一个消息尾节点位置Tail来表示当前队列中消息存储情况。Head表示队列中被占用消息的起始位置。Tail表示队列中空闲消息的起始位置。队列刚创建时,Head和Tail均指向队列起始位置。
写队列时,根据Tail找到被占用消息节点末尾的空闲节点,作为数据写入对象。
读队列时,根据Head找到最先写入队列中的消息节点进行读取。
删除队列时,根据传入的队列ID寻找到对应的队列,把队列状态置为未使用,释放原队列所占用的空间。对应的队列控制头置为初始状态。
三、API介绍
osMessageQueueNew
函数功能:
创建队列。不能在中断中使用。
函数原型:
osMessageQueueId_t osMessageQueueNew(uint32_t msg_count, uint32_t msg_size, const osMessageQueueAttr_t *attr)
参数:
msg_count:队列元素总个数
msg_size:队列单个元素大小
attr:属性,自定义地址时使用。默认为NULL
返回值:
NULL:失败
其他:队列标识符
实例:
typedef struct
{
char *Buf;
uint32_t len;
uint32_t Idx;
} MSGQUEUE_OBJ_t;
osMessageQueueId_t mid_MsgQueue;
mid_MsgQueue = osMessageQueueNew(MSGQUEUE_OBJECTS, sizeof(MSGQUEUE_OBJ_t), NULL);
osMessageQueuePut
函数功能:
数据入队。如果队列满,则挂起,直到队列空余或超时。如果超时时间为0,可以在中断中使用。
函数原型:
osStatus_t osMessageQueuePut(osMessageQueueId_t mq_id, const void *msg_ptr, uint8_t msg_prio, uint32_t timeout)
参数:
mq_id:队列标识符。队列创建osMessageQueueNew时获得。
msg_ptr:入队的数据地址
msg_prio:入队数据的优先级,优先级高,可有限被读出。
timeout:入队超时等待时间。osWaitForever死等
返回值:
osOK:成功
其他值:失败
实例:
osMessageQueueId_t mid_MsgQueue;
typedef struct
{
char *Buf;
uint32_t len;
uint32_t Idx;
} MSGQUEUE_OBJ_t;
MSGQUEUE_OBJ_t msg;
msg.Buf = "this is a queue test";
msg.Idx = 0U;
msg.len = strlen(msg.Buf);
osMessageQueuePut(mid_MsgQueue, &msg, 0U, 0U);
osMessageQueueGet
函数功能:
数据出队,如果队列空,则挂起,直到队列非空或超时。如果超时为0,可以在中断中调用。
函数原型:
osStatus_t osMessageQueueGet(osMessageQueueId_t mq_id, void *msg_ptr, uint8_t *msg_prio, uint32_t timeout)
参数:
mq_id:队列标识符。队列创建osMessageQueueNew时获得。
msg_ptr:入队的数据地址
msg_prio:入队数据的优先级,优先级高,可有限被读出。
timeout:入队超时等待时间。osWaitForever死等
返回值:
osOK:成功
其他值:失败
实例:
typedef struct
{
char *Buf;
uint32_t len;
uint32_t Idx;
} MSGQUEUE_OBJ_t;
osMessageQueueId_t mid_MsgQueue;
osStatus_t status;
MSGQUEUE_OBJ_t msg;
status = osMessageQueueGet(mid_MsgQueue, &msg, NULL, osWaitForever);
osMessageQueueGetMsgSize
函数功能:
获取每个队列元素的大小。可以在中断中使用。
函数原型:
uint32_t osMessageQueueGetMsgSize(osMessageQueueId_t mq_id)
参数:
mq_id:队列标识符。队列创建osMessageQueueNew时获得。
返回值:
每个队列元素的大小
实例:
osMessageQueueId_t mid_MsgQueue;
osMessageQueueGetMsgSize(mid_MsgQueue);
osMessageQueueGetCount
函数功能:
获取已经入队元素的个数。可以在中断中调用。
函数原型:
uint32_t osMessageQueueGetCount(osMessageQueueId_t mq_id)
参数:
mq_id:队列标识符。队列创建osMessageQueueNew时获得。
返回值:
已经入队元素的个数
实例:
osMessageQueueId_t mid_MsgQueue;
osMessageQueueGetCount(mid_MsgQueue);
osMessageQueueGetSpace
函数功能:
获取队列中空闲元素个数。可以在中断中调用。
函数原型:
uint32_t osMessageQueueGetSpace(osMessageQueueId_t mq_id)
参数:
mq_id:队列标识符。队列创建osMessageQueueNew时获得。
返回值:
队列中空闲元素的个数
实例:
osMessageQueueId_t mid_MsgQueue;
osMessageQueueGetSpace(mid_MsgQueue);
osMessageQueueGetCapacity
函数功能:
获取队列中元素总个数。可以在中断中调用。
函数原型:
uint32_t osMessageQueueGetCapacity(osMessageQueueId_t mq_id)
参数:
mq_id:队列标识符。队列创建osMessageQueueNew时获得。
返回值:
队列中元素总个数
实例:
osMessageQueueId_t mid_MsgQueue;
osMessageQueueGetCapacity(mid_MsgQueue);
四、实例
这里创建两个任务,一个任务操作数据入队,一个任务操作数据出队。并实时监控队列状态。其中,出队的任务操作比入队慢,所以一定会队列满。
#define LOG_I(fmt, args...) printf("<%8ld> - [QUEUE]:"fmt"\r\n",osKernelGetTickCount(),##args);
#define LOG_E(fmt, args...) printf("<%8ld>-[QUEUE_ERR]>>>>>>>>>>>>:"fmt"\r\n",osKernelGetTickCount(), ##args);
#define MSGQUEUE_OBJECTS 16
typedef struct
{
char *Buf;
uint32_t len;
uint32_t Idx;
} MSGQUEUE_OBJ_t;
osMessageQueueId_t mid_MsgQueue;
void Thread_MsgQueue1(void *argument)
{
(void)argument;
MSGQUEUE_OBJ_t msg;
static char temp_data[] = "this is a queue test";
msg.Buf = NULL;
msg.Idx = 0U;
LOG_I("each element %d Byte",osMessageQueueGetMsgSize(mid_MsgQueue));
while (1)
{
msg.Buf = temp_data;
msg.len = sizeof(temp_data);
msg.Idx++;
LOG_I("[enqueue]-111queued:%ld elements,left:%ld elements,total:%ld",osMessageQueueGetCount(mid_MsgQueue),osMessageQueueGetSpace(mid_MsgQueue),osMessageQueueGetCapacity(mid_MsgQueue));
osMessageQueuePut(mid_MsgQueue, &msg, 0U, 0U);
LOG_I("[enqueue]-222queued:%ld elements,left:%ld elements,total:%ld",osMessageQueueGetCount(mid_MsgQueue),osMessageQueueGetSpace(mid_MsgQueue),osMessageQueueGetCapacity(mid_MsgQueue));
osThreadYield();
osDelay(50);
}
}
void Thread_MsgQueue2(void *argument)
{
(void)argument;
osStatus_t status;
MSGQUEUE_OBJ_t msg;
while (1)
{
LOG_I("[dequeue]-111queued:%ld elements,left:%ld elements,total:%ld",osMessageQueueGetCount(mid_MsgQueue),osMessageQueueGetSpace(mid_MsgQueue),osMessageQueueGetCapacity(mid_MsgQueue));
status = osMessageQueueGet(mid_MsgQueue, &msg, NULL, osWaitForever);
LOG_I("[dequeue]-222queued:%ld elements,left:%ld elements,total:%ld",osMessageQueueGetCount(mid_MsgQueue),osMessageQueueGetSpace(mid_MsgQueue),osMessageQueueGetCapacity(mid_MsgQueue));
if (status == osOK)
{
LOG_I("Message Queue Get msg:%d,len:%d-%s\n", msg.Idx,msg.len,msg.Buf);
}
osDelay(100);
}
}
void app_queue_init(void)
{
mid_MsgQueue = osMessageQueueNew(MSGQUEUE_OBJECTS, sizeof(MSGQUEUE_OBJ_t), NULL);
if (mid_MsgQueue == NULL)
{
LOG_E("Falied to create Message Queue!\n");
}
osThreadAttr_t attr;
attr.attr_bits = 0U;
attr.cb_mem = NULL;
attr.cb_size = 0U;
attr.stack_mem = NULL;
attr.stack_size = 1024 * 10;
attr.priority = 25;
attr.name = "Thread_MsgQueue1";
if (osThreadNew(Thread_MsgQueue1, NULL, &attr) == NULL)
{
LOG_E("Falied to create Thread_MsgQueue1!\n");
}
attr.name = "Thread_MsgQueue2";
if (osThreadNew(Thread_MsgQueue2, NULL, &attr) == NULL)
{
LOG_E("Falied to create Thread_MsgQueue2!\n");
}
}
看结果:
可以看到,队列入队和出队正常运行,且因为入队比出队快,所以队列中空闲元素越来越少。