Skip to content

Stroage Implementation of Vector Type

约 371 个字 36 行代码 1 张图片 预计阅读时间 2 分钟

When adding a vertex with a VECTOR type property, the AddVerticesProcessor executor is responsible for processing the request. It packages the vertex data and metadata into raft-wal logs, which are then submitted to the Raft Part. The actual insertion into RocksDB occurs in the Part::commitLogs() method.

The actual data insertion into RocksDB occurs within the Part::commitLogs(std::unique_ptr iter, bool wait, bool needLock) method. The process can be summarized with the following pseudo-code:

C++
auto batch = engine_->startBatchWrite();
while (iter->valid()) {
    // ...
    switch (log[sizeof(int64_t)]) {
      case OP_MULTI_PUT: {
        // Here, 'kvs' contains both keys and values, and potentially the column family name.
        auto kvs = decodeMultiValues(log);
        // We iterate through key-value pairs.
        for (size_t i = 0; i < kvs.size(); i += 2) {
          // The column family name 'cfName' should be determined here before the put.
          auto code = batch->put(kvs[i], kvs[i + 1], cfName);
        }
        break;
      }
    }
    ++(*iter);
}
engine_->commitBatchWrite(
      std::move(batch), FLAGS_rocksdb_disable_wal, FLAGS_rocksdb_wal_sync, wait);

To minimize modifications and maintain the integrity of the existing Raft log processing workflow, we propose adding some new log types like OP_MULTI_PUT_VECTOR, `OP_MULTI_REMOVE_VECTOR, etc. We can just solve vector data by adding a new switch case in commitLogs and add cfName field in doPut method. To fully support our new VECTOR type, we need to modify the entire data flow:

  1. AddVerticesProcessor: Get vector columns from the schema and store them in the "vector" column family of RocksDB.
  2. BaseProcessor::doPut: Update the doPut method to handle operations targeting the "vector" column family.
  3. KVStore::asyncMultiPut: Extend the asyncMultiPut method to support writes to the "vector" column family.
  4. Part Methods:
    - The asyncMultiPut method must be updated to support the "vector" column family.
    - The commitLogs method must also be adapted to correctly process log entries destined for the "vector" column family.
  5. encodeMultiValues: Modify this function to correctly encode the "vector" column family name into the log entry.
  6. KVEngine Interface: The KVEngine interface needs to be enhanced to support operations on the "vector" column family.

Addition of Interface for Vector Type

  • Add a new interface for WriteBatch to support operations on specific column families.
C++
class WriteBatch {
public:
  virtual nebula::cpp2::ErrorCode put(folly::StringPiece key,
                                      folly::StringPiece value,
                                      const std::string& cfName) = 0;
  virtual nebula::cpp2::ErrorCode remove(folly::StringPiece key, const std::string& cfName) = 0;
  virtual nebula::cpp2::ErrorCode removeRange(folly::StringPiece start,
                                              folly::StringPiece end,
                                              const std::string& cfName) = 0;
};
  • TODO(TEMP update):Add a new interface for KVEngine to support operations on specific column families.
C++
class KVEngine {
public:
  virtual nebula::cpp2::ErrorCode get(const std::string& key,
                                      std::string* value,
                                      const std::string& cfName,
                                      const void* snapshot = nullptr) = 0;
};

vector encoding

Key

  • NebulaKeyUtils::vectorTagKey

    type(1) + partId(3) + vertexId(_) + tagId(4) + propId(4)

  • NebulaKeyUtils::vectorVertexKey:

    type(1) + partId(3) + vertexId(_) + tagId(4) + propId(4)

  • NebulaKeyUtils::vectorEdgeKey

    type(1) + partId(3) + srcId() + edgeType(4) + edgeRank(8) + dstId() + propId(4) +placeHolder(1)

Write

StatusOr BaseProcessor::encodeRowVal ->
WriteResult RowWriterV2::setValue(const std::string& name, const Value& val) ->
WriteResult RowWriterV2::setValue(ssize_t index, const Value& val) ->
WriteResult RowWriterV2::write(ssize_t index, const Vector& vector)

Read

RowReaderWrapper() ->
static StatusOr readValue->
RowReaderWrapper::getValueByName

KNN Search

Syntax Design

vector_distance(vector1: src1, vector2: src2, metric: L2;)