openstack中完成一项操作包含多个环节。如果按照简单的先后顺序实现各个环节的流程控制,错误回滚等会变的非常复杂并且质量很难保证。为了解决这些问题openstack引入了taskflow的概念。在taskflow的框架下,一个完整的操作被划分为多个互相独立的子操作,在其内部实现子操作的执行,中止,恢复执行,错误回滚等。通过这种方式简化整体流程的管理难度。下面将以cinder中创建卷为例,解析taskflow的具体工作方式。
在上面的第3、5、7步骤中都使用了taskfow的编码模式,下面我们来看一下taskflow是如何工作的。首先看一下步骤3中的api_flow的创建过程。
create_what = {
'size': size,
'name': name,
'description': description,
'snapshot': snapshot,
'image_id': image_id,
'volume_type': volume_type,
'metadata': metadata,
'availability_zone': availability_zone,
'forced_host': forced_host,
'source_volume': source_volume,
'scheduler_hints': scheduler_hints,
'key_manager': self.key_manager,
'backup_source_volume': backup_source_volume,
}
(flow, uuid) = create_volume.get_api_flow(self.scheduler_rpcapi,
self.volume_rpcapi,
self.db,
self.image_service,
check_volume_az_zone,
create_what)
self.image_service = (image_service or glance.get_default_image_service())
self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()
self.volume_rpcapi = volume_rpcapi.VolumeAPI()
assert flow, _('Create volume flow not retrieved')
flow.run(context)
if flow.state != states.SUCCESS:
raise exception.CinderException(_("Failed to successfully complete"
" create volume workflow"))
这样,一个复杂的卷创建流程就被简化为一个标准的taskflow流程。后面的schedule_flow和volume_flow也是同样的流程,可以看到不同的业务逻辑被统一到了同样的一个设计模式下,对代码编写和代码的理解都有很大的帮助。下面将从get_api_flow
源码入手分析taskflow的创建流程。
flow_name = ACTION.replace(":", "_") + "_api"
api_flow = linear_flow.Flow(flow_name)
flow_name = 'volume_create_api'
向Flow中添加业务逻辑需要的task
api_flow.add(base.InjectTask(create_what, addons=[ACTION]))
api_flow.add(ExtractVolumeRequestTask(image_service, az_check_functor))
api_flow.add(QuotaReserveTask())
v_uuid = api_flow.add(EntryCreateTask(db))
api_flow.add(QuotaCommitTask())
api_flow.add(OnFailureChangeStatusTask(db))
api_flow.add(VolumeCastTask(scheduler_rpcapi, volume_rpcapi, db))
InjectTask 将create_what注入到Flow中
ExtractVolumeRequestTask 验证request的参数,并且转换成适当的格式应用于Flow的其它task中
QuotaReserveTask 检测卷配额是否可用,如果可用则预留即将创建的卷将要占用的资源
EntryCreateTask 在数据库中创建volume的条目
QuotaCommitTask 将预留的资源提交到数据库
OnFailureChangeStatusTask 当task出现错误时,更新数据库中的卷状态为error
VolumeCastTask 调用cinder-scheduler调度cinder-volume或者直接调用cinder-volume创建卷。
监听Flow的task状态变化及变化结果并返回Flow的uuid
return (flow_utils.attach_debug_listeners(api_flow), v_uuid)
在创建api_flow时第一步就是初始化一个Flow对象,这个对象的结构如下
super(Flow, self).__init__(name, parents, uuid)
#回滚方法累加器,如果有task失败会依次执行该累加器中的回滚方法。
self._accumulator = utils.RollbackAccumulator()
#Flow中各个task的执行结果。
self.results = {}
#上次执行的中断点,用于恢复执行
self._leftoff_at = None
#Flow中加入的所有task
self._runners = []
self._connected = False
#恢复策略(不知道干啥用的)
self.resumer = None
Flow对象初始化完成之后,使用add方法向该对象中加入了多个task。add方法的操作如下:
def add(self, task):
assert isinstance(task, collections.Callable)
r = utils.Runner(task)
r.runs_before = list(reversed(self._runners))
self._runners.append(r)
self._reset_internals()
return r.uuid
可以看到,add方法使用传入的task类初始化了一个Runner对象。Runner对象中包含了在此之前已经加入Flow的所有task列表,以方便回滚操作。之后将Runner对象加入Flow的runners列表中。
在apiflow中我们加入了多个task,这些task均为CinderTask的子类,在这些类中必须实现`_call方法,完成task的业务逻辑;可以选择实现
revert`方法,完成task回滚操作。
这个task比较简单,只是实现了创建卷参数的字典化。并且没有实现revert方法,失败之后不需要回滚。
检测卷创建的参数是否可用,并将处理之后的参数传递给Flow。这个task中同样没有实现revert方法,失败之后不需要回滚。
def __call__(self, context, size, snapshot, image_id, source_volume, availability_zone, volume_type, metadata, key_manager, backup_source_volume):
#检查创建卷的参数,这三个参数中最多只能指定一个
utils.check_exclusive_options(snapshot=snapshot, imageRef=image_id,source_volume=source_volume)
#检查是否有权限执行当前操作
policy.enforce_action(context, ACTION)
#如果指定了快照,源镜像等,需要检查快照,镜像是否可用,size是否足够等。
snapshot_id = self._extract_snapshot(snapshot)
source_volid = self._extract_source_volume(source_volume)
size = self._extract_size(size, source_volume, snapshot)
self._check_image_metadata(context, image_id, size)
#如果指定了快照,源镜像等,需要检查新建卷的可用域是否合法
availability_zone = self._extract_availability_zone(availability_zone,snapshot, source_volume)
#指定volume_type
if not volume_type and not source_volume and not snapshot:
volume_type = volume_types.get_default_volume_type()
volume_type_id = self._get_volume_type_id(volume_type, source_volume, snapshot, backup_source_volume)
encryption_key_id = self._get_encryption_key_id(key_manager, context, volume_type_id, snapshot, source_volume, backup_source_volume)
#获取卷的qos信息
specs = {}
if volume_type_id:
qos_specs = volume_types.get_volume_type_qos_specs(volume_type_id)
specs = qos_specs['qos_specs']
if not specs:
specs = None
self._check_metadata_properties(metadata)
#将处理完成的参数返回Flow,用于接下来的task
return {
'size': size,
'snapshot_id': snapshot_id,
'source_volid': source_volid,
'availability_zone': availability_zone,
'volume_type': volume_type,
'volume_type_id': volume_type_id,
'encryption_key_id': encryption_key_id,
'qos_specs': specs,
}
检测新建卷的配额是否可用,如果可用则更新数据库中的预留值,并且保留更新之前的配额数值,如果后续task失败可以按照这个值回滚数据库。
在__call__方法中实现如下操作:
#创建卷类型对应的预留信息
reserve_opts = {'volumes': 1, 'gigabytes': size}
QUOTAS.add_volume_type_opts(context, reserve_opts, volume_type_id)
#检测配额信息,如果合法则创建对应的预留资源信息
reservations = QUOTAS.reserve(context, **reserve_opts)
return {
'reservations': reservations,
}
在revert方法中通过如下方式回滚数据库
QUOTAS.rollback(context, reservations)
创建volume在数据库中的条目,如果后续task失败可以通过revert回滚删除对应的条目
将reservation中预留的资源提交到数据库,更新useage数据。如果出错则按照预留值回滚。
这个task比较特殊,因为它的__call__
方法并不包含业务逻辑,其主要工作是在revert
方法中。当出现异常需要回滚时会把卷状态设置为error。这个task之前只是在数据库中增加一个条目,如果出错是不需要设置volume状态的,只要把数据库里面的数据删除即可。但是在这个task之后volume已经进入实质的创建阶段了,这个阶段出错就不能回滚删除数据库条目了,只能设置volume状态为error。
这个task会判断传入的volume参数,如果是创建快照则直接调度到源卷所在的cinder-volume节点上,否则调用cinder-scheduler的rpc接口进入调度。如果出错则回滚源volume的状态。
初始化的Flow对象中添加了需要执行的task之后,就可以调用flow.run
方法开始执行。run
方法的实现如下:
def run(self, context, *args, **kwargs):
super(Flow, self).run(context, *args, **kwargs)
def resume_it():
if self._leftoff_at is not None:
return ([], self._leftoff_at)
if self.resumer:
(finished, leftover) = self.resumer.resume(self,
self._ordering())
else:
finished = []
leftover = self._ordering()
return (finished, leftover)
#设置Flow状态为start,开始执行
self._change_state(context, states.STARTED)
#获取已经执行和尚未执行的task
try:
those_finished, leftover = resume_it()
except Exception:
with excutils.save_and_reraise_exception():
self._change_state(context, states.FAILURE)
def run_it(runner, failed=False, result=None, simulate_run=False):
try:
#将task的回滚方法加入回滚累加器
rb = utils.RollbackTask(context, runner.task, result=None)
self._accumulator.add(rb)
#通知任务开始执行,因为我们在创建FLow的时候已经注册了这个Flow的状态通知,因此在这里会打印出对应的日志信息。
self.task_notifier.notify(states.STARTED, details={
'context': context,
'flow': self,
'runner': runner,
})
if not simulate_run:
#未执行过的task,直接调用Runner对象
result = runner(context, *args, **kwargs)
else:
#对已经执行过的task,在resume时需要判断执行的结果。如果失败则直接报错。
if failed:
if not result:
result = "%s failed running." % (runner.task)
if isinstance(result, basestring):
result = exc.InvalidStateException(result)
if not isinstance(result, Exception):
LOG.warn("Can not raise a non-exception"
" object: %s", result)
result = exc.InvalidStateException()
raise result
#记录当前task的执行结果,并通知执行成功,打印相关日志信息。
rb.result = result
runner.result = result
self.results[runner.uuid] = result
self.task_notifier.notify(states.SUCCESS, details={
'context': context,
'flow': self,
'runner': runner,
})
except Exception as e:
#task执行失败,上报错误信息
runner.result = e
cause = utils.FlowFailure(runner, self, e)
with excutils.save_and_reraise_exception():
self.task_notifier.notify(states.FAILURE, details={
'context': context,
'flow': self,
'runner': runner,
})
#开始执行回滚操作
self.rollback(context, cause)
#如果包含已经执行的则设置状态为RESUMING
if len(those_finished):
self._change_state(context, states.RESUMING)
for (r, details) in those_finished:
failed = states.FAILURE in details.get('states', [])
result = details.get('result')
run_it(r, failed=failed, result=result, simulate_run=True)
#处理尚未执行的task
self._leftoff_at = leftover
#设置状态为RUNNING
self._change_state(context, states.RUNNING)
if self.state == states.INTERRUPTED:
return
#task为Flow中断的最小单位,每执行一个task之后判断是否被中断。被中断的话设置Flow状态为INTERRUPTED。
was_interrupted = False
for r in leftover:
r.reset()
run_it(r)
if self.state == states.INTERRUPTED:
was_interrupted = True
break
#Flow未被中断的情况下,设置Flow状态为SUCESS,表示正常完成。
if not was_interrupted:
self._change_state(context, states.SUCCESS)
self._leftoff_at = None
以上完整的解析了卷创建的简单流程及api_flow在这个流程中的工作情况。其它像schedule_flow及volume_flow均与之类似。