目前参与的项目,有很多任务都通过MQ进行异步化了,虽然现有的MQ有ack和unack机制来保证消息的可靠性,但是在实际应用中,一方面对于数据可靠性没有那么高的要求,另一方面也是为了更高的性能,往往在从MQ中获取到消息后,先缓存到本地,然后通过批量处理、异步处理等方式,来提高整体的消息处理能力。
那么,当我们进行服务重启的时候,那些正在处理的消息就会有丢失的风险。为了避免这种情况,我们可以在停止应用程序之前,首先停止从MQ获取新消息,然后等待本地消息处理完毕之后再停止消费进程。
前文描述的问题的实际上可以抽象为一个通用的问题,即如何安全的停止进程。通过借鉴观察者模式,采取注册与回调的方式,来实现上面的目的。
首先,我们肯定有一个管理类,用于停止进程,其至少包含如下三个方法:
register(obj)、cancelRegister(obj)、graceShutdown(),其中第一个方法用于注册被管理者,第二个方法是取消注册,第三个方法是关闭进程,当调用graceShutdown方法时,会开启一个新的线程,并且会等待所有被注册对象空闲下来之后,再安全的停止当前线程。
具体代码如下:
可以看到,在调用graceShutdown()方法的时候,会循环询问每个被注册对象是否空闲,如果已经空闲,则主动将其移除注册列表,当注册表为空时,表明所有被注册对象都没有正在处理的事务了,随后调用System.exit(0)方法结束进程。
对于被注册对象GraceShutdownObserved,其实现如下:
可以看到,这是一个抽象类,你可以将任何业务处理类继承自该类,并且调用register方法。GraceShutdownObserved包含了一个默认的idle方法实现,该方法通过记录业务处理子类的最后一次使用时间来确认是否已经处理完缓存中的任务(在使用默认实现的时候,需要在每次业务处理结束的时候调用updateLastUseTime()方法)。当然,我们可以对其进行重写,比如对于一个包含cache的业务处理类,可以重写为cache.size() == 0 && super.idle()
在实际应用中,不管是命令行还是其他方式(如特定文件检测),触发以下两件事情:
1) 停止新消息的进入
2) 调用graceShutdown方法
那么,整个进程就会在若干秒之后自动停止,并且不会丢失那些正在处理的消息
本文来自网易实践者社区,经作者曹佳俊授权发布。