Coroutine of C++20

C++的协程是:

  1. 对称的。一个协程暂停后,可返回 caller 或恢复任意协程。
  2. 语言级特性。编译器知道你在使用协程。然而不比库强到哪里去。
  3. 无栈(Stackless) 的。没有独立运行时栈,无惧爆栈,调度成本低。
  • 一个协程在被命令「暂停」时,会保证将数据和当前运行位置保存在堆内存(以便恢复现场),然后转移运行权。

实现协程间切换

co_await Transfer{};

struct promise {
// ...
std::coroutine_handle<promise> other;
};

std::coroutine_handle<> Transfer::await_suspend(std::coroutine_handle<promise> me)
{
return me.promise().other ? me.promise().other : me;
}

缺点

  1. 除非编译器优化,每个协程都需要通过operator new来分配 frame:
    • 动态内存分配可能引发性能问题;
    • 在嵌入式或异构(例如 GPU)环境下,缺乏动态内存分配能力,难以工作。
  2. 除非编译器优化,协程的可定制点太多,需要大量间接调用/跳转(而不是内联),同样引起性能问题。
    • 目前,编译器通常难以内联协程;
    • HALO 优化理论:P0981R0
  3. 动态分配间接调用的存在,导致协程暂时无法成为异步框架的最优方法。
  4. Debug 的体验风评不佳。

三个关键字

  • co_await暂停协程
  • co_yield暂停协程同时返回一个值
  • co_return结束协程并返回一个值

co_await

  1. expr转化为awaitable对象
  2. 获得该awaitable对象
  3. 调用awaiter.await_ready(),如果是false,需要挂起当前协程;恢复caller/resumer
  4. 调用awaiter.await_resume(),该结果是co_await expr的结果
  • If the coroutine was suspended in the co_await expression, and is later resumed, the resume point is immediately before the call to awaiter.await_resume().

co_yield

co_await promise.yield_value(expr)

关联三个对象

Promise Object

  • 协程内部对象,将结果或异常提交到该结构中

Coroutine Handle

  • 协程外部操纵对象,用于恢复协程或者销毁协程

Coroutine State

  • 将数据和当前运行位置保存在堆内存

协程运行流程

开始运行

  • 使用operator new分配新的coroutine state对象

  • 将所有参数拷贝到coroutine state

  • 调用promise type的构造函数

  • 获取返回对象( )并将结果保存在局部变量中。当协程第一次挂起时,该调用的结果将返回给调用者。在此步骤之前(包括此步骤)引发的任何异常都会传播回调用者,而不是放置在 Promise 中。

  • 调用promise.get_return_object()同时保存结果在本地变量。当协程第一次挂起,结果会被传递给caller。任何发生的异常会传递给caller

  • 调用promise.initial_suspend()然后co_await结果。典型的Promise type会返回一个std::suspend_always或者std::suspend_never

  • co_await promis.initial_suspend()恢复,开始执行协程中的结构

到达暂停点

  • 如有必要,在隐式转换为协程的返回类型之后,将先前获得的返回对象返回给调用者/恢复者。

  • 如果是co_return expr

    • 调用promise.return_void()
    • 如果expr是非void类型,调用promise.return_value(expr)
    • 按照创建时的相反顺序销毁所有具有自动存储持续时间的变量。
    • 调用promise.final_suspend()然后co_await结果
  • promise.final_suspend()返回awaiterable对象

例子

#include <chrono>
#include <coroutine>
#include <functional>
#include <iostream>
#include <thread>

class AddOne {
public:
// 注意构造函数本身不阻塞主线程
AddOne(int x, std::function<void(int)> result_ready_cb)
: _thread{[=, result_ready_cb = std::move(result_ready_cb)]() mutable {
// 假装阻塞了 5s 才得到结果
std::this_thread::sleep_for(std::chrono::seconds(5));
result_ready_cb(x + 1);
}} {}

~AddOne() {
// 如果 AddOne 析构时线程还没有完成,我们 detach 这个线程
if (_thread.joinable()) _thread.detach();
}

// 但是 wait_for_result 有可能阻塞主线程
void wait_for_result() { _thread.join(); }

private:
std::thread _thread;
};

class AddOneAwaitable {
public:
AddOneAwaitable(int x) : _x(x) {}
bool await_ready() const { return false; }
void await_suspend(std::coroutine_handle<> handle) {
AddOne add_one(_x, [=, this](int result) {
// 保存结果,用在 await_resume 中
_result = result;
// 在回调函数中,我们恢复当前协程
handle.resume();
});
}
int await_resume() const {
// 返回结果
return _result;
}

private:
int _x;
int _result;
};

class task {
public:
class promise_type {
public:
// 获得协程对象
task get_return_object() {
// 把协程句柄交给协程对象的构造函数
return {std::coroutine_handle<promise_type>::from_promise(*this)};
}

// 没有必要在初始化时暂停,所以返回 std::suspend_never
std::suspend_never initial_suspend() { return {}; }

// 我们还需要获取协程的相关信息,因此让协程在结束时暂停,见 task::done 函数
std::suspend_always final_suspend() noexcept { return {}; }

// 异常处理,我们目前不关心它,留空
void unhandled_exception() {}

// 和 co_return 有关,暂且按下不表
void return_void() {}
};

// 保存一下协程句柄
task(std::coroutine_handle<promise_type> handle) : _handle(handle) {}

// 协程对象的调用者可以通过该函数获取协程是否执行完成
bool done() {
// 需要注意,协程句柄的 done() 函数要求协程在结束时暂停,
// 也就是说,承诺类型的 final_suspend() 要返回 std::suspend_always
return _handle.done();
}

private:
std::coroutine_handle<promise_type> _handle;
};

// 返回值 task 将会自动生成,可以充当返回值的类型 T 一定要有 T::promise_type
task add_one_coroutine(int x, std::function<void(int)> result_handle) {
// co_await 一个 AddOneAwaitable,
// 协程会在此处暂停,然后将控制交还给 add_one_coroutine 的调用者。
// 整个表达式的返回值是 AddOneAwaitable::await_resume() 的返回值。
int result = co_await AddOneAwaitable(x);
// 协程句柄的 resume() 被 AddOneAwaitable 注册给 AddOne 的回调函数调用,协程函数恢复执行,可以处理数据了
result_handle(result);
}

int main(int argc, char* argv[]) {
int value = 1;
int result1 = 0;
int result2 = 0;
std::function<void(int)> result_handle1 = [&](int _result) mutable {
result1 = _result + 1;
};
std::function<void(int)> result_handle2 = [&](int _result) mutable {
result2 = _result * _result;
};

// 把线程模型改成协程模型:
task task1 = add_one_coroutine(value, result_handle1);
task task2 = add_one_coroutine(value, result_handle2);

// 由于上面两个任务都会阻塞暂停协程,所以 main 函数还可以继续做其他事情
std::cout << "hello world" << std::endl;

// main 需要做的事情已经做完了,等待协程完成
while (true) {
if (task1.done() && task2.done()) {
// 处理结果
std::cout << result1 << result2 << std::endl;
break;
}
else
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
return 0;
}
  1. 进入 main 函数
  2. main 函数调用 add_one_coroutine() 函数
  3. 创建一个 task::promise_type 类型的承诺对象,假设为 promise1
  4. 调用 promise1.get_return_object(),构造 task 类型的协程对象,获得 task1
  5. promise1.initial_suspend() 被调用,由于返回 std::suspend_never,因此协程继续执行
  6. 进入函数 add_one_coroutine() 内部,执行 co_await AddOneAwaitable(x)
  7. 构造 AddOneAwaitable 对象,假设为 awaitable1
  8. 调用 awaitable1.await_ready(),由于我们返回 false,因此协程立即暂停
  9. 调用 awaitable1.await_suspend(),之后协程将控制交还给 main 函数
  10. main 函数再次调用 add_one_coroutine() 函数
  11. 同 3~9,协程对象 task2 也做一样的事情,最后将控制交还给 main 函数
  12. main 函数继续做自己的事情
  13. AddOne 阻塞结束,调用 task1 的协程句柄的 resume() 函数,task1 协程恢复。需要注意的是,这里 task1 会被转移到 AddOne 创建的线程中恢复执行,这一点我们后面再说
  14. 调用 awaitable1.await_resume(),并将该函数的返回值作为 co_await AddOneAwaitable(x) 的返回值
  15. 继续执行 add_one_coroutine 剩下的内容
  16. 函数执行结束,调用 promise1.return_void(),该函数与 co_return 有关,我们后面再讲
  17. 调用 promise1.final_suspend(),协程执行结束
  18. 等待 task2 的阻塞结束,然后类似 12~16 步骤完成 task2 协程