Carbon's Blog

以梦为马 不负韶华


  • 首页

  • 标签

  • 分类

  • 归档

  • 搜索

GCC __buildin_except 解析

发表于 2018-05-10 | 更新于: 2018-05-10 | 分类于 Backend Development

概念

摘自gcc的官方文档OtherBuildinFunction.

—Built-in Function: long builtin_expect (long exp, long c)
You may use
builtin_expect to provide the compiler with branch prediction information. In general, you should prefer to use actual profile feedback for this (-fprofile-arcs), as programmers are notoriously bad at predicting how their programs actually perform. However, there are applications in which this data is hard to collect.

The return value is the value of exp, which should be an integral expression. The value of c must be a compile-time constant. The semantics of the built-in are that it is expected that exp == c. For example:

if (__builtin_expect (x, 0))
  foo ();

would indicate that we do not expect to call foo, since we expect x to be zero. Since you are limited to integral expressions for exp, you should use constructions such as

if (__builtin_expect (ptr != NULL, 1))
  error ();

when testing pointer or floating-point values.
我们可以使用 __buildin_except 向编译器提供分支预测信息,从而帮助编译器进行代码优化。

引入原因

CPU 流水线技术可以提高CPU执行效率,但是程序中的跳转指令会打乱CPU流水线。因此,跳转次数少的程序拥有更高的执行效率。

示例

使用__buildin_except 定义LIKELY和UNLIKELY宏,分别代表bool型变量或表达式有很大可能性为真或者很大可能性为假。以下是测试代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// 两个感叹号的作用是将所有的非零值转化为1
#define LIKELY(x) __builtin_expect(!!(x), 1)
#define UNLIKELY(x) __builtin_expect(!!(x), 0)

int likely(int x)
{
if(LIKELY(x))
{
x = 5;
}
else
{
x = 6;
}

return x;
}

int unlikely(int x)
{
if(UNLIKELY(x))
{
x = 5;
}
else
{
x = 6;
}

return x;
}

int normal(int x)
{
if(x)
{
x = 5;
}
else
{
x = 6;
}

return x;
}

编译出.o文件,并使用objdump查看汇编代码

1
2
gcc -O2 -fprofile-arcs -c test_builtin_except.cpp
objdump -d test_builtin_except.o

结果以及解释如下图
avatar

总结

Linux 内核中大量使用LIKELY UNLIKELY宏提升程序运行效率,在C/C++工程可以引入此宏提供分支预测提示编译器进行代码优化。比如在工程中经常会存在处理程序错误的分支,但是出错分支又是不经常进入,这种场景下可以使用__build_except进行代码优化。

Leveldb LRUCache 解析

发表于 2018-05-09 | 更新于: 2018-09-04 | 分类于 Backend Development

LRU原理

LRU 是一种经典的缓存淘汰策略,其原理以及实现可以查看我之前的博客LRU Cache 解析及实现。本文主要解析Leveldb LRU Cache。

Leveldb 实现

Leveldb 实现的LRUCache 使用自己实现的简单hashtable存储键值对,循环双链表记录每个元素的访问时间,为了提升多线程环境下的读写性能,Leveldb 内部使用LRUCache 数组对外提供服务。

Cache Entry

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
struct LRUHandle {
void* value;
void (*deleter)(const Slice&, void* value);
// next_hash 代表在hash桶中下一个元素的位置
LRUHandle* next_hash;
// 双链表中下个元素的位置
LRUHandle* next;
// 双链表中上个元素的位置
LRUHandle* prev;
size_t charge; // TODO(opt): Only allow uint32_t?
size_t key_length;
// 使用次数
uint32_t refs;
// key 的hash值用于在LRUCache数组以及hash表中定位
uint32_t hash; // Hash of key(); used for fast sharding and comparisons
char key_data[1]; // Beginning of key

Slice key() const {
// For cheaper lookups, we allow a temporary Handle object
// to store a pointer to a key in "value".
if (next == this) {
return *(reinterpret_cast<Slice*>(value));
} else {
return Slice(key_data, key_length);
}
}
};

简单hash表

Leveldb LRU Cache 实现了线程安全的拉链式的hashtable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class HandleTable {
public:
HandleTable() : length_(0), elems_(0), list_(NULL) { Resize(); }
~HandleTable() { delete[] list_; }
// 查找,直接调用FindPointer方法
LRUHandle* Lookup(const Slice& key, uint32_t hash) {
return *FindPointer(key, hash);
}
// 插入,使用FindPointer方法找到插入位置,在hashtable 对位位置插入,当hashtable 中的元素个数大于桶的个数时候触发hashtable 的扩容
LRUHandle* Insert(LRUHandle* h) {
LRUHandle** ptr = FindPointer(h->key(), h->hash);
LRUHandle* old = *ptr;
h->next_hash = (old == NULL ? NULL : old->next_hash);
*ptr = h;
if (old == NULL) {
++elems_;
if (elems_ > length_) {
// Since each cache entry is fairly large, we aim for a small
// average linked list length (<= 1).
Resize();
}
}
return old;
}
// 删除hashtable中的元素,同样使用FindPointer方法定位到在hashtable 中的位置
LRUHandle* Remove(const Slice& key, uint32_t hash) {
LRUHandle** ptr = FindPointer(key, hash);
LRUHandle* result = *ptr;
if (result != NULL) {
*ptr = result->next_hash;
--elems_;
}
return result;
}

private:
// The table consists of an array of buckets where each bucket is
// a linked list of cache entries that hash into the bucket.
uint32_t length_;
uint32_t elems_;
LRUHandle** list_;

// Return a pointer to slot that points to a cache entry that
// matches key/hash. If there is no such cache entry, return a
// pointer to the trailing slot in the corresponding linked list.
// 找到hash 元素的地址
LRUHandle** FindPointer(const Slice& key, uint32_t hash) {
//找到hash 桶的位置,由于length_是2的指数次幂,所以使用 按位与替换mod操作进行加速
LRUHandle** ptr = &list_[hash & (length_ - 1)];
// 遍历hash桶中的单链表直到找到相应节点或单链表结束
while (*ptr != NULL &&
((*ptr)->hash != hash || key != (*ptr)->key())) {
ptr = &(*ptr)->next_hash;
}
return ptr;
}
// hash 表扩容
void Resize() {
// 每次扩容都是翻倍,保证hash桶的个数是2的指数次幂
uint32_t new_length = 4;
while (new_length < elems_) {
new_length *= 2;
}
// 创建一个新的hashtable
LRUHandle** new_list = new LRUHandle*[new_length];
memset(new_list, 0, sizeof(new_list[0]) * new_length);
uint32_t count = 0;
// 逐个桶迁移到新的hashtable
for (uint32_t i = 0; i < length_; i++) {
LRUHandle* h = list_[i];
while (h != NULL) {
LRUHandle* next = h->next_hash;
uint32_t hash = h->hash;
LRUHandle** ptr = &new_list[hash & (new_length - 1)];
h->next_hash = *ptr;
*ptr = h;
h = next;
count++;
}
}
assert(elems_ == count);
// 释放掉老的hashtable
delete[] list_;
// 替换为新的hashtable
list_ = new_list;
length_ = new_length;
}
};

单个LRUCache实现

LRUCache 类声明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class LRUCache {
public:
LRUCache();
~LRUCache();

// Separate from constructor so caller can easily make an array of LRUCache
void SetCapacity(size_t capacity) { capacity_ = capacity; }

// Like Cache methods, but with an extra "hash" parameter.
Cache::Handle* Insert(const Slice& key, uint32_t hash,
void* value, size_t charge,
void (*deleter)(const Slice& key, void* value));
Cache::Handle* Lookup(const Slice& key, uint32_t hash);
void Release(Cache::Handle* handle);
void Erase(const Slice& key, uint32_t hash);
void Prune();
size_t TotalCharge() const {
MutexLock l(&mutex_);
return usage_;
}

private:
void LRU_Remove(LRUHandle* e);
void LRU_Append(LRUHandle* e);
void Unref(LRUHandle* e);

// Initialized before use.
// LRUCache 存储的元素个数,超过此个数触发淘汰机制
size_t capacity_;

// mutex_ protects the following state.
mutable port::Mutex mutex_;
size_t usage_;

// Dummy head of LRU list.
// lru.prev is newest entry, lru.next is oldest entry.
// 按照访问时间顺序存储的双向循环链表
LRUHandle lru_;
// 自己实现的拉链式的hash table
HandleTable table_;
};

LRUCache 插入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
Cache::Handle* LRUCache::Insert(
const Slice& key, uint32_t hash, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value)) {
MutexLock l(&mutex_);
// 构造一个新的LRUHandle 对象
LRUHandle* e = reinterpret_cast<LRUHandle*>(
malloc(sizeof(LRUHandle)-1 + key.size()));
e->value = value;
e->deleter = deleter;
e->charge = charge;
e->key_length = key.size();
e->hash = hash;
e->refs = 2; // One from LRUCache, one for the returned handle
memcpy(e->key_data, key.data(), key.size());
// 插入到双向链表表头的位置
LRU_Append(e);
usage_ += charge;

LRUHandle* old = table_.Insert(e);
// 替换掉LRUCache中重复的元素
if (old != NULL) {
LRU_Remove(old);
Unref(old);
}
// 触发LRU 淘汰
while (usage_ > capacity_ && lru_.next != &lru_) {
// 淘汰双向链表最后的元素
LRUHandle* old = lru_.next;
// 删除双向链表中的元素
LRU_Remove(old);
// 删除hashtable中的元素
table_.Remove(old->key(), old->hash);
Unref(old);
}
// 返回刚刚插入的Handle对象,此对象引用计数为2
return reinterpret_cast<Cache::Handle*>(e);
}

LRUCache 删除

1
2
3
4
5
6
7
8
9
10
void LRUCache::Erase(const Slice& key, uint32_t hash) {
MutexLock l(&mutex_);
// 删除hashtable中对应的元素,
// 如果该元素存在在hashtable中则在双向链表中进行删除
LRUHandle* e = table_.Remove(key, hash);
if (e != NULL) {
LRU_Remove(e);
Unref(e);
}
}

LRUCache Prune

删除LRUCache中ref为1 的元素,最初插入的元素的引用为2,外部消费者对使用结束的元素ref减一。LRUCache中ref 为1 的handle表示此元素只在LRUCache中出现,外部并不使用。此方法的作用就是删除这样的元素。

1
2
3
4
5
6
7
8
9
10
11
12
void LRUCache::Prune() {
MutexLock l(&mutex_);
for (LRUHandle* e = lru_.next; e != &lru_; ) {
LRUHandle* next = e->next;
if (e->refs == 1) {
table_.Remove(e->key(), e->hash);
LRU_Remove(e);
Unref(e);
}
e = next;
}
}

LRU_Remove

删除双向链表中一个节点

1
2
3
4
void LRUCache::LRU_Remove(LRUHandle* e) {
e->next->prev = e->prev;
e->prev->next = e->next;
}

LRU_Append

插入到双向链表表头位置

1
2
3
4
5
6
7
void LRUCache::LRU_Append(LRUHandle* e) {
// Make "e" newest entry by inserting just before lru_
e->next = &lru_;
e->prev = lru_.prev;
e->prev->next = e;
e->next->prev = e;
}

ShardedLRUCache

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
static const int kNumShardBits = 4;
// 16个 lru cache,目的在多线程的环境下,每个线程的访问都会锁住缓冲区
// 如果缓冲区比较大的话, 多线程情况下的性能比较差,
// 所以使用hash 的方式将一块cache 缓冲区 划分为多个小块的缓冲区
static const int kNumShards = 1 << kNumShardBits;
class ShardedLRUCache : public Cache {
private:
// LRUCache 数组,每个LRUCache对象使用自己的锁
LRUCache shard_[kNumShards];
// 这个锁只保护id
port::Mutex id_mutex_;
uint64_t last_id_;

static inline uint32_t HashSlice(const Slice& s) {
return Hash(s.data(), s.size(), 0);
}
// Shard 函数,根据key计算的hash 值定位LRUCache数组的位置
static uint32_t Shard(uint32_t hash) {
// 不需要使用mod运算,此数值一定是在0~kNumShards这个区间
return hash >> (32 - kNumShardBits);
}

public:
explicit ShardedLRUCache(size_t capacity)
: last_id_(0) {
// 向上取整,为每个LRUCache 分配容量
const size_t per_shard = (capacity + (kNumShards - 1)) / kNumShards;
for (int s = 0; s < kNumShards; s++) {
shard_[s].SetCapacity(per_shard);
}
}
virtual ~ShardedLRUCache() { }
// 插入时,先使用Shard方法定位到相应的LRUCache
virtual Handle* Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value)) {
const uint32_t hash = HashSlice(key);
return shard_[Shard(hash)].Insert(key, hash, value, charge, deleter);
}
virtual Handle* Lookup(const Slice& key) {
const uint32_t hash = HashSlice(key);
return shard_[Shard(hash)].Lookup(key, hash);
}
virtual void Release(Handle* handle) {
LRUHandle* h = reinterpret_cast<LRUHandle*>(handle);
shard_[Shard(h->hash)].Release(handle);
}
virtual void Erase(const Slice& key) {
const uint32_t hash = HashSlice(key);
shard_[Shard(hash)].Erase(key, hash);
}
virtual void* Value(Handle* handle) {
return reinterpret_cast<LRUHandle*>(handle)->value;
}
virtual uint64_t NewId() {
MutexLock l(&id_mutex_);
return ++(last_id_);
}
virtual void Prune() {
for (int s = 0; s < kNumShards; s++) {
shard_[s].Prune();
}
}
virtual size_t TotalCharge() const {
size_t total = 0;
for (int s = 0; s < kNumShards; s++) {
total += shard_[s].TotalCharge();
}
return total;
}
};

总结

Leveldb 实现的LRUCahe 基于hashtable以及双向链表实现,为了提升性能以及保证跨平台特性,Leveldb 实现了一个线程安全的拉链式hashtable,此hashtable 的缺点是当触发扩容操作的时候需要将老hashtable 的全部元素复制到新的hashtable 中,这会期间会一直占用锁,此时多线程环境下的读写会阻塞。这一点Memcached 中实现的hashtable是有一个线程专门负责扩展,每次只扩展一个元素。
Leveldb 为了提升多线程环境下的读写性能,将固定容量的Cache分摊到多个LRUCache,每个LRUCache保证自己的线程安全,这就降低了多线程环境下锁的竞争。这种做法与分段锁的思想类似。

Linux shell 颜色输出

发表于 2018-05-09 | 更新于: 2018-05-09 | 分类于 Backend Development

原理

可以利用ANSI escape code 实现linux终端输出颜色文本。下面是几个ANSI escape code 对应的颜色。

1
2
3
4
5
6
7
8
Black        0;30     Dark Gray     1;30
Red 0;31 Light Red 1;31
Green 0;32 Light Green 1;32
Brown/Orange 0;33 Yellow 1;33
Blue 0;34 Light Blue 1;34
Purple 0;35 Light Purple 1;35
Cyan 0;36 Light Cyan 1;36
Light Gray 0;37 White 1;37

示例

以下是一个输出ANSI Rainbow 的示例。

1
2
3
4
5
#!/bin/bash
for (( i = 30; i < 38; i++ ))
do
echo -e "\033[0;"$i"m Normal: (0;$i); \033[1;"$i"m Light: (1;$i)";
done

输出结果如下
avatar
如果想设置固定区间的颜色,可以将’\033[0m’放在对应区间的结尾。如下示例

1
2
3
4
5
6
#!/bin/bash
NC='\033[0m' # No Color
for (( i = 30; i < 38; i++ ))
do
echo -e "I \033[0;${i}mlove${NC} Linux"
done

输出结果如下
avatar

Leveldb Arena解析

发表于 2018-05-03 | 更新于: 2018-05-03 | 分类于 Backend Development

简介

Arena 是leveldb 实现的简单的内存池,以最小4096bytes 为单位申请block, 使用指针记录当前block 中空余内存起始位置以及当前block剩余空间。将所有的block 放到blocks_ 数组中。Arena 提供了分配内存以及分配对齐的内存的两种接口,没有释放内存的接口,当Arena 的生命周期结束时,由Arena 的析构函数统一释放内存。Arena 的主要结构如下图所示:
leveldb arena

Leveldb 代码实现

leveldb arena 实现在util/arena.h 以及 util/arena.cc

接口以及成员

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class Arena {
public:
Arena();
~Arena();

// Return a pointer to a newly allocated memory block of "bytes" bytes.
// 分配指定大小的内存,并返回分配内存的首地址
char* Allocate(size_t bytes);
// 分配指定大小并且对齐的内存
// Allocate memory with the normal alignment guarantees provided by malloc
char* AllocateAligned(size_t bytes);

// Returns an estimate of the total memory usage of data allocated
// by the arena.
// 统计使用了多少内存
size_t MemoryUsage() const {
return reinterpret_cast<uintptr_t>(memory_usage_.NoBarrier_Load());
}

private:
// 分配内存,根据情况决定是否使用新的block
char* AllocateFallback(size_t bytes);
// 分配一个新的block
char* AllocateNewBlock(size_t block_bytes);

// Allocation state
// 指向一个block中未被使用的内存首地址
char* alloc_ptr_;
// 表示一个block还剩余多少空间
size_t alloc_bytes_remaining_;

// Array of new[] allocated memory blocks
// 内存池数组
std::vector<char*> blocks_;

// Total memory usage of the arena.
// 统计内存使用,原子变量
port::AtomicPointer memory_usage_;

// No copying allowed
// 将拷贝构造和赋值构造设置为私有
Arena(const Arena&);
void operator=(const Arena&);
};

分配指定bytes 的内存

首先根据alloc_bytes_remaining_判断是否需要分配一个新的block ,如果不需要的话,则重新设置alloc_ptr_ 以及 alloc_bytes_remaining_,否则调用AllocFallback方法分配新的block,并且判断新的block 是否可以被复用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
inline char* Arena::Allocate(size_t bytes) {
// The semantics of what to return are a bit messy if we allow
// 0-byte allocations, so we disallow them here (we don't need
// them for our internal use).
assert(bytes > 0);
// 判断是否有可被复用的block
if (bytes <= alloc_bytes_remaining_) {
char* result = alloc_ptr_;
alloc_ptr_ += bytes;
alloc_bytes_remaining_ -= bytes;
return result;
}
return AllocateFallback(bytes);
}

分配指定bytes 对齐的内存

与上一个接口不同的地方在于,将传入的bytes 化为可对齐内存的大小

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
char* Arena::AllocateAligned(size_t bytes) {
// 获取当前系统指针大小
const int align = (sizeof(void*) > 8) ? sizeof(void*) : 8;
// 指针大小必须是2的整数次幂,2的整数次幂的二进制表示中
// 有且只有1位是1
assert((align & (align-1)) == 0); // Pointer size should be a power of 2
// 判断bytes是不是align 的整数倍,由于align是2的
// 整数次幂,所以对align的取模运算可以转化为
// 对(align - 1)进行按位与
size_t current_mod = reinterpret_cast<uintptr_t>(alloc_ptr_) & (align-1);
// 为了对齐内存需要新增的大小
size_t slop = (current_mod == 0 ? 0 : align - current_mod);
// needed 表示分配对齐的内存所需的大小
// 后面的逻辑同前
size_t needed = bytes + slop;
char* result;
if (needed <= alloc_bytes_remaining_) {
result = alloc_ptr_ + slop;
alloc_ptr_ += needed;
alloc_bytes_remaining_ -= needed;
} else {
// AllocateFallback always returned aligned memory
result = AllocateFallback(bytes);
}
assert((reinterpret_cast<uintptr_t>(result) & (align-1)) == 0);
return result;
}

AllocateFallback

函数的作用是分配一个新的block。不同的是,如果bytes 小于blocksize 的四分之一,则此新分配的block 可以被继续复用。否则的话直接分配新的block, Arena 中可被复用的block 保持不变。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
char* Arena::AllocateFallback(size_t bytes) {
// 分配的bytes较小,此新block可以被复用
if (bytes > kBlockSize / 4) {
// Object is more than a quarter of our block size. Allocate it separately
// to avoid wasting too much space in leftover bytes.
char* result = AllocateNewBlock(bytes);
return result;
}

// We waste the remaining space in the current block.
// 此新分配的block可以被复用,重新设置
// alloc_ptr_ 以及 alloc_bytes_remaining_
alloc_ptr_ = AllocateNewBlock(kBlockSize);
alloc_bytes_remaining_ = kBlockSize;

char* result = alloc_ptr_;
alloc_ptr_ += bytes;
alloc_bytes_remaining_ -= bytes;
return result;
}

AllocateNewBlock

直接使用new分配符分配指定大小的内存,并且更新memory_usage_

1
2
3
4
5
6
7
8
9
10
char* Arena::AllocateNewBlock(size_t block_bytes) {
// 分配空间
char* result = new char[block_bytes];
// 新的block 加入内存池
blocks_.push_back(result);
// 更新memory_usage_
memory_usage_.NoBarrier_Store(
reinterpret_cast<void*>(MemoryUsage() + block_bytes + sizeof(char*)));
return result;
}

析构

逐个释放内存池中的block

1
2
3
4
5
Arena::~Arena() {
for (size_t i = 0; i < blocks_.size(); i++) {
delete[] blocks_[i];
}
}

总结

Leveldb Arena 实现简单,Arena 的设计与leveldb 特定的应用场景相关,比如一个memtable 使用一个Arena对象统一获取内存,当memtable对象声明周期结束时,由Arena 统一释放内存,不需要消费者每次new一片内存就要自己delete 掉。

Leveldb BloomFilter 解析

发表于 2018-04-28 | 更新于: 2018-04-28 | 分类于 Backend Development

BloomFilter 原理

布隆过滤器由巴顿布隆于1970年提出,由一个很长的bit数组以及一系列hash函数组成。bloomfilter可以用于检索一个元素是否出现在一个集合中,bloomfilter的优点是相比hash表拥有极大的空间效率,缺点是会出现一定的错误概率(False positive,一个不在集合中的元素被误认为处于集合中)。
bloomfilter 的原理是,当一个元素在加入集合时,通过k个hash 函数映射到bit数组的k个位置,并将相应的位置置1。在查找一个元素是否存在于集合中时,使用相同的k个hash 函数查看bit数组中的位置是否为全为1,如果出现一个0,则该key 一定不存在集合中,否则有可能出现在集合中。

优点

Bloomfilter 可以使用较少的空间存储大量数据的全集,并且存储的不是每个元素的原本的数值,只是设置对应的bit位,一定程度上实现了加密的效果。除空间效率之外,bloomfilter的时间效率都是常数级别O(k),其中k 代表使用的hash 函数个数,由于k个hash 函数式相互独立的性质,在进行k个hash 计算时可以并行计算进一步加速插入查找。

缺点

Bloomfilter 的缺点是会出现False positive 的误判率,而且随着元素的增加,误算率也随之增加。因此尽可能降低误判率需要一些额外工作。

BloomFilter参数选择

以下均参考wikipedia bloomfilter 中关于误差率的计算章节。直接说结论,当k(hash 函数个数),m(bit数组大小),n(插入元素个数)满足下式的时候,可以保证最低的误差率。$$k=\cfrac{m}{n}ln2$$下图(摘自wikipedia) 表示在最优hash函数个数的情况下,不同m,n之间误差率关系。

从上图可以看到当存储10亿个元素时使用4GB的存储空间可以保证不到1e-06的错误率。可以看到bloomfilter在实现高空间利用率的同时可以保证较低误差率。

Leveldb 实现

leveldb bloomfilter 实现在util/bloom.cc

成员变量以及构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class BloomFilterPolicy : public FilterPolicy {
private:
// 平均每个key拥有的bit 数目
size_t bits_per_key_;
// hash function 的个数
size_t k_;

public:
explicit BloomFilterPolicy(int bits_per_key)
: bits_per_key_(bits_per_key) {
// We intentionally round down to reduce probing cost a little bit
// 按照上面的公式设定hash函数的个数
k_ = static_cast<size_t>(bits_per_key * 0.69); // 0.69 =~ ln(2)
if (k_ < 1) k_ = 1;
if (k_ > 30) k_ = 30;
}
};

CreateFilter

将传入的n个key 存储到bloomfilter 中,bloomfilter结果使用string存储。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
virtual void CreateFilter(const Slice* keys, int n, std::string* dst) const {
// Compute bloom filter size (in both bits and bytes)
// bloomfilter 需要多少bit
size_t bits = n * bits_per_key_;

// For small n, we can see a very high false positive rate. Fix it
// by enforcing a minimum bloom filter length.
if (bits < 64) bits = 64;
// 对齐,方便内存读写以及后续位置索引
size_t bytes = (bits + 7) / 8;
bits = bytes * 8;
// 在string 中分配空间
const size_t init_size = dst->size();
dst->resize(init_size + bytes, 0);
// string 的最后一个byte存储使用的hash 函数的个数
dst->push_back(static_cast<char>(k_)); // Remember # of probes in filter
// 获得string内部的char 型数组
char* array = &(*dst)[init_size];
// 逐个将每个key 写入bloom fliter
for (int i = 0; i < n; i++) {
// Use double-hashing to generate a sequence of hash values.
// See analysis in [Kirsch,Mitzenmacher 2006].
// leveldb 使用一个hash 函数,每次对hash值向右循环移位17个bit来模拟实现多个hash 函数的效果
uint32_t h = BloomHash(keys[i]);
// 每次向右循环移位17个bit
const uint32_t delta = (h >> 17) | (h << 15); // Rotate right 17 bits
for (size_t j = 0; j < k_; j++) {
// 在整个bit 数组的位置
const uint32_t bitpos = h % bits;
// 在char型数组的位置
array[bitpos/8] |= (1 << (bitpos % 8));
// 更新获得一个新的hash 数值
h += delta;
}
}
}

KeyMayMatch

判断一个 key 在bloomfilter中是否存在。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
virtual bool KeyMayMatch(const Slice& key, const Slice& bloom_filter) const {
const size_t len = bloom_filter.size();
if (len < 2) return false;

const char* array = bloom_filter.data();
// 最后一个byte数值代表使用了多少hash函数
// 除最后一个byte之外代表bit数组
const size_t bits = (len - 1) * 8;

// Use the encoded k so that we can read filters generated by
// bloom filters created using different parameters.
const size_t k = array[len-1];
if (k > 30) {
// Reserved for potentially new encodings for short bloom filters.
// Consider it a match.
return true;
}
// 使用相同的方法模拟多个hash函数计算的hash值
uint32_t h = BloomHash(key);
const uint32_t delta = (h >> 17) | (h << 15); // Rotate right 17 bits
for (size_t j = 0; j < k; j++) {
const uint32_t bitpos = h % bits;
// 找到一个bit位置不匹配,提前返回false
// 在bit数组中位置的索引和设置时的方法一致
if ((array[bitpos/8] & (1 << (bitpos % 8))) == 0) return false;
// 更新获得下一个hash value
h += delta;
}
// 全部匹配return true
return true;
}

BloomFilter 应用场景

由于其高效的空间效率,bloomfilter 可以应用于以下场景:

  • 爬虫系统记录以经爬取过的url
  • 垃圾邮件过滤
  • p2p 网络中查找资源操作: 使用一个bloomfilter 保存拥有此资源的网络通路
  • 广播信息时检查某个ip是否发包
  • 字典纠错:将所有单词存储到bloomfilter中,如果不存在则认为是一个错误拼写
  • CDN 代理缓存: 每个cache 服务器上使用bloomfilter 存储兄弟cache 服务器上是否有缓存关键字,如果没有则可以避免一次查找

总结

Bloomfilter 是一种设计巧妙的数据结构,由于其良好的空间效率,可以用于判断一个元素是否包含于海量元素集合的场景。
Leveldb 的实现的bloomfilter 可以灵活配置hash 函数的个数,使用一个hash 函数模拟任意多个hash 函数的场景,并将使用hash函数的个数存储到bloomfilter编码结果中。除此之外,leveldb bloomfilter 实现bit数组个数,hash函数个数以及存储元素个数的最优配置,保证最低的误差率。

Leveldb skiplist 实现以及解析

发表于 2018-04-27 | 更新于: 2018-04-27 | 分类于 Backend Development

skiplist 原理介绍

skiplist 由William Pugh 在论文Skip Lists: A Probabilistic Alternative to Balanced Trees 中提出的一种数据结构,skiplist 是一种随机化存储的多层线性链表结构,插入,查找,删除的都是对数级别的时间复杂度。skiplist 和平衡树有相同的时间复杂度,但相比平衡树,skip实现起来更简单。

下图是wikipedia 上一个一个高度为4的skiplist

avatar
从垂直角度看,skiplist 的第0层以单链表的形式按照从小到大的顺序存储全部数据,越高层的链表的节点数越少,这样的特点实现了skiplist 在定位某个位置时,通过在高层较少的节点中查找就可以确定需要定位的位置处于哪个区间,从高层到低层不断缩小查找区间。以上图为例,比如我们需要在skiplist中查找2,查找过程如下,首先在最高层确定到2只可能处于1->NULL 这个区间,然后在第三层查找确定 2 只可能处于 1->4 这个区间,继续在第二层查找确定2 只可能处于1-3 这区间,最后在最底层1->3 这个区间查找可以确定2 是否存在于skiplist之中。
下图是wikipedia上提供的表示skiplist插入过程的一张gif,此图形象的说明了skiplist 定位以及插入节点的过程。
skiplist insert
从水平角度来看,skiplist实现在链表开始的时候设置名为head 的哨兵节点,每一层链表的结束为止全部指向NULL。

leveldb 实现

leveldb 实现的skiplist位于db/skiplist.h。

skiplist Node 类型定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// Implementation details follow
template<typename Key, class Comparator>
struct SkipList<Key,Comparator>::Node {
explicit Node(const Key& k) : key(k) { }
// Node 存储的内容
Key const key;

// Accessors/mutators for links. Wrapped in methods so we can
// add the appropriate barriers as necessary.
// 获取当前节点在指定level的下一个节点
Node* Next(int n) {
assert(n >= 0);
// Use an 'acquire load' so that we observe a fully initialized
// version of the returned Node.
return reinterpret_cast<Node*>(next_[n].Acquire_Load());
}
// 将当前节点在指定level的下一个节点设置为x
void SetNext(int n, Node* x) {
assert(n >= 0);
// Use a 'release store' so that anybody who reads through this
// pointer observes a fully initialized version of the inserted node.
next_[n].Release_Store(x);
}
// 无内存屏障版本set。关于leveldb 内存屏障在新一篇博客介绍
// No-barrier variants that can be safely used in a few locations.
Node* NoBarrier_Next(int n) {
assert(n >= 0);
return reinterpret_cast<Node*>(next_[n].NoBarrier_Load());
}
void NoBarrier_SetNext(int n, Node* x) {
assert(n >= 0);
next_[n].NoBarrier_Store(x);
}

private:
// Array of length equal to the node height. next_[0] is lowest level link.
// 当前节点的下一个节点数组
port::AtomicPointer next_[1];
};

skiplist 类成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private:
// 使用枚举类型定义skiplist 最高高度
enum { kMaxHeight = 12 };

// Immutable after construction
// 用户定制的比较器
Comparator const compare_;
// leveldb 实现的简单的内存分配器
Arena* const arena_; // Arena used for allocations of nodes
// skiplist 的前置哨兵节点
Node* const head_;

// Modified only by Insert(). Read racily by readers, but stale
// values are ok.
// 记录当前skiplist使用的最高高度
port::AtomicPointer max_height_; // Height of the entire list

skiplist 插入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
template<typename Key, class Comparator>
void SkipList<Key,Comparator>::Insert(const Key& key) {
// TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
// here since Insert() is externally synchronized.
// 声明prev节点,代表插入位置的前一个节点
Node* prev[kMaxHeight];
// 使用FindGreaterOrEqual函数找到第一个大于等于插入key的位置
Node* x = FindGreaterOrEqual(key, prev);

// Our data structure does not allow duplicate insertion
assert(x == NULL || !Equal(key, x->key));
// 使用随机数获取该节点的插入高度
int height = RandomHeight();
if (height > GetMaxHeight()) {
// 大于当前skiplist 最高高度的话,将多出的来的高度的prev 设置为哨兵节点
for (int i = GetMaxHeight(); i < height; i++) {
prev[i] = head_;
}
//fprintf(stderr, "Change height from %d to %d\n", max_height_, height);

// It is ok to mutate max_height_ without any synchronization
// with concurrent readers. A concurrent reader that observes
// the new value of max_height_ will see either the old value of
// new level pointers from head_ (NULL), or a new value set in
// the loop below. In the former case the reader will
// immediately drop to the next level since NULL sorts after all
// keys. In the latter case the reader will use the new node.
// 跟新max_height_ max_height_.NoBarrier_Store(reinterpret_cast<void*>(height));
}
// 创建要插入的节点对象
x = NewNode(key, height);
for (int i = 0; i < height; i++) {
// NoBarrier_SetNext() suffices since we will add a barrier when
// we publish a pointer to "x" in prev[i].
// 首先将x的next 指向prev 的下一个节点
x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
// 将prev 指向x
prev[i]->SetNext(i, x);
}
}

skiplist 查找

1
2
3
4
5
6
7
8
9
10
11
template<typename Key, class Comparator>
bool SkipList<Key,Comparator>::Contains(const Key& key) const {
// 找到大于等于当前key的第一个node,然后判断node 的key
// 和传入的key 是否相等
Node* x = FindGreaterOrEqual(key, NULL);
if (x != NULL && Equal(key, x->key)) {
return true;
} else {
return false;
}
}

FindGreaterOrEqual

函数的作用是找到第一个大于或等于指定的key 的node,以及该node的前一个node

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node* SkipList<Key,Comparator>::FindGreaterOrEqual(const Key& key, Node** prev)
const {
Node* x = head_;
// level 从0 开始编码
int level = GetMaxHeight() - 1;
while (true) {
// 定位到当前level的下一个节点
Node* next = x->Next(level);
// key 没有在当前区间
if (KeyIsAfterNode(key, next)) {
// Keep searching in this list
x = next;
} else {
// key 在当前区间,在低level 继续查找,
// 在查找的同时设置prev 节点
if (prev != NULL) prev[level] = x;
// 在最低level找到相应位置
if (level == 0) {
return next;
} else {
// Switch to next list
level--;
}
}
}
}

RandomHeight

利用随机数实现每次有4分之一的概率增长高度。

1
2
3
4
5
6
7
8
9
10
11
12
template<typename Key, class Comparator>
int SkipList<Key,Comparator>::RandomHeight() {
// Increase height with probability 1 in kBranching
static const unsigned int kBranching = 4;
int height = 1;
while (height < kMaxHeight && ((rnd_.Next() % kBranching) == 0)) {
height++;
}
assert(height > 0);
assert(height <= kMaxHeight);
return height;
}

FindLessThan

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node*
SkipList<Key,Comparator>::FindLessThan(const Key& key) const {
Node* x = head_;
int level = GetMaxHeight() - 1;
while (true) {
assert(x == head_ || compare_(x->key, key) < 0);
\// 在当前level 查找
Node* next = x->Next(level);
// if 分支为true 的时候表示需要查找的位置在当前区间
if (next == NULL || compare_(next->key, key) >= 0) {
// 在最后一层停止查找
if (level == 0) {
return x;
} else {
// Switch to next list
level--;
}
} else {
// 在当前level 就找到了比key 小的节点
x = next;
}
}
}

总结

skiplist最底层单链表有序存储全部元素,利用多层有序链表的结构实现加速索引的功能,处于越高level 节点的链表越稀疏查找速度越快,在不断向下查找的过程中不断缩小查找空间。
总的来说,skiplist 是一种设计巧妙的数据结构,leveldb 的实现可读性高,容易理解。

C++ 拼接长字符串

发表于 2018-04-26 | 更新于: 2018-04-28 | 分类于 Backend Development

C++ 拼接长字符串

c++ string 类型提供 opearator+= 以及 append 方法进行字符串拼接,本文探讨c++拼接长字符串执行效率最高的方法。以下是四种实现方式。

实现方式

operator +=

使用 string 类提供重载 += 方法拼接字符串。示例:

1
2
3
4
5
6
7
8
9
10
11
// length 参数代表拼接的字符串长度
void composeLongstringWithOperator(const unsigned int length,std::string& long_string)
{
for (size_t i = 0; i < length / 9; i++)
{
char str[10];
// randStr 方法构造长度为9的随机字符串
long_string += (randStr(str,9));
}

}

append

使用 string 类提供的append 方法拼接字符串。示例:

1
2
3
4
5
6
7
8
void composeLongstringWithAppend(const unsigned int length,std::string& long_string)
{
for (size_t i = 0; i < length / 9; i++)
{
char str[10];
long_string.append(randStr(str,9));
}
}

reserve && operator +=

在拼接字符串之前为string 对象提前分配空间,然后使用 += 方法进行拼接,示例:

1
2
3
4
5
6
7
8
9
void composeLongstringWithReserveAndOperator(const unsigned int length,std::string& long_string)
{
long_string.reserve(length);
for (size_t i = 0; i < length / 9; i++)
{
char str[10];
long_string += (randStr(str,9));
}
}

reserve && append

在拼接字符串之前为string 对象提前分配空间,然后使用 append 方法进行拼接,示例:

1
2
3
4
5
6
7
8
void composeLongstringWithAppend(const unsigned int length,std::string& long_string)
{
for (size_t i = 0; i < length / 9; i++)
{
char str[10];
long_string.append(randStr(str,9));
}
}

性能测试

测试方法

进行10000次长字符串拼接,统计每种方式下耗时,示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#include <iostream>
#include <string>
#include <ctime>
#include <chrono>
char* randStr(char* str,const int len)
{
int i;
for(i = 0; i < len; ++i)
{
str[i] = 'A' + rand() % 26;
}
str[++i] = '\0';
return str;
}

int main(int argc, char* argv[])
{
(void) argc;
// 第一个参数代表生成的字符串的长度
const unsigned int length = atoi(argv[1]);
// 第二个参数代表使用哪种方法进行拼接
const unsigned int type = atoi(argv[2]);

srand(time(NULL));
auto start = std::chrono::high_resolution_clock::now();
switch(type)
{
case 1:
std::cout << "composeLongstringWithReserveAndAppend";
for(int i = 0; i < 10000; i++)
{
std::string long_string;
composeLongstringWithReserveAndAppend(length,long_string);
}
break;
case 2:
std::cout << "composeLongstringWithReserveAndOperator";
for(int i = 0; i < 10000; i++)
{
std::string long_string;
composeLongstringWithReserveAndOperator(length,long_string);
}
break;
case 3:
std::cout << "composeLongstringWithAppend";
for(int i = 0; i < 10000; i++)
{
std::string long_string;
composeLongstringWithAppend(length,long_string);
}
break;
case 4:
std::cout << "composeLongstringWithOperator";
for(int i = 0; i < 10000; i++)
{
std::string long_string;
composeLongstringWithOperator(length,long_string);
}
break;
default:
return 0;
}
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> diff = end - start;
std::cout << " cost " << 1000 * diff.count() << " ms\n";

return 0;
}

编译

1
g++ -std=c++11 -O3 compose_long_string.cpp -o compose_long_string

性能表现

长字符串长度为1000000,每种方法进行10000次拼接,四种方法的耗时如下:

method cost (ms)
reserve && append 117304
reserve && operator 122998
append 125682
operator 129071

结论

  1. 针对较短字符串,使用reserve提前分配空间对性能提升意义不大,当字符串的长度很长是,使用reserve方法提前分配空间可以带来比较大的性能提升。
  2. operator+= 和 append 方法在进行字符串拼接时性能表现几乎一致。原因是stl 实现的operator+= 方式实际是直接调用了append 方法。
  3. 综上,拼接长字符串时最优方式是 reserve && append。

读书短评

发表于 2018-04-19 | 更新于: 2018-07-06 | 分类于 Reading

2018-04-19

《人类简史》

现代社会建立在人类的重重假设,回到非洲大草原摘个野果子吃就很快乐~
从物种繁衍的角度来看,小麦,家畜是比人要更成功的。但是物种繁衍的成功不等同于个体的快乐。有人可能会说小麦,家畜的一生都很短暂,但相对浩瀚宇宙来说,人的一生也是沧海一粟。都是匆匆过客,如果能更多卸下捆绑获得真正的快乐才是生活的真谛。

《活着》

所有的困难都会过去,活着本身就是最大的意义。

《围城》

很多现象都可以用围城思想来解释,里面的人想出去,外边的人想进去

《明朝那些事》

除了朱元璋打天下,朱棣篡位,剩下大部分篇幅都在讲官场政治,当年明月本是公务员,写官场政治这些也非常熟悉,所以后面那几本叫《明朝官场那些事》可能更合适。

《解忧杂货店》

高产如东野,可能写悬疑探案类更费精力,所以来碗鸡汤喂喂。书中所有故事都说明了影射一个道理,向别人寻求建议的时候,自己心理已经有答案,只不过是希望别人加强一下自己的信念,如果建议和自己想的相反,你会找别的人再给你建议~

《白鹿原》

具有西北特色的低配百年孤独。读的过程中可以感受到西北作家的朴实的文风,稍微结合一点贵party的black历史就可以如此精彩!另外,读的过程中经常想吃羊肉泡馍~

《苏东坡传》

上学课本上说的伟大的文学家,书画家。而借用林语堂在前言中对苏东坡的描述: “苏东坡是一个无可救药的乐天派、一个伟大的人道主义者、一个百姓的朋友、一个大文豪、大书法家、创新的画家、造酒试验家、一个工程师、一个憎恨清教徒主义的人、一位瑜伽修行者佛教徒、巨儒政治家、一个皇帝的秘书、酒仙、厚道的法官、一位在政治上专唱反调的人。一个月夜徘徊者、一个诗人、一个小丑”,相比之下自己的生活单调了好多,所以应该在自己身上点更多天赋。不写了,中午吃东坡肉。

2018-04-20

《天龙八部》

乔峰,段誉,虚竹,慕容复所有主要角色无不苦难。众生皆苦,无欲则刚,修炼成佛的还是没几个。

《百年孤独》

人生来孤独,所以需要把雕刻好小金鱼后融掉重新再雕,需要吃土排解孤独。随便拉出个人来,对他说“我想你才是真正的孤独吧!”他都会感到你是他的知音~

《追风筝的人》

“为你,千千万万遍” 对主人的忠诚和对友谊的坚持要有多深才能说出这样的话。

《鹿鼎记》

“老子不干了” 在康熙皇帝和天地会之间脚踏两条船太累了!在这两个没法做决定的选择面前,干脆不选了,全都qtmd!

《黄金时代》

在混乱的年代,黄金时代年纪,有个人一起实践伟大革命友谊也很幸福吧!

2018-04-25

《万寿寺》

万寿寺的主要作用之一是为了老佛爷乘船去颐和园游玩的途中休息一下。之所以老佛爷成为了老佛爷是因为咸丰帝从她身上爬起来时那条射过精的疲软的jb,王小波称之为历史的脐带(hahaha)。所以他在万寿寺搞一些历史的脐带考,领导很不喜欢!

《了不起的盖茨比》

对黛西的爱情是盖茨比先生的一道绿光,绿光指引他前行,我猜在他临死之前,绿光也不曾熄灭。

《指数基金投资指南》

钉大的扫盲科普书籍,1 h 看完,后面继续有选择的抄E大和钉大的作业。

2018-05-02

《笑傲江湖》

金庸武侠里面政治色彩很重的一本,前面甚嚣尘土的左盟主还是被岳君子阴了。伪君子比真小人更可怕。

《神雕侠侣》

侠之大者为国为民,杨过在最后领悟此道理,也解开了杀父之仇的心结。在经过无数挫折之后终于和小龙女走到一起,只是苦了那些“一见杨过误终身”的少女们~

2018-05-03

《月亮与六便士》

“忘了是谁说过,为了让灵魂受益,每天应该做两件自己不喜欢的事情,说着这句话的是一个富有智慧的人,我把这句格言谨记在新,遵照行事,因此每天我醒来起床,每晚上床睡下” 毛姆这句话描述的境界是生活中所做的任何事都是自己发自肺腑开心的事,这比诗和远方的境界高多了。我承认自己不可能达到那种境界,只希望自己在低头捞便士的同时不要忘了抬头看一下明亮的月亮。

《1984》

“Big brother is watching you.”,原本影射苏联的一书放在今日天朝也并无太多违和。修宪之后,海康威视大涨就让我想到1984中的场景,依据相同的逻辑,中证传媒一定不会有大发展,军工企业虽然垃圾,但会有很大机会。不知道这书啥时候成为禁书。

《寻找无双》

王仙客在寻找无双的过程中困难重重,我们追寻的理想也同样如此吧,不过无所谓,重要的反而是追寻理想的过程吧!

《革命时期的爱情》

所有混乱的事情,如果是放在革命时期也就显得不那么难以解释。X海鹰作为先进青年在挽救堕落青年的王二的时候怎么可以动歪心思!

《爱你就像爱生命》

王小波李银河书信整理合集。李银河在书中表示说自己是回应了王小波热烈的感情,同时对他的爱也变得炽烈。最喜欢里面的两句活,“一想起你我这张丑脸上就泛起笑容”,“你好哇,李银河”。

2018-05-08

《蒋介石与现代中国》

讲述了委员长上位,完成中国形式上统一,抗日战争,国共内战,退守台湾五个阶段的大事。感受到那个动荡的时代波兰壮阔的历史,比历史课本中看到人物丰富立体。书中对抗日初期我党对抗日的态度,委员长对美玲的追求,退守台湾后两位大佬心照不宣共同薅美苏羊毛给我留下很深印象。最后留下在青岛花石楼里偶然看到的委员长一张诡异的微笑图。

《天才在左,疯子在右》

初读之后非常震撼,每个人的精神世界都是非常奇妙,即使是本人无法完全洞悉自己的全部想法。所谓精神病的概念,更多的是说那帮人的想法和社会主流价值观所要求的不一致。心理学是如此奇妙。

《三体》

印象最深的是第二部中黑暗丛林法则,这个原则放到现在的社会也不无道理。看书过程中不断为作者磅礴而又严谨的想象所感叹,科幻的同时还可以看到作者对人性的思考,很赞。

2018-05-15

《霍乱时期的爱情》

男主一生的主题就是对女主爱情的追求,在认识女主之后男主做的所有事情都是以赢得爱情为目标,和神雕侠侣里一见杨过毁终生的少女有些相似的感觉。书中描写了各种种类的爱情,读的同时还是感觉到和百年孤独类似的荒芜感,不同的是从这本书里还读到老爷子幽默的地方(比如女主从无限讨厌茄子,到无限热爱茄子,哈哈)。
看到豆瓣里很有意思的一个书评,“屌丝战胜高帅富的方法就是活得比高帅富久,所以屌丝们赶紧锻炼身体去吧”

2018-05-22

《谁在世界中心》

书中一直在强调地缘政治的重要性,大部分篇幅都在讲现在是海权时代,海洋强国才是真的强国。感到新奇的一点是如果继续全球变暖,北冰洋加速融化,俄罗斯会成为另外一个海洋大国,地缘优势会大大增加,那时候老毛子会更跳~

《精进》

作为非常讨厌鸡汤以及成功学的我,在采铜的这本书中看到了超多的干货,里面很多思想和刘未鹏博客中的观点不谋而合。总之就是信息量超大的一本书。最近逛知乎看到采铜已经离开知乎了,这也并不奇怪,知乎现在已经是段子手和装逼犯集中的地方了,没那么多大佬了。

2018-05-25

《送你一颗子弹》

刘瑜老师才是真正文艺女青年,整本书大部分是刘瑜老师在06-08年的生活见闻,其中充满了对生活,爱情,政治,自由的思考,文笔朴实犀利又不乏幽默,很赞。相比起来,生活中大部分自诩文艺的女青年还是差距太大了,建议她们多读读马克思,少看些张小娴吧。

2018-06-01

《撒哈拉的故事》

上高中还是本科的时候看过一遍。感觉到三毛作为一个文艺女青年也是够燥够任性,好好的在西班牙享受现代资本主义的灯红酒绿突然要跑到撒哈拉沙漠去。到撒哈拉之后自力更生经营自己的生活,见识了好多的奇奇怪怪的习俗同时又冒了好多险。总的来说,再次读的过程中不断感受到这位文艺女青年强大的气场。

2018-07-06

《小王子》

小王子在各个星球旅行,遇到形形色色各式各样不能理解的大人。这对应到现实生活中也正是如此,我们做的大部分事情可能都是没啥实际意义或者说已经脱离原本的想法,拥有孩子的天真以及直接了当实在难能可贵。

《未来简史》

待续

《盗墓笔记》

待续

Leveldb varint 解析

发表于 2018-04-17 | 更新于: 2018-04-17 | 分类于 Backend Development

varint 介绍

我们知道 uint32_t 类型占用4个byte,uint64_t 占用8个byte, 但是对于比较小的数字来说,使用uint32_t 或者uint64_t 存储会比较浪费,varint 的思想是根据数字所需大小使用unsigned char* 指针存储数据,节约内存。

leveldb 实现

leveldb 中的varint实现原理简单,每个byte 使用最高bit的 0/1值代表此整数值是否结束,用剩下的7个bit 存储实际的数值。知道最后一个byte 的最高bit 是0 表示整数结束。因此小于128的数据都可以用一个byte 来表示,大于128的,比如说300,使用varint 编码的话需要两个字节10101100 0000 0010

leveldb 32位int变长编码实现

正常情况下,32位int占用4个byte, varint 编码中每个byte 中的最高bit 用来表示该byte 是不是最后一个byte,所以针对大的数varint 编码可能需要5个byte才能表示。leveldb 实现中5个if 分支对应varint占用1到5个byte 的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
char* EncodeVarint32(char* dst, uint32_t v) {
// Operate on characters as unsigneds
unsigned char* ptr = reinterpret_cast<unsigned char*>(dst);
static const int B = 128;
if (v < (1<<7)) {
*(ptr++) = v;
} else if (v < (1<<14)) {
*(ptr++) = v | B;
*(ptr++) = v>>7;
} else if (v < (1<<21)) {
*(ptr++) = v | B;
*(ptr++) = (v>>7) | B;
*(ptr++) = v>>14;
} else if (v < (1<<28)) {
*(ptr++) = v | B;
*(ptr++) = (v>>7) | B;
*(ptr++) = (v>>14) | B;
*(ptr++) = v>>21;
} else {
*(ptr++) = v | B;
*(ptr++) = (v>>7) | B;
*(ptr++) = (v>>14) | B;
*(ptr++) = (v>>21) | B;
*(ptr++) = v>>28;
}
return reinterpret_cast<char*>(ptr);
}

对应的varint 解码的思路是从低byte 到高byte遍历,直到找到最后一个表示编码结束的byte(判断条件是 byte & 128 == 1),代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 针对一个byte情况直接处理,大于一个byte 时,
// 使用 GetVarint32PtrFallback 处理
inline const char* GetVarint32Ptr(const char* p,
const char* limit,
uint32_t* value) {
if (p < limit) {
uint32_t result = *(reinterpret_cast<const unsigned char*>(p));
if ((result & 128) == 0) {
*value = result;
return p + 1;
}
}
return GetVarint32PtrFallback(p, limit, value);
}

const char* GetVarint32PtrFallback(const char* p,
const char* limit,
uint32_t* value) {
uint32_t result = 0;
for (uint32_t shift = 0; shift <= 28 && p < limit; shift += 7) {
uint32_t byte = *(reinterpret_cast<const unsigned char*>(p));
p++;
if (byte & 128) {
// More bytes are present
result |= ((byte & 127) << shift);
} else {
result |= (byte << shift);
*value = result;
return reinterpret_cast<const char*>(p);
}
}
return NULL;
}

64位变长编码的实现

64位整形最多需要(10 * 8 - 10 > 64) 10个byte 来保存,类似于32位int的编码,需要写10个if-else 分支,针对64位整形的编码,leveldb 给出了更优雅的解决方案。

1
2
3
4
5
6
7
8
9
10
char* EncodeVarint64(char* dst, uint64_t v) {
static const int B = 128;
unsigned char* ptr = reinterpret_cast<unsigned char*>(dst);
while (v >= B) {
*(ptr++) = (v & (B-1)) | B;
v >>= 7;
}
*(ptr++) = static_cast<unsigned char>(v);
return reinterpret_cast<char*>(ptr);
}

针对64位整形的解码,leveldb 实现同样是类似的逻辑。也是从低位byte 开始向高位byte遍历判断编码是否结束。代码实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const char* GetVarint64Ptr(const char* p, const char* limit, uint64_t* value) {
uint64_t result = 0;
for (uint32_t shift = 0; shift <= 63 && p < limit; shift += 7) {
uint64_t byte = *(reinterpret_cast<const unsigned char*>(p));
p++;
if (byte & 128) {
// More bytes are present
result |= ((byte & 127) << shift);
} else {
result |= (byte << shift);
*value = result;
return reinterpret_cast<const char*>(p);
}
}
return NULL;
}

总结

varint 的思想是针对32位或者64位整形类型来说存储的大部分数据都是多余的,因此使用每个byte的最高位来标识编码是否结束,使用一个char 型指针存储编码的结果,针对很多情况可以节约存储空间。

Memcached hashtable解析

发表于 2018-04-11 | 更新于: 2018-04-11 | 分类于 Backend Development

介绍

Memcached中实现了高性能的hashtable。其解决hash冲突的方法采用拉链法。当hashtable 中存储的item个数大于容器大小的1.5倍的时候通知线程进行hashtable 扩容,为了保证在扩容期间的读写性能,扩容线程默认每次只迁移一个bucket。设置一个变量标识当前的迁移进度,在进行读写操作时根据此变量确定是去 old_hashtable 还是 primary_hashtable 进行操作。

代码解析

主要接口

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/*数组的大小都是2指数次幂,这样的好处是可以将 index % hashsize 
变为 index & hashmask*/
#define hashsize(n) ((ub4)1<<(n))
#define hashmask(n) (hashsize(n)-1)
// 初始化函数的作用是为主表分配空间
void assoc_init(const int hashtable_init) {
if (hashtable_init) {
hashpower = hashtable_init;
}
primary_hashtable = calloc(hashsize(hashpower), sizeof(void *));
if (! primary_hashtable) {
fprintf(stderr, "Failed to init hashtable.\n");
exit(EXIT_FAILURE);
}
STATS_LOCK();
stats_state.hash_power_level = hashpower;
stats_state.hash_bytes = hashsize(hashpower) * sizeof(void *);
STATS_UNLOCK();
}

查找

hash 查找的逻辑是优先使用hash 预算定位到bucket,然后循环bucket 链表找到指定的key。需要理解的地方在于查找时可能存在hashtable 正在进行扩展,所以需要确定是在old_hashtable还是 primary_hashtable 进行查找。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
item *assoc_find(const char *key, const size_t nkey, const uint32_t hv) {
item *it;
unsigned int oldbucket;
/* expanding 标识扩展是否完成。
expand_bucket表示当前扩展的进度,使用位与操作查找bucket位置 */
if (expanding &&
(oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
{
it = old_hashtable[oldbucket];
} else {
it = primary_hashtable[hv & hashmask(hashpower)];
}

item *ret = NULL;
int depth = 0;
// 循环单链表查找指定key
while (it) {
if ((nkey == it->nkey) && (memcmp(key, ITEM_key(it), nkey) == 0)) {
ret = it;
break;
}
it = it->h_next;
++depth;
}
MEMCACHED_ASSOC_FIND(key, nkey, depth);
return ret;
}

插入

插入的主要逻辑是找到指定桶的位置,将当前插入的节点设置为桶中位置的链表头结点位置,并且重新设置桶中元素的value

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int assoc_insert(item *it, const uint32_t hv) {
unsigned int oldbucket;

// assert(assoc_find(ITEM_key(it), it->nkey) == 0); /* shouldn't have duplicately named things defined */

if (expanding &&
(oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
{
it->h_next = old_hashtable[oldbucket];
old_hashtable[oldbucket] = it;
} else {
it->h_next = primary_hashtable[hv & hashmask(hashpower)];
primary_hashtable[hv & hashmask(hashpower)] = it;
}

MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey);
return 1;
}

删除

删除接口的主要逻辑是使用_hashitem_before 函数找到要删除item前一个item指针位置,然后将此指针的位置直接指向被删除item 的下一个item 位置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void assoc_delete(const char *key, const size_t nkey, const uint32_t hv) {
item **before = _hashitem_before(key, nkey, hv);

if (*before) {
item *nxt;
/* The DTrace probe cannot be triggered as the last instruction
* due to possible tail-optimization by the compiler
*/
MEMCACHED_ASSOC_DELETE(key, nkey);
nxt = (*before)->h_next;
(*before)->h_next = 0; /* probably pointless, but whatever. */
*before = nxt;
return;
}
/* Note: we never actually get here. the callers don't delete things
they can't find. */
assert(*before != 0);
}

其他辅助函数

_hashitem_before

函数的作用是查找给定item的前一个节点的指针,在delete 接口中调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static item** _hashitem_before (const char *key, const size_t nkey, const uint32_t hv) {
item **pos;
unsigned int oldbucket;
// 同理是确定是在old_hashtable 还是在primary_hashtable
if (expanding &&
(oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
{
pos = &old_hashtable[oldbucket];
} else {
pos = &primary_hashtable[hv & hashmask(hashpower)];
}
// 从头结点的位置开始顺序遍历单链表中的节点
while (*pos && ((nkey != (*pos)->nkey) || memcmp(key, ITEM_key(*pos), nkey))) {
pos = &(*pos)->h_next;
}
return pos;
}

assoc_expand

函数的作用是执行hash表的扩容,执行的过程是将当前primary_hashtable 指定为old_hashtable, 为primary_hashtable 分配内存,primary_hashtable的大小是old_hashtable 的两倍,将标识是否在扩展的bool型变量 expanding 设置为true。将标识扩展进度的变量expand_bucket设置为0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/* grows the hashtable to the next power of 2. */
static void assoc_expand(void) {
old_hashtable = primary_hashtable;

primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *));
if (primary_hashtable) {
if (settings.verbose > 1)
fprintf(stderr, "Hash table expansion starting\n");
hashpower++;
expanding = true;
expand_bucket = 0;
STATS_LOCK();
stats_state.hash_power_level = hashpower;
stats_state.hash_bytes += hashsize(hashpower) * sizeof(void *);
stats_state.hash_is_expanding = true;
STATS_UNLOCK();
} else {
primary_hashtable = old_hashtable;
/* Bad news, but we can keep running. */
}
}

assoc_start_expand

函数的作用判断是否进行扩展,进行扩展的临界条件是hashtable 中item 个数大于hash 桶数的1.5倍。满足此临界条件时通知扩展线程进行扩展

1
2
3
4
5
6
7
8
9
10
void assoc_start_expand(uint64_t curr_items) {
if (started_expanding)
return;

if (curr_items > (hashsize(hashpower) * 3) / 2 &&
hashpower < HASHPOWER_MAX) {
started_expanding = true;
pthread_cond_signal(&maintenance_cond);
}
}

start_assoc_maintenance_thread

函数的作用是创建hash 扩展线程,可以根据用户指定的参数设置每次扩展多少个bucket。如果不指定此参数的话,默认每次只扩展一个bucket

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int start_assoc_maintenance_thread() {
int ret;
char *env = getenv("MEMCACHED_HASH_BULK_MOVE");
if (env != NULL) {
hash_bulk_move = atoi(env);
if (hash_bulk_move == 0) {
hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
}
}
pthread_mutex_init(&maintenance_lock, NULL);
if ((ret = pthread_create(&maintenance_tid, NULL,
assoc_maintenance_thread, NULL)) != 0) {
fprintf(stderr, "Can't create thread: %s\n", strerror(ret));
return -1;
}
return 0;
}

assoc_maintenance_thread

函数的作用是执行实际的bucket 扩展。具体解释见注释

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
static void *assoc_maintenance_thread(void *arg) {

mutex_lock(&maintenance_lock);
while (do_run_maintenance_thread) {
int ii = 0;

/* There is only one expansion thread, so no need to global lock. */
// 循环每次扩展的全部bucket
for (ii = 0; ii < hash_bulk_move && expanding; ++ii) {
item *it, *next;
unsigned int bucket;
void *item_lock = NULL;

/* bucket = hv & hashmask(hashpower) =>the bucket of hash table
* is the lowest N bits of the hv, and the bucket of item_locks is
* also the lowest M bits of hv, and N is greater than M.
* So we can process expanding with only one item_lock. cool! */
/* expand_bucket需要锁保护,由于处于同一个bucket
中的特性是这些item 的hv 的低N位是完全相同,对应的
item_lock 的位置靠hv 的低M位确定,由于item_lock
数组大小小于桶数组的大小,所以有 M < N ,也就是说处
于同一个桶中的item拥有相同item_lock,所以在遍历桶中
所有的item 的时候不需要在额外获取item_lock。这里的
设计非常精妙~ */
if ((item_lock = item_trylock(expand_bucket))) {
/* 遍历bucket 中全部item,插入到
primary_hashtable 中相应bucket */
for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
next = it->h_next;
bucket = hash(ITEM_key(it), it->nkey) & hashmask(hashpower);
it->h_next = primary_hashtable[bucket];
primary_hashtable[bucket] = it;
}
// old_hashtable 中bucket 内容设置为空
old_hashtable[expand_bucket] = NULL;
// 维护当前扩展的进度
expand_bucket++;
/* 如果扩展已经全部完成则设置expanding为
false ,释放old_hashtable 的内存*/
if (expand_bucket == hashsize(hashpower - 1)) {
expanding = false;
free(old_hashtable);
STATS_LOCK();
stats_state.hash_bytes -= hashsize(hashpower - 1) * sizeof(void *);
stats_state.hash_is_expanding = false;
STATS_UNLOCK();
if (settings.verbose > 1)
fprintf(stderr, "Hash table expansion done\n");
}

} else {
usleep(10*1000);
}
// 释放资源
if (item_lock) {
item_trylock_unlock(item_lock);
item_lock = NULL;
}
}
// 如果不在进行扩展,则设置条件变量,等待被触发扩展
if (!expanding) {
/* We are done expanding.. just wait for next invocation */
started_expanding = false;
pthread_cond_wait(&maintenance_cond, &maintenance_lock);
/* assoc_expand() swaps out the hash table entirely, so we need
* all threads to not hold any references related to the hash
* table while this happens.
* This is instead of a more complex, possibly slower algorithm to
* allow dynamic hash table expansion without causing significant
* wait times.
*/
pause_threads(PAUSE_ALL_THREADS);
assoc_expand();
pause_threads(RESUME_ALL_THREADS);
}
}
return NULL;
}

线程安全

memcached 使用分段锁实现hashtable 线程安全,分段锁避免了hashtable 中全部的item公用一个锁,公用一个锁的会降低hashtable 的读写性能。下面部分代码是memcached 初始化分段锁数组的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
if (nthreads < 3) {
power = 10;
} else if (nthreads < 4) {
power = 11;
} else if (nthreads < 5) {
power = 12;
} else if (nthreads <= 10) {
power = 13;
} else if (nthreads <= 20) {
power = 14;
} else {
/* 32k buckets. just under the hashpower default. */
power = 15;
}
/* 保证分段锁的数目小于hashtable 桶的个数,这样设计的好处之一
是在扩展的时候针对一个桶中的所有item 对应的是同一个
item_lock*/
if (power >= hashpower) {
fprintf(stderr, "Hash table power size (%d) cannot be equal to or less than item lock table (%d)\n", hashpower, power);
fprintf(stderr, "Item lock table grows with `-t N` (worker threadcount)\n");
fprintf(stderr, "Hash table grows with `-o hashpower=N` \n");
exit(1);
}

item_lock_count = hashsize(power);
item_lock_hashpower = power;
// 分配分段锁数组
item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
if (! item_locks) {
perror("Can't allocate item locks");
exit(1);
}

在对hashtable 进行多线程读写时,首先需要根据hash 算法计算出hv 值,然后根据hv 获取item_lock,获取到item_lock 之后再进行读写操作。这也从侧面解释了为什么memcached在扩展时默认每次只扩展一个bucket,因为在进行扩展的时候需要占有item_lock,每次执行扩展的bucket 数多会影响读写性能。

总结

memcached 的hashtable是典型的拉链式hashtable,实现代码短小易读,使用一个线程进行hashtable的扩展以保证不会出现item增多导致哈希冲突激增降低读写性能的现象,除此之外使用分段锁来保证多线程的读写安全,相比全局锁也可以提升读写性能。memcached hashsize设置为2的整数次幂的设计非常精妙,首先这样可以将查找hash bucket索引的取余操作转化为对(hashsize-1)取按位与操作,在加上分段锁的数目大小小于hashsize 的设置可以保证一个bucket 中所有的item 对应于同一个分段锁,进而保证在扩展bucket中全部内容时只需要获取一次分段锁!

1…345
carbo06@163.com

carbo06@163.com

About Linux C++ and routing algorithms.

45 日志
5 分类
18 标签
GitHub E-Mail
Friends
  • DingliYang
  • SongZhang
© 2018 carbo06@163.com
0%