此文已由作者赵忠杰授权网易云社区发布。
欢迎访问网易云社区,了解更多网易技术产品运营经验。
1.需求背景
在做服务端开发时,我们经常会有这样的需求,需要把一个数据库中的数据经过一定的处理,落入到另一个数据库中。数据转化期间,
可能需要一系列的转化步骤,例如原始数据经过数据转化为目标数据库的格式,过滤不满足条件的数据,通过一些网络调用来增强数
据等等。这些都是数据同步/转换中的步骤,面对不同的需求,处理阶段可能会增加,也可能会减少。
当然,不局限于数据同步,开发中有很多场景都可以抽象为上述模型,即把整个流程抽象为不同的处理阶段。
2.面临的问题
在面对大数据量时,我们常常采用如下的多线程优化方式:
其中,S1-SN 为不同的处理阶段。问题在于:各个处理阶段(S1-SN)中可能充斥着大量的网络调用,IO操作等,即使采用了多线程的
优化方式,每个线程处理任务的效率仍然非常低下,无法充分利用CPU资源,从而造成整体任务效率低下。此时,pipeline模式就应
用而生了。
3.pipeline架构示意图
如果我们把一个任务分成3个处理阶段,对于一个任务来说,任务是串行执行的,但对于多个任务来说,各个阶段是并行执行的,所以任务
从整体上看是并行执行的,由于是并行执行,所以能够充分利用CPU资源。
上述只是单线程流水线状态,我们可以对其进行线程池化的改造,来进一步提交效率。
在pipeline上,对于任务不同的处理阶段,我们可以进行异步化,池化改造,从而实现不同阶段并行计算,同一阶段池化运算,充分利用多核CPU的处理能力,
提高整体任务的执行效率。
4.pipeline分类
从上图可知,pipeline分为线性pipeline和非线性pipeline,关键在于pipeline对不同处理阶段pipe是如何组装与路由的。针对不同的业务
场景,可以选择不同的pipeline类型。
5.pipeline类图
上图给出的是一个SimplePipeline的简易类图。对于不同的需求,可以针对具体情况做出相应的调整。
Pipe:处理阶段的抽象。负责对输入进行处理,并将输出作为下一个阶段的输入。Pipe可以理解为(输入,处理,输出)三元组。
process:用于接收前一阶段的处理结果,用作该处理阶段的输入。
init:初始化当前处理阶段对外提供的服务。
shutdown:关闭当前处理阶段对外提供的服务。
setNextPipe:设置当前处理阶段的下一个处理阶段。
ThreadPoolPipeDecorator:基于线程池的Pipe实现类。该类主要实现用线程池去执行对各个输入元素的处理。
AbstractPipe:Pipe的抽象实现类。
process:
接收前一阶段的处理结果作为输入,并调用子类的doProcess方法对元素进行处理,相应的处理结果会提交给
下一个阶段进行处理。
doProcess
:
留给子类实现的抽象方法。
PipeContext:对各个处理阶段的计算环境的抽象,主要用于异常处理。
Pipeline:
对复合Pipe的抽象。一个Pipeline实例可以包含多个Pipe实例。
addPipe:往该Pipeline实例中添加一个Pipe实例。
SimplePipeline:基于AbstractPipe的Pipeline接口的一个简单实现类。
6.代码示例
AbstractPipe为Pipe接口的方法提供了默认实现,子类按需覆盖即可。process方法中,会调用子类的doProcess方法。
Pipe接口的具体子类实现真正的数据处理。
通过委托的方式,实现Pipe的池化处理。
Pipeline的简单实现。通过Pipeline实现Pipe的组装与初始化,生命周期管理等。
上述Pipeline的实现比较简单,如果有必要,
可以借助配置等形式以动态的形式来创建和组装Pipeline。
7.Pipeline模式考量
1.Pipeline模式可以对有依赖关系的任务实现并行处理,应用Pipeline后,对任务的处理整体是并行的。提高了并发性。
2.Pipeline虽然可以提高并发性,但背后也隐藏这代价。各个阶段的处理都有其时间和空间的开销,而且编程复杂,出现问题不易排查,因此pipeline模式
适合于处理规模较大的任务,否则可能得不偿失。
3.Pipeline的深度需要依赖于任务的性质而定。如果是CPU密集型,深度最好不要超过CPU个数;如果是IO密集型,深度最好不要超过2*CPU个数。
免费领取验证码、内容安全、短信发送、直播点播体验包及云服务器等套餐
更多网易技术、产品、运营经验分享请点击。