文章目录
- 协程本地变量
- 相关结构体
- 实现原理
- 代码实现
- 代码示例
- 思考题
协程本地变量
在上一篇文章中,我们介绍了如何通过协程来实现批量并发执行,本篇文章将向大家介绍如何在协程的基础之上,实现协程本地变量。
注意:「为了减轻大家的阅读负担,在文章中只展示必要的代码,和当前讲解内容无关的代码在代码块中采用…进行忽略」。
完整的代码,已经开源在github上,地址为:https://github.com/wanmuc/MyCoroutine 。
在开源库中协程本地变量涉及的代码文件如下所示,文章后续代码的出处就不再说明。
.
├── common.h // LocalVariable结构体定义
├── localvariable.cpp // 协程本地变量核心函数实现
├── localvariable.h // 协程本地变量模版类封装
└── mycoroutine.h // 协程本地变量核心函数声明
相关结构体
协程本地变量该如何实现呢?其实实现原理和线程本地变量一样。
就是使用封装的变量的内存地址作为key,且key的值是唯一的。
虽然所有的协程中拿到的key的值都是相等的,但是在不同的协程中通过这个key可以读写到不同的值。
具体如何实现呢?先来看一下协程本地变量辅助结构体LocalVariable。
// 协程本地变量辅助结构体
typedef struct LocalVariable {
void *data{nullptr};
function<void(void *)> free{nullptr}; // 用于释放本地协程变量值的内存
} LocalVariable;
在LocalVariable结构体中只有两个成员变量,一个是data,一个是free,data是指向协程本地变量值的内存指针,free是用于释放协程本地变量值内存的函数。
单有LocalVariable是不够的,还需要在协程核心结构体Coroutine中新增一个成员变量local。
// 协程结构体
typedef struct Coroutine {
...
unordered_map<void *, LocalVariable> local; // 协程本地变量映射map,key是协程变量的内存地址
...
} Coroutine;
local变量是一个unordered_map,「它的key就是一个void*的通用指针,value则是LocalVariable类型的变量」。
实现原理
本小节来介绍一下实现原理,协程本地变量是通过this指针,去不同协程的local中索引不同的LocalVariable类型的变量。
然后再进行读写操作的,从而实现在不同的协程内操作协程本地变量的相互隔离。
下面的简图将使这个逻辑变得清晰易懂。
代码实现
协程本地变量的实现涉及到2个类,一个是Schedule类,一个CoroutineLocal模版类。
在Schedule类中新增了2个函数的声明。
// 协程调度器
class Schedule {
public:
...
void LocalVariableSet(void *key, const LocalVariable &local_variable); // 设置协程本地变量
bool LocalVariableGet(void *key, LocalVariable &local_variable); // 获取协程本地变量
...
};
LocalVariableSet和LocalVariableGet函数分别用于设置和获取协程本地变量的值,是核心的底层函数。它们的实现如下所示。
void Schedule::LocalVariableSet(void* key, const LocalVariable& local_variable) {
assert(not is_master_);
auto iter = coroutines_[slave_cid_]->local.find(key);
if (iter != coroutines_[slave_cid_]->local.end()) {
iter->second.free(iter->second.data); // 之前有值,则要先释放空间
}
coroutines_[slave_cid_]->local[key] = local_variable;
}
bool Schedule::LocalVariableGet(void* key, LocalVariable& local_variable) {
assert(not is_master_);
auto iter = coroutines_[slave_cid_]->local.find(key);
if (iter == coroutines_[slave_cid_]->local.end()) {
int32_t relate_bid = coroutines_[slave_cid_]->relate_bid;
if (relate_bid == kInvalidBid) { // 没有关联的Batch,直接返回false
return false;
}
int32_t parent_cid = batchs_[relate_bid]->parent_cid;
iter = coroutines_[parent_cid]->local.find(key);
if (iter == coroutines_[parent_cid]->local.end()) { // 父从协程中也没查找到,直接返回false
return false;
}
}
local_variable = iter->second;
return true;
}
LocalVariableSet函数的逻辑如下:
- 在local中查询,如果已经存在值,则先调用free函数来释放空间。
- 最后把协程本地变量最新的值,保存在local中。
LocalVariableGet函数的逻辑如下:
- 在local中查询,如果查询到了,则直接返回值。
- 查询不到,则判断当前协程是否为批量并发执行的子从协程。
- 如果是子从协程,再在父从协程中查询,查询到了,则直接返回值。
- 父从协程中查询不到,则返回不存在。
注意:「想了解批量并发执行,可以移步阅读本系列的第三篇文章」。
从易用性的角度出发,协程本地变量做了易用性封装,相关的代码如下所示。
// 协程本地变量模版类封装
template <typename Type>
class CoroutineLocal {
public:
CoroutineLocal(Schedule &schedule) : schedule_(schedule) {}
static void free(void *data) {
if (data)
delete (Type *)data;
}
Type &Get() {
MyCoroutine::LocalVariable local_variable;
bool result = schedule_.LocalVariableGet(this, local_variable);
assert(result == true);
return *(Type *)local_variable.data;
}
// 重载类型转换操作符,实现协程本地变量直接给Type类型的变量赋值的功能
operator Type() {
return Get();
}
// 重载赋值操作符,实现Type类型的变量直接给协程本地变量赋值的功能
CoroutineLocal &operator=(const Type &value) {
Set(value);
return *this;
}
private:
void Set(Type value) {
Type *data = new Type(value);
MyCoroutine::LocalVariable local_variable;
local_variable.data = data;
local_variable.free = free;
schedule_.LocalVariableSet(this, local_variable);
}
private:
Schedule &schedule_;
};
CoroutineLocal是一个模版类,只有一个schedule_成员变量,是指向Schedule类对象的引用。
CoroutineLocal类中的Get和Set函数,是对LocalVariableGet和LocalVariableSet函数调用的简单封装。
CoroutineLocal类还重载了类型转换操作符和赋值操作符,「从而实现CoroutineLocal类对象和Type类型变量的相互赋值」。
代码示例
最后我们来看一下协程本地变量使用的一个示例。
#include "mycoroutine.h"
#include "localvariable.h"
#include <iostream>
using namespace std;
using namespace MyCoroutine;
void LocalVar1(Schedule &schedule, CoroutineLocal<int32_t> &local_var,
int &sum) {
local_var = 100;
schedule.CoroutineYield();
assert(100 == local_var);
sum += local_var;
}
void LocalVar2(Schedule &schedule, CoroutineLocal<int32_t> &local_var,
int &sum) {
local_var = 200;
schedule.CoroutineYield();
assert(200 == local_var);
sum += local_var;
}
int main() {
// 创建一个协程调度对象,并自动生成大小为1024的协程池
Schedule schedule(1024);
int sum = 0;
CoroutineLocal<int32_t> local_var(schedule);
schedule.CoroutineCreate(LocalVar1, ref(schedule), ref(local_var), ref(sum));
schedule.CoroutineCreate(LocalVar2, ref(schedule), ref(local_var), ref(sum));
schedule.Run(); // Run函数完成从协程的自行调度,直到所有的从协程都执行完
cout << "sum = " << sum << endl;
return 0;
}
上面的代码中,在main函数中创建了一个协程本地变量local_var,然后分别在两个不同的协程中对local_var进行读写操作,在两个不同的协程中分别读写到了不同的值。
思考题
「如果实现的是协程互斥锁,该如何实现呢?在评论区给出你的想法。」
本文是大厂后端技术专家万木春原创。作者更多技术干货,见下方的书籍。