经过上面各种各样的初始化之后,终于我们可以进入真正的业务流程了。
//你没有看错,又是死循环^_^。
for (;;) {
int interval = max_interval;
//线程已经启动,再更新node信息就需要加锁了
pthread_mutex_lock(&node_info_mutex);
int nodes = update_nodes();
pthread_mutex_unlock(&node_info_mutex);
if (nodes > 1) {
//更新process信息
update_processes();
//numad真正的核心操作,在这里会根据当前状态更新进程的内存和cpu分布。
interval = manage_loads();
if (interval < max_interval) {
//如果执行了一些内容更新,再更新一下node信息。
nodes = update_nodes();
}
}
//等待
sleep(interval);
//判断是否收到进程信号
if (got_sigterm | got_sigquit) {
//清理内核消息队列,删除pid文件,退出程序
shut_down_numad();
}
if (got_sighup) {
got_sighup = 0;
close_log_file();
open_log_file();
}
}
下面我们就来介绍一下numad中最关键的几个处理逻辑update_nodes,update_processes,manage_loads
以及-w参数需要的逻辑pick_numa_nodes
update_nodes
这个函数用于更新物理节点上的node信息。需要初始化一个包含以下信息的node数组:
typedef struct node_data {
uint64_t node_id; //node id
uint64_t MBs_total; //node内存总量
uint64_t MBs_free; //node剩余内存总量
uint64_t CPUs_total; //node全部可用CPU的总计算能力
uint64_t CPUs_free; //node可用CPU的剩余计算能力
uint64_t magnitude; // hack: MBs * CPUs
uint8_t *distance; //当前node与其他node的距离
id_list_p cpu_list_p; //当前node的所有可用CPU列表
} node_data_t, *node_data_p;
node_data_p node = NULL;
int min_node_CPUs_free_ix = -1; //剩余CPU最少的node id
int min_node_MBs_free_ix = -1; //剩余内存最少的node id
long min_node_CPUs_free = MAXINT; //node上剩余最少的CPU量
long min_node_MBs_free = MAXINT; //node上剩余最少的内存量
long max_node_CPUs_free = 0; //剩余CPU最多的node id
long max_node_MBs_free = 0; //剩余内存最多的node id
long avg_node_CPUs_free = 0; //平均每个node剩余的CPU
long avg_node_MBs_free = 0; //平均每个node剩余的内存
double stddev_node_CPUs_free = 0.0; //每个node剩余CPU的方差
double stddev_node_MBs_free = 0.0; //每个node剩余内存的方差
完成以上信息的初始化之后,我们就拿到了一个全局的资源列表。在这个列表中,可以看到每个numa node的资源使用情况,和numa nodes之间的资源使用离散情况。下面来看看numad中是怎样一步一步的完成这个列表的初始化的。
int update_nodes() {
char fname[FNAME_SIZE];
char buf[BIG_BUF_SIZE];
//获取当前时间戳,以1%s为单位(我也不知道为什么搞这么奇葩的一个单位。。。)
uint64_t time_stamp = get_time_stamp();
#define STATIC_NODE_INFO_DELAY (600 * ONE_HUNDRED)
//node_info_time_stamp初始化为0,在第一次启动的时候会强制刷新nodeinfo
if ((num_nodes == 0) || (node_info_time_stamp + STATIC_NODE_INFO_DELAY < time_stamp)) {
//更新node_info刷新时间戳
node_info_time_stamp = time_stamp;
//从/sys/devices/system/node/目录下获取node数量
struct dirent **namelist;
int num_files = scandir ("/sys/devices/system/node", &namelist, node_and_digits, NULL);
if (num_files < 1) {
numad_log(LOG_CRIT, "Could not get NUMA node info\n");
exit(EXIT_FAILURE);
}
//在运行过程中数量一般不会变。第一次运行时num_nodes为0,需要进入初始化。
int need_to_realloc = (num_files != num_nodes);
if (need_to_realloc) {
for (int ix = num_files; (ix < num_nodes); ix++) {
//如果node数减少了,需要释放对应的统计信息
free(node[ix].distance);
FREE_LIST(node[ix].cpu_list_p);
}
//重新申请存储node信息的内存空间
node = realloc(node, (num_files * sizeof(node_data_t)));
if (node == NULL) {
numad_log(LOG_CRIT, "node realloc failed\n");
exit(EXIT_FAILURE);
}
//初始化新的node结构
for (int ix = num_nodes; (ix < num_files); ix++) {
node[ix].distance = NULL;
node[ix].cpu_list_p = NULL;
}
//更新node数量
num_nodes = num_files;
}
sum_CPUs_total = 0;
//初始化下面两个list
CLEAR_CPU_LIST(all_cpus_list_p);
CLEAR_NODE_LIST(all_nodes_list_p);
//获取每个cpu core的超线程数量(一般为2)
threads_per_core = count_set_bits_in_hex_list_file("/sys/devices/system/cpu/cpu0/topology/thread_siblings");
if (threads_per_core < 1) {
numad_log(LOG_CRIT, "Could not count threads per core\n");
exit(EXIT_FAILURE);
}
for (int node_ix = 0; (node_ix < num_nodes); node_ix++) {
int node_id;
//获取node id
char *p = &namelist[node_ix]->d_name[4];
CONVERT_DIGITS_TO_NUM(p, node_id);
free(namelist[node_ix]);
//在全局变量node中添加当前node的信息
node[node_ix].node_id = node_id;
//将node id添加到all_nodes_list_p
ADD_ID_TO_LIST(node_id, all_nodes_list_p);
//获取当前node的cpulist
snprintf(fname, FNAME_SIZE, "/sys/devices/system/node/node%d/cpulist", node_id);
int fd = open(fname, O_RDONLY, 0);
if ((fd >= 0) && (read(fd, buf, BIG_BUF_SIZE) > 0)) {
buf[BIG_BUF_SIZE - 1] = '\0';
//初始化node中的cpu list
CLEAR_CPU_LIST(node[node_ix].cpu_list_p);
int n = add_ids_to_list_from_str(node[node_ix].cpu_list_p, buf);
//如果入参中指定了reserve cpu,需要将这部分CPU在cpu_list_p中标记出来
if (reserved_cpu_str != NULL) {
//reserved_cpu_mask_list_p这个列表在初始化的时候已经处理过了,其中包含的是未预留部分的CPU mask
//两个列表and处理之后是所有的预留cpu mask
AND_LISTS(node[node_ix].cpu_list_p, node[node_ix].cpu_list_p, reserved_cpu_mask_list_p);
//除去预留之外的CPU数量
n = NUM_IDS_IN_LIST(node[node_ix].cpu_list_p);
}
//上面初始化的all_cpus_list_p为空,在此初始化为除去预留部分之后的CPU mask
OR_LISTS(all_cpus_list_p, all_cpus_list_p, node[node_ix].cpu_list_p);
//计算CPU数量,并保存到全局变量node数组中。计算规则如下:
//如果没有打开超线程或者超线程计算能力设置为物理核至少100%,则直接计算为100*n(n为可用的CPU数量)
//如果打开了超线程,总CPU=物理核数*100+超线程数*超线程计算能力(默认为20%)
if ((threads_per_core == 1) || (htt_percent >= 100)) {
node[node_ix].CPUs_total = n * ONE_HUNDRED;
} else {
n /= threads_per_core;
node[node_ix].CPUs_total = n * ONE_HUNDRED;
node[node_ix].CPUs_total += n * (threads_per_core - 1) * htt_percent;
}
//统计当前节点的总CPU计算能力
sum_CPUs_total += node[node_ix].CPUs_total;
close(fd);
} else {
numad_log(LOG_CRIT, "Could not get node cpu list\n");
exit(EXIT_FAILURE);
}
//如果需要刷新,则重新获取numa node之间的distance,并保存到node组中。
if (need_to_realloc) {
node[node_ix].distance = realloc(node[node_ix].distance, (num_nodes * sizeof(uint8_t)));
if (node[node_ix].distance == NULL) {
numad_log(LOG_CRIT, "node distance realloc failed\n");
exit(EXIT_FAILURE);
}
}
snprintf(fname, FNAME_SIZE, "/sys/devices/system/node/node%d/distance", node_id);
fd = open(fname, O_RDONLY, 0);
if ((fd >= 0) && (read(fd, buf, BIG_BUF_SIZE) > 0)) {
int rnode = 0;
for (char *p = buf; (*p != '\n'); ) {
int lat;
CONVERT_DIGITS_TO_NUM(p, lat);
node[node_ix].distance[rnode++] = lat;
while (*p == ' ') { p++; }
}
close(fd);
} else {
numad_log(LOG_CRIT, "Could not get node distance data\n");
exit(EXIT_FAILURE);
}
}
//namelist中的信息已经保存到node中,包括:
//node id
//每个node的可用cpu列表
//每个node的cpu计算能力
//每个node与其他node的距离
free(namelist);
}
//更新每个node的内存和CPU计算能力信息
//前面初始化了两个全局变量用于保存CPU数据的信息
//typedef struct cpu_data {
//uint64_t time_stamp;
//uint64_t *idle;
//} cpu_data_t, *cpu_data_p;
//cpu_data_t cpu_data_buf[2];仅保留两次,用于计算差值
//int cur_cpu_data_buf = 0;
//确保两次之间至少相差0.07s(原谅我还是忍不住吐槽一下这个单位设置。。。)
while (cpu_data_buf[cur_cpu_data_buf].time_stamp + 7 >= time_stamp) {
//如果两次数据之间相差不到0.07s,则sleep 0.1s
struct timespec ts = { 0, 100000000 };
nanosleep(&ts, &ts);
time_stamp = get_time_stamp();
}
//更新CPU data,原理是从/proc/stat文件中读取每个CPU的idle数据,并保存在cpu_data_buf中。
update_cpu_data();
//初始化下面的几个全局变量
//node上剩余的内存总量
max_node_MBs_free = 0;
//node上剩余的cpu计算能力
max_node_CPUs_free = 0;
min_node_MBs_free = MAXINT;
min_node_CPUs_free = MAXINT;
uint64_t sum_of_node_MBs_free = 0;
uint64_t sum_of_node_CPUs_free = 0;
for (int node_ix = 0; (node_ix < num_nodes); node_ix++) {
int node_id = node[node_ix].node_id;
//获取当前node上的内存信息
snprintf(fname, FNAME_SIZE, "/sys/devices/system/node/node%d/meminfo", node_id);
int fd = open(fname, O_RDONLY, 0);
if ((fd >= 0) && (read(fd, buf, BIG_BUF_SIZE) > 0)) {
close(fd);
uint64_t KB;
buf[BIG_BUF_SIZE - 1] = '\0';
char *p = strstr(buf, "MemTotal:");
if (p != NULL) {
p += 9;
} else {
numad_log(LOG_CRIT, "Could not get node MemTotal\n");
exit(EXIT_FAILURE);
}
while (!isdigit(*p)) { p++; }
CONVERT_DIGITS_TO_NUM(p, KB);
//获取内存总量,以MB为单位,保存在全局变量node中
node[node_ix].MBs_total = (KB / KILOBYTE);
//如果一个node没有内存,则将该node从all_nodes_list_p中移除
if (node[node_ix].MBs_total < 1) {
CLR_ID_IN_LIST(node_id, all_nodes_list_p);
}
p = strstr(p, "MemFree:");
if (p != NULL) {
p += 8;
} else {
numad_log(LOG_CRIT, "Could not get node MemFree\n");
exit(EXIT_FAILURE);
}
while (!isdigit(*p)) { p++; }
CONVERT_DIGITS_TO_NUM(p, KB);
//获取剩余内存,以MB为单位,保存在全局变量node中
node[node_ix].MBs_free = (KB / KILOBYTE);
//如果指定了使用cache作为可用内存的参数,将这部分内存加入可用内存中
if (use_inactive_file_cache) {
// Add inactive file cache quantity to "free" memory
p = strstr(p, "Inactive(file):");
if (p != NULL) {
p += 15;
} else {
numad_log(LOG_CRIT, "Could not get node Inactive(file)\n");
exit(EXIT_FAILURE);
}
while (!isdigit(*p)) { p++; }
CONVERT_DIGITS_TO_NUM(p, KB);
node[node_ix].MBs_free += (KB / KILOBYTE);
}
//计算总的内存剩余数量
sum_of_node_MBs_free += node[node_ix].MBs_free;
//记录最小及最大剩余内存的nodeid,及对应的剩余内存量。
if (min_node_MBs_free > node[node_ix].MBs_free) {
min_node_MBs_free = node[node_ix].MBs_free;
min_node_MBs_free_ix = node[node_ix].node_id;
}
if (max_node_MBs_free < node[node_ix].MBs_free) {
max_node_MBs_free = node[node_ix].MBs_free;
}
} else {
numad_log(LOG_CRIT, "Could not get node meminfo\n");
exit(EXIT_FAILURE);
}
//如果cpu_data_buf已经保存了两份数据,则可以计算当前node在两个时间点之间的CPU剩余计算能力
int old_cpu_data_buf = 1 - cur_cpu_data_buf;
if (cpu_data_buf[old_cpu_data_buf].time_stamp > 0) {
uint64_t idle_ticks = 0;
int cpu = 0;
//当前节点可用的cpu数量
int num_lcpus = NUM_IDS_IN_LIST(node[node_ix].cpu_list_p);
int num_cpus_to_process = num_lcpus;
//计算可用cpu的idle ticks数量之和idle_ticks
while (num_cpus_to_process) {
if (ID_IS_IN_LIST(cpu, node[node_ix].cpu_list_p)) {
idle_ticks += cpu_data_buf[cur_cpu_data_buf].idle[cpu]
- cpu_data_buf[old_cpu_data_buf].idle[cpu];
num_cpus_to_process -= 1;
}
cpu += 1;
}
uint64_t time_diff = cpu_data_buf[cur_cpu_data_buf].time_stamp
- cpu_data_buf[old_cpu_data_buf].time_stamp;
//计算当前node的平均剩余计算能力
node[node_ix].CPUs_free = (idle_ticks * ONE_HUNDRED) / time_diff;
//如果打开了超线程,并且设置的超线程计算能力不足一个物理核,则需要将多计算出的部分减去
if ((threads_per_core > 1) && (htt_percent < 100)) {
uint64_t htt_discount = (num_lcpus - (num_lcpus / threads_per_core)) * (100 - htt_percent);
if (node[node_ix].CPUs_free > htt_discount) {
node[node_ix].CPUs_free -= htt_discount;
} else {
node[node_ix].CPUs_free = 0;
}
}
if (node[node_ix].CPUs_free > node[node_ix].CPUs_total) {
node[node_ix].CPUs_free = node[node_ix].CPUs_total;
}
//统计总计剩余的CPU计算能力
sum_of_node_CPUs_free += node[node_ix].CPUs_free;
//记录最小及最大剩余CPU计算能力的nodeid,及对应的剩余CPU计算能力值。
if (min_node_CPUs_free > node[node_ix].CPUs_free) {
min_node_CPUs_free = node[node_ix].CPUs_free;
min_node_CPUs_free_ix = node[node_ix].node_id;
}
if (max_node_CPUs_free < node[node_ix].CPUs_free) {
max_node_CPUs_free = node[node_ix].CPUs_free;
}
//保存mem free * cpu free的数据
node[node_ix].magnitude = node[node_ix].CPUs_free * node[node_ix].MBs_free;
} else {
node[node_ix].CPUs_free = 0;
node[node_ix].magnitude = 0;
}
}
//记录平均每个node的内存及CPU剩余数量
avg_node_MBs_free = sum_of_node_MBs_free / num_nodes;
avg_node_CPUs_free = sum_of_node_CPUs_free / num_nodes;
//记录所有node剩余内存和剩余CPU的方差和
double MBs_variance_sum = 0.0;
double CPUs_variance_sum = 0.0;
for (int node_ix = 0; (node_ix < num_nodes); node_ix++) {
double MBs_diff = (double)node[node_ix].MBs_free - (double)avg_node_MBs_free;
double CPUs_diff = (double)node[node_ix].CPUs_free - (double)avg_node_CPUs_free;
MBs_variance_sum += MBs_diff * MBs_diff;
CPUs_variance_sum += CPUs_diff * CPUs_diff;
}
//记录所有node剩余内存和剩余CPU的方差,用于描述每个node资源余量的离散程度
double MBs_variance = MBs_variance_sum / (num_nodes);
double CPUs_variance = CPUs_variance_sum / (num_nodes);
stddev_node_MBs_free = sqrt(MBs_variance);
stddev_node_CPUs_free = sqrt(CPUs_variance);
//如果log level为INFO或以上,输出整理的node信息。
if (log_level >= LOG_INFO) {
show_nodes();
}
return num_nodes;
}
update_processes
函数会根据传入的参数整理出来一个当前进程信息列表放入process_hash_table
,信息表格式如下:
typedef struct process_data {
int pid; //进程pid
unsigned int flags; //标记是否跨node分配了内存
uint64_t data_time_stamp; //进程信息更新时间戳
uint64_t bind_time_stamp;
uint64_t num_threads; //进程包含的线程数
uint64_t MBs_size; //进程占用的内存vsize
uint64_t MBs_used; //进程实际使用的内存rss
uint64_t cpu_util; //进程使用的CPU时间
uint64_t CPUs_used; //
uint64_t CPUs_used_ring_buf[RING_BUF_SIZE];
int ring_buf_ix;
char *comm; //进程的命令行
id_list_p node_list_p; //进程使用node列表
uint64_t *process_MBs; //进程在每个node上的内存
} process_data_t, *process_data_p;
完成这个列表的初始化之后,我们就获取了当前物理节点上我们需要关注的所有进程的CPU和内存信息。下面来看看numad中是如何一步一步获取这些信息的。
int update_processes() {
//获取本次更新时间戳
uint64_t this_update_time = get_time_stamp();
int new_candidates = 0;
int files = 0;
//如果传入的参数指定扫描所有进程,则在/proc目录下获取所有的进程号,并读取这些进程目录下的stat文件,更新process_hash_table。
if (scan_all_processes) {
struct dirent **namelist;
//记录纳入numad管理的进程数量
files = scandir("/proc", &namelist, name_starts_with_digit, NULL);
if (files < 0) {
numad_log(LOG_CRIT, "Could not open /proc\n");
exit(EXIT_FAILURE);
}
for (int ix = 0; (ix < files); ix++) {
process_data_p data_p;
//用进程的stat文件内容初始化一个process_data_p结构
if ((data_p = get_stat_data_for_pid(-1, namelist[ix]->d_name)) != NULL) {
//numad限制至少要有300MB内存才能纳入管理(可以通过参数改变)
//每次增加到hash表中的内容不能超过当前hash表大小的三分之一(优化性能????)
if ((data_p->MBs_used > MEMORY_THRESHOLD)
&& (new_candidates < process_hash_table_size / 3)) {
data_p->data_time_stamp = get_time_stamp();
//把进程信息更新到hash表中。如果已经存在,new_candidates值不增加。
new_candidates += process_hash_update(data_p);
}
}
free(namelist[ix]);
}
free(namelist);
}
//接下来要处理参数传入的inclusion和exclusion两个队列,需要加上线程锁防止子线程正在处理这个队列。
pthread_mutex_lock(&pid_list_mutex);
//inclusion队列,相当于白名单,应当纳入numad管理
pid_list_p pid_ptr = include_pid_list;
//仍然限制新增加到hash表中的内容不能超过hash表大小的三分之一
while ((pid_ptr != NULL) && (new_candidates < process_hash_table_size / 3)) {
int hash_ix = process_hash_lookup(pid_ptr->pid);
//如果最近已经更新过一次了(指定了scan_all_processes的情况下),直接扫描列表中的下一个进程
if ( (hash_ix >= 0) && (process_hash_table[hash_ix].data_time_stamp > this_update_time)) {
pid_ptr = pid_ptr->next;
continue;
}
process_data_p data_p;
if ((data_p = get_stat_data_for_pid(pid_ptr->pid, NULL)) != NULL) {
data_p->data_time_stamp = get_time_stamp();
new_candidates += process_hash_update(data_p);
if (!scan_all_processes) {
//如果没有设置scan_all_processes,需要记录纳入numad管理的进程数量。
files += 1;
}
pid_ptr = pid_ptr->next;
} else {
//没有获取到进程信息,说明该进程已经不存在了,从inclusion list中移除该进程
include_pid_list = remove_pid_from_pid_list(include_pid_list, pid_ptr->pid);
pid_ptr = include_pid_list;
continue;
}
}
//处理exclusion list中的进程,这些进程相当于黑名单。如果现有的hash表中已经包含了这部分进程,需要重置他们的CPU使用量为0
pid_ptr = exclude_pid_list;
while (pid_ptr != NULL) {
int hash_ix = process_hash_lookup(pid_ptr->pid);
if (hash_ix >= 0) {
process_hash_table[hash_ix].CPUs_used = 0;
}
pid_ptr = pid_ptr->next;
}
//队列处理完毕,释放队列的线程锁
pthread_mutex_unlock(&pid_list_mutex);
if (log_level >= LOG_INFO) {
numad_log(LOG_INFO, "Processes: %d\n", files);
}
//清理hash表中的过时数据,即仍然保留在hash表中的此次更新之前的数据。
//另外,如果hash表内容已经超过hash表大小的二分之一,需要将此hash表扩大到原来的2倍(性能????)
process_hash_table_cleanup(this_update_time);
return files;
}
相关阅读:
本文来自网易实践者社区,经作者岳文远授权发布。