实现原理:通过解析mysql的binlog日志来获取变更事件,解析过程利用Java开源工具OpenReplicator,Open Replicator 首先连接到MySQL(就像一个普通的MySQL Slave一样),然后接收和分析binlog,最终将分析得出的binlog events以回调的方式通知应用,所有的Event实现了BinlogEventV4接口。
binlog 格式:Databus设计为针对Row格式日志进行解析
SCN的确定:64bits组成,高32位表示binlog的文件序号,低32位代表event在binlog文件的offset,例如在 mysql-bin.000001文件中 offset为 4的scn表示为(1 << 32) | 4 = 4294967300
安装mysql数据库,本次使用mysql-5.5.56版本。
查看数据库是否开启binlog,如果binlog没有开启,可以通过set sql_log_bin=1命令来启用;如果想停用binlog,可以使用set sql_log_bin=0。
配置数据库binlog_format=ROW, show variables like ‘binlog_format‘可查看日志格式, set globle binlog_format=ROW’可设置,通过修改my.cnf文件也可以,增加或修改行binlog_format=ROW即可。
binlog_checksum设置为空,show global variables like ‘binlog_checksum’命令可查看,set binlog_checksum=none可设置。
在mysql上创建名为or_test的数据库,or_test上创建表名为person的表,定义如下:
Databus官网下载源码,下载地址https://github.com/linkedin/databus.git,我们需要用到databus目录下的databus2-example文件夹,在此基础上改造并运行,目录结构及介绍如下:
配置Relay属性文件:databus2-example-relay-pkg/conf/relay-or-person.properties的内容如下配置,包括端口号,buffer存储策略,maxScn存放地址等信息:
databus.relay.container.httpPort=11115
databus.relay.container.jmx.rmiEnabled=false
databus.relay.eventBuffer.allocationPolicy=DIRECT_MEMORY
databus.relay.eventBuffer.queuePolicy=OVERWRITE_ON_WRITE
databus.relay.eventLogReader.enabled=false
databus.relay.eventLogWriter.enabled=false
databus.relay.schemaRegistry.type=FILE_SYSTEM
databus.relay.schemaRegistry.fileSystem.schemaDir=./schemas_registry
databus.relay.eventBuffer.maxSize=1024000000
databus.relay.eventBuffer.readBufferSize=10240
databus.relay.eventBuffer.scnIndexSize=10240000
databus.relay.physicalSourcesConfigsPattern=../../databus2-example/databus2-example-relay-pkg/conf/sources-or-person.json
databus.relay.dataSources.sequenceNumbersHandler.file.scnDir=/tmp/maxScn
databus.relay.startDbPuller=true
配置被监控表的source信息:databus2-example-relay-pkg/conf/sources-or-person.json的内容如下配置,其中URI format:mysql://username/password@mysql_host[:mysql_port]/mysql_serverid/binlog_prefix,注意%2F为转义字符,用户名为root,数据库密码为123。
{
"name" : "person",
"id" : 1,
"uri" : "mysql://root%2F123@localhost:3306/1/mysql-bin",
"slowSourceQueryThreshold" : 2000,
"sources" :
[
{
"id" : 40,
"name" : "com.linkedin.events.example.or_test.Person",
"uri": "or_test.person",
"partitionFunction" : "constant:1"
}
]
}
databus2-example-relay-pkg/schemas_registry/下定义person的Avro schema文件 com.linkedin.events.example.or_test.Person.1.avsc,其中1表示版本(Databus目前没有针对mysql提供生成Avro schema文件的工具,所以只能手工编写)具体内容如下所示:
{
"name" : "Person_V1",
"doc" : "Auto-generated Avro schema for sy$person. Generated at Dec 04, 2012 05:07:05 PM PST",
"type" : "record",
"meta" : "dbFieldName=person;pk=id;",
"namespace" : "com.linkedin.events.example.or_test",
"fields" : [ {
"name" : "id",
"type" : [ "long", "null" ],
"meta" : "dbFieldName=ID;dbFieldPosition=0;"
}, {
"name" : "firstName",
"type" : [ "string", "null" ],
"meta" : "dbFieldName=FIRST_NAME;dbFieldPosition=1;"
}, {
"name" : "lastName",
"type" : [ "string", "null" ],
"meta" : "dbFieldName=LAST_NAME;dbFieldPosition=2;"
}, {
"name" : "birthDate",
"type" : [ "long", "null" ],
"meta" : "dbFieldName=BIRTH_DATE;dbFieldPosition=3;"
}, {
"name" : "deleted",
"type" : [ "string", "null" ],
"meta" : "dbFieldName=DELETED;dbFieldPosition=4;"
} ]
}
注册Avro schema到index.schemas_registry文件,databus2-example-relay-pkg/schemas_registry/index.schemas_registry文件中添加行com.linkedin.events.example.or_test.Person.1.avsc ,每定义一个Avro schema都需要添加进去,relay运行时会到此文件中查找表对应的定义的Avro schema。
配置Client属性文件:databus2-example-client-pkg/conf/client-person.properties的内容如下配置,包括端口号,buffer存储策略,checkpoint持久化等信息:
databus.relay.container.httpPort=11125
databus.relay.container.jmx.rmiEnabled=false
databus.relay.eventBuffer.allocationPolicy=DIRECT_MEMORY
databus.relay.eventBuffer.queuePolicy=BLOCK_ON_WRITE
databus.relay.schemaRegistry.type=FILE_SYSTEM
databus.relay.eventBuffer.maxSize=10240000
databus.relay.eventBuffer.readBufferSize=1024000
databus.relay.eventBuffer.scnIndexSize=1024000
databus.client.connectionDefaults.pullerRetries.initSleep=1
databus.client.checkpointPersistence.fileSystem.rootDirectory=./personclient-checkpoints
databus.client.checkpointPersistence.clearBeforeUse=false
databus.client.connectionDefaults.enablePullerMessageQueueLogging=true
databus2-example-client/src/main/java下的PersonConsumer类是消费逻辑回调代码,主要是取出每一个event后依次打印每个字段的名值对,主要代码如下:
private ConsumerCallbackResult processEvent(DbusEvent event, DbusEventDecoder eventDecoder)
{
GenericRecord decodedEvent = eventDecoder.getGenericRecord(event, null);
try {
Utf8 firstName = (Utf8)decodedEvent.get("firstName");
Utf8 lastName = (Utf8)decodedEvent.get("lastName");
Long birthDate = (Long)decodedEvent.get("birthDate");
Utf8 deleted = (Utf8)decodedEvent.get("deleted");
LOG.info("firstName: " + firstName.toString() +
", lastName: " + lastName.toString() +
", birthDate: " + birthDate +
", deleted: " + deleted.toString());
} catch (Exception e) {
LOG.error("error decoding event ", e);
return ConsumerCallbackResult.ERROR;
}
return ConsumerCallbackResult.SUCCESS;
}
databus2-example-client/src/main/java下的PersonClient类是relay的启动主类,主要是设置启动Client的配置信息,将消费者实例注册到监听器中,后续可对其进行回调,主要代码如下:
public static void main(String[] args) throws Exception
{
DatabusHttpClientImpl.Config configBuilder = new DatabusHttpClientImpl.Config();
//Try to connect to a relay on localhost
configBuilder.getRuntime().getRelay("1").setHost("localhost");
configBuilder.getRuntime().getRelay("1").setPort(11115);
configBuilder.getRuntime().getRelay("1").setSources(PERSON_SOURCE);
//Instantiate a client using command-line parameters if any
DatabusHttpClientImpl client = DatabusHttpClientImpl.createFromCli(args, configBuilder);
//register callbacks
PersonConsumer personConsumer = new PersonConsumer();
client.registerDatabusStreamListener(personConsumer, null, PERSON_SOURCE);
client.registerDatabusBootstrapListener(personConsumer, null, PERSON_SOURCE);
//fire off the Databus client
client.startAndBlock();
}
Build:Databus采用gradle进行编译,所以需要安装gradle环境,安装安成后进入databus根目录执行命令gradle -Dopen_source=true assemble即可完成build,成功后在databus根目录下生成名为build的文件夹
启动Relay:
测试:
Relay和Client启动成功后,就已经开始对person表进行数据变更捕获了,现在向person表插入一条如下记录:
databus2-example-relay-pkg/distributions/logs下的relay.log记录如下:
databus2-example-client-pkg/distributions/logs下的client.log记录如下:
可以看到已经可以抓取到改变的数据了!
遇到的问题:
需要进一步实验:
相关阅读:Databus架构分析与初步实践(for mysql)(上篇)
本文来自网易实践者社区,经作者徐和东授权发布。