Extenible Hash Table

可扩展哈希,初始哈希表中只有一个directory和一个bucket,每个bucket可以容纳两条数据

Hash Table Page

Hash Table Header Page

  • 存放所有的directory pages,但是这个header page容纳的directory pages是固定的
/**
* Header page format:
* ---------------------------------------------------
* | DirectoryPageIds(2048) | MaxDepth (4) | Free(2044)
* ---------------------------------------------------
*/

static constexpr uint64_t HTABLE_HEADER_PAGE_METADATA_SIZE = sizeof(uint32_t);
static constexpr uint64_t HTABLE_HEADER_MAX_DEPTH = 9;
static constexpr uint64_t HTABLE_HEADER_ARRAY_SIZE = 1 << HTABLE_HEADER_MAX_DEPTH;
  • 按照MSB方式取max_depth_位作为directory_page_ids_的索引
void ExtendibleHTableHeaderPage::Init(uint32_t max_depth) {
// throw NotImplementedException("ExtendibleHTableHeaderPage is not implemented");
BUSTUB_ASSERT(max_depth <= 32, "max depth need <= 9");
max_depth_ = max_depth;
std::fill(directory_page_ids_, directory_page_ids_ + MaxSize(), INVALID_PAGE_ID);
}

auto ExtendibleHTableHeaderPage::HashToDirectoryIndex(uint32_t hash) const -> uint32_t {
if (max_depth_ == 0) {
return 0;
}
return hash >> (32 - max_depth_);
}

auto ExtendibleHTableHeaderPage::GetDirectoryPageId(uint32_t directory_idx) const -> uint32_t {
return directory_page_ids_[directory_idx];
}

void ExtendibleHTableHeaderPage::SetDirectoryPageId(uint32_t directory_idx, page_id_t directory_page_id) {
// throw NotImplementedException("ExtendibleHTableHeaderPage is not implemented");
directory_page_ids_[directory_idx] = directory_page_id;
}

auto ExtendibleHTableHeaderPage::MaxSize() const -> uint32_t {
return (1 << max_depth_) < HTABLE_HEADER_ARRAY_SIZE ? (1 << max_depth_) : HTABLE_HEADER_ARRAY_SIZE;
}

Hash Table Directory Page

  • max_depth_:最大hash的位数
  • global_depth_directory page作为hash索引的位数
  • local_depths_directory page包含的bucketslocal_depth_(容纳entry数)
  • bucket_page_ids_:存放所有bucket的页id
  • 按照LSB方式取global_depth_进行bucket的索引
/**
* Directory page format:
* --------------------------------------------------------------------------------------
* | MaxDepth (4) | GlobalDepth (4) | LocalDepths (512) | BucketPageIds(2048) | Free(1528)
* --------------------------------------------------------------------------------------
*/
void ExtendibleHTableDirectoryPage::Init(uint32_t max_depth) {
// throw NotImplementedException("ExtendibleHTableDirectoryPage is not implemented");
max_depth_ = max_depth;
global_depth_ = 0;
std::fill(bucket_page_ids_, bucket_page_ids_ + HTABLE_DIRECTORY_ARRAY_SIZE, INVALID_PAGE_ID);
std::fill(local_depths_, local_depths_ + HTABLE_DIRECTORY_ARRAY_SIZE, 0);
}

auto ExtendibleHTableDirectoryPage::HashToBucketIndex(uint32_t hash) const -> uint32_t {
uint32_t mask = GetGlobalDepthMask();
return hash & mask;
}

/**
* @brief 获取分裂后的兄弟索引
*
* @param bucket_idx
* @return uint32_t
*/
auto ExtendibleHTableDirectoryPage::GetSplitImageIndex(uint32_t bucket_idx) const -> uint32_t {
uint32_t local_depth = local_depths_[bucket_idx];
return bucket_idx ^ (1 << (local_depth - 1));
}

void ExtendibleHTableDirectoryPage::IncrGlobalDepth() {
// throw NotImplementedException("ExtendibleHTableDirectoryPage is not implemented");
if (global_depth_ >= max_depth_) {
return;
}
for (int i = 0; i < (1 << global_depth_); i++) {
bucket_page_ids_[(1 << global_depth_) + i] = bucket_page_ids_[i];
local_depths_[(1 << global_depth_) + i] = local_depths_[i];
}
global_depth_++;
}

void ExtendibleHTableDirectoryPage::DecrGlobalDepth() {
// throw NotImplementedException("ExtendibleHTableDirectoryPage is not implemented");
if (global_depth_ <= 0) {
return;
}
global_depth_--;
}

/**
* @brief 所有bucket的local_depth都小于global_depth
*
* @return true
* @return false
*/
auto ExtendibleHTableDirectoryPage::CanShrink() -> bool {
return std::all_of(local_depths_, local_depths_ + Size(), [this](uint32_t depth) { return depth < global_depth_; });
}

Hash Table Bucket Page

  • 容纳键值对
/**
* Bucket page format:
* ----------------------------------------------------------------------------
* | METADATA | KEY(1) + VALUE(1) | KEY(2) + VALUE(2) | ... | KEY(n) + VALUE(n)
* ----------------------------------------------------------------------------
*
* Metadata format (size in byte, 8 bytes in total):
* --------------------------------
* | CurrentSize (4) | MaxSize (4)
* --------------------------------
*/

Operation

  • header page中寻找对应的directory page,然后在directory page中寻找对应的bucket page,最后在bucket pagekey对应的value
template <typename K, typename V, typename KC>
auto DiskExtendibleHashTable<K, V, KC>::GetValue(const K &key, std::vector<V> *result, Transaction *transaction) const
-> bool {
if (header_page_id_ == INVALID_PAGE_ID) {
return false;
}
ReadPageGuard header_guard = bpm_->FetchPageRead(header_page_id_);
auto header = header_guard.As<ExtendibleHTableHeaderPage>();

// Get the directory page id, Drop the header page and fetch the directory page
uint32_t directory_index = header->HashToDirectoryIndex(Hash(key));
page_id_t directory_page_id = header->GetDirectoryPageId(directory_index);
header_guard.Drop();

if (directory_page_id == INVALID_PAGE_ID) {
return false;
}
ReadPageGuard directory_guard = bpm_->FetchPageRead(directory_page_id);
auto directory = directory_guard.As<ExtendibleHTableDirectoryPage>();

// Get the bucket page id, Drop the directory page and fetch the bucket page
uint32_t bucket_index = directory->HashToBucketIndex(Hash(key));
page_id_t bucket_page_id = directory->GetBucketPageId(bucket_index);
directory_guard.Drop();
if (bucket_page_id == INVALID_PAGE_ID) {
return false;
}
ReadPageGuard bucket_guard = bpm_->FetchPageRead(bucket_page_id);
auto bucket = bucket_guard.As<ExtendibleHTableBucketPage<K, V, KC>>();

// look up the key in the bucket
V value;
if (bucket->Lookup(key, value, cmp_)) {
result->push_back(value);
return true;
}
return false;
}

Insert(递归)

  • header page中寻找对应的directory page,如果没有先进行directory page的创建
  • directory page中寻找对应的bucket page,没有就创建bucket page
  • 对应的bucket page中插入有两种情况
    • 如果bucket page还有剩余空间进行插入,直接插入<key, value>
    • 如果bucket page已经满了,增加global_depth_和对应bucket的local_depth_;对现有bucket进行分裂;重新进行插入
  • 注意:获取到DirectoryPage的锁后才将HeaderPage的锁释放,否则可能造成bucket中元素已经被挪走,Search依旧访问该位置上的元素,造成Segment Fault
/*
* 获取到DirectoryPage的锁后才将HeaderPage的锁释放
*/
template <typename K, typename V, typename KC>
auto DiskExtendibleHashTable<K, V, KC>::Insert(const K &key, const V &value, Transaction *transaction) -> bool {
if (header_page_id_ == INVALID_PAGE_ID) {
return false;
}
// Fetch the header page
WritePageGuard header_guard = bpm_->FetchPageWrite(header_page_id_);
auto header = header_guard.AsMut<ExtendibleHTableHeaderPage>();

// Get the directory page id, if the directory page id is invalid, create a new directory page
uint32_t directory_index = header->HashToDirectoryIndex(Hash(key));
page_id_t directory_page_id = header->GetDirectoryPageId(directory_index);
if (directory_page_id == INVALID_PAGE_ID) {
// Create a new directory page
BasicPageGuard new_directory_guard = bpm_->NewPageGuarded(&directory_page_id);
auto new_directory = new_directory_guard.AsMut<ExtendibleHTableDirectoryPage>();
new_directory->Init(directory_max_depth_);
header->SetDirectoryPageId(directory_index, directory_page_id);
}
// Get the bucket page id, if the bucket page id is invalid, create a new bucket page
WritePageGuard directory_guard = bpm_->FetchPageWrite(directory_page_id);
auto directory = directory_guard.AsMut<ExtendibleHTableDirectoryPage>();
header_guard.Drop();
uint32_t bucket_index = directory->HashToBucketIndex(Hash(key));
page_id_t bucket_page_id = directory->GetBucketPageId(bucket_index);
if (bucket_page_id == INVALID_PAGE_ID) {
// Create a new bucket page
BasicPageGuard new_bucket_guard = bpm_->NewPageGuarded(&bucket_page_id);
auto new_bucket = new_bucket_guard.AsMut<ExtendibleHTableBucketPage<K, V, KC>>();
new_bucket->Init(bucket_max_size_);
directory->SetBucketPageId(bucket_index, bucket_page_id);
directory->SetLocalDepth(bucket_index, 0);
}
// Insert the key-value pair into the bucket
// We maybe split the bucket, and the directory maybe changed, so we can not drop the directory page
WritePageGuard bucket_guard = bpm_->FetchPageWrite(bucket_page_id);
auto bucket = bucket_guard.AsMut<ExtendibleHTableBucketPage<K, V, KC>>();

V val;
if (bucket->Lookup(key, val, cmp_)) {
return false;
}
// 如果此时bucket已经满了,需要进行split操作
if (bucket->IsFull()) {
// 如果全局深度小于等于Local Depth,增加全局深度
if (directory->GetLocalDepth(bucket_index) >= directory->GetGlobalDepth()) {
if (directory->GetGlobalDepth() >= directory->GetMaxDepth()) {
return false;
}
directory->IncrGlobalDepth();
}
// Increase local depth for the bucket
directory->IncrLocalDepth(bucket_index);
if (!SplitBucket(directory, bucket, bucket_index)) {
return false;
}
// 此时进行页面锁的释放

directory_guard.Drop();
bucket_guard.Drop();
// 递归该插入过程
return Insert(key, value, transaction);
}
// 如果Bucket未满,直接插入
return bucket->Insert(key, value, cmp_);
}

split

  • 对bucket_idx对应的bucket进行split_bucket的创建
  • 设置directory对应的split_bucket
  • 对bucket中的<key,value>进行重新分配
template <typename K, typename V, typename KC>
auto DiskExtendibleHashTable<K, V, KC>::SplitBucket(ExtendibleHTableDirectoryPage *directory,
ExtendibleHTableBucketPage<K, V, KC> *bucket, uint32_t bucket_idx)
-> bool {
// 1. Create a new bucket page(split page)
page_id_t split_page_id;
BasicPageGuard split_guard = bpm_->NewPageGuarded(&split_page_id);
if (split_page_id == INVALID_PAGE_ID) {
return false;
}
auto split_bucket = split_guard.AsMut<ExtendibleHTableBucketPage<K, V, KC>>();
split_bucket->Init();
uint32_t split_idx = directory->GetSplitImageIndex(bucket_idx);
uint32_t local_depth = directory->GetLocalDepth(bucket_idx);
directory->SetBucketPageId(split_idx, split_page_id);
directory->SetLocalDepth(split_idx, local_depth);

// 2. Get all pointers to the split page
uint32_t idx_diff = (1 << local_depth);
for (int i = bucket_idx - idx_diff; i >= 0; i -= idx_diff) {
directory->SetLocalDepth(i, local_depth);
}
for (int i = bucket_idx + idx_diff; i < static_cast<int>(directory->Size()); i += idx_diff) {
directory->SetLocalDepth(i, local_depth);
}
for (int i = split_idx - idx_diff; i >= 0; i -= idx_diff) {
directory->SetBucketPageId(i, split_page_id);
directory->SetLocalDepth(i, local_depth);
}
for (int i = split_idx + idx_diff; i < static_cast<int>(directory->Size()); i += idx_diff) {
directory->SetBucketPageId(i, split_page_id);
directory->SetLocalDepth(i, local_depth);
}

// 3. Reallocate the entries in the bucket
page_id_t bucket_page_id = directory->GetBucketPageId(bucket_idx);
if (bucket_page_id == INVALID_PAGE_ID) {
return false;
}
// Get all entries in the bucket
int size = bucket->Size();
std::list<std::pair<K, V>> entries;
for (int i = 0; i < size; i++) {
entries.push_back(bucket->EntryAt(i));
}
bucket->Clear(); // 仅仅是做个size = 0,不是真正的删除

for (const auto &entry : entries) {
uint32_t target_idx = directory->HashToBucketIndex(Hash(entry.first));
page_id_t target_page_id = directory->GetBucketPageId(target_idx);
BUSTUB_ASSERT(target_page_id == bucket_page_id || target_page_id == split_page_id, "target_page_id error");
if (target_page_id == bucket_page_id) {
bucket->Insert(entry.first, entry.second, cmp_);
} else {
split_bucket->Insert(entry.first, entry.second, cmp_);
}
}
return true;
}

Remove

  • header page中寻找对应的directory page
  • directory page中寻找对应的bucket page
  • bucket page中remove该<key, value>
  • 尝试对现有的bucket进行合并
  • 如果此时directory page中所有的local_depth_均小于global_depth_,将global_depth_减小1
/*****************************************************************************
* REMOVE
*****************************************************************************/
template <typename K, typename V, typename KC>
auto DiskExtendibleHashTable<K, V, KC>::Remove(const K &key, Transaction *transaction) -> bool {
if (header_page_id_ == INVALID_PAGE_ID) {
return false;
}
WritePageGuard header_guard = bpm_->FetchPageWrite(header_page_id_);
auto header = header_guard.AsMut<ExtendibleHTableHeaderPage>();
// Get the directory page id, Drop the header page and fetch the directory page
uint32_t directory_index = header->HashToDirectoryIndex(Hash(key));
page_id_t directory_page_id = header->GetDirectoryPageId(directory_index);

if (directory_page_id == INVALID_PAGE_ID) {
header_guard.Drop();
return false;
}
// Get the bucket page id and fetch the bucket page
WritePageGuard directory_guard = bpm_->FetchPageWrite(directory_page_id);
auto directory = directory_guard.AsMut<ExtendibleHTableDirectoryPage>();
header_guard.Drop();
uint32_t bucket_index = directory->HashToBucketIndex(Hash(key));
page_id_t bucket_page_id = directory->GetBucketPageId(bucket_index);

// Fetch the bucket page
WritePageGuard bucket_guard = bpm_->FetchPageWrite(bucket_page_id);
auto bucket = bucket_guard.AsMut<ExtendibleHTableBucketPage<K, V, KC>>();

// Remove the key-value pair from the bucket
if (!bucket->Remove(key, cmp_)) {
return false;
}
// Try to merge bucket
MergeBucket(directory, bucket, bucket_index);
while (directory->CanShrink()) {
if (directory->GetGlobalDepth() == 0) {
break;
}
directory->DecrGlobalDepth();
}
return true;
}

Merge(递归)

  • Buckets can only be merged with their split image if their split image has the same local depth.
  • Only empty buckets can be merged(当且仅当split_bucket和bucket都不为空时,才能跳出merge循环)
  • Merge the split bucket into the bucket
  • Decrease the local depth of this bucket
  • Update the pointers to the split page
template <typename K, typename V, typename KC>
auto DiskExtendibleHashTable<K, V, KC>::MergeBucket(ExtendibleHTableDirectoryPage *directory,
ExtendibleHTableBucketPage<K, V, KC> *bucket, uint32_t bucket_idx)
-> void {
// 递归操作
while (true) {
if (directory->GetLocalDepth(bucket_idx) == 0) {
return;
}
uint32_t split_idx = directory->GetSplitImageIndex(bucket_idx);
page_id_t split_page_id = directory->GetBucketPageId(split_idx);
// 1.Buckets can only be merged with their split image if their split image has the same local depth.
if (directory->GetLocalDepth(bucket_idx) != directory->GetLocalDepth(split_idx)) {
return;
}
// 2.Only empty buckets can be merged.
WritePageGuard split_guard = bpm_->FetchPageWrite(split_page_id);
auto split_bucket = split_guard.AsMut<ExtendibleHTableBucketPage<K, V, KC>>();
// 当且仅当split_bucket和bucket都不为空时,才能跳出merge循环
if (!split_bucket->IsEmpty() && !bucket->IsEmpty()) {
return;
}
// 3.Merge the split bucket into the bucket
int size = split_bucket->Size();
for (int i = 0; i < size; ++i) {
std::pair<K, V> entry = split_bucket->EntryAt(i);
bucket->Insert(entry.first, entry.second, cmp_);
}
split_bucket->Clear();
split_guard.Drop();
// 4.Update the directory
page_id_t bucket_page_id = directory->GetBucketPageId(bucket_idx);
directory->DecrLocalDepth(bucket_idx);
uint32_t local_depth = directory->GetLocalDepth(bucket_idx);

// 5.Update the pointers to the split page
uint32_t idx_diff = 1 << local_depth;
for (int i = bucket_idx - idx_diff; i >= 0; i -= idx_diff) {
directory->SetBucketPageId(i, bucket_page_id);
directory->SetLocalDepth(i, local_depth);
}
for (int i = bucket_idx + idx_diff; i < static_cast<int>(directory->Size()); i += idx_diff) {
directory->SetBucketPageId(i, bucket_page_id);
directory->SetLocalDepth(i, local_depth);
}
}
}