现在我们获取了物理机上的numa nodes信息和进程信息,接下来我们就要根据已经掌握的信息对进程的cpu和内存分布做自动化的调整,以优化进程的内存和cpu性能。numad里面通过manage_loads
函数获取进程需要的CPU计算能力和内存,并且根据上面刷新的信息,选择一个合适的node再将该进程绑定到选定的node上。
int manage_loads() { uint64_t time_stamp = get_time_stamp(); //因为需要对现有的进程hash table做修改,所以先对原始的hash table做一份拷贝。 static int pindex_size; static process_data_p *pindex; //第一次启动,或者处理过程中线程hash table大小发生改变本次仅为备份数据申请新的空间。 //下次循环会更新node info和process hasd table if (pindex_size < process_hash_table_size) { pindex_size = process_hash_table_size; pindex = realloc(pindex, pindex_size * sizeof(process_data_p)); if (pindex == NULL) { numad_log(LOG_CRIT, "pindex realloc failed\n"); exit(EXIT_FAILURE); } //尽快执行下一次循环 return min_interval / 2; } memset(pindex, 0, pindex_size * sizeof(process_data_p)); //将process hash table中的数据拷贝到新的备份空间内 //注意,此处只拷贝满足最低要求的进程信息 //CPU至少0.5核计算能力,内存至少300MB int nprocs = 0; //统计有效进程使用的所有CPU计算能力 long sum_CPUs_used = 0; for (int ix = 0; (ix < process_hash_table_size); ix++) { process_data_p p = &process_hash_table[ix]; if ((p->pid) && (p->CPUs_used > CPU_THRESHOLD) && (p->MBs_used > MEMORY_THRESHOLD)) { pindex[nprocs++] = p; sum_CPUs_used += p->CPUs_used; // Initialize node list, if not already done for this process. if (p->node_list_p == NULL) { //初始化进程内存绑定的node list //从/proc/%d/statu文件中读取Mems_allowed_list对应的node list信息 //如果进程内存已经绑定在某些node上,则设定该进程的绑定时间在半小时以前(假设) initialize_mem_node_list(p); } } } int num_unbound = 0; //第一轮排序,未做内存绑定的进程排在已经做过内存绑定的进程之前。 for (int ij = 0; (ij < nprocs); ij++) { if (pindex[ij]->bind_time_stamp == 0) { process_data_p tmp = pindex[num_unbound]; pindex[num_unbound++] = pindex[ij]; pindex[ij] = tmp; } } //第二轮排序,未做内存绑定的进程,按照CPUs_used*MBs_used大小做降序排列 for (int ij = 0; (ij < num_unbound); ij++) { int best = ij; for (int ik = ij + 1; (ik < num_unbound); ik++) { uint64_t ik_mag = (pindex[ ik]->CPUs_used * pindex[ ik]->MBs_used); uint64_t best_mag = (pindex[best]->CPUs_used * pindex[best]->MBs_used); if (ik_mag <= best_mag) continue; best = ik; } if (best != ij) { process_data_p tmp = pindex[ij]; pindex[ij] = pindex[best]; pindex[best] = tmp; } }
//第三轮排序,已经做过内存绑定的进程,按照CPUs_used*MBs_used大小做升序排列 for (int ij = num_unbound; (ij < nprocs); ij++) { int best = ij; for (int ik = ij + 1; (ik < nprocs); ik++) { uint64_t ik_mag = (pindex[ ik]->CPUs_used * pindex[ ik]->MBs_used); uint64_t best_mag = (pindex[best]->CPUs_used * pindex[best]->MBs_used); uint64_t min_mag = ik_mag; uint64_t diff_mag = best_mag - ik_mag; if (diff_mag < 0) { diff_mag = -(diff_mag); min_mag = best_mag; } if ((diff_mag > 0) && (min_mag / diff_mag < 5)) { // difference > 20 percent. Use magnitude ordering if (ik_mag <= best_mag) continue; } else { // difference within 20 percent. Sort these by bind_time_stamp. if (pindex[ik]->bind_time_stamp > pindex[best]->bind_time_stamp) continue; } best = ik; } if (best != ij) { process_data_p tmp = pindex[ij]; pindex[ij] = pindex[best]; pindex[best] = tmp; } } //打印排序好的进程信息 if ((log_level >= LOG_INFO) && (nprocs > 0)) { numad_log(LOG_INFO, "Candidates: %d\n", nprocs); for (int ix = 0; (ix < nprocs); ix++) { process_data_p p = pindex[ix]; char buf[BUF_SIZE]; str_from_id_list(buf, BUF_SIZE, p->node_list_p); fprintf(log_fs, "%ld: PID %d: %s, Threads %2ld, MBs_size %6ld, MBs_used %6ld, CPUs_used %4ld, Magnitude %6ld, Nodes: %s\n", p->data_time_stamp, p->pid, p->comm, p->num_threads, p->MBs_size, p->MBs_used, p->CPUs_used, p->MBs_used * p->CPUs_used, buf); } fflush(log_fs); } //每次循环只选择一个进程做numa亲和性设置,设置完成之后退出等待下次再扫描。 //扫描整个进程hash table,选择优先级最高的进程处理。 for (int ix = 0; (ix < nprocs); ix++) { process_data_p p = pindex[ix]; //如果进程内存跨node分布,最少需要等待1800s才能做下一次调整 #define MIN_DELAY_FOR_INTERLEAVE (1800 * ONE_HUNDRED) if (((p->flags & PROCESS_FLAG_INTERLEAVED) > 0) && (p->bind_time_stamp + MIN_DELAY_FOR_INTERLEAVE > time_stamp)) { if (log_level >= LOG_DEBUG) { numad_log(LOG_DEBUG, "Skipping evaluation of PID %d because of interleaved memory.\n", p->pid); } continue; } //接下来计算进程需要的CPU计算能力和内存。 //numad支持设置内存和CPU的使用率,因此在计算进程资源占用的时候需要考虑资源使用率的情况。 int mem_target_utilization = target_utilization; int cpu_target_utilization = target_utilization; //内存使用率不允许超过100% if (mem_target_utilization > 100) { mem_target_utilization = 100; } int mb_request; //这里是numad的一个规则,如果进程使用的vmsize大于一个node,并且rss大于一个node内存的80%时,使用vmsize计算进程内存需求,否则使用rss计算进程内存需求 if ((p->MBs_size > node[0].MBs_total) && ((p->MBs_used * 5 / 4) > node[0].MBs_total)) { mb_request = (p->MBs_size * 100) / mem_target_utilization; } else { mb_request = (p->MBs_used * 100) / mem_target_utilization; } //计算进程的CPU需求,需要考虑进程的线程信息。 int cpu_request = (p->CPUs_used * 100) / cpu_target_utilization; int thread_limit = p->num_threads; //如果该进程是KVM虚拟机进程,为每个VCPU线程保留一个CPU核的计算能力。 if ((p->comm) && (p->comm[0] == '(') && (p->comm[1] == 'q') && (strcmp(p->comm, "(qemu-kvm)") == 0)) { int kvm_vcpu_threads = get_num_kvm_vcpu_threads(p->pid); if (thread_limit > kvm_vcpu_threads) { thread_limit = kvm_vcpu_threads; } } //每个线程最多占一个CPU核的计算能力,进程不能要求更多的CPU计算能力了。 thread_limit *= ONE_HUNDRED; if (cpu_request > thread_limit) { cpu_request = thread_limit; } //调整进程numa亲和性之前要保证距离上次调整300s以上。 #define MIN_DELAY_FOR_REEVALUATION (300 * ONE_HUNDRED) if (p->bind_time_stamp + MIN_DELAY_FOR_REEVALUATION > time_stamp) { //虽然距离上次绑定操作还不到300s,但是也检查一下当前进程的运行状态。 //如果当前进程运行在一个最繁忙的node上(CPU计算能力余量最少或者内存余量最少) //并且当前进程的CPU和内存需求可以分布在一个node节点上 //并且重新分布之后两个node上的压力会更加均衡 //那么立即执行一次重新绑定 //否则认为重新绑定也没有好处,直接扫描下一个进程 if ( ( ID_IS_IN_LIST(min_node_CPUs_free_ix, p->node_list_p) || ID_IS_IN_LIST(min_node_MBs_free_ix, p->node_list_p)) && (cpu_request < node[0].CPUs_total) && (mb_request < node[0].MBs_total) && (abs(min_node_CPUs_free + p->CPUs_used - avg_node_CPUs_free) + abs((max_node_CPUs_free - p->CPUs_used) - avg_node_CPUs_free) < (max_node_CPUs_free - min_node_CPUs_free) - CPU_THRESHOLD) // CPU slop && (abs(min_node_MBs_free + p->MBs_used - avg_node_MBs_free) + abs((max_node_MBs_free - p->MBs_used) - avg_node_MBs_free) < (max_node_MBs_free - min_node_MBs_free)) ) { if (log_level >= LOG_DEBUG) { numad_log(LOG_DEBUG, "Bypassing delay for %d because it looks like it can do better.\n", p->pid); } } else { if (log_level >= LOG_DEBUG) { numad_log(LOG_DEBUG, "Skipping evaluation of PID %d because done too recently.\n", p->pid); } continue; } } pthread_mutex_lock(&node_info_mutex); int assume_enough_cpus = (sum_CPUs_used <= sum_CPUs_total); //为进程选择一个合适的node id_list_p node_list_p = pick_numa_nodes(p->pid, cpu_request, mb_request, assume_enough_cpus); //将进程绑定到新的node上 if ((node_list_p != NULL) && (bind_process_and_migrate_memory(p))) { pthread_mutex_unlock(&node_info_mutex); return min_interval; } pthread_mutex_unlock(&node_info_mutex); } return max_interval; }
相关阅读:
本文来自网易实践者社区,经作者岳文远授权发布。