cinder当中的定时任务

背景

最近有一个任务是针对cinder存储后端为ceph的情况下,修改cinder-scheduler调度权重按虚拟空间使用率计算,以达到各个pool之间的尽量平衡,实现方式是在cinder-volume上报状态的时候,上报ceph虚拟空间而不是物理空间的使用率。现在考虑实现的方法是遍历所有位于同一物理存储位置的pool中的所有镜像,通过打开一个上下文来获取其镜像的虚拟大小,这样存在的问题是耗时比较长,在线下环境测试大概需要20-30秒的时间,并且这个时间会随着pool中镜像数量的增加而增加。因为各种原因,暂时没有找到合适的优化方法,因而需要评估下这个耗时任务对整体cinder服务的影响。

我们知道在cinder当中,cinder-volume向cinder-scheduler上报自己的状态(包括剩余空间、总空间、空间预留比例、后端类型等)这个任务是由两个方法来实现的(_report_driver_status和_publish_service_capabilities,加个装饰器@periodic_task.periodic_task使之能被定时调用),这两个定时任务每隔一定时间(默认为60s)通过调用对应后端的接口来获取相应的信息(比如通过ceph df来获取ceph的空间使用信息)并通过RPC发送给cinder-scheduler。cinder当中还存在其他的定时任务,因此我们想知道两件事情。

  • 上述耗时任务是否会影响到cinder中其他的定时任务
  • 上述耗时任务是否会阻塞cinder-volume的进程,使其无法对外提供服务

分析

我们以M版的代码为例,分析下cinder是如何实现定时任务的,不同版本代码实现细节上会有不同,但大致框架和原理是一致的。先从cinder-volume的启动方法入手。

# cinder.service.py:Service.start
class Service(service.Service):
    def start(self):
        ······
        # 如果设置了状态上报间隔,默认为10s
        # 则开启一个绿色线程每隔一定时间上报自己的状态,
        # 并保存在数据库当中
        if self.report_interval:
            pulse = loopingcall.FixedIntervalLoopingCall(
                self.report_state)
            pulse.start(interval=self.report_interval,
                        initial_delay=self.report_interval)
            self.timers.append(pulse)

        # 如果设置了定时任务间隔,默认60s
        # 则开启一个绿色线程,每隔一定时间去执行定时任务
        # 调用方法periodic_tasks
        if self.periodic_interval:
            if self.periodic_fuzzy_delay:
                initial_delay = random.randint(0, self.periodic_fuzzy_delay)
            else:
                initial_delay = None

            periodic = loopingcall.FixedIntervalLoopingCall(
                self.periodic_tasks)
            periodic.start(interval=self.periodic_interval,
                           initial_delay=initial_delay)
            self.timers.append(periodic)

可以看到cinder-volume在启动的时候,会调用loopingcall.FixedIntervalLoopingCall开启两个绿色线程来调用两个不同的方法,分别是self.report_state和self.periodic_tasks。先来看下loopingcall.FixedIntervalLoopingCall这个类相关的方法,主要关注__init__以及start。 在OpenStack的M版中,有很多公共组件被独立出来形成单独的库了,比如这里的loopingcall,就是包含在独立的oslo_service库中,而在H版本当中这些代码还是在各自的项目代码目录中。

from oslo_service import loopingcall
# oslo_service.loopingcall
class LoopingCallBase(object):
    # 省略__init__, stop等方法
    def _start(self, idle_for, initial_delay=None, stop_on_exception=True):
        if self._thread is not None:
            raise RuntimeError(self._RUN_ONLY_ONE_MESSAGE)
        self._running = True
        self.done = event.Event()
        # 调用greenthread的spawn方法来孵化一个绿色线程来执行
        # self._run_loop方法,剩余参数会作为_run_loop方法的参数
        self._thread = greenthread.spawn(
            self._run_loop, idle_for,
            initial_delay=initial_delay, stop_on_exception=stop_on_exception)
        self._thread.link(self._on_done)
        return self.done

    def _run_loop(self, idle_for_func, initial_delay=None, stop_on_exception=True):
        kind = self._KIND
        func_name = reflection.get_callable_name(self.f)
        func = self.f if stop_on_exception else _safe_wrapper(self.f, kind,
                                                              func_name)
        if initial_delay:
            greenthread.sleep(initial_delay)
        try:
            # watch用来统计任务实际上执行的时间
            watch = timeutils.StopWatch()
            # 循环执行任务,直到self._running为False
            # (调用stop方法)
            while self._running:
                watch.restart()
                # 调用定时任务
                result = func(*self.args, **self.kw)
                watch.stop()
                if not self._running:
                    break
                # 计算等待的时间
                idle = idle_for_func(result, watch.elapsed())
                LOG.trace('%(kind)s %(func_name)r sleeping '
                          'for %(idle).02f seconds',
                          {'func_name': func_name, 'idle': idle,
                           'kind': kind})
                greenthread.sleep(idle)
            # 省略异常处理代码
        else:
            self.done.send(True)

class FixedIntervalLoopingCall(LoopingCallBase):                                                            
    def start(self, interval, initial_delay=None, stop_on_exception=True):
        # _idle_for是用来计算任务执行完毕后等待时间的,具体会作为参数传入
        # 给_run_loop方法用来计算等待时间,第一个参数是上一次任务执行结束后
        # 的结果,第二个参数是定时任务实际上执行的时间
        def _idle_for(result, elapsed):
            delay = round(elapsed - interval, 2)
            if delay > 0:
                func_name = reflection.get_callable_name(self.f)
                LOG.warning(_LW('Function %(func_name)r run outlasted '
                                'interval by %(delay).2f sec'),
                            {'func_name': func_name, 'delay': delay})
            return -delay if delay < 0 else 0                         
        return self._start(_idle_for, initial_delay=initial_delay,
                           stop_on_exception=stop_on_exception)

FixedIntervalLoopingCall继承了类LoopingCallBase,在其start方法中实现了一个_idle_for方法,这个方法的作用是用来计算任务执行完毕后等待时间,具体会作为参数传入给_run_loop方法用来计算等待时间,它有两个参数,第一个参数是上一次任务执行结束后返回的结果(一般是定时任务返回的建议等待时间,这里实际上没有使用到,因为他的等待时间是固定的),第二个参数是定时任务实际上执行的时间。这里计算循环调用定时任务等待时间的方法是:用定时任务执行的时间减去循环调用定时任务的间隔,得到delay,如果定时任务本身的执行时间已经超过了间隔时间,即delay大于0,那么立即开始下一次的定时任务调用,否则等待-delay的时间。

这里顺便提一下,在H版本的实现里,每次都是等待interval的时间,也就是说如果定时任务本身的执行时间是60s,interval也是60s的话,那么两次定时任务之间的间隔将是120s,而在M版本里是60s。另外在nova这个项目当中我们还看到使用了DynamicLoopingCall,它和FixedIntervalLoopingCall的区别是,调用定时任务的间隔时间是根据上一次定时任务返回的计算动态计算的,来看看它的_idle_for方法。

class DynamicLoopingCall(LoopingCallBase):
    def start(self, initial_delay=None, periodic_interval_max=None, stop_on_exception=True):
        # 在定时任务返回的建议等待时间和最大等待时间中选择一个小的
        def _idle_for(suggested_delay, elapsed):
            delay = suggested_delay
            if delay is None:
                if periodic_interval_max is not None:
                    delay = periodic_interval_max
                else:
                    # 省略异常报错
            else:
                if periodic_interval_max is not None:
                    delay = min(delay, periodic_interval_max)
            return delay

        return self._start(_idle_for, initial_delay=initial_delay,
                           stop_on_exception=stop_on_exception)

接着看基类LoopingCallBase的_start方法,其核心功能是调用greenthread的spawn方法来孵化一个绿色线程来执行self._run_loop方法,并把子类实现的_idle_for方法作为参数传入,在_run_loop的循环体中根据这个函数来计算间隔时间。_run_loop方法比较简单,看注释就明白了。

到这里已经搞明白了定时任务的执行方式,就是为每个任务开辟一个绿色线程来循环调用。接下来来看看这个两个方法具体是做什么的,以及为什么一个方法加上@periodic_task.periodic_task装饰器后就可以被定时调用了。首先看self.report_state的实现。

def report_state(self):
    ctxt = context.get_admin_context()
    zone = CONF.storage_availability_zone
    try:
        try:
            # 根据id来查询service的信息,service_id是在初始化时根据host来
            # 从数据库中取出对应的service得到的
            service_ref = objects.Service.get_by_id(ctxt, self.service_id)
        except exception.NotFound:
            # 省略异常处理代码
        # 上报次数加1
        service_ref.report_count += 1
        if zone != service_ref.availability_zone:
            service_ref.availability_zone = zone
        # 保存service的信息
        service_ref.save()

        # TODO(termie): make this pattern be more elegant.
        if getattr(self, 'model_disconnected', False):
            self.model_disconnected = False
    # 省略异常处理代码

report_state的作用是向数据库上报自己的心跳,如果服务正常运行,那么每隔一定时间(默认是10s),根据自身id来从数据库中取出service,将其中的report_count加上1,然后保存。在保存的时候,service表中有一个updated_at字段会同时设置为当前时间,这点可以从sqlalchemy中定义的service代码得知,后续可以根据updated_at字段的值以及设置的超时时间来判断服务自身的状态。

updated_at = Column(DateTime, default=timeutils.utcnow, 
                    onupdate=timeutils.utcnow)

接着是slef.periodic_tasks方法。

def periodic_tasks(self, raise_on_error=False):
    """Tasks to be run at a periodic interval."""
    ctxt = context.get_admin_context()
    # 实际上调用的是cinder.manager.py:Manager.periodic_tasks
    self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error)
# cinder.manager.py:Manager.periodic_tasks
class Manager(base.Base, PeriodicTasks):
    def periodic_tasks(self, context, raise_on_error=False):
        """Tasks to be run at a periodic interval."""
        # run_periodic_tasks由其基类PeriodicTasks实现,PeriodicTasks和之前
        # 的FixedIntervalLoopingCall一样,也是在oslo.service库中
        return self.run_periodic_tasks(context,raise_on_error=raise_on_error)

基本上就是一层层的关系调用,直接来看PeriodicTasks.run_periodic_tasks方法。

# oslo.service.periodic_task.py:PeriodicTasks
@six.add_metaclass(_PeriodicTasksMeta)
class PeriodicTasks(object):
    def __init__(self, conf):
        super(PeriodicTasks, self).__init__()
        self.conf = conf
        self.conf.register_opts(_options.periodic_opts)
        self._periodic_last_run = {}
        # _periodic_tasks记录了每一个需要调用的task,
        # _periodic_last_run记录每一个task上一次的执行时间
        for name, task in self._periodic_tasks:
            self._periodic_last_run[name] = task._periodic_last_run

    def run_periodic_tasks(self, context, raise_on_error=False):
        """Tasks to be run at a periodic interval."""
        idle_for = DEFAULT_INTERVAL
        # 遍历_periodic_tasks中保存的task,依次执行
        for task_name, task in self._periodic_tasks:
            if (task._periodic_external_ok and not
               self.conf.run_external_periodic_tasks):
                continue
            cls_name = reflection.get_class_name(self, fully_qualified=False)
            full_task_name = '.'.join([cls_name, task_name])
            # spacing表示一个task的执行时间间隔
            # 可以由装饰器传入,也可以不传,默认值60s
            spacing = self._periodic_spacing[task_name]
            # 上一次执行完毕时间
            last_run = self._periodic_last_run[task_name]

            # 这里的计算有两个作用:1)给DynamicLoopingCall提供建议的
            # 等待时间,其值是last_run + spacing - now()的最小值;2)
            # 判断当前task是否需要执行,如果距离上一次执行时间已经
            # 过了spacing值,那么就立即执行,否则跳过这个task
            idle_for = min(idle_for, spacing)
            if last_run is not None:
                delta = last_run + spacing - now()
                if delta > 0:
                    idle_for = min(idle_for, delta)
                    continue

            # 更新上一次task的执行时间
            self._periodic_last_run[task_name] = _nearest_boundary(
                last_run, spacing)
            # 执行task
            try:
                task(self, context)
            # 省略异常处理
            time.sleep(0)
            
        return idle_for

run_periodic_tasks的作用就是从self._periodic_tasks中依次取出每一个task,判断其是否可以执行,标准是距离上一次执行到现在是否已经过了spacing值,如果还没到,则会跳过这个task。然后更新并上一次执行的时间,最后执行task。这里比较关键是保存task的self._periodic_tasks以及保存每一个task spacing值的self._periodic_spacing是怎么获取的。注意到类的装饰器@six.add_metaclass(_PeriodicTasksMeta),就是给PeriodicTasks添加了一个元类_PeriodicTasksMeta,上述的两个值就是元类提供的,PeriodicTasks可以看做是元类_PeriodicTasksMeta的实例。

# oslo.service.periodic_task.py:_PeriodicTasksMeta
class _PeriodicTasksMeta(type):
    def _add_periodic_task(cls, task):
        name = task._periodic_name
        # _periodic_spacing小于0,这个task不会被加入到task列表
        if task._periodic_spacing < 0:
            LOG.info(_LI('Skipping periodic task %(task)s because '
                         'its interval is negative'),
                     {'task': name})
            return False
        # 如果task的_periodic_enabled是False,也不会加入到task列表
        if not task._periodic_enabled:
            LOG.info(_LI('Skipping periodic task %(task)s because '
                         'it is disabled'),
                     {'task': name})
            return False
        # 如果task的spacing为0(默认情况),则重新赋值为
        # DEFAULT_INTERVAL,默认值60s
        if task._periodic_spacing == 0:
            task._periodic_spacing = DEFAULT_INTERVAL
        # 保存task和对应的spacing
        cls._periodic_tasks.append((name, task))
        cls._periodic_spacing[name] = task._periodic_spacing
        return True

    def __init__(cls, names, bases, dict_):
        """Metaclass that allows us to collect decorated periodic tasks."""
        super(_PeriodicTasksMeta, cls).__init__(names, bases, dict_)

        # 如果不存在_periodic_tasks和_periodic_spacing,则需要先初始化他们
        try:
            cls._periodic_tasks = cls._periodic_tasks[:]
        except AttributeError:
            cls._periodic_tasks = []

        try:
            cls._periodic_spacing = cls._periodic_spacing.copy()
        except AttributeError:
            cls._periodic_spacing = {}
        
        # 遍历类成员方法,如果有_periodic_task这个属性,就认为
        # 是需要定时执行的task,将其保存到_periodic_tasks数组,
        # 对应的spacing值保存到_periodic_spacing
        for value in cls.__dict__.values():
            if getattr(value, '_periodic_task', False):
                cls._add_periodic_task(value)

在元类_PeriodicTasksMeta的__init__方法中,他会遍历类的成员方法,如果成员方法中有_periodic_task属性,则认为是需要定时执行的task,通过调用_add_periodic_task来把task添加到列表。那么_periodic_task这个属性又是怎么来的,PeriodicTasks类都有哪些类成员方法?这就是最初提到的装饰器@periodic_task.periodic_task的功能了。

# oslo.service.periodic_task.py:periodic_task
def periodic_task(*args, **kwargs):
    def decorator(f):
        # Test for old style invocation
        if 'ticks_between_runs' in kwargs:
            raise InvalidPeriodicTaskArg(arg='ticks_between_runs')

        # 这里就是属性的设置了!
        f._periodic_task = True
        f._periodic_external_ok = kwargs.pop('external_process_ok', False)
        f._periodic_enabled = kwargs.pop('enabled', True)
        f._periodic_name = kwargs.pop('name', f.__name__)

        # spacing默认值为0
        f._periodic_spacing = kwargs.pop('spacing', 0)
        # 默认不立即执行
        f._periodic_immediate = kwargs.pop('run_immediately', False)
        if f._periodic_immediate:
            f._periodic_last_run = None
        else:
            f._periodic_last_run = now()
        return f
    if kwargs:
        return decorator
    else:
        return decorator(args[0])

periodic_task的作用就是为一个函数设置_periodic_task属性,并从装饰器的kwargs中取得spacing参数,如果有run_immediately(启动之后先立刻执行一次,然后再定时执行)会设置_periodic_last_run属性。 我们再回过头来看看那两个被periodic_task装饰的方法。一个是cinder.volume.manager.VolumeManager中的_report_driver_status,作用是获取存储后端的信息并保存在cinder.manager.SchedulerDependentManager的last_capabilities中,另一个是cinder.manager.SchedulerDependentManager中的_publish_service_capabilities,作用是通过RPC远程调用把last_capabilities上报给cinder-scheduler。而他们之间的继承关系是: cinder.volume.manager.VolumeManager-->cinder.manager.SchedulerDependentManager-->cinder.manager.Manager-->PeriodicTasks。相当于都是PeriodicTasks的子类,因为元类可以隐式地继承到子类,所以子类拥有的类成员方法也会遍历到,并进行是否有_periodic_task属性的判断。

总结

通过上述分析,已经比较清楚的知道cinder当中的定时任务是如何实现的了,同时也可以得到几个结论:

  • 因为是开辟绿色线程执行定时任务的方式,因此两个定时任务之间是会有影响的,因为并不存在真正的并发。如果其中一个定时任务一直占着CPU,那么将导致另外的定时任务挂起,比如心跳,因此我们需要在定时任务里添加sleep(0)来适时的切换。
  • 由periodic_task装饰的task,是依次调用的,因此如果_report_driver_status耗时过长,比如超过一分钟,那么_publish_service_capabilities就不能按默认值60s调用到一次。不过_publish_service_capabilities本身就是把_report_driver_status得到的信息上报而已,并且cinder-volume不存在其他的由periodic_task装饰的task了,因此影响不大。

最后还需要注意的是对_report_driver_status做了改动之后,需要观察cinder-volume的CPU、内存等资源的使用情况。最终还是需要考虑如何从代码层面进行优化,本质上解决问题。

本文来自网易实践者社区,经作者廖跃华授权发布。