RocksDB 架构

先写入 WAL,再写入 MemTable。MemTable 达到一定的大小就会变为 Immutable MemTable,后台异步以 SSTable 的形式 flush 到磁盘中。
MemTable 中还包含了一个 Bloom Filter,用于快速判断一个元素是否不存在。
Column Family
RocksDB 中列族是一个逻辑上的分组,列族之间使用不同的 LSM Tree,但是共享 WAL。
Version

Version 会指向不同的 SST,并且 Version 和 SSTable 都是以引用计数的方式维护,一旦引用计数变为 0,就会进行回收。
Write Batch
参考文献:
上面这篇文章讲的挺清楚的,RocksDB 默认是不开启并行写的,因为跳表结构并不适合并发写入,WAL 如果要并发写也需要一定同步机制维护其写入顺序。所以,RocksDB 采用的是分批写入的方式,以保证其吞吐量。Write Batch Group 会有一个 leader(当前的第一个 writer),然后后面的 writer 将成为 follower,构成一条双向链表(通过 link_older,link_newer)相连。这个 group 会有个大小上限,超过之后就会截断。截断后的新 writer 将会变为新的 leader,构成新的 group。
RocksDB 使用 atomic::<Writer *> 的形式组织链表,可以通过 CAS 以 latch-free 的方式添加 writer:
Writer* writers = newest_writer->load(std::memory_order_relaxed);
while (true) {
w->link_older = writers;
if (newest_writer->compare_exchange_weak(writers, w)) {
return (writers == nullptr);
}
}
这边正好学习了一下 std::atomic 的用法,比起其他语言,c++ 的原子对象的 load 操作还可以选择不同的参数:
std::memory_order_relaxed:仅保证原子性,不强制内存顺序。std::memory_order_acquire:确保后续读操作不会重排到此操作之前。std::memory_order_seq_cst(默认):顺序一致性,提供最强的内存屏障。
尝试写了一个链表插入练习:
#include <atomic>
#include <iostream>
#include <thread>
struct ListNode {
int val;
std::atomic<ListNode *> next;
};
void insertListNode(std::atomic<ListNode *> &tail, ListNode **root, int val) {
ListNode *newNode = new ListNode;
newNode->val = val;
newNode->next = nullptr;
ListNode *oldTail = tail.load(std::memory_order_relaxed);
while (true) {
if (tail.compare_exchange_weak(oldTail, newNode)) {
break;
}
}
if (oldTail == nullptr) {
*root = newNode;
} else {
oldTail->next = newNode;
}
}
void printList(ListNode *root) {
if (root == nullptr) {
std::cout << "List is empty" << std::endl;
return;
}
ListNode *p = root;
while (p) {
std::cout << p->val << " ";
p = p->next.load(std::memory_order_relaxed);
}
std::cout << std::endl;
}
int main() {
std::atomic<ListNode *> tail = nullptr;
ListNode *root = nullptr;
for (int i = 0; i < 3; i++) {
std::thread t([&, i]() {
while (1) {
insertListNode(tail, &root, i);
std::this_thread::sleep_for(std::chrono::milliseconds(300));
}
});
t.detach();
}
while (1) {
printList(root);
std::this_thread::sleep_for(std::chrono::milliseconds(300));
}
}
一种 Lazy Initialize 的方式:
std::optional<std::mutex> mtx;
mtx.emplace();