cinder中的taskflow

达芬奇密码2018-07-16 16:09

openstack中完成一项操作包含多个环节。如果按照简单的先后顺序实现各个环节的流程控制,错误回滚等会变的非常复杂并且质量很难保证。为了解决这些问题openstack引入了taskflow的概念。在taskflow的框架下,一个完整的操作被划分为多个互相独立的子操作,在其内部实现子操作的执行,中止,恢复执行,错误回滚等。通过这种方式简化整体流程的管理难度。下面将以cinder中创建卷为例,解析taskflow的具体工作方式。

创建卷的处理流程

  1. wsgi server接收到请求并分发到cinder.api.openstack.wsgi.Resource.call函数,在该函数内解析请求的数据,获取用户需要执行的操作并根据服务启动时初始化的路由信息获取执行该操作的方法。获取该方法的执行结果并组织response信息返回给client端。
  2. cind.api.v1.volumes.VolumeController.create方法从request中获取创建卷所需的各参数:size,name,description,volume_type,az,forced_host等。
  3. cinder.volume.api.API.create根据传递过来的参数组织创建卷的taskflow: api_flow并执行。
  4. 创建卷的taskflow最终执行的结果是向cinder-scheduler发出创建卷的异步调度请求或者直接向cinder-volume发出创建卷请求。因为创建卷是异步请求,taskflow执行完毕后会直接返回结果给调用者。
  5. cinder.scheduler.manager.SchedulerManager.create_volume中组织调度的taskflow: schedule_flow并执行。
  6. cinder-scheduler的taskflow执行结果是选定一个可用的cinder-volume节点,并发送创建卷请求。
  7. cinder.volume.manager.VolumeManager.create_volume接收到创建卷的请求,组织volume的taskflow: volume_flow并执行。 至此一个cinder volume就创建完成了。

taskflow的处理

在上面的第3、5、7步骤中都使用了taskfow的编码模式,下面我们来看一下taskflow是如何工作的。首先看一下步骤3中的api_flow的创建过程。

  1. 在cinder.volume.api.API.create中组织卷的参数信息
         create_what = {
             'size': size,
             'name': name,
             'description': description,
             'snapshot': snapshot,
             'image_id': image_id,
             'volume_type': volume_type,
             'metadata': metadata,
             'availability_zone': availability_zone,
             'forced_host': forced_host,
             'source_volume': source_volume,
             'scheduler_hints': scheduler_hints,
             'key_manager': self.key_manager,
             'backup_source_volume': backup_source_volume,
         }
    
  2. 创建task_flow。
         (flow, uuid) = create_volume.get_api_flow(self.scheduler_rpcapi,
                                                   self.volume_rpcapi,
                                                   self.db,
                                                   self.image_service,
                                                   check_volume_az_zone,
                                                   create_what)
    

self.image_service = (image_service or glance.get_default_image_service())
self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()
self.volume_rpcapi = volume_rpcapi.VolumeAPI()

  1. 检查task_flow的创建结果
    assert flow, _('Create volume flow not retrieved')
    
  2. 执行task_flow
    flow.run(context)
    
  3. 检查task_flow的执行结果
    if flow.state != states.SUCCESS:
     raise exception.CinderException(_("Failed to successfully complete"
                                       " create volume workflow"))
    
    这样,一个复杂的卷创建流程就被简化为一个标准的taskflow流程。后面的schedule_flow和volume_flow也是同样的流程,可以看到不同的业务逻辑被统一到了同样的一个设计模式下,对代码编写和代码的理解都有很大的帮助。

taskflow的创建

下面将从get_api_flow源码入手分析taskflow的创建流程。

  1. 初始化一个Flow类的对象
    flow_name = ACTION.replace(":", "_") + "_api"
    api_flow = linear_flow.Flow(flow_name)
    

    flow_name = 'volume_create_api'

  2. 向Flow中添加业务逻辑需要的task

    api_flow.add(base.InjectTask(create_what, addons=[ACTION]))
    api_flow.add(ExtractVolumeRequestTask(image_service, az_check_functor))
    api_flow.add(QuotaReserveTask())
    v_uuid = api_flow.add(EntryCreateTask(db))
    api_flow.add(QuotaCommitTask())
    api_flow.add(OnFailureChangeStatusTask(db))
    api_flow.add(VolumeCastTask(scheduler_rpcapi, volume_rpcapi, db))
    

    InjectTask 将create_what注入到Flow中
    ExtractVolumeRequestTask 验证request的参数,并且转换成适当的格式应用于Flow的其它task中
    QuotaReserveTask 检测卷配额是否可用,如果可用则预留即将创建的卷将要占用的资源
    EntryCreateTask 在数据库中创建volume的条目
    QuotaCommitTask 将预留的资源提交到数据库
    OnFailureChangeStatusTask 当task出现错误时,更新数据库中的卷状态为error
    VolumeCastTask 调用cinder-scheduler调度cinder-volume或者直接调用cinder-volume创建卷。

  3. 监听Flow的task状态变化及变化结果并返回Flow的uuid

    return (flow_utils.attach_debug_listeners(api_flow), v_uuid)
    

TaskFlow的结构

在创建api_flow时第一步就是初始化一个Flow对象,这个对象的结构如下

super(Flow, self).__init__(name, parents, uuid)
#回滚方法累加器,如果有task失败会依次执行该累加器中的回滚方法。
self._accumulator = utils.RollbackAccumulator()
#Flow中各个task的执行结果。
self.results = {}
#上次执行的中断点,用于恢复执行
self._leftoff_at = None
#Flow中加入的所有task
self._runners = []
self._connected = False
#恢复策略(不知道干啥用的)
self.resumer = None

Flow对象初始化完成之后,使用add方法向该对象中加入了多个task。add方法的操作如下:

def add(self, task):
    assert isinstance(task, collections.Callable)
    r = utils.Runner(task)
    r.runs_before = list(reversed(self._runners))
    self._runners.append(r)
    self._reset_internals()
    return r.uuid

可以看到,add方法使用传入的task类初始化了一个Runner对象。Runner对象中包含了在此之前已经加入Flow的所有task列表,以方便回滚操作。之后将Runner对象加入Flow的runners列表中。

api_flow中的task

在apiflow中我们加入了多个task,这些task均为CinderTask的子类,在这些类中必须实现`_call方法,完成task的业务逻辑;可以选择实现revert`方法,完成task回滚操作。

InjectTask

这个task比较简单,只是实现了创建卷参数的字典化。并且没有实现revert方法,失败之后不需要回滚。

ExtractVolumeRequestTask

检测卷创建的参数是否可用,并将处理之后的参数传递给Flow。这个task中同样没有实现revert方法,失败之后不需要回滚。

def __call__(self, context, size, snapshot, image_id, source_volume, availability_zone, volume_type, metadata, key_manager, backup_source_volume):
    #检查创建卷的参数,这三个参数中最多只能指定一个
    utils.check_exclusive_options(snapshot=snapshot, imageRef=image_id,source_volume=source_volume)
    #检查是否有权限执行当前操作
    policy.enforce_action(context, ACTION)
    #如果指定了快照,源镜像等,需要检查快照,镜像是否可用,size是否足够等。
    snapshot_id = self._extract_snapshot(snapshot)
    source_volid = self._extract_source_volume(source_volume)
    size = self._extract_size(size, source_volume, snapshot)
    self._check_image_metadata(context, image_id, size)
    #如果指定了快照,源镜像等,需要检查新建卷的可用域是否合法
    availability_zone = self._extract_availability_zone(availability_zone,snapshot, source_volume)
    #指定volume_type
    if not volume_type and not source_volume and not snapshot:
        volume_type = volume_types.get_default_volume_type()
    volume_type_id = self._get_volume_type_id(volume_type, source_volume, snapshot, backup_source_volume)
    encryption_key_id = self._get_encryption_key_id(key_manager, context, volume_type_id, snapshot, source_volume, backup_source_volume)
    #获取卷的qos信息
    specs = {}
    if volume_type_id:
        qos_specs = volume_types.get_volume_type_qos_specs(volume_type_id)
        specs = qos_specs['qos_specs']
    if not specs:
        specs = None

    self._check_metadata_properties(metadata)
    #将处理完成的参数返回Flow,用于接下来的task
    return {
        'size': size,
        'snapshot_id': snapshot_id,
        'source_volid': source_volid,
        'availability_zone': availability_zone,
        'volume_type': volume_type,
        'volume_type_id': volume_type_id,
        'encryption_key_id': encryption_key_id,
        'qos_specs': specs,
    }

QuotaReserveTask

检测新建卷的配额是否可用,如果可用则更新数据库中的预留值,并且保留更新之前的配额数值,如果后续task失败可以按照这个值回滚数据库。

在__call__方法中实现如下操作:
#创建卷类型对应的预留信息
reserve_opts = {'volumes': 1, 'gigabytes': size}
QUOTAS.add_volume_type_opts(context, reserve_opts, volume_type_id)
#检测配额信息,如果合法则创建对应的预留资源信息
reservations = QUOTAS.reserve(context, **reserve_opts)
return {
    'reservations': reservations,
}
在revert方法中通过如下方式回滚数据库
QUOTAS.rollback(context, reservations)

EntryCreateTask

创建volume在数据库中的条目,如果后续task失败可以通过revert回滚删除对应的条目

QuotaCommitTask

将reservation中预留的资源提交到数据库,更新useage数据。如果出错则按照预留值回滚。

OnFailureChangeStatusTask

这个task比较特殊,因为它的__call__方法并不包含业务逻辑,其主要工作是在revert方法中。当出现异常需要回滚时会把卷状态设置为error。这个task之前只是在数据库中增加一个条目,如果出错是不需要设置volume状态的,只要把数据库里面的数据删除即可。但是在这个task之后volume已经进入实质的创建阶段了,这个阶段出错就不能回滚删除数据库条目了,只能设置volume状态为error。

VolumeCastTask

这个task会判断传入的volume参数,如果是创建快照则直接调度到源卷所在的cinder-volume节点上,否则调用cinder-scheduler的rpc接口进入调度。如果出错则回滚源volume的状态。

task flow的执行

初始化的Flow对象中添加了需要执行的task之后,就可以调用flow.run方法开始执行。run方法的实现如下:

def run(self, context, *args, **kwargs):
    super(Flow, self).run(context, *args, **kwargs)

    def resume_it():
        if self._leftoff_at is not None:
            return ([], self._leftoff_at)
        if self.resumer:
            (finished, leftover) = self.resumer.resume(self,
                                                       self._ordering())
        else:
            finished = []
            leftover = self._ordering()
        return (finished, leftover)
    #设置Flow状态为start,开始执行
    self._change_state(context, states.STARTED)
    #获取已经执行和尚未执行的task
    try:
        those_finished, leftover = resume_it()
    except Exception:
        with excutils.save_and_reraise_exception():
            self._change_state(context, states.FAILURE)

    def run_it(runner, failed=False, result=None, simulate_run=False):
        try:
            #将task的回滚方法加入回滚累加器
            rb = utils.RollbackTask(context, runner.task, result=None)
            self._accumulator.add(rb)
            #通知任务开始执行,因为我们在创建FLow的时候已经注册了这个Flow的状态通知,因此在这里会打印出对应的日志信息。
            self.task_notifier.notify(states.STARTED, details={
                'context': context,
                'flow': self,
                'runner': runner,
            })
            if not simulate_run:
                #未执行过的task,直接调用Runner对象
                result = runner(context, *args, **kwargs)
            else:
                #对已经执行过的task,在resume时需要判断执行的结果。如果失败则直接报错。
                if failed:
                    if not result:
                        result = "%s failed running." % (runner.task)
                    if isinstance(result, basestring):
                        result = exc.InvalidStateException(result)
                    if not isinstance(result, Exception):
                        LOG.warn("Can not raise a non-exception"
                                 " object: %s", result)
                        result = exc.InvalidStateException()
                    raise result
            #记录当前task的执行结果,并通知执行成功,打印相关日志信息。
            rb.result = result
            runner.result = result
            self.results[runner.uuid] = result
            self.task_notifier.notify(states.SUCCESS, details={
                'context': context,
                'flow': self,
                'runner': runner,
            })
        except Exception as e:
            #task执行失败,上报错误信息
            runner.result = e
            cause = utils.FlowFailure(runner, self, e)
            with excutils.save_and_reraise_exception():
                self.task_notifier.notify(states.FAILURE, details={
                    'context': context,
                    'flow': self,
                    'runner': runner,
                })
                #开始执行回滚操作
                self.rollback(context, cause)
    #如果包含已经执行的则设置状态为RESUMING
    if len(those_finished):
        self._change_state(context, states.RESUMING)
        for (r, details) in those_finished:
            failed = states.FAILURE in details.get('states', [])
            result = details.get('result')
            run_it(r, failed=failed, result=result, simulate_run=True)
    #处理尚未执行的task
    self._leftoff_at = leftover
    #设置状态为RUNNING
    self._change_state(context, states.RUNNING)
    if self.state == states.INTERRUPTED:
        return
    #task为Flow中断的最小单位,每执行一个task之后判断是否被中断。被中断的话设置Flow状态为INTERRUPTED。
    was_interrupted = False
    for r in leftover:
        r.reset()
        run_it(r)
        if self.state == states.INTERRUPTED:
            was_interrupted = True
            break
    #Flow未被中断的情况下,设置Flow状态为SUCESS,表示正常完成。
    if not was_interrupted:
        self._change_state(context, states.SUCCESS)
        self._leftoff_at = None

总结

以上完整的解析了卷创建的简单流程及api_flow在这个流程中的工作情况。其它像schedule_flow及volume_flow均与之类似。


本文来自网易实践者社区,经作者岳文远授权发布。