Leveldb LRUCache 解析

LRU原理

LRU 是一种经典的缓存淘汰策略,其原理以及实现可以查看我之前的博客LRU Cache 解析及实现。本文主要解析Leveldb LRU Cache。

Leveldb 实现

Leveldb 实现的LRUCache 使用自己实现的简单hashtable存储键值对,循环双链表记录每个元素的访问时间,为了提升多线程环境下的读写性能,Leveldb 内部使用LRUCache 数组对外提供服务。

Cache Entry

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
struct LRUHandle {
void* value;
void (*deleter)(const Slice&, void* value);
// next_hash 代表在hash桶中下一个元素的位置
LRUHandle* next_hash;
// 双链表中下个元素的位置
LRUHandle* next;
// 双链表中上个元素的位置
LRUHandle* prev;
size_t charge; // TODO(opt): Only allow uint32_t?
size_t key_length;
// 使用次数
uint32_t refs;
// key 的hash值用于在LRUCache数组以及hash表中定位
uint32_t hash; // Hash of key(); used for fast sharding and comparisons
char key_data[1]; // Beginning of key

Slice key() const {
// For cheaper lookups, we allow a temporary Handle object
// to store a pointer to a key in "value".
if (next == this) {
return *(reinterpret_cast<Slice*>(value));
} else {
return Slice(key_data, key_length);
}
}
};

简单hash表

Leveldb LRU Cache 实现了线程安全的拉链式的hashtable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class HandleTable {
public:
HandleTable() : length_(0), elems_(0), list_(NULL) { Resize(); }
~HandleTable() { delete[] list_; }
// 查找,直接调用FindPointer方法
LRUHandle* Lookup(const Slice& key, uint32_t hash) {
return *FindPointer(key, hash);
}
// 插入,使用FindPointer方法找到插入位置,在hashtable 对位位置插入,当hashtable 中的元素个数大于桶的个数时候触发hashtable 的扩容
LRUHandle* Insert(LRUHandle* h) {
LRUHandle** ptr = FindPointer(h->key(), h->hash);
LRUHandle* old = *ptr;
h->next_hash = (old == NULL ? NULL : old->next_hash);
*ptr = h;
if (old == NULL) {
++elems_;
if (elems_ > length_) {
// Since each cache entry is fairly large, we aim for a small
// average linked list length (<= 1).
Resize();
}
}
return old;
}
// 删除hashtable中的元素,同样使用FindPointer方法定位到在hashtable 中的位置
LRUHandle* Remove(const Slice& key, uint32_t hash) {
LRUHandle** ptr = FindPointer(key, hash);
LRUHandle* result = *ptr;
if (result != NULL) {
*ptr = result->next_hash;
--elems_;
}
return result;
}

private:
// The table consists of an array of buckets where each bucket is
// a linked list of cache entries that hash into the bucket.
uint32_t length_;
uint32_t elems_;
LRUHandle** list_;

// Return a pointer to slot that points to a cache entry that
// matches key/hash. If there is no such cache entry, return a
// pointer to the trailing slot in the corresponding linked list.
// 找到hash 元素的地址
LRUHandle** FindPointer(const Slice& key, uint32_t hash) {
//找到hash 桶的位置,由于length_是2的指数次幂,所以使用 按位与替换mod操作进行加速
LRUHandle** ptr = &list_[hash & (length_ - 1)];
// 遍历hash桶中的单链表直到找到相应节点或单链表结束
while (*ptr != NULL &&
((*ptr)->hash != hash || key != (*ptr)->key())) {
ptr = &(*ptr)->next_hash;
}
return ptr;
}
// hash 表扩容
void Resize() {
// 每次扩容都是翻倍,保证hash桶的个数是2的指数次幂
uint32_t new_length = 4;
while (new_length < elems_) {
new_length *= 2;
}
// 创建一个新的hashtable
LRUHandle** new_list = new LRUHandle*[new_length];
memset(new_list, 0, sizeof(new_list[0]) * new_length);
uint32_t count = 0;
// 逐个桶迁移到新的hashtable
for (uint32_t i = 0; i < length_; i++) {
LRUHandle* h = list_[i];
while (h != NULL) {
LRUHandle* next = h->next_hash;
uint32_t hash = h->hash;
LRUHandle** ptr = &new_list[hash & (new_length - 1)];
h->next_hash = *ptr;
*ptr = h;
h = next;
count++;
}
}
assert(elems_ == count);
// 释放掉老的hashtable
delete[] list_;
// 替换为新的hashtable
list_ = new_list;
length_ = new_length;
}
};

单个LRUCache实现

LRUCache 类声明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class LRUCache {
public:
LRUCache();
~LRUCache();

// Separate from constructor so caller can easily make an array of LRUCache
void SetCapacity(size_t capacity) { capacity_ = capacity; }

// Like Cache methods, but with an extra "hash" parameter.
Cache::Handle* Insert(const Slice& key, uint32_t hash,
void* value, size_t charge,
void (*deleter)(const Slice& key, void* value));
Cache::Handle* Lookup(const Slice& key, uint32_t hash);
void Release(Cache::Handle* handle);
void Erase(const Slice& key, uint32_t hash);
void Prune();
size_t TotalCharge() const {
MutexLock l(&mutex_);
return usage_;
}

private:
void LRU_Remove(LRUHandle* e);
void LRU_Append(LRUHandle* e);
void Unref(LRUHandle* e);

// Initialized before use.
// LRUCache 存储的元素个数,超过此个数触发淘汰机制
size_t capacity_;

// mutex_ protects the following state.
mutable port::Mutex mutex_;
size_t usage_;

// Dummy head of LRU list.
// lru.prev is newest entry, lru.next is oldest entry.
// 按照访问时间顺序存储的双向循环链表
LRUHandle lru_;
// 自己实现的拉链式的hash table
HandleTable table_;
};

LRUCache 插入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
Cache::Handle* LRUCache::Insert(
const Slice& key, uint32_t hash, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value)) {
MutexLock l(&mutex_);
// 构造一个新的LRUHandle 对象
LRUHandle* e = reinterpret_cast<LRUHandle*>(
malloc(sizeof(LRUHandle)-1 + key.size()));
e->value = value;
e->deleter = deleter;
e->charge = charge;
e->key_length = key.size();
e->hash = hash;
e->refs = 2; // One from LRUCache, one for the returned handle
memcpy(e->key_data, key.data(), key.size());
// 插入到双向链表表头的位置
LRU_Append(e);
usage_ += charge;

LRUHandle* old = table_.Insert(e);
// 替换掉LRUCache中重复的元素
if (old != NULL) {
LRU_Remove(old);
Unref(old);
}
// 触发LRU 淘汰
while (usage_ > capacity_ && lru_.next != &lru_) {
// 淘汰双向链表最后的元素
LRUHandle* old = lru_.next;
// 删除双向链表中的元素
LRU_Remove(old);
// 删除hashtable中的元素
table_.Remove(old->key(), old->hash);
Unref(old);
}
// 返回刚刚插入的Handle对象,此对象引用计数为2
return reinterpret_cast<Cache::Handle*>(e);
}

LRUCache 删除

1
2
3
4
5
6
7
8
9
10
void LRUCache::Erase(const Slice& key, uint32_t hash) {
MutexLock l(&mutex_);
// 删除hashtable中对应的元素,
// 如果该元素存在在hashtable中则在双向链表中进行删除
LRUHandle* e = table_.Remove(key, hash);
if (e != NULL) {
LRU_Remove(e);
Unref(e);
}
}

LRUCache Prune

删除LRUCache中ref为1 的元素,最初插入的元素的引用为2,外部消费者对使用结束的元素ref减一。LRUCache中ref 为1 的handle表示此元素只在LRUCache中出现,外部并不使用。此方法的作用就是删除这样的元素。

1
2
3
4
5
6
7
8
9
10
11
12
void LRUCache::Prune() {
MutexLock l(&mutex_);
for (LRUHandle* e = lru_.next; e != &lru_; ) {
LRUHandle* next = e->next;
if (e->refs == 1) {
table_.Remove(e->key(), e->hash);
LRU_Remove(e);
Unref(e);
}
e = next;
}
}

LRU_Remove

删除双向链表中一个节点

1
2
3
4
void LRUCache::LRU_Remove(LRUHandle* e) {
e->next->prev = e->prev;
e->prev->next = e->next;
}

LRU_Append

插入到双向链表表头位置

1
2
3
4
5
6
7
void LRUCache::LRU_Append(LRUHandle* e) {
// Make "e" newest entry by inserting just before lru_
e->next = &lru_;
e->prev = lru_.prev;
e->prev->next = e;
e->next->prev = e;
}

ShardedLRUCache

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
static const int kNumShardBits = 4;
// 16个 lru cache,目的在多线程的环境下,每个线程的访问都会锁住缓冲区
// 如果缓冲区比较大的话, 多线程情况下的性能比较差,
// 所以使用hash 的方式将一块cache 缓冲区 划分为多个小块的缓冲区
static const int kNumShards = 1 << kNumShardBits;
class ShardedLRUCache : public Cache {
private:
// LRUCache 数组,每个LRUCache对象使用自己的锁
LRUCache shard_[kNumShards];
// 这个锁只保护id
port::Mutex id_mutex_;
uint64_t last_id_;

static inline uint32_t HashSlice(const Slice& s) {
return Hash(s.data(), s.size(), 0);
}
// Shard 函数,根据key计算的hash 值定位LRUCache数组的位置
static uint32_t Shard(uint32_t hash) {
// 不需要使用mod运算,此数值一定是在0~kNumShards这个区间
return hash >> (32 - kNumShardBits);
}

public:
explicit ShardedLRUCache(size_t capacity)
: last_id_(0) {
// 向上取整,为每个LRUCache 分配容量
const size_t per_shard = (capacity + (kNumShards - 1)) / kNumShards;
for (int s = 0; s < kNumShards; s++) {
shard_[s].SetCapacity(per_shard);
}
}
virtual ~ShardedLRUCache() { }
// 插入时,先使用Shard方法定位到相应的LRUCache
virtual Handle* Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value)) {
const uint32_t hash = HashSlice(key);
return shard_[Shard(hash)].Insert(key, hash, value, charge, deleter);
}
virtual Handle* Lookup(const Slice& key) {
const uint32_t hash = HashSlice(key);
return shard_[Shard(hash)].Lookup(key, hash);
}
virtual void Release(Handle* handle) {
LRUHandle* h = reinterpret_cast<LRUHandle*>(handle);
shard_[Shard(h->hash)].Release(handle);
}
virtual void Erase(const Slice& key) {
const uint32_t hash = HashSlice(key);
shard_[Shard(hash)].Erase(key, hash);
}
virtual void* Value(Handle* handle) {
return reinterpret_cast<LRUHandle*>(handle)->value;
}
virtual uint64_t NewId() {
MutexLock l(&id_mutex_);
return ++(last_id_);
}
virtual void Prune() {
for (int s = 0; s < kNumShards; s++) {
shard_[s].Prune();
}
}
virtual size_t TotalCharge() const {
size_t total = 0;
for (int s = 0; s < kNumShards; s++) {
total += shard_[s].TotalCharge();
}
return total;
}
};

总结

Leveldb 实现的LRUCahe 基于hashtable以及双向链表实现,为了提升性能以及保证跨平台特性,Leveldb 实现了一个线程安全的拉链式hashtable,此hashtable 的缺点是当触发扩容操作的时候需要将老hashtable 的全部元素复制到新的hashtable 中,这会期间会一直占用锁,此时多线程环境下的读写会阻塞。这一点Memcached 中实现的hashtable是有一个线程专门负责扩展,每次只扩展一个元素。
Leveldb 为了提升多线程环境下的读写性能,将固定容量的Cache分摊到多个LRUCache,每个LRUCache保证自己的线程安全,这就降低了多线程环境下锁的竞争。这种做法与分段锁的思想类似。

Donate
0%