RocksDB 架构
先写入 WAL
,再写入 MemTable
。MemTable
达到一定的大小就会变为 Immutable MemTable
,后台异步以 SSTable
的形式 flush 到磁盘中。
MemTable
中还包含了一个 Bloom Filter
,用于快速判断一个元素是否不存在。
Column Family
RocksDB 中列族是一个逻辑上的分组,列族之间使用不同的 LSM Tree,但是共享 WAL。
Version
Version
会指向不同的 SST,并且 Version
和 SSTable
都是以引用计数的方式维护,一旦引用计数变为 0,就会进行回收。
Write Batch
参考文献:
上面这篇文章讲的挺清楚的,RocksDB 默认是不开启并行写的,因为跳表结构并不适合并发写入,WAL 如果要并发写也需要一定同步机制维护其写入顺序。所以,RocksDB 采用的是分批写入的方式,以保证其吞吐量。Write Batch Group
会有一个 leader
(当前的第一个 writer
),然后后面的 writer
将成为 follower
,构成一条双向链表(通过 link_older
,link_newer
)相连。这个 group
会有个大小上限,超过之后就会截断。截断后的新 writer
将会变为新的 leader
,构成新的 group
。
RocksDB 使用 atomic::<Writer *>
的形式组织链表,可以通过 CAS
以 latch-free
的方式添加 writer
:
Writer* writers = newest_writer->load(std::memory_order_relaxed);
while (true) {
w->link_older = writers;
if (newest_writer->compare_exchange_weak(writers, w)) {
return (writers == nullptr);
}
}
这边正好学习了一下 std::atomic
的用法,比起其他语言,c++ 的原子对象的 load
操作还可以选择不同的参数:
std::memory_order_relaxed
:仅保证原子性,不强制内存顺序。std::memory_order_acquire
:确保后续读操作不会重排到此操作之前。std::memory_order_seq_cst
(默认):顺序一致性,提供最强的内存屏障。
尝试写了一个链表插入练习:
#include <atomic>
#include <iostream>
#include <thread>
struct ListNode {
int val;
std::atomic<ListNode *> next;
};
void insertListNode(std::atomic<ListNode *> &tail, ListNode **root, int val) {
ListNode *newNode = new ListNode;
newNode->val = val;
newNode->next = nullptr;
ListNode *oldTail = tail.load(std::memory_order_relaxed);
while (true) {
if (tail.compare_exchange_weak(oldTail, newNode)) {
break;
}
}
if (oldTail == nullptr) {
*root = newNode;
} else {
oldTail->next = newNode;
}
}
void printList(ListNode *root) {
if (root == nullptr) {
std::cout << "List is empty" << std::endl;
return;
}
ListNode *p = root;
while (p) {
std::cout << p->val << " ";
p = p->next.load(std::memory_order_relaxed);
}
std::cout << std::endl;
}
int main() {
std::atomic<ListNode *> tail = nullptr;
ListNode *root = nullptr;
for (int i = 0; i < 3; i++) {
std::thread t([&, i]() {
while (1) {
insertListNode(tail, &root, i);
std::this_thread::sleep_for(std::chrono::milliseconds(300));
}
});
t.detach();
}
while (1) {
printList(root);
std::this_thread::sleep_for(std::chrono::milliseconds(300));
}
}
一种 Lazy Initialize 的方式:
std::optional<std::mutex> mtx;
mtx.emplace();