操作系统:ubuntu20.04LTS
头文件:<ucontext.h>
什么是协程
协程可以看作轻量级线程,相比于线程,协程的调度完全由用户控制。可以理解为程序员可以暂停执行或恢复执行的函数。将每个线程看作是一个子程序,或者说一个函数,首先main函数启动后产生了main线程,在main函数中调用了function函数后会产生另一个线程执行function函数。在这个过程中,可以发现线程的执行并不受程序员的控制,从入口进入到执行完毕给出返回值,程序员无法控制。而协程则允许程序员在其执行到某个步骤时停止执行,去执行别的任务,当某个条件满足后继续执行被暂停的任务。在子程序内部中断,去执行别的子程序,在适合的时候返回继续执行。例如下面的例子,假设有两个函数func1和func2:func1: 打印123;func2打印456
假设在func1最后调用func2,这是利用线程来完成的,那么最终的结果是打印123456;
而在使用协程情况下,允许程序员中断func1,执行func2,然后返回继续执行func1,可以实现先打印12,然后执行func2,打印456,然后返回继续执行func1,打印3,最终的结果是:124563。
协程的优点和缺点
协程具有极高的执行效率,这是由于没有线程切换的开销以及没有线程同步的开销。子程序切换不设计到线程切换没有线程切换的开销,同时由于多个协程在同一线程下,不涉及到写资源冲突,协程中控制共享资源不加锁,因此执行效率比线程高。线程数目越多,协程的优势越明显。
优点
-
轻量级: 协程通常比线程更轻量级,因为它们在同一线程中运行,并且可以在不同的时间点暂停和恢复执行。这意味着在同一系统上可以运行更多的协程而不会产生过多的资源消耗。
-
低开销: 创建和销毁协程的开销通常比线程低,因为它们不涉及操作系统级别的上下文切换。
-
高效的并发: 由于协程可以在同一线程中进行切换,因此在某些情况下,协程可以比使用多线程的程序更高效地利用多核处理器。
-
简化同步: 在协程中,共享状态的同步通常更容易,因为协程可以通过消息传递等方式来进行通信,而不需要使用诸如锁之类的同步原语。
-
简化代码: 使用协程可以编写更具表达力和清晰度的异步代码。它们通常使用异步/await语法,使得编写异步代码更类似于编写同步代码。
缺点
-
不适用于CPU密集型任务: 协程通常适用于IO密集型任务,而不适用于CPU密集型任务。因为协程在同一线程中执行,如果一个协程执行了一个长时间的CPU密集型操作,它会阻塞其他协程的执行。
-
需要显式地调用异步操作: 在使用协程时,必须确保在适当的地方使用异步操作。如果忘记在长时间运行的操作中使用异步调用,可能会阻塞整个程序的执行。
-
难以调试: 由于协程通常是在同一线程中运行的,因此调试时可能会更加复杂。在调试过程中,难以查看多个协程之间的状态和执行顺序。
-
可能会出现死锁: 如果在协程中错误地使用了同步原语,例如锁,可能会导致死锁的发生。这种情况下,由于协程在同一线程中执行,死锁可能会导致整个程序无法继续执行。
-
需要特定的语言和库支持: 并非所有编程语言都原生支持协程,需要特定的语言和库支持。此外,不同的库和框架对于协程的实现方式和性能特点也可能不同,需要开发者进行选择和权衡。
协程类的设计
本文章中协程在切换时,需要程序员指定协程有哪些状态,根据状态进行操作。同时在协程中提供一个主协程作为中转协程,主协程没有栈空间,只负责中转。例如协程1要获得执行权,需要将协程1和主协程的上下文进行切换,将协程1切入执行,将主协程切出;当协程1要让出执行权时,需要将主协程和协程1上下文进行切换,将协程1切出,将执行权还给主协程。
协程状态
- INITIALIZED:初始化状态,刚刚创建协程;
- PAUSED:暂停状态,等待资源或某件事发生;
- RUNNING:运行状态,协程已经开始执行任务;
- FINISHED:结束状态,协程已经执行完任务,不需要执行其他操作。
- EXECUTABLE:可执行状态,等待分配CPU或事件触发;
- EXCEPTIONAL:异常状态,协程执行过程中遇到问题。
成员
首先效仿线程,我们给予协程一个id,用来标识某个协程;然后是协程的上下文结构体还有栈指针和栈空间大小,其次是协程的状态,最后给出协程的一个回调函数。
- coroutine_id_:协程id
- coroutine_ucontext_:协程上下文
- coroutine_stack_ptr_:协程栈指针
- coroutine_stack_size_:协程栈大小
- coroutine_state_:协程状态
- coroutine_callback_func_:协程回调函数
方法
除了必要的私有数据成员访问方法,还需要提供两种构造函数,一种负责主协程,也就是中转协程,另一种构造常用协程。此外当协程执行过程中出现错误或其他原因允许切换协程的回调函数。还需要将当前协程切入执行和将当前协程切出让出执行权的函数。
此外还需要一些静态函数来服务类,如设置和返回当前正在执行协程,将当前正在执行协程切换到其他状态。
- getCoroutineID:返回协程id
- getCoroutineState:返回协程状态
- acquireExecution:获得执行权
- yieldExecution:让出执行权
- Coroutine:构造函数,包含两种
- ~Coroutine:析构函数,释放栈空间
- setCallbackFunction:设置协程的回调函数
- SetExecutingCoroutine:设置正在执行的协程
- GetExecutingCoroutine:返回正在执行的协程
- GetExecutingCoroutineState:返回正在执行的协程的状态
- GetExecutingCoroutineID:返回正在执行的协程的id
- GetTotalCoroutineCount:返回协程总数
- MainFunction:协程绑定的函数
- ConvertToPaused:将正在执行的协程切换到暂停状态
- ConvertToExecutable:将正在执行的协程切换到可执行状态
源代码
#pragma once
#include <iostream>
#include <ucontext.h>
#include <memory>
#include <assert.h>
#include <functional>
#include "uncopyable.h"
class Coroutine : public std::enable_shared_from_this<Coroutine> {
private:
enum STATE {
INITIALIZED,
PAUSED,
RUNNING,
FINISHED,
EXECUTABLE,
EXCEPTIONAL
};
private:
Coroutine();
public:
typedef std::shared_ptr<Coroutine> ptr;
Coroutine(std::function<void()> func, size_t stack_size = 0);
~Coroutine();
const size_t getCoroutineID() const { return coroutine_id_; }
STATE getCoroutineState() const { return coroutine_state_; }
void setCallbackFunction(std::function<void()> func);
void acquireExecution();
void yieldExecution();
public:
static void SetExecutingCoroutine(Coroutine* coroutine);
static Coroutine::ptr GetExecutingCoroutine();
static void ConvertToPaused();
static void ConvertToExecutable();
static size_t GetTotalCoroutineCount();
static size_t GetExecutingCoroutineID();
static STATE GetExecutingCoroutineState();
private:
static void MainFunction();
private:
size_t coroutine_id_ = 0;
ucontext_t coroutine_ucontext_;
void* coroutine_stack_ptr_;
size_t coroutine_stack_size_ = 0;
std::function<void()> coroutine_callback_func_;
STATE coroutine_state_ = INITIALIZED;
};
#include "coroutine.h"
// 用来初始化协程id
static thread_local size_t COROUTINE_ID = {0};
// 用来跟踪协程个数
static thread_local size_t COROUTINE_COUNTS = {0};
// 当前线程正在执行的协程
static thread_local Coroutine* EXECUTING_COROUTINE = nullptr;
// 用于协程切换的调度协程
static thread_local Coroutine::ptr SCHEDULER_COROUTINE = nullptr;
// 默认协程栈大小
static size_t COROUTINE_STACK_DEFAULT_SIZE = 128 * 1024;
// 提供更方便的开辟和释放空间方法
class CoroutineStackAllocator {
public:
static void* Alloc(size_t stack_size) {
return malloc(stack_size);
}
static void Dealloc(void* stack_ptr, size_t stack_size) {
return free(stack_ptr);
}
};
Coroutine::Coroutine() {
coroutine_state_ = RUNNING;
SetExecutingCoroutine(this);
assert(!getcontext(&coroutine_ucontext_));
++COROUTINE_COUNTS;
std::cout << "scheduler coroutine constructed id = " << coroutine_id_ << std::endl;
}
Coroutine::Coroutine(std::function<void()> func, size_t stack_size):
coroutine_id_(++COROUTINE_ID),
coroutine_callback_func_(func) {
++COROUTINE_COUNTS;
coroutine_stack_size_ = stack_size ?stack_size : COROUTINE_STACK_DEFAULT_SIZE;
coroutine_stack_ptr_ = CoroutineStackAllocator::Alloc(coroutine_stack_size_);
assert(!getcontext(&coroutine_ucontext_));
coroutine_ucontext_.uc_link = nullptr;
coroutine_ucontext_.uc_stack.ss_sp = coroutine_stack_ptr_;
coroutine_ucontext_.uc_stack.ss_size = coroutine_stack_size_;
makecontext(&coroutine_ucontext_, &MainFunction, 0);
std::cout << "coroutine " << coroutine_id_ << " constructed" << std::endl;
}
Coroutine::~Coroutine() {
--COROUTINE_COUNTS;
if(coroutine_stack_ptr_) {
assert(coroutine_state_ == INITIALIZED || coroutine_state_ == FINISHED || coroutine_state_ == EXCEPTIONAL);
CoroutineStackAllocator::Dealloc(coroutine_stack_ptr_, coroutine_stack_size_);
} else {
assert(!coroutine_callback_func_);
assert(coroutine_state_ == RUNNING);
Coroutine* cur = SCHEDULER_COROUTINE.get();
if(cur == this) {
SetExecutingCoroutine(nullptr);
}
}
std::cout << "coroutine " << coroutine_id_ << " descructed" << std::endl;
}
void Coroutine::setCallbackFunction(std::function<void()> func) {
assert(coroutine_stack_ptr_);
assert(coroutine_state_ == INITIALIZED || coroutine_state_ == EXCEPTIONAL || coroutine_state_ == FINISHED);
coroutine_callback_func_ = func;
assert(!getcontext(&coroutine_ucontext_));
coroutine_ucontext_.uc_link = nullptr;
coroutine_ucontext_.uc_stack.ss_sp = coroutine_stack_ptr_;
coroutine_ucontext_.uc_stack.ss_size = coroutine_stack_size_;
makecontext(&coroutine_ucontext_, &MainFunction, 0);
coroutine_state_ = INITIALIZED;
}
void Coroutine::acquireExecution() {
SetExecutingCoroutine(this);
coroutine_state_ = RUNNING;
std::cout << "coroutine " << coroutine_id_ << " acquireExecution" << std::endl;
assert(!swapcontext(&SCHEDULER_COROUTINE->coroutine_ucontext_, &coroutine_ucontext_));
}
void Coroutine::yieldExecution() {
SetExecutingCoroutine(SCHEDULER_COROUTINE.get());
assert(!swapcontext(&coroutine_ucontext_, &SCHEDULER_COROUTINE->coroutine_ucontext_));
}
void Coroutine::SetExecutingCoroutine(Coroutine* coroutine) {
EXECUTING_COROUTINE = coroutine;
}
Coroutine::ptr Coroutine::GetExecutingCoroutine() {
if(EXECUTING_COROUTINE) {
return EXECUTING_COROUTINE->shared_from_this();
}
Coroutine::ptr scheduler_coroutine(new Coroutine);
assert(EXECUTING_COROUTINE == scheduler_coroutine.get());
SCHEDULER_COROUTINE = scheduler_coroutine;
return EXECUTING_COROUTINE->shared_from_this();
}
void Coroutine::ConvertToPaused() {
Coroutine::ptr cur = GetExecutingCoroutine();
assert(cur->coroutine_state_ == RUNNING);
cur->coroutine_state_ = PAUSED;
cur->yieldExecution();
}
void Coroutine::ConvertToExecutable() {
Coroutine::ptr cur = GetExecutingCoroutine();
assert(cur->coroutine_state_ == RUNNING);
cur->coroutine_state_ = EXECUTABLE;
cur->yieldExecution();
}
size_t Coroutine::GetTotalCoroutineCount() {
return COROUTINE_COUNTS;
}
size_t Coroutine::GetExecutingCoroutineID() {
if(EXECUTING_COROUTINE) {
return EXECUTING_COROUTINE->coroutine_id_;
}
return 0;
}
Coroutine::STATE Coroutine::GetExecutingCoroutineState() {
if(EXECUTING_COROUTINE) {
return EXECUTING_COROUTINE->coroutine_state_;
}
return EXCEPTIONAL;
}
void Coroutine::MainFunction() {
Coroutine::ptr cur = GetExecutingCoroutine();
assert(cur);
try
{
cur->coroutine_callback_func_();
cur->coroutine_callback_func_ = nullptr;
cur->coroutine_state_ = FINISHED;
} catch(const std::exception& e) {
cur->coroutine_state_ = EXCEPTIONAL;
std::cerr << e.what() << '\n';
} catch(...) {
cur->coroutine_state_ = EXCEPTIONAL;
std::cout << "something else happened" << std::endl;
}
std::cout << "coroutine " << cur->coroutine_id_ << " finished" << std::endl;
auto raw_ptr = cur.get();
cur.reset();
raw_ptr->yieldExecution();
}
测试
#include "coroutine.h"
#include <thread>
#include <chrono>
void test() {
std::cout << "function test begin" << std::endl;
std::cout << "yieldExecution" << std::endl;
Coroutine::GetExecutingCoroutine()->yieldExecution();
std::cout << "ConvertToPaused " << std::endl;
Coroutine::ConvertToPaused();
std::cout << "ConvertToExecutable" << std::endl;
Coroutine::ConvertToExecutable();
std::cout << "function test end" << std::endl;
}
int main() {
std::cout << "begin test" << std::endl;
Coroutine::GetExecutingCoroutine();
Coroutine::ptr coroutine(new Coroutine(test));
std::cout << "coroutine acquireExecution for first time" << std::endl;
coroutine->acquireExecution();
std::cout << "coroutine acquireExecution for second time" << std::endl;
coroutine->acquireExecution();
std::cout << "coroutine acquireExecution for third time" << std::endl;
coroutine->acquireExecution();
std::cout << "coroutine acquireExecution for four time" << std::endl;
coroutine->acquireExecution();
std::cout << "end test" << std::endl;
return 0;
}
测试结果:
总结
本文实现了基于C++的协程的封装,介绍了协程及其优点和缺点,并给出了源代码和测试代码。