在软件的生命周期中,经常遇到由于业务发展,系统迭代更新带来的数据迁移工作;或者软件系统本身的重构抑或其他因素,几乎都需要对数据进行迁移。数据迁移主要包含数据迁移前的准备、数据迁移的实施和数据迁移后的校验。我们在进行数据迁移实施过程中哪一个环节都要考虑周全,不然很容易出现线上故障。本次系统迁移的最大难点和风险点在于无停机状态下实现ES索引库无缝迁移、应急处理、全量&增量数据同步逻辑等。最终能够保证数据的正确性、一致性、可用性!
一、需求分析
最近在做考拉社区内容数据由原先的CMS系统迁移到内容管理平台中,相关的搜索逻辑也有很大的改动,使用原先的ES索引库已经满足不了当前的迁移需求。此时有两种改动方案,一是在原先ES索引库基础上做全量数据入库逻辑;二是不动原先逻辑新增ES索引库,在系统迁移过程中使用双写策略,当系统稳定后通过动态配置抛弃原先ES索引库。最终我们使用了第二种方案。此种方案可以实现系统无停机动态切换、故障转移。
二、迁移设计
1、迁移方案设计
本次数据迁移主要包括三个重要时间点,预发环境、上线后、后置工作。具体时间点要做的工作如下:
预发环境: 准线上环境时需要把DDB全量数据全部迁移到ES
后置工作:本次迁移完ES后,还要调整ES数据更新方式&完全替换新数据源
三、迁移实现
数据迁移实现主要有全量迁移脚本、kafka消息推送消息设计。
1、全量脚本设计
全量脚本存在两个版本的设计,第一个版本主要考虑点在于系统之间进行解耦,所以在A工程只做全量获取id功能,然后通过kafka消息形式推送出去,B工程接收消息调用RPC接口拉取id对应的ES数据。基本逻辑代码如下,此种方式在测试环境跑数据后发现存在两个问题。
while(true){ //A工程推送消息
//分页获取数据,获取discussionIdList
CollectionUtils.collect(contentChannels, new Transformer() {
@Override
public Long transform(ContentChannel contentChannel) {
return Longs.toLong(contentChannel.getArticleId());
}
}, discussionIdList);
//封装kafka消息
ArticleAsyncDataVo dataObject = new ArticleAsyncDataVo();
dataObject.setDiscussionIdList(discussionIdList);
//省略发送kafka消息逻辑
LoggerConstant.runLogger.info("sendDirect dbUpdateTime={}, page={}, dataObject={}", dbUpdateTime, page, JSON.toJSONString(dataObject));
if(ObjectUtils.isEmpty(contentChannels) || (!ObjectUtils.isEmpty(contentChannels) && contentChannels.size() < pageSize)){
break;
}
page++;
}
@Override
public void onMessage(ConsumerRecord consumerRecord) { //B工程接收消息
if(ObjectUtils.isEmpty(consumerRecord) || ObjectUtils.isEmpty(consumerRecord.value())){
return;
}
try {
ArticleAsyncDataVo articleAsyncDataVo = JSON.parseObject(consumerRecord.value(), ArticleAsyncDataVo.class);
indexNewService.batchIndex(articleAsyncDataVo.getDiscussionIdList());
} catch (Exception e) {
LoggerConstant.alarmLogger.error("Parse Kafka Msg Occur Error, Data={}", consumerRecord.value());
throw new ApiException("Parse Error!");
}
}
<bean id="consumerProperties5" >
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${kafka.bootstrap.servers3}"/>
<entry key="group.id" value="${kafka.group.id}" /> <!-- 消费组 -->
<entry key="enable.auto.commit" value="true"/>
<entry key="auto.commit.interval.ms" value="1000"/>
<entry key="session.timeout.ms" value="15000"/>
<entry key="max.poll.records" value="1000"/> <!-- 批量取队列信息,最多取1000消息 -->
<entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
<entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
</map>
</constructor-arg>
</bean>
这个问题有两种解决方式,第一种是调整
auto.commit.interval.ms数值,但是治标不治本。还有种更合理的方式,设置
enable.auto.commit为false。Spring-kafka的offset提交机制,可以保证了在一批消息没有完成消费的情况下,也能提交offset,从而避免了完全提交不上而导致永远重复消费的问题。
具体Spring-kafka设计的
enable.auto.commit的逻辑如下:
public void run() {
if (this.autoCommit && this.theListener instanceof ConsumerSeekAware) {
((ConsumerSeekAware) this.theListener).registerSeekCallback(this);
}
this.count = 0;
this.last = System.currentTimeMillis();
if (isRunning() && this.definedPartitions != null) {
initPartitionsIfNeeded();
// we start the invoker here as there will be no rebalance calls to
// trigger it, but only if the container is not set to autocommit
// otherwise we will process records on a separate thread
if (!this.autoCommit) {
startInvoker();
}
}
long lastReceive = System.currentTimeMillis();
long lastAlertAt = lastReceive;
while (isRunning()) {
try {
if (!this.autoCommit) {
processCommits();
}
processSeeks(); //内部基本逻辑:ack确认进入Spring管理的阻塞队列的消息
if (this.logger.isTraceEnabled()) {
this.logger.trace("Polling (paused=" + this.paused + ")...");
}
......
private void processCommits() {
handleAcks();
this.count += this.acks.size();
long now;
AckMode ackMode = this.containerProperties.getAckMode();
if (!this.isManualImmediateAck) {
if (!this.isManualAck) {
updatePendingOffsets();
}
boolean countExceeded = this.count >= this.containerProperties.getAckCount();
if (this.isManualAck || this.isBatchAck || this.isRecordAck
|| (ackMode.equals(AckMode.COUNT) && countExceeded)) {
if (this.logger.isDebugEnabled() && ackMode.equals(AckMode.COUNT)) {
this.logger.debug("Committing in AckMode.COUNT because count " + this.count
+ " exceeds configured limit of " + this.containerProperties.getAckCount());
}
commitIfNecessary();
this.count = 0;
}
......
如果auto.commit关掉的话,spring-kafka会启动一个invoker,这个invoker的目的就是启动一个线程去消费数据,数据来源于spring-kafka自己创建的阻塞队列。然后会进入一个循环,从源代码中可以看到如果auto.commit被关掉的话, 他会先把之前处理过的数据先进行提交offset,然后再去从kafka里面取数据。上面创建的线程去消费阻塞队列里面的消息,并且如果阻塞队列满了导致取到的数据塞不进去的话,spring-kafka会调用kafka的pause方法,则consumer会停止从kafka里面继续再拿数据。
while(true){ //省略异常处理代码逻辑
//查询数据
List discussionIdList = Lists.newArrayList();
CollectionUtils.collect(discussionList, new Transformer() {
@Override
public Long transform(Discussion discussion) {
return discussion.getId();
}
}, discussionIdList);
if(ObjectUtils.isEmpty(discussionIdList)){
break;
}
int total = discussionIdList.size();
int num = total/DEFAULT_SIZE;
if(total%DEFAULT_SIZE != 0){
num = num+1;
}
if(num > 0){
List communityContentArticleTypeVos = new CopyOnWriteArrayList<>();
executorService = Executors.newFixedThreadPool(num);
ListPageUtils pager = new ListPageUtils<>(discussionIdList, DEFAULT_SIZE);
CountDownLatch waitCountDownLatch = new CountDownLatch(num);
for(int i=1;i<=num;i++){
if(!ObjectUtils.isEmpty(pager.getPagedList(i))){
if(ObjectUtils.isEmpty(pager.getData())){
executorService.shutdown();
}
executorService.execute(new InitArticleThread(pager.getPagedList(i),communityContentArticleTypeVos,waitCountDownLatch));
}
}
waitCountDownLatch.await();
Map result = list2map(communityContentArticleTypeVos);
if(!ObjectUtils.isEmpty(result)){
//调用RPC直接批量更新封装好的ES数据
}
}
//省略其他业务逻辑
page++;
}
通过线程池和批量更新逻辑处理,大概只需要30分钟就可以把全量数据更新至ES。
2、Kafka推送消息机制设计
ES数据源依赖于数据库数据,业务代码里需要保证数据库数据事务已经提交后才可以发送kafka消息且为了保证发送消息跟代码耦合使用自定义注解和切面实现。具体实现逻辑如下:
具体数据库更新点加上注解标识
@SyncKafkaDiscussion()
@Transactional
public boolean userEdit(Long id, Long contentId, Long effectiveTime, Long editTime,
//省略具体业务代码
//通过threadlocal传递数据,在切面拿到数据发送消息
RequestHolder.setDiscussionIdList(Collections.singletonList(discussionId));
return result;
}
切面层保证事务提交成功后,发送消息至kafka供B工程消费
@Aspect
@Component
public class SyncKafkaDiscussionMsgAspect implements Ordered{
@Resource
private ArticleAsyncKafkaMQPushService articleAsyncKafkaMQPushService;
@Before(value = "pointcut()")
public void initData(JoinPoint joinPoint){
RequestHolder.init();
LoggerConstant.runLogger.info("clearData, methodName={}, time={}", joinPoint.getSignature().getName(),
new Date(RequestHolder.getTime()));
}
@After(value = "pointcut() && @annotation(syncKafkaDiscussion)")
public void syncEsData(final JoinPoint joinPoint, final SyncKafkaDiscussion syncKafkaDiscussion) throws Exception{
final List discussionIdList = RequestHolder.getDiscussionIdList();
final boolean syncInTime = syncKafkaDiscussion.syncInTime();
if(!ObjectUtils.isEmpty(discussionIdList) && syncInTime){
TransactionUtil.executeAfterCommit(new TransactionUtil.MethodWrapper() { //确保事务提交后才进行发送消息
@Override
public void run() {
ArticleAsyncDataVo dataObject = new ArticleAsyncDataVo();
dataObject.setDiscussionIdList(discussionIdList);
articleAsyncKafkaMQPushService.sendDirect(dataObject);
}
}, true);
}
//省略部分业务逻辑
RequestHolder.removeAll();
}
@Override
public int getOrder() {
return BigDecimal.TEN.intValue();
}
这里需要注意DB事务和Aspect的优先级。在切入点前的操作,按order的值由小到大执行,在切入点后的操作,按order的值由大到小执行。为了保证发送消息在事务后提交,需要保证order数值比DB事务数值大!
数据迁移是一份比较细致的工作,不能容忍有半点脏数据、遗漏数据。在做数据迁移之前一定要设计地足够细致,尽可能把所有故障点都要考虑在内,并且存在故障点的时候如何能够快速故障切换。个人觉得数据迁移主要保证三点。
1、代码耦合性,因为数据迁移代码相对比较独立,尽量不要跟其他业务代码做非常强的耦合,相反应该做的很独立,而且希望增加以下动态配置来处理各种异常情况。
2、性能问题,比如做全量数据迁移时,由于数据量比较大,尽量采用多线程(甚至多机器),批量处理逻辑来提高性能。
3、核对数据,迁移完数据后核对数据也是比较繁琐的工作,可以通过开发一些自动化脚本来对数据进行核对校验提高工作效率。
本文来自网易实践者社区,经作者田躲躲授权发布。