Skip to content

Folly 异步编程框架

约 1028 个字 27 行代码 1 张图片 预计阅读时间 5 分钟

Folly 的异步编程模型围绕 Futures、Promises 和 EventBase 展开。该系统提供了一种强大的方式来编写非阻塞代码,高效处理并发。

  1. 非阻塞调用:storageClient->getProps() 是非阻塞的。它启动异步操作并立即返回一个 Future。调用 getProps 的线程可以继续执行其他任务,而不用等待网络 I/O 和存储层处理完成。
  2. Future 作为占位符:folly::Future 是一个占位符,代表类型为 T 的操作最终的结果。
  3. Promise (隐藏的):在 StorageClient::getProps 内部,当异步操作(如网络请求发送)被发起后,它会与一个 folly::Promise 相关联。当存储层的响应到达时,某个组件会负责调用这个 Promise 的 setValue(response) 或 setException(error) 方法。这会使对应的 Future 完成。
  4. 链式回调 (.via().thenValue()):
    - .via(executor) 控制回调在哪个线程执行。
    - .thenValue(callback) 注册一个当 Future 成功完成时要执行的函数。如果 Future 失败(即 Promise 被 setException),.thenValue 的回调将被跳过,Folly 会寻找链中的错误处理器(如 .thenError())。
    - 回调可以返回一个普通值(会被包装成新的 Future)或另一个 Future(Folly 会自动“扁平化”它,即 Future> 变成 Future)。
  5. 数据流和控制流分离:代码的结构清晰地分离了“发起请求”和“处理结果”。处理结果的逻辑(handlePropResp)被封装在回调中,只有当数据实际可用时才执行。
  6. 错误处理 (隐式):如果 storageClient->getProps() 内部发生错误,或者 Promise 被 setException,那么 getProps 返回的 Future 将会失败。.thenValue 的回调不会执行。如果调用者在 getProps 返回的 Future 上链接了 .thenError(),那么错误处理回调会被触发。如果 handlePropResp 内部抛出异常,那么由 .thenValue 返回的 Future> 将会失败,并携带这个异常。

nebula 中的示例

graphd 节点通过 rpc 向 stroaged 节点请求服务,整个过程为了不阻碍其他服务过程,都是异步化的。
stroaged 节点中的 executor 中存在如下方法:

  • getFuture: 返回 promise 对应的 Future
  • onFinished: 主要是对 promise 进行 setvalue 操作
C++
folly::Future<RESP> getFuture() {
  return promise_.getFuture();
}

this->promise_.setValue(std::move(this->resp_));

graphd 对应端存在相应的 executor,负责向 storaged 节点发送请求并返回 Future,实现异步执行。、
Future 的执行器是在 request context 中设置的。

C++
folly::Future<std::vector<Value>> StorageAccessExecutor::getProps(
    const std::vector<Value> &vids, const std::vector<VertexProp> *vertexPropPtr) {
  nebula::DataSet vertices({kVid});
  vertices.rows.reserve(vids.size());
  for (auto &vid : vids) {
    vertices.emplace_back(Row({vid}));
  }
  StorageClient *storageClient = qctx_->getStorageClient();
  StorageClient::CommonRequestParam param(qctx_->rctx()->session()->space().id,
                                          qctx_->rctx()->session()->id(),
                                          qctx_->plan()->id(),
                                          qctx_->plan()->isProfileEnabled());
  return DCHECK_NOTNULL(storageClient)
      ->getProps(
          param, std::move(vertices), vertexPropPtr, nullptr, nullptr, false, {}, -1, nullptr)
      .via(runner())
      .thenValue([this](PropRpcResponse &&resp) {
        memory::MemoryCheckGuard guard;
        addStats(resp);
        return handlePropResp(std::move(resp));
      });
}

详细流程

  1. storageClient->getProps(...): 这是关键的异步调用。
  • 这个方法并不直接返回属性数据。相反,它立即返回一个 folly::Future
  • 这个 Future 代表一个“承诺”——在未来的某个时刻,当存储层处理完请求并返回响应后,这个 Future 将会包含 PropRpcResponse 类型的结果(或者一个异常,如果出错了)。
  1. .via(runner()): 这是 Folly Future 的一个链式操作。
  • runner() 应该返回一个 folly::Executor* 实例。Executor 定义了后续的回调(即 .thenValue 中的 lambda)将在哪个线程或线程池上执行。
  • 线程切换:RPC 响应可能在一个 I/O 线程(例如网络库的线程)上到达。.via(runner()) 可以将后续的处理逻辑切换到一个专门的工作线程池(由 runner() 提供),避免阻塞 I/O 线程。
  • 上下文管理:确保回调在具有正确上下文(如特定的 EventBase 或与当前请求关联的执行环境)的线程上执行。
  1. .thenValue(this { ... }): 这是另一个链式操作,用于注册一个成功回调。
  • 当 storageClient->getProps(...) 返回的 Future 成功完成时(即存储层成功返回了 PropRpcResponse),这个 lambda 函数将被执行。
  • 回调的返回值:
    • handlePropResp(std::move(resp)) 被调用。这个函数本身返回 std::vector
    • 由于 .thenValue 的回调返回了一个 std::vector,所以整个 storageClient->getProps(...).via(...).thenValue(...) 表达式的结果是一个新的 folly::Future>。
    • Folly 框架会自动将 handlePropResp 的返回值包装进一个新的 Future。
  1. 函数返回值: getProps 函数最终返回的就是由 .thenValue 产生的 folly::Future>。调用者将得到这个 Future,并可以进一步链接其他回调或等待其完成。