一条 nGQL 语句的前世今生¶
约 810 个字 204 行代码 3 张图片 预计阅读时间 7 分钟
- 用户在 console 或者通过 sdk 输入 Query 语句到 Graph Service,返回给用户 Future,然后开始执行回调,即下面的整个流程。
- Graph Service 将 Query、Session、Storage 等打包成 Request Context,随后将 Request Context 打包成 Query Instance 随后开始执行 parse、validate、optimize、execute 整个流程。
- 在 validate 结束后,首先先获取 statement 对应的 ast context,然后再转换成 plan node tree。
- 在 Graph Service 获取到 query 并启动整个异步执行流程后,返回 Future 给 client。 - 然后将 Request Context 传入 Scheduler,用 Scheduler 的内存池按照算子树依次执行每个物理节点计划。
- 每个物理节点计划在 graphd 中存在一个 executor,实际上里面调用了 storaged 的 processor。
- 每个 processor 内部从 RocksDB 中获取或者存储 KV 对,实际上是通过 raft-wal 进行集群间的同步,然后用结果去设置 promise,graphd 的 future 接收到后继续向上传递结果,直到返回给 client 的结果。
Client¶
client 通过 tcp 来连接到 graphd,通过 libevent 来与 graphd 进行 SQL 语句和查询结果的传输。
- 先构造 host address(ip address, port)
- 通过配置参数初始化连接池
- 通过连接池与 graphd 建立 Session,最后通过 Session 来进行 query 的执行以及返回结果。
basic example¶
Go
const (
address = "127.0.0.1"
// The default port of NebulaGraph 2.x is 9669.
// 3699 is only for testing.
port = 3699
username = "root"
password = "nebula"
useHTTP2 = false
)
func main() {
hostAddress := nebula.HostAddress{Host: address, Port: port}
hostList := []nebula.HostAddress{hostAddress}
// Create configs for connection pool using default values
testPoolConfig := nebula.GetDefaultConf()
testPoolConfig.UseHTTP2 = useHTTP2
testPoolConfig.HandshakeKey = "3.0.0"
// Initialize connection pool
pool, err := nebula.NewConnectionPool(hostList, testPoolConfig, log)
if err != nil {
log.Fatal(fmt.Sprintf("Fail to initialize the connection pool, host: %s, port: %d, %s", address, port, err.Error()))
}
// Close all connections in the pool
defer pool.Close()
// Create session
session, err := pool.GetSession(username, password)
// Release session and return connection back to connection pool
defer session.Release()
{
// Prepare the query
createSchema := "CREATE SPACE IF NOT EXISTS basic_example_space(vid_type=FIXED_STRING(20)); " +
"USE basic_example_space;" +
"CREATE TAG IF NOT EXISTS person(name string, age int);" +
"CREATE EDGE IF NOT EXISTS like(likeness double)"
// Execute a query
resultSet, err := session.Execute(createSchema)
}
// Drop space
{
query := "DROP SPACE IF EXISTS basic_example_space"
// Send query
resultSet, err := session.Execute(query)
}
}
Graphd¶
graphd 大体上分为 parse,validate,planner,optimize,execute 这几个阶段。所有的 statement、execplan 以及最后的 resultset 包括原始的 Query 语句,都存储在 RequestContext 中。
- 构建 RequestContext 并把 Query 语句传入,根据 RequestContext 构建 Query Instance。开启所有的异步流程后,返回用户一个 Future。
- 在 Query Instance 中执行 parse,将语句解析成 sentence(statement)。
- 在 validate 中通过
validateimpl
方法进行 check,校验是否满足 schema 以及是否数据未超范围。
C++
Status InsertVerticesValidator::validateImpl() {
spaceId_ = vctx_->whichSpace().id;
NG_RETURN_IF_ERROR(check());
NG_RETURN_IF_ERROR(prepareVertices());
return Status::OK();
}
- 通过 validator 的
toplan
方法将 sentence 变成 execution plan tree。
C++
Status InsertVerticesValidator::toPlan() {
auto doNode = InsertVertices::make(qctx_,
nullptr,
spaceId_,
std::move(vertices_),
std::move(tagPropNames_),
ifNotExists_,
ignoreExistedIndex_);
root_ = doNode;
tail_ = root_;
return Status::OK();
}
- 将执行计划树交给 Optimizer 进行优化,nebula 现在的优化是 Rule-based optimization,每个执行计划需要遍历所有的 rules,得到最优的执行计划。
- 然后将整个计划交给 Scheduler 进行执行,在 Scheduler 中按照计划树的父子关系,从下之上依此执行计划,所有的计划都是用 Future 异步执行。parent plan 节点需要所有的 children plan 节点都执行完后才开始执行。
C++
folly::Future<Status> AsyncMsgNotifyBasedScheduler::scheduleExecutor(
std::vector<folly::Future<Status>>&& futures, Executor* exe, folly::Executor* runner) const {
switch (exe->node()->kind()) {
// ...
case PlanNode::Kind::kArgument: {
return runExecutor(std::move(futures), exe, runner);
}
default: {
if (exe->depends().empty()) {
return runLeafExecutor(exe, runner);
} else {
return runExecutor(std::move(futures), exe, runner);
}
}
}
}
folly::Future<Status> AsyncMsgNotifyBasedScheduler::runExecutor(
std::vector<folly::Future<Status>>&& futures, Executor* exe, folly::Executor* runner) const {
return folly::collect(futures).via(runner).thenValue(
[exe, this](auto&& t) mutable -> folly::Future<Status> {
NG_RETURN_IF_ERROR(checkStatus(std::move(t)));
// Execute in current thread.
return execute(exe);
});
}
folly::Future<Status> AsyncMsgNotifyBasedScheduler::runLeafExecutor(Executor* exe,
folly::Executor* runner) const {
return std::move(execute(exe)).via(runner);
}
- 在 runExecutor 方法中会调用 graphd 中的 executor,进行物理计划的执行。里面涉及到存储层的内容时会通过 RPC 向 Storaged 请求服务。
Storaged¶
Storaged 接收到 graphd 发送的 executor 请求,启动对应的 Processor,执行process
方法,实际上执行doProcess
方法。
- 在执行完逻辑后,将所有的更改写入集群,最后在回调中执行
handleAsync
方法,实际上是执行onFinished
方法,onFinished
方法中设置 promise 的 value 为 RESP,RESP 中包含查询的结果,一层层返回直到返回给客户端。
C++
template <typename RESP>
void BaseProcessor<RESP>::doPut(GraphSpaceID spaceId,
PartitionID partId,
std::vector<kvstore::KV>&& data) {
this->env_->kvstore_->asyncMultiPut(
spaceId, partId, std::move(data), [spaceId, partId, this](nebula::cpp2::ErrorCode code) {
handleAsync(spaceId, partId, code);
});
}
template <typename RESP>
void BaseProcessor<RESP>::handleAsync(GraphSpaceID spaceId,
PartitionID partId,
nebula::cpp2::ErrorCode code) {
VLOG(3) << "partId:" << partId << ", code: " << static_cast<int32_t>(code);
bool finished = false;
{
std::lock_guard<std::mutex> lg(this->lock_);
handleErrorCode(code, spaceId, partId);
this->callingNum_--;
if (this->callingNum_ == 0) {
finished = true;
}
}
if (finished) {
this->onFinished();
}
}
virtual void onFinished() {
memory::MemoryCheckOffGuard guard;
if (counters_) {
stats::StatsManager::addValue(counters_->numCalls_);
if (!this->codes_.empty()) {
stats::StatsManager::addValue(counters_->numErrors_);
}
}
this->result_.latency_in_us_ref() = this->duration_.elapsedInUSec();
if (!profileDetail_.empty()) {
this->result_.latency_detail_us_ref() = std::move(profileDetail_);
}
this->result_.failed_parts_ref() = this->codes_;
this->resp_.result_ref() = std::move(this->result_);
this->promise_.setValue(std::move(this->resp_));
if (counters_) {
stats::StatsManager::addValue(counters_->latency_, this->duration_.elapsedInUSec());
}
delete this;
}
- 写入更改是 doPut 操作,该操作实际上是对 raft 提交日志,驱动 raft 状态机将该操作同步到集群中。
- 所有操作都是 write wal first。
- sync logs to followers
C++
void Part::asyncMultiPut(const std::vector<KV>& keyValues, KVCallback cb) {
std::string log = encodeMultiValues(OP_MULTI_PUT, keyValues);
appendAsync(FLAGS_cluster_id, std::move(log))
.thenValue(
[callback = std::move(cb)](nebula::cpp2::ErrorCode code) mutable { callback(code); });
}
// 经过 Part::appendAsync -> RaftPart::appendLogAsync -> RaftPart::appendLogsInternal
void RaftPart::appendLogsInternal(AppendLogsIterator iter, TermID termId) {
TermID currTerm = 0;
LogID prevLogId = 0;
TermID prevLogTerm = 0;
LogID committed = 0;
LogID lastId = 0;
nebula::cpp2::ErrorCode res = nebula::cpp2::ErrorCode::SUCCEEDED;
do {
std::lock_guard<std::mutex> g(raftLock_);
res = canAppendLogs(termId);
if (res != nebula::cpp2::ErrorCode::SUCCEEDED) {
break;
}
currTerm = term_;
prevLogId = lastLogId_;
prevLogTerm = lastLogTerm_;
committed = committedLogId_;
// Step 1: Write WAL
{
SCOPED_TIMER(
[](uint64_t execTime) { stats::StatsManager::addValue(kAppendWalLatencyUs, execTime); });
if (!wal_->appendLogs(iter)) {
VLOG_EVERY_N(2, 1000) << idStr_ << "Failed to write into WAL";
res = nebula::cpp2::ErrorCode::E_RAFT_WAL_FAIL;
lastLogId_ = wal_->lastLogId();
lastLogTerm_ = wal_->lastLogTerm();
break;
}
}
lastId = wal_->lastLogId();
VLOG(4) << idStr_ << "Succeeded writing logs [" << iter.firstLogId() << ", " << lastId
<< "] to WAL";
} while (false);
if (!checkAppendLogResult(res)) {
iter.commit(res);
return;
}
// Step 2: Replicate to followers
auto* eb = ioThreadPool_->getEventBase();
replicateLogs(eb, std::move(iter), currTerm, lastId, committed, prevLogTerm, prevLogId);
return;
}