QWaitCondition内部实现结构图:
相关系列文章
C++之Pimpl惯用法
目录
1.简介
2.示例
2.1.全局配置
2.2.生产者Producer
2.3.消费者Consumer
2.4.测试例子
3.原理分析
3.1.源码介绍
3.2.辅助函数CreateEvent
3.3.辅助函数WaitForSingleObject
3.4.QWaitConditionEvent
3.5.QWaitConditionEventPrivate
3.6.流程分析
4.总结
1.简介
QWaitCondition
是用来同步线程的条件变量,头文件<QWaitCondition>
,类中的所有函数都是线程安全的。主要的公共函数(以Qt5.12.12为例)如下表:
返回类型 | 函数名称 | 含义 |
QWaitCondition () | 构造函数 | |
---|---|---|
~QWaitCondition () | 析构函数 | |
bool | wait ( QMutex * mutex, unsigned long time = ULONG_MAX ) | mutex将被解锁,并且调用线程将会阻塞,直到下列条件之一满足才想来: (1)另一个线程使用wakeOne()或wakeAll()传输给它; (2)time毫秒过去。 |
bool | wait(QMutex *lockedMutex, QDeadlineTimer deadline) | 同上 |
bool | wait ( QReadWriteLock * readWriteLock, unsigned long time = ULONG_MAX ) | readWriteLock将被解锁,并且调用线程将会阻塞,直到下列条件之一满足才想来: (1)另一个线程使用wakeOne()或wakeAll()传输给它; (2)time毫秒过去。 |
bool | wait(QReadWriteLock *lockedReadWriteLock, QDeadlineTimer deadline) | 同上 |
void | wakeAll () | 唤醒所有等待的线程,线程唤醒的顺序不确定,由操作系统的调度策略决定 |
void | wakeOne() | 唤醒等待QWaitCondition的线程中的一个线程,线程唤醒的顺序不确定,由操作系统的调度策略决定 |
void | notify_all() | 同wakeAll() |
void | notify_one() | 同wakeOne() |
QWaitCondition允许线程告诉其他线程某种条件已经满足。一个或多个线程可以阻止等待QWaitCondition使用wakeOne()或wakeAll()设置条件。使用wakeOne()唤醒一个随机选择的线程,或使用wakeAll()唤醒所有线程。
2.示例
以生产者/消费者模型为例,看一下具体实现:
2.1.全局配置
//! [0]
const int DataSize = 127;
const int BufferSize = 8192;
char buffer[BufferSize];
QWaitCondition bufferNotEmpty;
QWaitCondition bufferNotFull;
QMutex mutex;
int numUsedBytes = 0;
//! [0]
主要有缓冲区buffer, 循环缓冲区大小BufferSize及生产的数量,小于 DataSize,这意味着在某一时刻生产者将达到缓冲区的末尾,并从开始位置重新启动。
要同步生产者和消费者,需要两个 wait 条件和一个 mutex。当生产者生成一些数据时,bufferNotEmpty 条件被发射,告诉消费者可以读取它了;当消费者读取一些数据时,bufferNotFull 条件被发射,告诉生产者生成更多的数据。numUsedBytes 为缓冲区中所包含数据的字节数。
总之,wait 条件、mutex、和 numUsedBytes 计数器确保生产者不会先于消费者超过 BufferSize 的大小,而消费者永远不会读取生产者尚未生成的数据。
2.2.生产者Producer
生产者的代码如下:
//! [1]
class Producer : public QThread
//! [1] //! [2]
{
public:
Producer(QObject *parent = NULL) : QThread(parent)
{
}
void run() override
{
for (int i = 0; i < DataSize; ++i) {
mutex.lock();
if (numUsedBytes == BufferSize)
bufferNotFull.wait(&mutex);
mutex.unlock();
buffer[i % BufferSize] = i;//"ACGT"[QRandomGenerator::global()->bounded(4)];
mutex.lock();
++numUsedBytes;
bufferNotEmpty.wakeAll();
mutex.unlock();
}
}
};
//! [2]
生产者根据DataSize的大小循环生产数据。在往循环缓冲区写入一个字母之前,它必须检查缓冲区是否已满(即满足numUsedBytes等于BufferSize条件),如果缓冲区满了,现成就会在bufferNotFull条件上等待。
满足条件后,生产者增加 numUsedBytes,并且标志 bufferNotEmpty 条件为 true,从而唤醒消费者线程去消费。
2.3.消费者Consumer
消费者的代码如下:
//! [3]
class Consumer : public QThread
//! [3] //! [4]
{
Q_OBJECT
public:
Consumer(QObject *parent = NULL) : QThread(parent)
{
}
void run() override
{
for (int i = 0; i < DataSize; ++i) {
mutex.lock();
if (numUsedBytes == 0)
bufferNotEmpty.wait(&mutex);
mutex.unlock();
fprintf(stderr, "%d\n", buffer[i % BufferSize]);
mutex.lock();
--numUsedBytes;
bufferNotFull.wakeAll();
mutex.unlock();
}
fprintf(stderr, "\n");
}
signals:
void stringConsumed(const QString &text);
};
//! [4]
代码非常类似于生产者,在读取字节之前,需要先检查缓冲区是否为空(numUsedBytes 为 0),而非它是否为已满。并且,当它为空时,等待 bufferNotEmpty 条件。在读取字节后,减小 numUsedBytes (而非增加),并标志 bufferNotFull 条件(而非 bufferNotEmpty 条件)。
2.4.测试例子
代码如下:
//! [5]
int main(int argc, char *argv[])
//! [5] //! [6]
{
QCoreApplication app(argc, argv);
Producer producer;
Consumer consumer;
producer.start();
consumer.start();
producer.wait();
consumer.wait();
return 0;
}
//! [6]
#include "waitconditions.moc"
上面的测试代码是一个生产者对一个消费者,生产一个消费一个,所以看到的结果是按照顺序输出,结果如下:
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
此示例中提出的“生产者 - 消费者”模式,适用于编写高并发多线程应用。在多处理器计算机中,程序可能比基于 mutex 的方案快达两倍之多,因为两个线程可以同一时间在缓冲区的不同部分处于激活状态。
上面我们讲解了QWaitCondition的用法,如果还有兴趣继续探究它的实现原理的话,则可以继续往下看。
3.原理分析
在Qt5.12.12版本上,以windows端来讲解它的实现原理。
3.1.源码介绍
序号 | 文件 | 说明 | 源码位置 |
---|---|---|---|
1 | qwaitcondition.h | 声明QWaitCondition | .\5.12.12\Src\qtbase\src\corelib\thread |
2 | qwaitcondition_win.cpp | 定义QWaitConditionEvent,QWaitConditionPrivate,QWaitCondition的windows实现 | .\5.12.12\Src\qtbase\src\corelib\thread |
3 | qwaitcondition_unix.cpp | 定义QWaitConditionEvent,QWaitConditionPrivate,QWaitCondition的unix实现 | .\5.12.12\Src\qtbase\src\corelib\thread |
4 | qwaitcondition.h | 对外访问包含的头文件 | .\5.12.12\Src\qtbase\include\QtCore |
3.2.辅助函数CreateEvent
CreateEvent的定义如下:
HANDLE CreateEvent(
[in, optional] LPSECURITY_ATTRIBUTES lpEventAttributes,
[in] BOOL bManualReset,
[in] BOOL bInitialState,
[in, optional] LPCSTR lpName
);
创建或打开一个命名或未命名的事件对象。
lpEventAttributes: 指向 SECURITY_ATTRIBUTES 结构的指针。 如果此参数为 NULL,则子进程无法继承句柄。结构的 lpSecurityDescriptor 成员为新事件指定 安全描述符 。 如果 lpEventAttributes 为 NULL,则事件将获取默认的安全描述符。 事件的默认安全描述符中的 ACL 来自创建者的主要令牌或模拟令牌。
bManualReset: 如果此参数为 TRUE,则函数将创建手动重置事件对象,该对象需要使用 ResetEvent 函数将事件状态设置为非签名。 如果此参数为 FALSE,则函数将创建一个自动重置事件对象,在释放单个等待线程后,系统会自动将事件状态重置为未签名。
bInitialState: 如果此参数为 TRUE,则会向事件对象发出初始状态信号;否则,它将不进行签名。
lpName: 可选项,事件对象的名称。 名称限制为 MAX_PATH 个字符。 名称比较区分大小写。
如果 lpName 与现有命名事件对象的名称匹配,则此函数请求 EVENT_ALL_ACCESS 访问权限。 在这种情况下, bManualReset 和 bInitialState 参数将被忽略,因为它们已由创建过程设置。 如果 lpEventAttributes 参数不是 NULL,它将确定是否可以继承句柄,但忽略其安全描述符成员。
如果 lpName 为 NULL,则创建不带名称的事件对象。
如果 lpName 与同一命名空间中另一种对象的名称匹配, (例如现有信号灯、互斥体、可等待计时器、作业或文件映射对象) ,则函数将失败, GetLastError 函数将返回 ERROR_INVALID_HANDLE。 发生这种情况的原因是这些对象共享相同的命名空间。
名称可以具有“Global”或“Local”前缀,以在全局命名空间或会话命名空间中显式创建对象。 名称的其余部分可以包含除反斜杠字符 (\) 以外的任何字符。 有关详细信息,请参阅 内核对象命名空间。 使用终端服务会话实现快速用户切换。 内核对象名称必须遵循终端服务概述的准则,以便应用程序可以支持多个用户。
可以在专用命名空间中创建 对象。 有关详细信息,请参阅 对象命名空间。
返回值:如果函数成功,则返回值是事件对象的句柄。 如果命名事件对象在函数调用之前存在,则函数将返回现有对象的句柄, GetLastError 将返回 ERROR_ALREADY_EXISTS。
如果函数失败,则返回值为 NULL。 要获得更多的错误信息,请调用 GetLastError。
3.3.辅助函数WaitForSingleObject
这个是windows系统多线程,进程中用的最多的一个函数,它的定义如下:
DWORD WaitForSingleObject(HANDLE hHandle, DWORD dwMilliseconds);
这个函数的作用是等待一个内核对象,在Windows系统上一个内核对象通常使用其句柄来操作,参数hHandle即需要等待的内核对象,参数dwMilliseconds是等待这个内核对象的最大时间,时间单位是毫秒,其类型是DWORD,这是一个unsigned long类型。如果我们需要无限等待下去,可以将这个参数值设置为INFINITE宏。
在Windows上可以调用WaitForSingleObject等待的常见对象如下表所示:
可以被等待的对象 | 等待对象成功的含义 | 对象类型 |
---|---|---|
线程 | 等待线程结束 | HANDLE |
Process | 等待进程结束 | HANDLE |
Event (事件) | 等待 Event 有信号 | HANDLE |
Mutex (互斥体) | 等待持有 Mutex 的线程释放该 Mutex,等待成功,拥有该Mutex | HANDLE |
Semaphore(信号量) | 等待该 Semaphore 对象有信号 | HANDLE |
上面介绍的等待线程对象上文中已经详细介绍过了,这里不再重复了,等待进程退出与等待线程退出类似,也不再赘述。下文中我们将详细介绍 Event、Mutex、Semaphore 这三种类型的资源同步对象,这里我们先接着介绍WaitForSingleObject函数的用法,该函数的返回值一般有以下类型:
- WAIT_FAILED,表示WaitForSingleObject函数调用失败了,调用失败时,可以通过GetLastError 函数得到具体的错误码;
- WAIT_OBJECT_0,表示WaitForSingleObject成功“等待”到设置的对象;
- WAIT_TIMEOUT,等待超时;
- WAIT_ABANDONED,当等待的对象是Mutex类型时,如果持有该Mutex对象的线程已经结束,但是没有在结束前释放该Mutex,此时该Mutex已经处于废弃状态,其行为是未知的,不建议再使用该Mutex。
上面我们讲解了CreateEvent和WaitForSingleObject函数,下面看一个示例:
#include <windows.h>
#include <stdio.h>
#define THREADCOUNT 4
HANDLE ghWriteEvent;
HANDLE ghThreads[THREADCOUNT];
DWORD WINAPI ThreadProc(LPVOID);
void CreateEventsAndThreads(void)
{
int i;
DWORD dwThreadID;
// Create a manual-reset event object. The write thread sets this
// object to the signaled state when it finishes writing to a
// shared buffer.
ghWriteEvent = CreateEvent(
NULL, // default security attributes
TRUE, // manual-reset event
FALSE, // initial state is nonsignaled
TEXT("WriteEvent") // object name
);
if (ghWriteEvent == NULL)
{
printf("CreateEvent failed (%d)\n", GetLastError());
return;
}
// Create multiple threads to read from the buffer.
for(i = 0; i < THREADCOUNT; i++)
{
// TODO: More complex scenarios may require use of a parameter
// to the thread procedure, such as an event per thread to
// be used for synchronization.
ghThreads[i] = CreateThread(
NULL, // default security
0, // default stack size
ThreadProc, // name of the thread function
NULL, // no thread parameters
0, // default startup flags
&dwThreadID);
if (ghThreads[i] == NULL)
{
printf("CreateThread failed (%d)\n", GetLastError());
return;
}
}
}
void WriteToBuffer(VOID)
{
// TODO: Write to the shared buffer.
printf("Main thread writing to the shared buffer...\n");
// Set ghWriteEvent to signaled
if (! SetEvent(ghWriteEvent) )
{
printf("SetEvent failed (%d)\n", GetLastError());
return;
}
}
void CloseEvents()
{
// Close all event handles (currently, only one global handle).
CloseHandle(ghWriteEvent);
}
int main( void )
{
DWORD dwWaitResult;
// TODO: Create the shared buffer
// Create events and THREADCOUNT threads to read from the buffer
CreateEventsAndThreads();
// At this point, the reader threads have started and are most
// likely waiting for the global event to be signaled. However,
// it is safe to write to the buffer because the event is a
// manual-reset event.
WriteToBuffer();
printf("Main thread waiting for threads to exit...\n");
// The handle for each thread is signaled when the thread is
// terminated.
dwWaitResult = WaitForMultipleObjects(
THREADCOUNT, // number of handles in array
ghThreads, // array of thread handles
TRUE, // wait until all are signaled
INFINITE);
switch (dwWaitResult)
{
// All thread objects were signaled
case WAIT_OBJECT_0:
printf("All threads ended, cleaning up for application exit...\n");
break;
// An error occurred
default:
printf("WaitForMultipleObjects failed (%d)\n", GetLastError());
return 1;
}
// Close the events to clean up
CloseEvents();
return 0;
}
DWORD WINAPI ThreadProc(LPVOID lpParam)
{
// lpParam not used in this example.
UNREFERENCED_PARAMETER(lpParam);
DWORD dwWaitResult;
printf("Thread %d waiting for write event...\n", GetCurrentThreadId());
dwWaitResult = WaitForSingleObject(
ghWriteEvent, // event handle
INFINITE); // indefinite wait
switch (dwWaitResult)
{
// Event object was signaled
case WAIT_OBJECT_0:
//
// TODO: Read from the shared buffer
//
printf("Thread %d reading from buffer\n",
GetCurrentThreadId());
break;
// An error occurred
default:
printf("Wait error (%d)\n", GetLastError());
return 0;
}
// Now that we are done reading the buffer, we could use another
// event to signal that this thread is no longer reading. This
// example simply uses the thread handle for synchronization (the
// handle is signaled when the thread terminates.)
printf("Thread %d exiting\n", GetCurrentThreadId());
return 1;
}
上面示例使用事件对象来防止在主线程写入该缓冲区时从共享内存缓冲区读取多个线程。 首先,主线程使用 CreateEvent 函数创建初始状态为非签名的手动重置事件对象。 然后,它会创建多个读取器线程。 主线程执行写入操作,然后在完成写入后将事件对象设置为信号状态。
在开始读取操作之前,每个读取器线程都使用 WaitForSingleObject 等待手动重置事件对象发出信号。 当 WaitForSingleObject 返回时,这表示main线程已准备好开始其读取操作。
3.4.QWaitConditionEvent
QWaitConditionEvent实际是对CreateEvent的封装,代码如下:
class QWaitConditionEvent
{
public:
inline QWaitConditionEvent() : priority(0), wokenUp(false)
{
event = CreateEvent(NULL, TRUE, FALSE, NULL);
}
inline ~QWaitConditionEvent() { CloseHandle(event); }
int priority;
bool wokenUp;
HANDLE event;
};
这个定义源码在.\Qt\Qt5.12.12\5.12.12\Src\qtbase\src\corelib\thread\qwaitcondition_win.cpp中,从中可以看出没生成一个QWaitConditionEvent就会创建一个手动重置事件对象。
3.5.QWaitConditionEventPrivate
QWaitConditionEventPrivate的定义如下:
typedef QList<QWaitConditionEvent *> EventQueue;
class QWaitConditionPrivate
{
public:
QMutex mtx;
EventQueue queue;
EventQueue freeQueue;
QWaitConditionEvent *pre();
bool wait(QWaitConditionEvent *wce, unsigned long time);
void post(QWaitConditionEvent *wce, bool ret);
};
上面代码定义了两个事件队列,一个是等待事件队列,一个空闲时间队列;还定义了3个对事件队列操作的接口,下面说明各接口的用法:
1) pre() : 从空闲队列中freeQueue取出一个事件对象 QWaitConditionEvent放入queue。
2)wait() : 在事件对象wce上等待time时间,该函数会阻塞当前线程的运行,直到time到或SetEvent。
3) post() : 把使用后的QWaitConditionEvent归还到空事件队列freeQueue里面。
3.6.流程分析
QWaitCondition的d指针是QWaitConditionEventPrivate,对QWaitCondition的操作转换为对QWaitConditionEventPrivate的操作。关键步骤流程如下:
1)wait函数执行流程
2)wakeOne函数执行流程
3) wakeAll函数执行流程
wakeAll的流程同wakeOne的流程相似,只是wakeOne是把事件队列的第一个事件对象SetEvent,而wakeAll是把事件队列中的所有事件对象SetEvent。
QWaitCondition类的设计思想也遵循Qt大部分类的设计思想Pimpl技法,关于Pimpl技法的一些详细介绍,可参考我的博客C++之Pimpl惯用法-CSDN博客
4.总结
QMutex 和 QWaitCondition 联合使用是多线程中的一个常用的习惯用法,不仅是 Qt,对于 C++ 的 std::condition_variable 和 std::mutex ,以及 java 的 synchronized / wait / notify 也都适用。
参考:
createEventA 函数 (synchapi.h) - Win32 apps | Microsoft Learn