概览
bthread(代码)是baidu-rpc使用的M:N线程库,是其稳定和高效的关键组件。能更好地利用多核cpu,能在pthread中运行,需要注意的是,bthread的work stealing机制会da让任务pthread发生切换,从而让thread_local变量不可信,通常在bthread_usleep或这join的时候就有可能发生切换。
thread_local SomeObject obj;
...
SomeObject* p = &obj;
p->bar();
bthread_usleep(1000);
p->bar();
解决方案:
- 不使用线程级变量传递业务数据。这是一种槽糕的设计模式,依赖线程级数据的函数也难以单测。判断是否滥用:如果不使用线程级变量,业务逻辑是否还能正常运行?线程级变量应只用作优化手段,使用过程中不应直接或间接调用任何可能阻塞的bthread函数。比如使用线程级变量的tcmalloc就不会和bthread有任何冲突。
- 如果一定要(在业务中)使用线程级变量,使用bthread_key_create和bthread_getspecific。另外,bthread没有优先级,如果需要线程优先级,还是需要使用pthread。
全局视图
在进入代码实现介绍前,通过一张图了解bthread库的主要类/结构体,以及他们之间的关系。
TaskControl是单例模式的全局控制类。
TaskGroup由每一个Worker线程进行创建,和Worker线程一一对应。
TaskMeta是描述bthread任务的结构体,里面保存了bthread任务的各种信息,包括id,状态,任务函数和参数,栈空间等等。
ParkingLot则是通过futex实现的线程同步工具,用来唤醒(有任务提交时)和阻塞(无任务时)挂在上面的Worker线程(存在多个ParkingLot的目的是减少竞争)。
在关系上,TaskControl中创建了Worker线程,然后Worker线程在线程入口函数里面创建属于自己的TaskGroup。TaskGroup内有rq和remote_rq两个队列,里面保存的是外部提交需要执行的bthread任务id。
所以简单描述bthread库的运行过程就是:外部提交一个bthread任务到某个TaskGroup中的任务队列,然后通过ParkingLot唤醒休眠的Worker线程,Worker线程会从自己的TaskGroup中取出或者其他Worker线程的TaskGroup中“偷”出bthread任务进行执行。某种意义上来说,和线程池有些类似,但是bthread任务粒度更小,所以bthread库也被称为一种类协程库。
这种“偷”任务的实现称为work stealing机制。它保证了即使某一个Worker线程被阻塞,它内部任务队列的bthread任务也可以被其他Worker线程“偷取”执行成功。所以有时候在bthread任务中调用了bthread_join / bthread_usleep等函数把自己切出后,等到函数返回继续执行的时候会发现执行自己的线程已经变了,这时pthread local的数据已经失效,这是在使用的时候一个需要注意的点。
代码剖析
接下来开始介绍bthread库主体功能的代码实现,包括bthread库的初始化、bthread任务的提交、以及bthread任务的执行和较为核心的栈空间切换。
具体的,我们从一个bthread和pthread比较的demo开始。从功能、函数参数和调用形式上,两者非常相似,都是实现后台运行thread_func任务函数。但bthread额外拥有唯一标识自己的bthread id。
#include <iostream>
#include "bthread.h"
const char* bthread_flag = "bthread";
const char* pthread_flag = "pthread";
void* thread_func(void* args) {
std::cout << "flag: " << (char*)(args) <<
", pthread_id: " << pthread_self() <<
", bthread_id: " << bthread_self() << std::endl;
return nullptr;
}
int main(int argc, char** argv) {
bthread_t bth;
pthread_t pth;
if (bthread_start_background(&bth, nullptr, thread_func, (void*)bthread_flag) != 0) {
std::cout << "fail to create bthread" << std::endl;
return -1;
}
if (pthread_create(&pth, nullptr, thread_func, (void*)pthread_flag) != 0) {
std::cout << "fail to create pthread" << std::endl;
return -1;
}
bthread_join(bth, nullptr);
pthread_join(pth, nullptr);
return 0;
}
// $./output/bin/bthread_test
// flag: pthread, pthread_id: 140281875785472, bthread_id: 0
// flag: bthread, pthread_id: 140282267948800, bthread_id: 4294968064
初始化
bthread_start_background是我们启动一个bthread任务常用的接口函数。
它会首先判断自己的调用是不是由某个Worker线程发起的。这在BRPC框架下是很常见的场景,因为RPC回调中的主体业务代码都是运行在Worker线程内的。它判断的依据是pthread local级别的TaskGroup指针变量tls_task_group是否为空。之前有提到TaskGroup和Worker线程一一对应,实现的方式就是在Worker线程创建好TaskGroup后,赋值给自己的线程变量tls_task_group。
所以如果tls_task_group不为空,这说明bthread库已经初始化过了,并且位于Worker线程中,直接调用TaskGroup的start_background函数提交bthread任务到它的任务队列;否则就需要尝试进行bthread库的初始化,然后从其中随机挑选一个TaskGroup同样调用它的start_background函数进行任务提交。前面的demo很明显走的是后者的逻辑,需要进行bthread库的初始化。
int bthread_start_background(bthread_t* __restrict tid,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) __THROW {
bthread::TaskGroup* g = bthread::tls_task_group;
if (g) {
// start from worker
return g->start_background<false>(tid, attr, fn, arg);
}
return bthread::start_from_non_worker(tid, attr, fn, arg);
}
BASE_FORCE_INLINE int
start_from_non_worker(bthread_t* __restrict tid,
const bthread_attr_t* __restrict attr,
void * (*fn)(void*),
void* __restrict arg) {
TaskControl* c = get_or_new_task_control();
if (NULL == c) {
return ENOMEM;
}
...
return c->choose_one_group()->start_background<true>(
tid, attr, fn, arg);
}
get_or_new_task_control函数实现了全局单例类TaskControl的构造和初始化。它是通过double-checked locking pattern(DCLP)方式实现的单例模式,使用了memory fence保证了正确性(感兴趣的可以看这篇论文(https://www.aristeia.com/Papers/DDJ_Jul_Aug_2004_revised.pdf)和文章(https://zh.wikipedia.org/wiki/%E5%8F%8C%E9%87%8D%E6%A3%80%E6%9F%A5%E9%94%81%E5%AE%9A%E6%A8%A1%E5%BC%8F#Microsoft_Visual_C++_%E4%B8%AD%E7%9A%84%E4%BD%BF%E7%94%A8),C++11后通常用std::call_once实现单例模式)。
inline TaskControl* get_or_new_task_control() {
base::atomic<TaskControl*>* p = (base::atomic<TaskControl*>*)&g_task_control;
TaskControl* c = p->load(base::memory_order_consume);
if (c != NULL) {
return c;
}
BAIDU_SCOPED_LOCK(g_task_control_mutex);
c = p->load(base::memory_order_consume);
if (c != NULL) {
return c;
}
c = new (std::nothrow) TaskControl;
if (NULL == c) {
return NULL;
}
int concurrency = FLAGS_bthread_min_concurrency > 0 ?
FLAGS_bthread_min_concurrency :
FLAGS_bthread_concurrency;
if (c->init(concurrency) != 0) {
LOG(ERROR) << "Fail to init g_task_control";
delete c;
return NULL;
}
p->store(c, base::memory_order_release);
return c;
}
TaskControl的init函数,首先检查了入参concurrency的正确性,然后创建了concurrency数量的Worker线程。
默认情况下会创建concurrency = FLAGS_bthread_concurrency = 8 + 1 = 9条线程。
BRPC框架下默认情况会设置concurrency为系统CPU核心数 + 1,也可以通过ServerOptions.num_threads配置Worker线程的数量。
int TaskControl::init(int concurrency) {
if (_concurrency != 0) {
LOG(ERROR) << "Already initialized";
return -1;
}
if (concurrency <= 0) {
LOG(ERROR) << "Invalid concurrency=" << concurrency;
return -1;
}
_concurrency = concurrency;
...
_workers.resize(_concurrency);
for (int i = 0; i < _concurrency; ++i) {
const int rc = pthread_create(&_workers[i], NULL, worker_thread<true>, this);
if (rc) {
LOG(ERROR) << "Fail to create _workers[" << i << "], " << berror(rc);
return -1;
}
}
...
while (_ngroup == 0) {
usleep(100); // TODO: Elaborate
}
return 0