Memcached hashtable解析

介绍

Memcached中实现了高性能的hashtable。其解决hash冲突的方法采用拉链法。当hashtable 中存储的item个数大于容器大小的1.5倍的时候通知线程进行hashtable 扩容,为了保证在扩容期间的读写性能,扩容线程默认每次只迁移一个bucket。设置一个变量标识当前的迁移进度,在进行读写操作时根据此变量确定是去 old_hashtable 还是 primary_hashtable 进行操作。

代码解析

主要接口

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/*数组的大小都是2指数次幂,这样的好处是可以将 index % hashsize 
变为 index & hashmask*/
#define hashsize(n) ((ub4)1<<(n))
#define hashmask(n) (hashsize(n)-1)
// 初始化函数的作用是为主表分配空间
void assoc_init(const int hashtable_init) {
if (hashtable_init) {
hashpower = hashtable_init;
}
primary_hashtable = calloc(hashsize(hashpower), sizeof(void *));
if (! primary_hashtable) {
fprintf(stderr, "Failed to init hashtable.\n");
exit(EXIT_FAILURE);
}
STATS_LOCK();
stats_state.hash_power_level = hashpower;
stats_state.hash_bytes = hashsize(hashpower) * sizeof(void *);
STATS_UNLOCK();
}

查找

hash 查找的逻辑是优先使用hash 预算定位到bucket,然后循环bucket 链表找到指定的key。需要理解的地方在于查找时可能存在hashtable 正在进行扩展,所以需要确定是在old_hashtable还是 primary_hashtable 进行查找。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
item *assoc_find(const char *key, const size_t nkey, const uint32_t hv) {
item *it;
unsigned int oldbucket;
/* expanding 标识扩展是否完成。
expand_bucket表示当前扩展的进度,使用位与操作查找bucket位置 */
if (expanding &&
(oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
{
it = old_hashtable[oldbucket];
} else {
it = primary_hashtable[hv & hashmask(hashpower)];
}

item *ret = NULL;
int depth = 0;
// 循环单链表查找指定key
while (it) {
if ((nkey == it->nkey) && (memcmp(key, ITEM_key(it), nkey) == 0)) {
ret = it;
break;
}
it = it->h_next;
++depth;
}
MEMCACHED_ASSOC_FIND(key, nkey, depth);
return ret;
}

插入

插入的主要逻辑是找到指定桶的位置,将当前插入的节点设置为桶中位置的链表头结点位置,并且重新设置桶中元素的value

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int assoc_insert(item *it, const uint32_t hv) {
unsigned int oldbucket;

// assert(assoc_find(ITEM_key(it), it->nkey) == 0); /* shouldn't have duplicately named things defined */

if (expanding &&
(oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
{
it->h_next = old_hashtable[oldbucket];
old_hashtable[oldbucket] = it;
} else {
it->h_next = primary_hashtable[hv & hashmask(hashpower)];
primary_hashtable[hv & hashmask(hashpower)] = it;
}

MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey);
return 1;
}

删除

删除接口的主要逻辑是使用_hashitem_before 函数找到要删除item前一个item指针位置,然后将此指针的位置直接指向被删除item 的下一个item 位置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void assoc_delete(const char *key, const size_t nkey, const uint32_t hv) {
item **before = _hashitem_before(key, nkey, hv);

if (*before) {
item *nxt;
/* The DTrace probe cannot be triggered as the last instruction
* due to possible tail-optimization by the compiler
*/
MEMCACHED_ASSOC_DELETE(key, nkey);
nxt = (*before)->h_next;
(*before)->h_next = 0; /* probably pointless, but whatever. */
*before = nxt;
return;
}
/* Note: we never actually get here. the callers don't delete things
they can't find. */
assert(*before != 0);
}

其他辅助函数

_hashitem_before

函数的作用是查找给定item的前一个节点的指针,在delete 接口中调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static item** _hashitem_before (const char *key, const size_t nkey, const uint32_t hv) {
item **pos;
unsigned int oldbucket;
// 同理是确定是在old_hashtable 还是在primary_hashtable
if (expanding &&
(oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
{
pos = &old_hashtable[oldbucket];
} else {
pos = &primary_hashtable[hv & hashmask(hashpower)];
}
// 从头结点的位置开始顺序遍历单链表中的节点
while (*pos && ((nkey != (*pos)->nkey) || memcmp(key, ITEM_key(*pos), nkey))) {
pos = &(*pos)->h_next;
}
return pos;
}

assoc_expand

函数的作用是执行hash表的扩容,执行的过程是将当前primary_hashtable 指定为old_hashtable, 为primary_hashtable 分配内存,primary_hashtable的大小是old_hashtable 的两倍,将标识是否在扩展的bool型变量 expanding 设置为true。将标识扩展进度的变量expand_bucket设置为0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/* grows the hashtable to the next power of 2. */
static void assoc_expand(void) {
old_hashtable = primary_hashtable;

primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *));
if (primary_hashtable) {
if (settings.verbose > 1)
fprintf(stderr, "Hash table expansion starting\n");
hashpower++;
expanding = true;
expand_bucket = 0;
STATS_LOCK();
stats_state.hash_power_level = hashpower;
stats_state.hash_bytes += hashsize(hashpower) * sizeof(void *);
stats_state.hash_is_expanding = true;
STATS_UNLOCK();
} else {
primary_hashtable = old_hashtable;
/* Bad news, but we can keep running. */
}
}

assoc_start_expand

函数的作用判断是否进行扩展,进行扩展的临界条件是hashtable 中item 个数大于hash 桶数的1.5倍。满足此临界条件时通知扩展线程进行扩展

1
2
3
4
5
6
7
8
9
10
void assoc_start_expand(uint64_t curr_items) {
if (started_expanding)
return;

if (curr_items > (hashsize(hashpower) * 3) / 2 &&
hashpower < HASHPOWER_MAX) {
started_expanding = true;
pthread_cond_signal(&maintenance_cond);
}
}

start_assoc_maintenance_thread

函数的作用是创建hash 扩展线程,可以根据用户指定的参数设置每次扩展多少个bucket。如果不指定此参数的话,默认每次只扩展一个bucket

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int start_assoc_maintenance_thread() {
int ret;
char *env = getenv("MEMCACHED_HASH_BULK_MOVE");
if (env != NULL) {
hash_bulk_move = atoi(env);
if (hash_bulk_move == 0) {
hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
}
}
pthread_mutex_init(&maintenance_lock, NULL);
if ((ret = pthread_create(&maintenance_tid, NULL,
assoc_maintenance_thread, NULL)) != 0) {
fprintf(stderr, "Can't create thread: %s\n", strerror(ret));
return -1;
}
return 0;
}

assoc_maintenance_thread

函数的作用是执行实际的bucket 扩展。具体解释见注释

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
static void *assoc_maintenance_thread(void *arg) {

mutex_lock(&maintenance_lock);
while (do_run_maintenance_thread) {
int ii = 0;

/* There is only one expansion thread, so no need to global lock. */
// 循环每次扩展的全部bucket
for (ii = 0; ii < hash_bulk_move && expanding; ++ii) {
item *it, *next;
unsigned int bucket;
void *item_lock = NULL;

/* bucket = hv & hashmask(hashpower) =>the bucket of hash table
* is the lowest N bits of the hv, and the bucket of item_locks is
* also the lowest M bits of hv, and N is greater than M.
* So we can process expanding with only one item_lock. cool! */
/* expand_bucket需要锁保护,由于处于同一个bucket
中的特性是这些item 的hv 的低N位是完全相同,对应的
item_lock 的位置靠hv 的低M位确定,由于item_lock
数组大小小于桶数组的大小,所以有 M < N ,也就是说处
于同一个桶中的item拥有相同item_lock,所以在遍历桶中
所有的item 的时候不需要在额外获取item_lock。这里的
设计非常精妙~ */
if ((item_lock = item_trylock(expand_bucket))) {
/* 遍历bucket 中全部item,插入到
primary_hashtable 中相应bucket */
for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
next = it->h_next;
bucket = hash(ITEM_key(it), it->nkey) & hashmask(hashpower);
it->h_next = primary_hashtable[bucket];
primary_hashtable[bucket] = it;
}
// old_hashtable 中bucket 内容设置为空
old_hashtable[expand_bucket] = NULL;
// 维护当前扩展的进度
expand_bucket++;
/* 如果扩展已经全部完成则设置expanding为
false ,释放old_hashtable 的内存*/
if (expand_bucket == hashsize(hashpower - 1)) {
expanding = false;
free(old_hashtable);
STATS_LOCK();
stats_state.hash_bytes -= hashsize(hashpower - 1) * sizeof(void *);
stats_state.hash_is_expanding = false;
STATS_UNLOCK();
if (settings.verbose > 1)
fprintf(stderr, "Hash table expansion done\n");
}

} else {
usleep(10*1000);
}
// 释放资源
if (item_lock) {
item_trylock_unlock(item_lock);
item_lock = NULL;
}
}
// 如果不在进行扩展,则设置条件变量,等待被触发扩展
if (!expanding) {
/* We are done expanding.. just wait for next invocation */
started_expanding = false;
pthread_cond_wait(&maintenance_cond, &maintenance_lock);
/* assoc_expand() swaps out the hash table entirely, so we need
* all threads to not hold any references related to the hash
* table while this happens.
* This is instead of a more complex, possibly slower algorithm to
* allow dynamic hash table expansion without causing significant
* wait times.
*/
pause_threads(PAUSE_ALL_THREADS);
assoc_expand();
pause_threads(RESUME_ALL_THREADS);
}
}
return NULL;
}

线程安全

memcached 使用分段锁实现hashtable 线程安全,分段锁避免了hashtable 中全部的item公用一个锁,公用一个锁的会降低hashtable 的读写性能。下面部分代码是memcached 初始化分段锁数组的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
if (nthreads < 3) {
power = 10;
} else if (nthreads < 4) {
power = 11;
} else if (nthreads < 5) {
power = 12;
} else if (nthreads <= 10) {
power = 13;
} else if (nthreads <= 20) {
power = 14;
} else {
/* 32k buckets. just under the hashpower default. */
power = 15;
}
/* 保证分段锁的数目小于hashtable 桶的个数,这样设计的好处之一
是在扩展的时候针对一个桶中的所有item 对应的是同一个
item_lock*/
if (power >= hashpower) {
fprintf(stderr, "Hash table power size (%d) cannot be equal to or less than item lock table (%d)\n", hashpower, power);
fprintf(stderr, "Item lock table grows with `-t N` (worker threadcount)\n");
fprintf(stderr, "Hash table grows with `-o hashpower=N` \n");
exit(1);
}

item_lock_count = hashsize(power);
item_lock_hashpower = power;
// 分配分段锁数组
item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
if (! item_locks) {
perror("Can't allocate item locks");
exit(1);
}

在对hashtable 进行多线程读写时,首先需要根据hash 算法计算出hv 值,然后根据hv 获取item_lock,获取到item_lock 之后再进行读写操作。这也从侧面解释了为什么memcached在扩展时默认每次只扩展一个bucket,因为在进行扩展的时候需要占有item_lock,每次执行扩展的bucket 数多会影响读写性能。

总结

memcached 的hashtable是典型的拉链式hashtable,实现代码短小易读,使用一个线程进行hashtable的扩展以保证不会出现item增多导致哈希冲突激增降低读写性能的现象,除此之外使用分段锁来保证多线程的读写安全,相比全局锁也可以提升读写性能。memcached hashsize设置为2的整数次幂的设计非常精妙,首先这样可以将查找hash bucket索引的取余操作转化为对(hashsize-1)取按位与操作,在加上分段锁的数目大小小于hashsize 的设置可以保证一个bucket 中所有的item 对应于同一个分段锁,进而保证在扩展bucket中全部内容时只需要获取一次分段锁!

Donate
0%