作者:吕宗胜
除了LRU管理线程,我们还需要看看Memcached的LRU爬虫机制的实现:
/*** ITEM CRAWLER THREAD ***/
/*把爬虫的Item的加到了最末尾*/
static void crawler_link_q(item *it) { /* item is the new tail */
item **head, **tail;
assert(it->it_flags == 1);
assert(it->nbytes == 0);
head = &heads[it->slabs_clsid];
tail = &tails[it->slabs_clsid];
assert(*tail != 0);
assert(it != *tail);
assert((*head && *tail) || (*head == 0 && *tail == 0));
it->prev = *tail;
it->next = 0;
if (it->prev) {
assert(it->prev->next == 0);
it->prev->next = it;
}
*tail = it;
if (*head == 0) *head = it;
return;
}
/*把爬虫的Item去掉*/
static void crawler_unlink_q(item *it) {
item **head, **tail;
head = &heads[it->slabs_clsid];
tail = &tails[it->slabs_clsid];
if (*head == it) {
assert(it->prev == 0);
*head = it->next;
}
if (*tail == it) {
assert(it->next == 0);
*tail = it->prev;
}
assert(it->next != it);
assert(it->prev != it);
if (it->next) it->next->prev = it->prev;
if (it->prev) it->prev->next = it->next;
return;
}
static void *item_crawler_thread(void *arg) {
int i;
int crawls_persleep = settings.crawls_persleep;
pthread_mutex_lock(&lru_crawler_lock);
if (settings.verbose > 2)
fprintf(stderr, "Starting LRU crawler background thread\n");
while (do_run_lru_crawler_thread) {
pthread_cond_wait(&lru_crawler_cond, &lru_crawler_lock);
while (crawler_count) {
item *search = NULL;
void *hold_lock = NULL;
for (i = POWER_SMALLEST; i < LARGEST_ID; i++) {
if (crawlers[i].it_flags != 1) {
continue;
}
pthread_mutex_lock(&lru_locks[i]);
search = crawler_crawl_q((item *)&crawlers[i]);
/* 满足该条件意味着爬虫已经爬到链表头或者没有剩余数*/
if (search == NULL ||
(crawlers[i].remaining && --crawlers[i].remaining < 1)) {
if (settings.verbose > 2)
fprintf(stderr, "Nothing left to crawl for %d\n", i);
crawlers[i].it_flags = 0;
crawler_count--;
/* 移除爬虫Item */
crawler_unlink_q((item *)&crawlers[i]);
pthread_mutex_unlock(&lru_locks[i]);
pthread_mutex_lock(&lru_crawler_stats_lock);
crawlerstats[CLEAR_LRU(i)].end_time = current_time;
crawlerstats[CLEAR_LRU(i)].run_complete = true;
pthread_mutex_unlock(&lru_crawler_stats_lock);
continue;
}
uint32_t hv = hash(ITEM_key(search), search->nkey);
/* Attempt to hash item lock the "search" item. If locked, no
* other callers can incr the refcount
*/
if ((hold_lock = item_trylock(hv)) == NULL) {
pthread_mutex_unlock(&lru_locks[i]);
continue;
}
/* Now see if the item is refcount locked */
if (refcount_incr(&search->refcount) != 2) {
refcount_decr(&search->refcount);
if (hold_lock)
item_trylock_unlock(hold_lock);
pthread_mutex_unlock(&lru_locks[i]);
continue;
}
/* Frees the item or decrements the refcount. */
/* Interface for this could improve: do the free/decr here
* instead? */
pthread_mutex_lock(&lru_crawler_stats_lock);
item_crawler_evaluate(search, hv, i);
pthread_mutex_unlock(&lru_crawler_stats_lock);
if (hold_lock)
item_trylock_unlock(hold_lock);
pthread_mutex_unlock(&lru_locks[i]);
if (crawls_persleep <= 0 && settings.lru_crawler_sleep) {
usleep(settings.lru_crawler_sleep);
crawls_persleep = settings.crawls_persleep;
}
}
}
if (settings.verbose > 2)
fprintf(stderr, "LRU crawler thread sleeping\n");
STATS_LOCK();
stats.lru_crawler_running = false;
STATS_UNLOCK();
}
pthread_mutex_unlock(&lru_crawler_lock);
if (settings.verbose > 2)
fprintf(stderr, "LRU crawler thread stopping\n");
return NULL;
}
static pthread_t item_crawler_tid;
int stop_item_crawler_thread(void) {
int ret;
pthread_mutex_lock(&lru_crawler_lock);
do_run_lru_crawler_thread = 0;
pthread_cond_signal(&lru_crawler_cond);
pthread_mutex_unlock(&lru_crawler_lock);
if ((ret = pthread_join(item_crawler_tid, NULL)) != 0) {
fprintf(stderr, "Failed to stop LRU crawler thread: %s\n", strerror(ret));
return -1;
}
settings.lru_crawler = false;
return 0;
}
int start_item_crawler_thread(void) {
int ret;
if (settings.lru_crawler)
return -1;
pthread_mutex_lock(&lru_crawler_lock);
do_run_lru_crawler_thread = 1;
settings.lru_crawler = true;
if ((ret = pthread_create(&item_crawler_tid, NULL,
item_crawler_thread, NULL)) != 0) {
fprintf(stderr, "Can't create LRU crawler thread: %s\n",
strerror(ret));
pthread_mutex_unlock(&lru_crawler_lock);
return -1;
}
pthread_mutex_unlock(&lru_crawler_lock);
return 0;
}
/* 'remaining' is passed in so the LRU maintainer thread can scrub the whole
* LRU every time.
*/
static int do_lru_crawler_start(uint32_t id, uint32_t remaining) {
int i;
uint32_t sid;
uint32_t tocrawl[3];
int starts = 0;
tocrawl[0] = id | HOT_LRU;
tocrawl[1] = id | WARM_LRU;
tocrawl[2] = id | COLD_LRU;
for (i = 0; i < 3; i++) {
sid = tocrawl[i];
pthread_mutex_lock(&lru_locks[sid]);
if (tails[sid] != NULL) {
if (settings.verbose > 2)
fprintf(stderr, "Kicking LRU crawler off for LRU %d\n", sid);
crawlers[sid].nbytes = 0;
crawlers[sid].nkey = 0;
crawlers[sid].it_flags = 1; /* For a crawler, this means enabled. */
crawlers[sid].next = 0;
crawlers[sid].prev = 0;
crawlers[sid].time = 0;
crawlers[sid].remaining = remaining;
crawlers[sid].slabs_clsid = sid;
crawler_link_q((item *)&crawlers[sid]);
crawler_count++;
starts++;
}
pthread_mutex_unlock(&lru_locks[sid]);
}
if (starts) {
STATS_LOCK();
stats.lru_crawler_running = true;
stats.lru_crawler_starts++;
STATS_UNLOCK();
pthread_mutex_lock(&lru_crawler_stats_lock);
memset(&crawlerstats[id], 0, sizeof(crawlerstats_t));
crawlerstats[id].start_time = current_time;
pthread_mutex_unlock(&lru_crawler_stats_lock);
}
return starts;
}
static int lru_crawler_start(uint32_t id, uint32_t remaining) {
int starts;
if (pthread_mutex_trylock(&lru_crawler_lock) != 0) {
return 0;
}
starts = do_lru_crawler_start(id, remaining);
if (starts) {
pthread_cond_signal(&lru_crawler_cond);
}
pthread_mutex_unlock(&lru_crawler_lock);
return starts;
}
/* FIXME: Split this into two functions: one to kick a crawler for a sid, and one to
* parse the string. LRU maintainer code is generating a string to set up a
* sid.
* Also only clear the crawlerstats once per sid.
*/
enum crawler_result_type lru_crawler_crawl(char *slabs) {
char *b = NULL;
uint32_t sid = 0;
int starts = 0;
uint8_t tocrawl[MAX_NUMBER_OF_SLAB_CLASSES];
if (pthread_mutex_trylock(&lru_crawler_lock) != 0) {
return CRAWLER_RUNNING;
}
/* FIXME: I added this while debugging. Don't think it's needed? */
memset(tocrawl, 0, sizeof(uint8_t) * MAX_NUMBER_OF_SLAB_CLASSES);
if (strcmp(slabs, "all") == 0) {
for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
tocrawl[sid] = 1;
}
} else {
for (char *p = strtok_r(slabs, ",", &b);
p != NULL;
p = strtok_r(NULL, ",", &b)) {
if (!safe_strtoul(p, &sid) || sid < POWER_SMALLEST
|| sid >= MAX_NUMBER_OF_SLAB_CLASSES-1) {
pthread_mutex_unlock(&lru_crawler_lock);
return CRAWLER_BADCLASS;
}
tocrawl[sid] = 1;
}
}
for (sid = POWER_SMALLEST; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
if (tocrawl[sid])
starts += do_lru_crawler_start(sid, settings.lru_crawler_tocrawl);
}
if (starts) {
pthread_cond_signal(&lru_crawler_cond);
pthread_mutex_unlock(&lru_crawler_lock);
return CRAWLER_OK;
} else {
pthread_mutex_unlock(&lru_crawler_lock);
return CRAWLER_NOTSTARTED;
}
}
/* If we hold this lock, crawler can't wake up or move */
void lru_crawler_pause(void) {
pthread_mutex_lock(&lru_crawler_lock);
}
void lru_crawler_resume(void) {
pthread_mutex_unlock(&lru_crawler_lock);
}
int init_lru_crawler(void) {
if (lru_crawler_initialized == 0) {
memset(&crawlerstats, 0, sizeof(crawlerstats_t) * MAX_NUMBER_OF_SLAB_CLASSES);
if (pthread_cond_init(&lru_crawler_cond, NULL) != 0) {
fprintf(stderr, "Can't initialize lru crawler condition\n");
return -1;
}
pthread_mutex_init(&lru_crawler_lock, NULL);
lru_crawler_initialized = 1;
}
return 0;
}
本文来自网易实践者社区,经作者吕宗胜授权发布