作者:吕宗胜
下面我们来根据Memcached的关键源码来看看其LRU策略。
item *do_item_alloc(char *key, const size_t nkey, const int flags,
const rel_time_t exptime, const int nbytes,
const uint32_t cur_hv) {
int i;
uint8_t nsuffix;
item *it = NULL;
char suffix[40];
unsigned int total_chunks;
/*计算需要分配的空间*/
/*nkey+1,这里的1主要是Key的分隔符*/
size_t ntotal = item_make_header(nkey + 1, flags, nbytes, suffix, &nsuffix);
if (settings.use_cas) {
ntotal += sizeof(uint64_t);
}
/*该方法返回要保存该数据的SlabClass的ID*/
unsigned int id = slabs_clsid(ntotal);
if (id == 0)
return 0;
/* 在进行item内存获取的时候,Memcached会进行LRU的回收。 */
for (i = 0; i < 5; i++) {
/* 先试着回收内存 */
/* lru_maintainer_thread,该值由LUR管理控制,在默认分配的时候,该值是false */
if (!settings.lru_maintainer_thread) {
lru_pull_tail(id, COLD_LRU, 0, false, cur_hv);
}
it = slabs_alloc(ntotal, id, &total_chunks);
if (settings.expirezero_does_not_evict)
total_chunks -= noexp_lru_size(id);
if (it == NULL) {
if (settings.lru_maintainer_thread) {
lru_pull_tail(id, HOT_LRU, total_chunks, false, cur_hv);
lru_pull_tail(id, WARM_LRU, total_chunks, false, cur_hv);
lru_pull_tail(id, COLD_LRU, total_chunks, true, cur_hv);
} else {
lru_pull_tail(id, COLD_LRU, 0, true, cur_hv);
}
} else {
break;
}
}
if (i > 0) {
pthread_mutex_lock(&lru_locks[id]);
/* 直接回收 */
itemstats[id].direct_reclaims += i;
pthread_mutex_unlock(&lru_locks[id]);
}
if (it == NULL) {
pthread_mutex_lock(&lru_locks[id]);
itemstats[id].outofmemory++;
pthread_mutex_unlock(&lru_locks[id]);
return NULL;
}
assert(it->slabs_clsid == 0);
//assert(it != heads[id]);
/* Refcount is seeded to 1 by slabs_alloc() */
it->next = it->prev = it->h_next = 0;
/* Items are initially loaded into the HOT_LRU. This is '0' but I want at
* least a note here. Compiler (hopefully?) optimizes this out.
*/
if (settings.lru_maintainer_thread) {
//Item默认进入的LRU链表的规则
if (exptime == 0 && settings.expirezero_does_not_evict) {
id |= NOEXP_LRU;
} else {
id |= HOT_LRU;
}
} else {
/* There is only COLD in compat-mode */
id |= COLD_LRU;
}
it->slabs_clsid = id;
DEBUG_REFCNT(it, '*');
it->it_flags = settings.use_cas ? ITEM_CAS : 0;
it->nkey = nkey;
it->nbytes = nbytes;
memcpy(ITEM_key(it), key, nkey);
it->exptime = exptime;
memcpy(ITEM_suffix(it), suffix, (size_t)nsuffix);
it->nsuffix = nsuffix;
return it;
}
下面的代码是LRU链表中对各个ITEM的操作。
/* item的回收,把item加入到slots空闲链表 */
void item_free(item *it) {
size_t ntotal = ITEM_ntotal(it);
unsigned int clsid;
assert((it->it_flags & ITEM_LINKED) == 0);
assert(it != heads[it->slabs_clsid]);
assert(it != tails[it->slabs_clsid]);
assert(it->refcount == 0);
clsid = ITEM_clsid(it);
DEBUG_REFCNT(it, 'F');
slabs_free(it, ntotal, clsid);
}
/*把Item放到LRU的头部*/
static void do_item_link_q(item *it) { /* item is the new head */
item **head, **tail;
assert((it->it_flags & ITEM_SLABBED) == 0);
head = &heads[it->slabs_clsid];
tail = &tails[it->slabs_clsid];
assert(it != *head);
assert((*head && *tail) || (*head == 0 && *tail == 0));
it->prev = 0;
it->next = *head;
if (it->next) it->next->prev = it;
*head = it;
if (*tail == 0) *tail = it;
sizes[it->slabs_clsid]++;
return;
}
/* 加锁的do_item_link_q*/
static void item_link_q(item *it) {
pthread_mutex_lock(&lru_locks[it->slabs_clsid]);
do_item_link_q(it);
pthread_mutex_unlock(&lru_locks[it->slabs_clsid]);
}
/*把Item从LRU队列中去掉*/
static void do_item_unlink_q(item *it) {
item **head, **tail;
head = &heads[it->slabs_clsid];
tail = &tails[it->slabs_clsid];
if (*head == it) {
assert(it->prev == 0);
*head = it->next;
}
if (*tail == it) {
assert(it->next == 0);
*tail = it->prev;
}
assert(it->next != it);
assert(it->prev != it);
if (it->next) it->next->prev = it->prev;
if (it->prev) it->prev->next = it->next;
sizes[it->slabs_clsid]--;
return;
}
/* 加锁的do_item_unlink_q */
static void item_unlink_q(item *it) {
pthread_mutex_lock(&lru_locks[it->slabs_clsid]);
do_item_unlink_q(it);
pthread_mutex_unlock(&lru_locks[it->slabs_clsid]);
}
/* 这个方法非常重要,是memcached中加入item的方法 */
int do_item_link(item *it, const uint32_t hv) {
MEMCACHED_ITEM_LINK(ITEM_key(it), it->nkey, it->nbytes);
assert((it->it_flags & (ITEM_LINKED|ITEM_SLABBED)) == 0);
it->it_flags |= ITEM_LINKED;
it->time = current_time;
STATS_LOCK();
stats.curr_bytes += ITEM_ntotal(it);
stats.curr_items += 1;
stats.total_items += 1;
STATS_UNLOCK();
/* Allocate a new CAS ID on link. */
ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
assoc_insert(it, hv);
item_link_q(it);
refcount_incr(&it->refcount);
return 1;
}
/* 这个方法非常重要,是memcached中剔除item的方法 */
void do_item_unlink(item *it, const uint32_t hv) {
MEMCACHED_ITEM_UNLINK(ITEM_key(it), it->nkey, it->nbytes);
if ((it->it_flags & ITEM_LINKED) != 0) {
it->it_flags &= ~ITEM_LINKED;
STATS_LOCK();
stats.curr_bytes -= ITEM_ntotal(it);
stats.curr_items -= 1;
STATS_UNLOCK();
assoc_delete(ITEM_key(it), it->nkey, hv);
item_unlink_q(it);
do_item_remove(it);
}
}
/* FIXME: Is it necessary to keep this copy/pasted code? */
void do_item_unlink_nolock(item *it, const uint32_t hv) {
MEMCACHED_ITEM_UNLINK(ITEM_key(it), it->nkey, it->nbytes);
if ((it->it_flags & ITEM_LINKED) != 0) {
it->it_flags &= ~ITEM_LINKED;
STATS_LOCK();
stats.curr_bytes -= ITEM_ntotal(it);
stats.curr_items -= 1;
STATS_UNLOCK();
assoc_delete(ITEM_key(it), it->nkey, hv);
do_item_unlink_q(it);
do_item_remove(it);
}
}
/* 回收Item,只有item->refcount==1的对象才可以被回收*/
void do_item_remove(item *it) {
MEMCACHED_ITEM_REMOVE(ITEM_key(it), it->nkey, it->nbytes);
assert((it->it_flags & ITEM_SLABBED) == 0);
assert(it->refcount > 0);
if (refcount_decr(&it->refcount) == 0) {
item_free(it);
}
}
/*刷新了item的时间,没有加锁*/
void do_item_update_nolock(item *it) {
MEMCACHED_ITEM_UPDATE(ITEM_key(it), it->nkey, it->nbytes);
if (it->time < current_time - ITEM_UPDATE_INTERVAL) {
assert((it->it_flags & ITEM_SLABBED) == 0);
if ((it->it_flags & ITEM_LINKED) != 0) {
do_item_unlink_q(it);
it->time = current_time;
do_item_link_q(it);
}
}
}
/* 加锁的do_item_update */
void do_item_update(item *it) {
MEMCACHED_ITEM_UPDATE(ITEM_key(it), it->nkey, it->nbytes);
if (it->time < current_time - ITEM_UPDATE_INTERVAL) {
assert((it->it_flags & ITEM_SLABBED) == 0);
if ((it->it_flags & ITEM_LINKED) != 0) {
it->time = current_time;
if (!settings.lru_maintainer_thread) {
item_unlink_q(it);
item_link_q(it);
}
}
}
}
int do_item_replace(item *it, item *new_it, const uint32_t hv) {
MEMCACHED_ITEM_REPLACE(ITEM_key(it), it->nkey, it->nbytes,
ITEM_key(new_it), new_it->nkey, new_it->nbytes);
assert((it->it_flags & ITEM_SLABBED) == 0);
do_item_unlink(it, hv);
return do_item_link(new_it, hv);
}
下面是LRU管理线程的工作,主要在各个LRU之间进行数据的移动和清理。
/* Will crawl all slab classes a minimum of once per hour */
#define MAX_MAINTCRAWL_WAIT 60 * 60
/* 这个方法定义了LRU爬虫的触发机制 */
static void lru_maintainer_crawler_check(void) {
int i;
static rel_time_t last_crawls[MAX_NUMBER_OF_SLAB_CLASSES];
static rel_time_t next_crawl_wait[MAX_NUMBER_OF_SLAB_CLASSES];
for (i = POWER_SMALLEST; i < MAX_NUMBER_OF_SLAB_CLASSES; i++) {
crawlerstats_t *s = &crawlerstats[i];
/* We've not successfully kicked off a crawl yet. */
if (last_crawls[i] == 0) {
/* 这一步往LRU尾部加入爬虫的Item,并设置可以进行爬虫的执行 */
if (lru_crawler_start(i, 0) > 0) {
last_crawls[i] = current_time;
}
}
pthread_mutex_lock(&lru_crawler_stats_lock);
/* 爬虫执行完成 */
if (s->run_complete) {
int x;
/* Should we crawl again? */
uint64_t possible_reclaims = s->seen - s->noexp;
uint64_t available_reclaims = 0;
/* Need to think we can free at least 1% of the items before
* crawling. */
/* FIXME: Configurable? */
uint64_t low_watermark = (s->seen / 100) + 1;
rel_time_t since_run = current_time - s->end_time;
/* Don't bother if the payoff is too low. */
if (settings.verbose > 1)
fprintf(stderr, "maint crawler: low_watermark: %llu, possible_reclaims: %llu, since_run: %u\n",
(unsigned long long)low_watermark, (unsigned long long)possible_reclaims,
(unsigned int)since_run);
for (x = 0; x < 60; x++) {
if (since_run < (x * 60) + 60)
break;
available_reclaims += s->histo[x];
}
if (available_reclaims > low_watermark) {
last_crawls[i] = 0;
if (next_crawl_wait[i] > 60)
next_crawl_wait[i] -= 60;
} else if (since_run > 5 && since_run > next_crawl_wait[i]) {
last_crawls[i] = 0;
if (next_crawl_wait[i] < MAX_MAINTCRAWL_WAIT)
next_crawl_wait[i] += 60;
}
if (settings.verbose > 1)
fprintf(stderr, "maint crawler: available reclaims: %llu, next_crawl: %u\n", (unsigned long long)available_reclaims, next_crawl_wait[i]);
}
pthread_mutex_unlock(&lru_crawler_stats_lock);
}
}
static pthread_t lru_maintainer_tid;
#define MAX_LRU_MAINTAINER_SLEEP 1000000
#define MIN_LRU_MAINTAINER_SLEEP 1000
static void *lru_maintainer_thread(void *arg) {
int i;
useconds_t to_sleep = MIN_LRU_MAINTAINER_SLEEP;
rel_time_t last_crawler_check = 0;
pthread_mutex_lock(&lru_maintainer_lock);
if (settings.verbose > 2)
fprintf(stderr, "Starting LRU maintainer background thread\n");
while (do_run_lru_maintainer_thread) {
int did_moves = 0;
pthread_mutex_unlock(&lru_maintainer_lock);
usleep(to_sleep);
pthread_mutex_lock(&lru_maintainer_lock);
STATS_LOCK();
stats.lru_maintainer_juggles++;
STATS_UNLOCK();
/* We were asked to immediately wake up and poke a particular slab
* class due to a low watermark being hit */
if (lru_maintainer_check_clsid != 0) {
did_moves = lru_maintainer_juggle(lru_maintainer_check_clsid);
lru_maintainer_check_clsid = 0;
} else {
for (i = POWER_SMALLEST; i < MAX_NUMBER_OF_SLAB_CLASSES; i++) {
did_moves += lru_maintainer_juggle(i);
}
}
if (did_moves == 0) {
if (to_sleep < MAX_LRU_MAINTAINER_SLEEP)
to_sleep += 1000;
} else {
to_sleep /= 2;
if (to_sleep < MIN_LRU_MAINTAINER_SLEEP)
to_sleep = MIN_LRU_MAINTAINER_SLEEP;
}
/* Once per second at most */
if (settings.lru_crawler && last_crawler_check != current_time) {
/* 爬虫线程的检查,满足条件会促发爬虫线程的执行 */
lru_maintainer_crawler_check();
last_crawler_check = current_time;
}
}
pthread_mutex_unlock(&lru_maintainer_lock);
if (settings.verbose > 2)
fprintf(stderr, "LRU maintainer thread stopping\n");
return NULL;
}
int stop_lru_maintainer_thread(void) {
int ret;
pthread_mutex_lock(&lru_maintainer_lock);
/* LRU thread is a sleep loop, will die on its own */
do_run_lru_maintainer_thread = 0;
pthread_mutex_unlock(&lru_maintainer_lock);
if ((ret = pthread_join(lru_maintainer_tid, NULL)) != 0) {
fprintf(stderr, "Failed to stop LRU maintainer thread: %s\n", strerror(ret));
return -1;
}
/*lru_maintainer_thread控制了lru_pull_tail的策略*/
settings.lru_maintainer_thread = false;
return 0;
}
int start_lru_maintainer_thread(void) {
int ret;
pthread_mutex_lock(&lru_maintainer_lock);
do_run_lru_maintainer_thread = 1;
settings.lru_maintainer_thread = true;
if ((ret = pthread_create(&lru_maintainer_tid, NULL,
lru_maintainer_thread, NULL)) != 0) {
fprintf(stderr, "Can't create LRU maintainer thread: %s\n",
strerror(ret));
pthread_mutex_unlock(&lru_maintainer_lock);
return -1;
}
pthread_mutex_unlock(&lru_maintainer_lock);
return 0;
}
/* If we hold this lock, crawler can't wake up or move */
void lru_maintainer_pause(void) {
pthread_mutex_lock(&lru_maintainer_lock);
}
void lru_maintainer_resume(void) {
pthread_mutex_unlock(&lru_maintainer_lock);
}
int init_lru_maintainer(void) {
if (lru_maintainer_initialized == 0) {
pthread_mutex_init(&lru_maintainer_lock, NULL);
lru_maintainer_initialized = 1;
}
return 0;
}
本文来自网易实践者社区,经作者吕宗胜授权发布