Skip to content

一条 nGQL 语句的前世今生

约 810 个字 204 行代码 3 张图片 预计阅读时间 7 分钟

  1. 用户在 console 或者通过 sdk 输入 Query 语句到 Graph Service,返回给用户 Future,然后开始执行回调,即下面的整个流程。
  2. Graph Service 将 Query、Session、Storage 等打包成 Request Context,随后将 Request Context 打包成 Query Instance 随后开始执行 parse、validate、optimize、execute 整个流程。
    - 在 validate 结束后,首先先获取 statement 对应的 ast context,然后再转换成 plan node tree。
    - 在 Graph Service 获取到 query 并启动整个异步执行流程后,返回 Future 给 client。
  3. 然后将 Request Context 传入 Scheduler,用 Scheduler 的内存池按照算子树依次执行每个物理节点计划。
  4. 每个物理节点计划在 graphd 中存在一个 executor,实际上里面调用了 storaged 的 processor。
  5. 每个 processor 内部从 RocksDB 中获取或者存储 KV 对,实际上是通过 raft-wal 进行集群间的同步,然后用结果去设置 promise,graphd 的 future 接收到后继续向上传递结果,直到返回给 client 的结果。

Client

client 通过 tcp 来连接到 graphd,通过 libevent 来与 graphd 进行 SQL 语句和查询结果的传输。

  • 先构造 host address(ip address, port)
  • 通过配置参数初始化连接池
  • 通过连接池与 graphd 建立 Session,最后通过 Session 来进行 query 的执行以及返回结果。

basic example

Go
const (
    address = "127.0.0.1"
    // The default port of NebulaGraph 2.x is 9669.
    // 3699 is only for testing.
    port     = 3699
    username = "root"
    password = "nebula"
    useHTTP2 = false
)


func main() {
    hostAddress := nebula.HostAddress{Host: address, Port: port}
    hostList := []nebula.HostAddress{hostAddress}
    // Create configs for connection pool using default values
    testPoolConfig := nebula.GetDefaultConf()
    testPoolConfig.UseHTTP2 = useHTTP2
    testPoolConfig.HandshakeKey = "3.0.0"

    // Initialize connection pool
    pool, err := nebula.NewConnectionPool(hostList, testPoolConfig, log)
    if err != nil {
        log.Fatal(fmt.Sprintf("Fail to initialize the connection pool, host: %s, port: %d, %s", address, port, err.Error()))
    }
    // Close all connections in the pool
    defer pool.Close()

    // Create session
    session, err := pool.GetSession(username, password)

    // Release session and return connection back to connection pool
    defer session.Release()

    {
        // Prepare the query
        createSchema := "CREATE SPACE IF NOT EXISTS basic_example_space(vid_type=FIXED_STRING(20)); " +
            "USE basic_example_space;" +
            "CREATE TAG IF NOT EXISTS person(name string, age int);" +
            "CREATE EDGE IF NOT EXISTS like(likeness double)"

        // Execute a query
        resultSet, err := session.Execute(createSchema)

    }
    // Drop space
    {
        query := "DROP SPACE IF EXISTS basic_example_space"
        // Send query
        resultSet, err := session.Execute(query)
    }
}

Graphd

graphd 大体上分为 parse,validate,planner,optimize,execute 这几个阶段。所有的 statement、execplan 以及最后的 resultset 包括原始的 Query 语句,都存储在 RequestContext 中。

  • 构建 RequestContext 并把 Query 语句传入,根据 RequestContext 构建 Query Instance。开启所有的异步流程后,返回用户一个 Future。
  • 在 Query Instance 中执行 parse,将语句解析成 sentence(statement)。
  • 在 validate 中通过validateimpl方法进行 check,校验是否满足 schema 以及是否数据未超范围。
C++
Status InsertVerticesValidator::validateImpl() {
  spaceId_ = vctx_->whichSpace().id;
  NG_RETURN_IF_ERROR(check());
  NG_RETURN_IF_ERROR(prepareVertices());
  return Status::OK();
}
  • 通过 validator 的toplan方法将 sentence 变成 execution plan tree。
C++
Status InsertVerticesValidator::toPlan() {
  auto doNode = InsertVertices::make(qctx_,
                                    nullptr,
                                    spaceId_,
                                    std::move(vertices_),
                                    std::move(tagPropNames_),
                                    ifNotExists_,
                                    ignoreExistedIndex_);
  root_ = doNode;
  tail_ = root_;
  return Status::OK();
}
  • 将执行计划树交给 Optimizer 进行优化,nebula 现在的优化是 Rule-based optimization,每个执行计划需要遍历所有的 rules,得到最优的执行计划。
  • 然后将整个计划交给 Scheduler 进行执行,在 Scheduler 中按照计划树的父子关系,从下之上依此执行计划,所有的计划都是用 Future 异步执行。parent plan 节点需要所有的 children plan 节点都执行完后才开始执行。
C++
folly::Future<Status> AsyncMsgNotifyBasedScheduler::scheduleExecutor(
std::vector<folly::Future<Status>>&& futures, Executor* exe, folly::Executor* runner) const {
switch (exe->node()->kind()) {
  // ...
  case PlanNode::Kind::kArgument: {
    return runExecutor(std::move(futures), exe, runner);
  }
  default: {
    if (exe->depends().empty()) {
      return runLeafExecutor(exe, runner);
    } else {
      return runExecutor(std::move(futures), exe, runner);
    }
  }
}
}
folly::Future<Status> AsyncMsgNotifyBasedScheduler::runExecutor(
  std::vector<folly::Future<Status>>&& futures, Executor* exe, folly::Executor* runner) const {
return folly::collect(futures).via(runner).thenValue(
    [exe, this](auto&& t) mutable -> folly::Future<Status> {
      NG_RETURN_IF_ERROR(checkStatus(std::move(t)));
      // Execute in current thread.
      return execute(exe);
    });
}

folly::Future<Status> AsyncMsgNotifyBasedScheduler::runLeafExecutor(Executor* exe,
                                                                  folly::Executor* runner) const {
  return std::move(execute(exe)).via(runner);
}
  • 在 runExecutor 方法中会调用 graphd 中的 executor,进行物理计划的执行。里面涉及到存储层的内容时会通过 RPC 向 Storaged 请求服务。

Storaged

Storaged 接收到 graphd 发送的 executor 请求,启动对应的 Processor,执行process方法,实际上执行doProcess方法。

  • 在执行完逻辑后,将所有的更改写入集群,最后在回调中执行handleAsync方法,实际上是执行onFinished方法,onFinished方法中设置 promise 的 value 为 RESP,RESP 中包含查询的结果,一层层返回直到返回给客户端。
C++
template <typename RESP>
void BaseProcessor<RESP>::doPut(GraphSpaceID spaceId,
                                PartitionID partId,
                                std::vector<kvstore::KV>&& data) {
  this->env_->kvstore_->asyncMultiPut(
      spaceId, partId, std::move(data), [spaceId, partId, this](nebula::cpp2::ErrorCode code) {
        handleAsync(spaceId, partId, code);
      });
}

template <typename RESP>
void BaseProcessor<RESP>::handleAsync(GraphSpaceID spaceId,
                                    PartitionID partId,
                                    nebula::cpp2::ErrorCode code) {
  VLOG(3) << "partId:" << partId << ", code: " << static_cast<int32_t>(code);

  bool finished = false;
  {
    std::lock_guard<std::mutex> lg(this->lock_);
    handleErrorCode(code, spaceId, partId);
    this->callingNum_--;
    if (this->callingNum_ == 0) {
      finished = true;
    }
  }

  if (finished) {
    this->onFinished();
  }
}

virtual void onFinished() {
  memory::MemoryCheckOffGuard guard;
  if (counters_) {
    stats::StatsManager::addValue(counters_->numCalls_);
    if (!this->codes_.empty()) {
      stats::StatsManager::addValue(counters_->numErrors_);
    }
  }

  this->result_.latency_in_us_ref() = this->duration_.elapsedInUSec();
  if (!profileDetail_.empty()) {
    this->result_.latency_detail_us_ref() = std::move(profileDetail_);
  }
  this->result_.failed_parts_ref() = this->codes_;
  this->resp_.result_ref() = std::move(this->result_);
  this->promise_.setValue(std::move(this->resp_));

  if (counters_) {
    stats::StatsManager::addValue(counters_->latency_, this->duration_.elapsedInUSec());
  }

  delete this;
}
  • 写入更改是 doPut 操作,该操作实际上是对 raft 提交日志,驱动 raft 状态机将该操作同步到集群中。

  1. 所有操作都是 write wal first。
  2. sync logs to followers
C++
void Part::asyncMultiPut(const std::vector<KV>& keyValues, KVCallback cb) {
  std::string log = encodeMultiValues(OP_MULTI_PUT, keyValues);

  appendAsync(FLAGS_cluster_id, std::move(log))
      .thenValue(
          [callback = std::move(cb)](nebula::cpp2::ErrorCode code) mutable { callback(code); });
}
// 经过 Part::appendAsync -> RaftPart::appendLogAsync -> RaftPart::appendLogsInternal
void RaftPart::appendLogsInternal(AppendLogsIterator iter, TermID termId) {
  TermID currTerm = 0;
  LogID prevLogId = 0;
  TermID prevLogTerm = 0;
  LogID committed = 0;
  LogID lastId = 0;
  nebula::cpp2::ErrorCode res = nebula::cpp2::ErrorCode::SUCCEEDED;
  do {
    std::lock_guard<std::mutex> g(raftLock_);
    res = canAppendLogs(termId);
    if (res != nebula::cpp2::ErrorCode::SUCCEEDED) {
      break;
    }
    currTerm = term_;
    prevLogId = lastLogId_;
    prevLogTerm = lastLogTerm_;
    committed = committedLogId_;
    // Step 1: Write WAL
    {
      SCOPED_TIMER(
          [](uint64_t execTime) { stats::StatsManager::addValue(kAppendWalLatencyUs, execTime); });
      if (!wal_->appendLogs(iter)) {
        VLOG_EVERY_N(2, 1000) << idStr_ << "Failed to write into WAL";
        res = nebula::cpp2::ErrorCode::E_RAFT_WAL_FAIL;
        lastLogId_ = wal_->lastLogId();
        lastLogTerm_ = wal_->lastLogTerm();
        break;
      }
    }
    lastId = wal_->lastLogId();
    VLOG(4) << idStr_ << "Succeeded writing logs [" << iter.firstLogId() << ", " << lastId
            << "] to WAL";
  } while (false);

  if (!checkAppendLogResult(res)) {
    iter.commit(res);
    return;
  }
  // Step 2: Replicate to followers
  auto* eb = ioThreadPool_->getEventBase();
  replicateLogs(eb, std::move(iter), currTerm, lastId, committed, prevLogTerm, prevLogId);
  return;
}