勿忘初心

个人签名

530篇博客

MySQL MGR源码分析 - 从start group_replication看MGR代码框架

勿忘初心2018-10-19 18:39

此文已由作者温正湖授权网易云社区发布。

欢迎访问网易云社区,了解更多网易技术产品运营经验。


上一篇我们从方案层面讲解了MGR的成员管理和故障恢复。本篇从源码层面捋一捋,通过本篇介绍,除了能够了解如何将一个节点加入到group中,也可以了解在这个过程中,节点是如何进行分布式通信模块进行初始化,上层如何响应底层上发的消息,事务进入MGR后是如何通过pipeline一步步执行的,等待。


start group_replication处理逻辑

该命令用于将一个正在运行且已经install了group_replication.so插件的mysqld加入到group中。又可以根据是否设置了group_replication_bootstrap_group来分为创建一个新的group或加入一个已有的group。start group_replication对应的实现函数为plugin_group_replication_start,其做些基本的检查后交由initialize_plugin_and_join()来执行节点加入group的任务,下图为该函数的主要执行流程。

下面我们注意对其9个阶段进行逐一分析:


1、调用Gcs_operations::initialize()来创建gcs_interface对象并设置gcs日志系统;


2、根据用户配置的MGR参数设置GCS,其中最重要的逻辑是调用Gcs_xcom_interface::initialize_xcom()来进行MGR目前唯一执行的gcs实现xcom的初始化,该函数详见后续小节描述。


3、设置本节点的MGR成员管理对象,包括本节点信息维护对象local_member_info和集群信息维护对象group_member_mgr,此时group_member_mgr还只有本地节点的信息。


4、initialize_recovery_module()函数初始化节点的MGR recovery模块,包括创建Recovery_module对象,为其设置节点上线策略(认证完上线还是回放完上线),设置全局故障恢复节点donor节点连接失败重试的次数和失败重连的时间间隔。Recovery_module模块在start group_replication命令返回后,负责将节点从恢复中变为在线状态。


5、接下来调用configure_and_start_applier_module()函数配置和启动节点的MGR Applier_module模块,该模块是MGR的核心模块之一,用于对通过xcom进入该节点的包括用户事务数据包(package)在内的众多行为(action)/事件(event)进行分发、认证和执行。


首先创建一个Applier_module对象,并将其赋予Recovery_module模块;接着对Applier_module对象进行初始化,包括创建用于缓存等待处理的消息的队列incoming,初始化模块的处理机制pipeline;最后启动一个新的线程,并制定处理函数applier_thread_handle(),用于具体处理这些入队的消息。MGR使用pipeline管道的方式处理每个消息,pipeline设置和初始化详见后续小节分析。


6、在单主模式下,调用initialize_asynchronous_channels_observer()进行节点异步复制通道的操作行为监听。主要监听行为是io thread和sql thread启动操作。原因是在单主模式下,只有primary节点能接收外部写请求,所以需要禁止MGR单主模式的secondary节点启动非MGR的异步复制和回放Relay log的通道。而多主模式下各节点均可写,不存在该限制。


7、下一步initialize_group_partition_handler()用于初始化MGR的网络分区处理对象group_partition_handler。应对MGR中的节点因为网络问题导致相互间无法进行正常的通信。当有节点无法连上其他节点时,就会启动handler->partition_thread_handler()网络分区处理线程,若在超时时间内()网络无法恢复,则会进行网络分区处理,包括退出group并rollback已进入MGR的事务等。


8、调用start_group_communication()启动节点与group中其他节点的通信是及其重要的步骤。直接决定了该节点是否能够正常加入group。


该函数首先设置节点的自增列的自增区间和自增初始值。然后初始化group集群视图变化状态设置监听对象view_change_notifier,该notifier会设置和监控视图变化情况并作出相应处理,初始化非常重要的events_handler,该handler是向gcs注册的事件回调接口,用于处理gcs(paxos)返回的各种消息/事件,包括用户事务数据包、group变化的控制包等。接着,view_change_notifier设置view_changing变量,表示正式进入视图切换阶段(当view_changing重新变为false时,start group_replication即可返回)。最后调用Gcs_operations::join()函数执行将该节点加入group的操作,该函数详见后续小节描述。


9、以上就是start group_replication命令所要执行的所有操作,在完成这些操作后,initialize_plugin_and_join()调用view_change_notifier->wait_for_view_modification()来等待view_changing变为false,之命令返回。若返回值非零,则意味着加入失败,需要进行操作回滚。若返回值为零,则节点进入故障恢复流程,最终将节点设置为在线状态。


Gcs_xcom_interface::initialize_xcom()

initialize_xcom()用于初始化节点的xcom实例。但此时该实例仍未运行和加入的group。


如上图所示,该函数主要是初始化xcom实例的group信息及相关对象,设置将该xcom加入group所需的种子节点,注册各种xcom消息的回调处理函数,创建xcom的各种操作接口xcom_proxy,最后创建Gcs_xcom_engine对象,并调用Gcs_xcom_engine::initialize()来启动节点的xcom处理引擎,xcom引擎通过独立线程的处理函数process_notification_thread来调用2.2.1.3.3设置的回调函数处理各种xcom消息。process_notification_thread的处理逻辑如下所示:


paxos消息接收和执行

上面我们说了m_notification_queue消息的执行框架,那么这些消息如何加入m_notification_queue队列呢,详见下图例子:


deliver_to_app是MGR最底层paxos将已达成一致性的消息发送给上层的主要接口之一,其调用在该类消息处理函数xcom_receive_data()进行处理,处理操作很简单,经创建一个Data_notification对象,参数中指定了具体的消息处理函数,最后将其push到我们上面所述的m_notification_queue队列。我们进一步看看do_cb_xcom_receive_data()如何处理该消息:


首先,将从paxos接收到的消息进行pipeline处理(注意跟Applier_module的pipeline相区分),目前pipeline仅可注册一个stage,即Gcs_message_stage_lz4,用于对paxos消息进行压缩。可通过参数group_replication_compression_threshold来设置进行消息压缩的阈值。

接着,若消息类型为进行视图切换时产生的成员状态交换消息(CT_INTERNAL_STATE_EXCHANGE),调用process_control_message()函数进行处理。否则,判断是否正在进行视图切换,若是,则暂时buffer住这些消息,等到视图完成切换后(即成员加入group后)再处理。否则,调用上层为每种消息注册的回调函数on_message_received()进行处理。


下面,我们简要介绍process_control_message()和on_message_received():

on_view_changed是各个节点对视图切换的响应,是视图切换关键性的函数,包含了对加入节点、退出节点的处理,处理完成后还会判断是否需要发起选主,如果是节点加群那么还会调用故障恢复模块recovery_module函数start_recovery()。其处理流程如下:

on_message_received()是个消息分发枢纽,其操作如下:


如上图所示,根据消息类型的不同,分别调用不同的处理函数,其中对于单主模式新primary完成relay log回放后的消息和事务性消息(事务和视图数据包)均进入到applier_module的incoming队列按顺序执行。其他三种消息分别是用于节点间进行事务执行状态周期性同步的CT_PIPELINE_STATS_MEMBER_MESSAGE消息,该消息是performance_schema.replication_group_member_stats和replication_group_members的主要数据来源。会触发节点进入流控模式。CT_RECOVERY_MESSAGE消息用于在节点完成故障恢复后将自己设置为在线状态前给group发送的广播消息,确保其他节点及时感知节点状态变化。CT_CERTIFICATION_MESSAGE是周期性发送的,用于对applier_module模块的冲突检测数据库进行无用信息purge的消息。


Applier_module的pipeline实现

MGR使用pipeline管道的方式处理每个消息,pipeline设置和初始化详见第5小节分析。目前官方设置了3个pipeline处理器(handler),分别是CATALOGING_HANDLER、CERTIFICATION_HANDLER和SQL_THREAD_APPLICATION_HANDLER,其中CATALOGING_HANDLER用于待处理的事件进行分类,主要通过判断事件类型是否为binary_log::TRANSACTION_CONTEXT_EVENT来设置事务开始的标志,并判断是否为SINGLE_VIEW_EVENT来标识处理视图变更事件。CERTIFICATION_HANDLER是最核心的pipeline处理器,在各个节点采用相同的规则独立进行事务认证,包括认证模块初始化和销毁、事务快照版本解析、对事务进行冲突检测和在视图变更的时候初始化冲突检测数据库等。


SQL_THREAD_APPLICATION_HANDLER用于并行回放通过了认证的异地事务,更新本地的数据库版本。3个pipeline的汇总信息如下图所示:



Gcs_operations::join()

该接口用于在start group_replication时将节点加入group。具体流程如下:


首先进行通信(gcs_communication)和控制(gcs_control)接口初始化,确保从gcs层发送上来的各种消息能够被正确执行。最后调用Gcs_xcom_control::join()来执行最终的节点入群操作,该函数判断是创建group还是加入group场景,并在Gcs_xcom_control::do_join()中进行对应处理,do_join在Gcs_xcom_interface::initialize_xcom()的基础上完成最后的加入操作,流程如下:


先创建m_xcom_thread线程,通过proxy->xcom_init()进行最后的初始化,让xcom实例进入运行状态;在确保xcom实例通信正常的情况下,分别调用proxy->xcom_client_boot()或proxy->xcom_client_add_node()创建一个group或加入已存在的group。


梳理总结

上面就是start group_replication命令的全部操作。经过上面介绍后,还有2个大疑问没有解释清楚:


1、该命令在什么时候返回?

上面我们只是说在view_changing为false的时候返回,那么该变量什么时候会变为false呢,首先需要明确的是在前述的Plugin_gcs_events_handler::on_view_changed()函数中设置,对于执行该命令的节点,会在Plugin_gcs_events_handler::handle_joining_members()函数中调用view_change_notifier->end_view_modification()设置,并进行广播:



2、在介绍process_control_message()时我们知道该函数是接收了CT_INTERNAL_STATE_EXCHANGE消息后调用执行的。那么该消息是在什么场景下发出的?

下面我们从源头开始进行简单介绍:

detector_task是paxos层的一个定时任务机制,用于发现group的成员变化情况,并进行相应的处理。对于我们所述的场景,有一个新的节点加入paxos group中,所以会触发该任务发送一个view_msg消息。最终调用do_cb_xcom_receive_global_view进行处理。具体逻辑为:

该函数会获取自己节点的状态信息get_exchangeable_data(),并使用Gcs_xcom_state_exchange::broadcast_state()来广播一个CT_INTERNAL_STATE_EXCHANGE。



网易云免费体验馆,0成本体验20+款云产品! 

更多网易技术、产品、运营经验分享请点击


相关文章:
【推荐】 一次活动引发的血案
【推荐】 消息中间件客户端消费控制实践