网易考拉内容管理平台全量ES数据迁移总结

达芬奇密码2018-07-19 11:39

在软件的生命周期中,经常遇到由于业务发展,系统迭代更新带来的数据迁移工作;或者软件系统本身的重构抑或其他因素,几乎都需要对数据进行迁移。数据迁移主要包含数据迁移前的准备、数据迁移的实施和数据迁移后的校验。我们在进行数据迁移实施过程中哪一个环节都要考虑周全,不然很容易出现线上故障。本次系统迁移的最大难点和风险点在于无停机状态下实现ES索引库无缝迁移、应急处理、全量&增量数据同步逻辑等。最终能够保证数据的正确性、一致性、可用性!

     一、需求分析

最近在做考拉社区内容数据由原先的CMS系统迁移到内容管理平台中,相关的搜索逻辑也有很大的改动,使用原先的ES索引库已经满足不了当前的迁移需求。此时有两种改动方案,一是在原先ES索引库基础上做全量数据入库逻辑;二是不动原先逻辑新增ES索引库,在系统迁移过程中使用双写策略,当系统稳定后通过动态配置抛弃原先ES索引库。最终我们使用了第二种方案。此种方案可以实现系统无停机动态切换、故障转移。

    二、迁移设计

   1、迁移方案设计

     

      本次数据迁移主要包括三个重要时间点,预发环境、上线后、后置工作。具体时间点要做的工作如下:

     预发环境: 准线上环境时需要把DDB全量数据全部迁移到ES

  • 基于DDB分页全量同步ES数据
  • 修改ES更新方式,新ES索引库新增搜索字段,需要修改原先更新逻辑
  • 预发环境开关灵活配置,开关的作用主要起到故障转移功能
  • 预留后台同步数据接口,主要可灵活处理一些异常数据
      线 后:仅谈此业务,线上CMS平台正常使用,这样会导致一些“脏数据”,预发环境验证OK后,需要对这段时间做数据增量处理
  • 根据DDB预留字段db_update_time获取预发环境下更新的id,增量脚步同步ES数据
  • 预发环境开关灵活配置,开关的作用主要起到故障转移功能  

     后置工作:本次迁移完ES后,还要调整ES数据更新方式&完全替换新数据源

  • 目前线上新、老ES数据源采用双写策略,新ES数据源部分数据更新依赖老ES数据源更新策略(binLog),考虑binLog存在更新版本冲突、数据库迁移日志丢失等问题,后续会通过业务级别发送kafka消息替换binLog
  • 关闭更新老ES数据源开关,数据更新&查询全部走新ES数据源
    2、全量、增量更新逻辑
考虑到数据量,目前数据初始化使用全量和增量两种方式实现。全量更新在预发环境初始化(当然全量操作也可以在上线时触发,但是由于比较耗时影响运营使用平台),增量逻辑处理预发环境中更新的id(通过预留接口和增量脚本几分钟可以搞定)可以实现对系统使用者无感知地平滑迁移。   
    3、故障风险点
考虑迁移数据时存在不可控因素,故障转移采用灵活动态开关实现,一旦发现脏数据或者集群环境问题立马修改开关切换老ES数据源或者屏蔽服务调用,继续使用CMS后台。目前上线前存在以下故障风险点:     
  • 由于预发环境误操作等情况,新ES数据源可能拉取脏数据
  • 本次需求存在DB变更,新ES数据源依赖于DB数据源。DB数据迁移失败或者数据异常会影响新ES数据源数据
  • 新ES数据源更新方式依赖多种渠道(binLog、Tmc、业务主动kafka消息触发等),新增的kafka消息触发时机业务层面存在遗漏,会导致ES数据丢失影响线上业务
  • 全量、增量更新ES数据源出现异常    

  三、迁移实现

数据迁移实现主要有全量迁移脚本、kafka消息推送消息设计。

     1、全量脚本设计

     全量脚本存在两个版本的设计,第一个版本主要考虑点在于系统之间进行解耦,所以在A工程只做全量获取id功能,然后通过kafka消息形式推送出去,B工程接收消息调用RPC接口拉取id对应的ES数据。基本逻辑代码如下,此种方式在测试环境跑数据后发现存在两个问题。

            while(true){ //A工程推送消息

            	//分页获取数据,获取discussionIdList
                CollectionUtils.collect(contentChannels, new Transformer() {
                    @Override
                    public Long transform(ContentChannel contentChannel) {
                        return Longs.toLong(contentChannel.getArticleId());
                    }
                }, discussionIdList);

                //封装kafka消息
                ArticleAsyncDataVo dataObject = new ArticleAsyncDataVo();
                dataObject.setDiscussionIdList(discussionIdList);
                //省略发送kafka消息逻辑

                LoggerConstant.runLogger.info("sendDirect dbUpdateTime={}, page={}, dataObject={}", dbUpdateTime, page, JSON.toJSONString(dataObject));
                if(ObjectUtils.isEmpty(contentChannels) || (!ObjectUtils.isEmpty(contentChannels) && contentChannels.size() < pageSize)){
                    break;
                }
                page++;
            }
    @Override
    public void onMessage(ConsumerRecord consumerRecord) {   //B工程接收消息

        if(ObjectUtils.isEmpty(consumerRecord) || ObjectUtils.isEmpty(consumerRecord.value())){
            return;
        }
        try {
            ArticleAsyncDataVo articleAsyncDataVo = JSON.parseObject(consumerRecord.value(), ArticleAsyncDataVo.class);
            indexNewService.batchIndex(articleAsyncDataVo.getDiscussionIdList());
        } catch (Exception e) {
            LoggerConstant.alarmLogger.error("Parse Kafka Msg Occur Error, Data={}", consumerRecord.value());
            throw new ApiException("Parse Error!");
        }
    }
问题1:kafka消费能力问题导致消息无ACK一直在队列里面重复消费。每次分页取1000条数据封装一条消息发送给B工程消费,但是消费一条消息时长明显大于15000ms,消息也没有ACK机制,导致消息一直不停在消费。当时kafka系统配置如下:         
<bean id="consumerProperties5" >
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="${kafka.bootstrap.servers3}"/>
                <entry key="group.id" value="${kafka.group.id}" />  <!-- 消费组 -->
                <entry key="enable.auto.commit" value="true"/>
                <entry key="auto.commit.interval.ms" value="1000"/>
                <entry key="session.timeout.ms" value="15000"/>
                <entry key="max.poll.records" value="1000"/>  <!-- 批量取队列信息,最多取1000消息 -->
                <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
                <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
            </map>
        </constructor-arg>
</bean>
这个问题有两种解决方式,第一种是调整 auto.commit.interval.ms数值,但是治标不治本。还有种更合理的方式,设置 enable.auto.commit为false。Spring-kafka的offset提交机制,可以保证了在一批消息没有完成消费的情况下,也能提交offset,从而避免了完全提交不上而导致永远重复消费的问题。 具体Spring-kafka设计的 enable.auto.commit的逻辑如下:
                   public void run() {
			if (this.autoCommit && this.theListener instanceof ConsumerSeekAware) {
				((ConsumerSeekAware) this.theListener).registerSeekCallback(this);
			}
			this.count = 0;
			this.last = System.currentTimeMillis();
			if (isRunning() && this.definedPartitions != null) {
				initPartitionsIfNeeded();
				// we start the invoker here as there will be no rebalance calls to
				// trigger it, but only if the container is not set to autocommit
				// otherwise we will process records on a separate thread
				if (!this.autoCommit) {
					startInvoker();
				}
			}
			long lastReceive = System.currentTimeMillis();
			long lastAlertAt = lastReceive;
			while (isRunning()) {
				try {
					if (!this.autoCommit) {
					     processCommits();
					}
					processSeeks(); //内部基本逻辑:ack确认进入Spring管理的阻塞队列的消息
					if (this.logger.isTraceEnabled()) {
						this.logger.trace("Polling (paused=" + this.paused + ")...");
					}
                       ......
                private void processCommits() {
			handleAcks();
			this.count += this.acks.size();
			long now;
			AckMode ackMode = this.containerProperties.getAckMode();
			if (!this.isManualImmediateAck) {
				if (!this.isManualAck) {
				     updatePendingOffsets();
				}
				boolean countExceeded = this.count >= this.containerProperties.getAckCount();
				if (this.isManualAck || this.isBatchAck || this.isRecordAck
						|| (ackMode.equals(AckMode.COUNT) && countExceeded)) {
					if (this.logger.isDebugEnabled() && ackMode.equals(AckMode.COUNT)) {
						this.logger.debug("Committing in AckMode.COUNT because count " + this.count
								+ " exceeds configured limit of " + this.containerProperties.getAckCount());
					}
					commitIfNecessary();
					this.count = 0;
				}
                   ......
如果auto.commit关掉的话,spring-kafka会启动一个invoker,这个invoker的目的就是启动一个线程去消费数据,数据来源于spring-kafka自己创建的阻塞队列。然后会进入一个循环,从源代码中可以看到如果auto.commit被关掉的话, 他会先把之前处理过的数据先进行提交offset,然后再去从kafka里面取数据。上面创建的线程去消费阻塞队列里面的消息,并且如果阻塞队列满了导致取到的数据塞不进去的话,spring-kafka会调用kafka的pause方法,则consumer会停止从kafka里面继续再拿数据。
问题2:通过这种方式更新非常慢,通过查询日志大概1分钟1千条数据,如果按照目前的数据源估计要一天,这个时间是无法容忍的。所以全量脚本第二个版本为了兼顾性能,抛弃了工程耦合度,把B工程拉取拼装ES数据的方式嫁接到A工程里。具体逻辑如下:
            while(true){ //省略异常处理代码逻辑
                //查询数据
                List discussionIdList = Lists.newArrayList();
                CollectionUtils.collect(discussionList, new Transformer() {
                    @Override
                    public Long transform(Discussion discussion) {
                        return discussion.getId();
                    }
                }, discussionIdList);

                if(ObjectUtils.isEmpty(discussionIdList)){
                    break;
                }
                int total = discussionIdList.size();
                int num = total/DEFAULT_SIZE;
                if(total%DEFAULT_SIZE != 0){
                    num = num+1;
                }

                if(num > 0){
                    List communityContentArticleTypeVos = new CopyOnWriteArrayList<>();
                    executorService = Executors.newFixedThreadPool(num);

                    ListPageUtils pager = new ListPageUtils<>(discussionIdList, DEFAULT_SIZE);
                    CountDownLatch waitCountDownLatch = new CountDownLatch(num);

                    for(int i=1;i<=num;i++){
                        if(!ObjectUtils.isEmpty(pager.getPagedList(i))){

                            if(ObjectUtils.isEmpty(pager.getData())){
                                executorService.shutdown();
                            }
                            executorService.execute(new InitArticleThread(pager.getPagedList(i),communityContentArticleTypeVos,waitCountDownLatch));

                        }
                    }

                    waitCountDownLatch.await();

                    Map result = list2map(communityContentArticleTypeVos);
                    if(!ObjectUtils.isEmpty(result)){
                      //调用RPC直接批量更新封装好的ES数据   
                    }

                }
               //省略其他业务逻辑
                page++;
            }  

通过线程池和批量更新逻辑处理,大概只需要30分钟就可以把全量数据更新至ES。

2、Kafka推送消息机制设计

ES数据源依赖于数据库数据,业务代码里需要保证数据库数据事务已经提交后才可以发送kafka消息且为了保证发送消息跟代码耦合使用自定义注解和切面实现。具体实现逻辑如下:

具体数据库更新点加上注解标识

@SyncKafkaDiscussion()
@Transactional
 public boolean userEdit(Long id, Long contentId, Long effectiveTime, Long editTime,
        //省略具体业务代码               
        //通过threadlocal传递数据,在切面拿到数据发送消息
        RequestHolder.setDiscussionIdList(Collections.singletonList(discussionId));     
        return result;
}
切面层保证事务提交成功后,发送消息至kafka供B工程消费
@Aspect
@Component
public class SyncKafkaDiscussionMsgAspect implements Ordered{

    @Resource
    private ArticleAsyncKafkaMQPushService articleAsyncKafkaMQPushService;

    @Before(value = "pointcut()")
    public void initData(JoinPoint joinPoint){

        RequestHolder.init();
        LoggerConstant.runLogger.info("clearData, methodName={}, time={}", joinPoint.getSignature().getName(),
                new Date(RequestHolder.getTime()));

    }

    @After(value = "pointcut() && @annotation(syncKafkaDiscussion)")
    public void syncEsData(final JoinPoint joinPoint, final SyncKafkaDiscussion syncKafkaDiscussion) throws Exception{

        final List discussionIdList = RequestHolder.getDiscussionIdList();
        final boolean syncInTime = syncKafkaDiscussion.syncInTime();

        if(!ObjectUtils.isEmpty(discussionIdList) && syncInTime){

            TransactionUtil.executeAfterCommit(new TransactionUtil.MethodWrapper() { //确保事务提交后才进行发送消息
                @Override
                public void run() {
                    ArticleAsyncDataVo dataObject = new ArticleAsyncDataVo();
                    dataObject.setDiscussionIdList(discussionIdList);
                    articleAsyncKafkaMQPushService.sendDirect(dataObject);   
                }
            }, true);

        }
        //省略部分业务逻辑
        RequestHolder.removeAll();
    }

    @Override
    public int getOrder() {
        return BigDecimal.TEN.intValue();
    }
这里需要注意DB事务和Aspect的优先级。在切入点前的操作,按order的值由小到大执行,在切入点后的操作,按order的值由大到小执行。为了保证发送消息在事务后提交,需要保证order数值比DB事务数值大!
    四、迁移总结

数据迁移是一份比较细致的工作,不能容忍有半点脏数据、遗漏数据。在做数据迁移之前一定要设计地足够细致,尽可能把所有故障点都要考虑在内,并且存在故障点的时候如何能够快速故障切换。个人觉得数据迁移主要保证三点。

       1、代码耦合性,因为数据迁移代码相对比较独立,尽量不要跟其他业务代码做非常强的耦合,相反应该做的很独立,而且希望增加以下动态配置来处理各种异常情况。

       2、性能问题,比如做全量数据迁移时,由于数据量比较大,尽量采用多线程(甚至多机器),批量处理逻辑来提高性能。

       3、核对数据,迁移完数据后核对数据也是比较繁琐的工作,可以通过开发一些自动化脚本来对数据进行核对校验提高工作效率。

本文来自网易实践者社区,经作者田躲躲授权发布。