HBase数据迁移实战

达芬奇密码2018-07-18 13:36

最近为产品做了一次HBase的数据迁移操作,学习了一些相关的技术和实现方案。现整理出来,作为今后HBase数据迁移的参考。如有纰漏,欢迎指正。

1. 前期准备

1.1 确认集群使用的版本

  源HBase集群(以下称旧集群)和目的HBase集群(以下称新集群)的版本可能并不是一致的,特别是其底层所使用的HDFS版本信息。譬如这样一个数据迁移场景:业务希望从低版本的HBase集群(0.94.x)迁移到当前稳定的HBase集群(1.2.x),因为新版本的HBase有新特性,Bug更少,稳定性和可运维行更优。理论上新版本会兼容老版本的API,但是如果彼此版本差距过大,可能会出现HDFS RPC版本不一致(一般来说是Protobuf的版本),那么集群之间的迁移就会因为彼此与对方的NameNode无法进行通信而无法进行。 这种情况下,需要先考虑升级低版本的HDFS

1.2 确认集群是否开启Kerberos认证

  这里会有三种可能情况:一是都未开启认证,二是都开启认证,三是一个开了,另一个没开。前两种情况按照正常的认证(或非认证)的配置操作即可,第三种情况则需要在开启了认证的集群上开启 ipc.client.fallback-to-simple-auth-allowed 参数,意即在使用Kerberos的方式访问非Kerberos的集群时,系统自动转换成简单认证,否则会出现认证问题。Kerberos的配置和使用本文不做展开。

1.3 确认操作账号的读写权限问题

  要在不同的HBase集群之间做数据迁移,必然要涉及到不同集群的读写权限问题。HBase使用ACL来管理不同数据表的读写权限,在开启了Kerberos认证的环境下,还需要验证Kerberos;而HBase使用的HDFS自身也有一套类似的权限管理规则,当两个集群配置不同时(如部署账号不一致),极易出现冲突。在迁移前需要确认两个集群的管理员账号(HDFS和HBase账号)是否一致,如果不一致,需要开通权限。

1.4 开启YARN服务

  数据迁移任务本质上是一个MapRedcue任务,故需要在一个集群上开启YARN服务。如何选择在哪个集群上开呢?建议是在新集群上开,因为旧集群上可能还需要继续跑线上业务,在上面起大量Map任务并把数据远程写入到新集群,会对线上业务带来较大的性能影响;而新集群较大可能是一个独立集群,尚没有业务运行,在其上运行Map任务通过网络从旧集群中拉数据到本地写入,性价比更高,且对线上业务的侵入性更低。

  在HBase集群上配置YARN服务可以查阅其安装部署文档,这里不做展开。

1.5 确认数据迁移的SLA

  数据迁移是否是在线迁移,即业务不能中断。若业务允许做离线迁移,可以先将该表Disable后再做迁移,然后在新集群上重新clone成新表即可;但若需要在线进行迁移,则需要提前新集群上生成对应的HBase表,开启ACL权限等操作,并让业务开启数据双写,确保两个集群的数据在迁移时刻之后的数据是一致的。因为数据迁移和后续的数据合并耗时都很长,如果不开启双写,是无法达到数据一致性要求的。所以大多数情况下,业务都是要求数据迁移以在线方式进行。

1.6 源集群开启Snapshot

  HBase的Snapshot是从0.94.6之后才引入的特性,开启Snapshot特性需要开启 hbase.snapshot.enabled (默认已开启)。如果没有开启该特性,则需要重启服务以开启该特性;如果版本过低,就只能使用其他对业务影响较大的CopyTable/ExportTable(需要Disable表)才能操作。

2. 生成HBase表和Region

  完成前期的验证和准备工作后,就可以在新集群中创建待迁移的目的表和域(以下称Region)了。 因为迁移过程中业务需要开启双写,所以目的表结构必须和源表是一致;同时源数据表可能已经存在多个Region了,那么目的表也必须提前规划好这些Region,以免双写期间出现Region数量不足出现热点或者Region内文件数过多频繁Compact导致线上业务出现性能问题。 这里详细说明下如何正确创建一个带有多个Region的表。

2.1 使用RegionSplitter生成表

  如果新建一个自带多个Region的表,可以使用以下命令:

例1. 生成一个表 t1, 有30region且表有一个列族"d",则使用

bin/hbase org.apache.hadoop.hbase.util.RegionSplitter t1 UniformSplit -c 30 -f d 

例2. 生成一个表 t2, 有10个region,有两个列族 d1,d2, 其起始rowkey是'0'

bin/hbase org.apache.hadoop.hbase.util.RegionSplitter t2 UniformSplit -c 10  -f d1:d2 --firstrow '0'

2.2 使用HBase Shell生成表

  使用HBase shell中的create命令也可以直接生成多个Regions,前提是必须要指定split keys

例3. 生成一个表 t3,按照'10','20','30','40'为Regions的split keys

create 't3', 'f1', SPLITS => ['10', '20', '30', '40']

整个表划分成5个Region,其起始和结束key分别是 ['0','10'],['10','20'],['20','30'],['30','40'],['40',-]

2.3 将已有的表重新切分或合并

  如果一个表的Region范围过大,可以使用split来将其切分成两个子Region

 split 't1', '1'
 split '110e80fecae753e848eaaa08843a3e87', '\x001'

  同理,如果表的Region过于零散,可以使用merge_region来进行合并

  hbase> merge_region 'ENCODED_REGIONNAME', 'ENCODED_REGIONNAME'
  hbase> merge_region 'ENCODED_REGIONNAME', 'ENCODED_REGIONNAME', true

具体的操作命令用户可以自己查看HBase的相关文档来了解。

  注:在为迁移的目的表划分多个Region时,其StartEndKey最好和旧HBase集群中源表分布一致。这样后续文件加载时,不需要额外进行过多的拆分,可以节省载入时间。另外有一点需要说明,使用splitKey时,系统不支持Hex字符串。 如果想用HexString来作为自己RegionStartKey,则需要对HBase的客户端代码进行简单的修改。简单来说需要能支持Bytes.toBinaryString()方法, 但系统读取后都是直接用Byets.toBytes()方法。 具体的代码实现,可以私聊。

3. Snapshot机制与使用

  HBase快照是一份指向多个HFile文件的元数据文件。在执行snapshot命令时,不会触发任何的HBase数据操作,所以这个命令非常高效。使用快照来恢复或克隆一个表也非常快,因为它只需要引用已有的HFile文件即可。所以使用Snapshot进行数据迁移的优势就是备份和拷贝数据对线上服务没有影响,或者影响极低。其流程如下:

  • 执行snapshot命令时,Master会从自己管理的meta信息中,找到该表所在RegionServer,然后下发该命令到相应的一个或多个RegionServer(RS)
  • RS负责生成HFile文件引用,同时会获取其RegionHFile文件信息,将当前文件的大小写入到manifest文件中。
  • HFile文件是使用Append方式来添加的,所以某一个时刻的文件大小相当于记录了一份当前时刻的文件偏移量。 恢复时,系统也只会读到该偏移量的位置。 如果想再次对该表做快照,那么文件引用的偏移量会正确设置为当前HFile的大小。

Snapshot命令有一个skipFlush参数,设置为true时,会强制将RS的MemStore里内容刷到磁盘中,可能会造成RS短暂的中止服务。时间长短视内存中的数据量而定。 在这里我们不需要靠强制刷新出内存中的数据来保证数据完整性,理由如下:

  1. 如果我们使用的是停服迁移的方式,那么使用快照时内存中是没有数据写入的。
  2. 如果使用的双写迁移的方案,则快照时存在于内存中的那部分数据实际上会被双写到另外的集群中,同样不会有数据丢失的问题。

3.1 创建快照

  hbase> snapshot 'sourceTable', 'snapshotName'
  hbase> snapshot 'namespace:sourceTable', 'snapshotName', {SKIP_FLUSH => true}

3.2 查看快照

  hbase> list_snapshots
  hbase> list_snapshots 'abc.*'

3.3 克隆快照

  hbase> clone_snapshot 'snapshotName', 'tableName'
  hbase> clone_snapshot 'snapshotName', 'namespace:tableName'

生成snapshot后,可以通过hadoopShell命令来查看到对应的snapshot目录

 bin/hadoop fs -ls /hbase/.hbase-snapshot/newSnapshot
 Found 2 items
 -rw-r--r--   3 xxx xxx         35 2017-04-24 21:58 /hbase/.hbase-snapshot/newSnapshot/.snapshotinfo
 -rw-r--r--   3 xxx xxx        486 2017-04-24 21:58 /hbase/.hbase-snapshot/newSnapshot/data.manifest

4. 使用ExportSnapshot工具迁移快照数据

  ExportSnapshot是HBase提供的Snapshot迁移工具,其使用方法见下图:


可以看出,这个工具的参数列表和HDFSDistCp工具很类似。其简要流程如下:

  1. 首先通过HDFS的cp方法,将/.hbase-snapshot/newSnapshot目录拷贝至新集群上
  2. 然后将/hbase/data/<table>下面的数据文件通过MapReduce的方式(DistCp)拷贝至 新集群的/hbase/archive/data/<tablename>
  3. 最后检查snapshot相关文件的完整性

5. 数据合并方式

  一旦数据迁移到了新集群,我们可以通过clone_snapshot命令重新生成该表,如果业务是一个可以支持离线迁移的,那迁移工作也就算完成了。更多的情况是,业务开了双写,即老集群和新集群同时在更新数据,我们需要把迁移后的数据进行合并。这里有三种方法:

5.1. 使用Phoenix SQL导入

  需要在新集群上开启Phoenix支持(如何安装Phoenix见相关文档)

  假设双写的新表为A',A'表必须使用Phoenix的接口来创建。其使用方法与常规的SQL语法类似,但是要注意其splitKey的用法:



例1. 创建一个表t1,只有一个列族f1,有1个修饰字段body, splitKey为['a','b','c']

CREATE TABLE IF NOT EXISTS t1
    ( "id" char(10) not null primary key, "f1".body varchar)
    DATA_BLOCK_ENCODING='NONE',VERSIONS=5,MAX_FILESIZE=2000000 split on ('a', 'b', 'c')

使用clone_snapshot命令将迁移的数据重新生成一个HBaseB,然后再使用Phoenix的DDL重新生成B表(和实际的表B不会冲突,Phoenix的元数据存在另外的目录下),最后使用UPSERT SELECT命令将B表中的数据插入到双写的A’表即可

UPSERT INTO A'("id","f1".body) SELECT "id","f1".body FROM B;

注: 使用Phoenix的问题在于,原来的业务模式需要做较大改动以适应新的JDBC访问HBase方式。

5.2. 使用MapReduce导入

  使用MapReduce导入需要有YARN服务支持,同样需要先使用clone_snapshot命令将迁移的数据重新生成一个HBase表。

用户需要使用HBase的API从表中读取记录,然后插入到新的表,这个方法实际上就是上面Phoenix的底层实现。如果集群没有安装Phoenix插件,可以用这个方法。但这个方法的缺点也是显而易见,需要自己写代码来实现在一个Map中实现上述操作,且如何切分RowKey到各个Map任务中也是一个不小的难题。

5.3. 使用IncrementLoadHFile工具

  顾名思义,这个工具可以实现往HBase表中添加HFile来实现数据的批量写入,其使用方法如下:

bin/hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
usage: completebulkload /path/to/hfileoutputformat-output tablename
 -Dcreate.table=no - can be used to avoid creation of table by this tool
  Note: if you set this to 'no', then the target table must already exist in HBase

这个工具的使用说明很简单,只需要提供一个HFile文件所在的Hdfs路径名和所需要写入的HBase Table名即可。
例1. 将/tmp/hbase/archive/data/test/test/f8510124151cabf704bc02c9c7e687f6目录下的HFile文件加载到test:test表中

bin/hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles -Dcreate.table=no /tmp/hbase/archive/data/test/test/f8510124151cabf704bc02c9c7e687f6 test:test

其实现原理说明如下:

  1. 首先确认目录下的HFile文件是否合法,得到一个文件列表。
  2. 从列表中获取某一个HFile,获取文件的起始和结束rowKey
  3. 查找要导入到新表的各个Region,得到其StartEndKeys(每次都是重新重启)
  4. 根据HFile的startKey,判断该HFile应该插入的Region位置
  5. 以该HFile的起始rowkey所要插入的Region的EndKey将文件切成top和bottom两部分,将这2个文件加入到待加载的文件列表中
  6. 使用SecureBulkLoadHFile方法一次性批量加载这些文件,如果其中有文件加载失败,则方法失败,返回异常的文件列表。
  7. 将返回的异常文件,加入到循环加载的文件列表里。
  8. 继续重复2-7整个流程,直至完成加载或达到重试阈值而异常退出。

SecureLoadHFile的原理也很简单,它是一个原子操作,所以操作过程中会有短暂的卡顿。

  1. 检查待加载的HFiles是否来自多个列族,需要对多个列族同时加锁以保障一致性。
  2. 检查该次操作是否满足相关权限要求,同时也会将对应的HFile文件进行权限变更操作
  3. 完成HFile文件加载,新的HFile引用被加入到Region的StoreFile列表中。

对这块逻辑感兴趣的同学,可以自行查阅SecureBulkLoadEndpoint,HRegion,HStore等类。

关于如何减少批量加载的时间,有以下几点需要注意: 

  1. 如果新集群上的regions的起始和结束rowkey分布正好和旧集群一致,那么使用批量加载HFile的方式可以最快的方式来合并到线上表中。否则就需要针对新的Region来拆分HFile。
  2. 调整hbase.hregion.max.filesize参数,该参数用于控制一个Region下的最大HFile的文件大小,超过该值后,系统会强制拆分这个文件。新旧集群上这个参数配置可能会不一致,为了尽快完成加载,可以考虑将其设置成一致,或者新集群上的配置更大,这样也能减少加载时间
  3. 该工具默认的重试次数是10次,即一个Hfile如果拆分次数超过10次,就会放弃本次批量加载。 需要注意留意日志

6. 数据验证

HFile文件增量加载更新完毕之后,进行数据验证流程。因为数据量实在太大,不可能对两边的HBase表中的记录做一一比对,故可进行抽样验证。按照Snapshot和双写的机制来讲,数据可能有重复,但不可能存在丢失的情况。验证算法描述如下:

  1. 按照迁移过程中不同阶段,划分成不同的时间区域。 对于每一个时间区间,选一个子区间作为样本
  2. 选定旧集群中的A表,获取其Region信息,得到每个Region的起始rowkey
  3. 按照每个Region的起始rowkey, 顺序查找 N 条该区间中的rowkey记录
  4. 根据上一步拿到的rowkey到相应的测试表中(A')中查找是否能找到匹配的记录。
  5. 找到记录后,对比相应Column+Cell信息,能够都完全匹配即为一个匹配的记录(因为双写的原因,时间戳会有先后,故不判断时间戳。rowkey已能满足要求)

使用Java连接Kerberized HBase的需要的配置如下:

hbase-site.xml

<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://test1.163.org:8020</value>
    </property>
    <property>
        <name>hbase.rootdir</name>
        <value>hdfs://test1.163.org:8020/hbase</value>
    </property>
    <property>
        <name>hbase.zookeeper.quorum</name>
        <value>test1.163.org,test2.163.org,test3.163.org</value>
    </property>
    <property>
        <name>zookeeper.znode.parent</name>
        <value>/hbase</value>
    </property>
    <property>
        <name>hbase.cluster.distributed</name>
        <value>true</value>
    </property>
    <property>
        <name>hadoop.security.authorization</name>
        <value>true</value>
    </property>
    <property>
        <name>hadoop.security.authentication</name>
        <value>kerberos</value>
    </property>
    <property>
        <name>hbase.rpc.timeout</name>
        <value>180000</value>
    </property>
    <property>
        <name>hbase.client.operation.timeout</name>
        <value>120000</value>
    </property>
    <property>
        <name>hbase.security.authentication</name>
        <value>kerberos</value>
    </property>
    <property>
        <name>hbase.security.authorization</name>
        <value>true</value>
    </property>
    <property>
        <name>dfs.namenode.principal</name>
        <value>hdfs/_HOST@HADOOP.HZ.NETEASE.COM</value>
    </property>
    <property>
        <name>hbase.master.kerberos.principal</name>
        <value>hbase/_HOST@HADOOP.HZ.NETASE.COM</value>
    </property>
    <property>
        <name>hbase.regionserver.kerberos.principal</name>
        <value>hbase/_HOST@HADOOP.HZ.NETASE.COM</value>
    </property>
    <property>
        <name>hbase.client.scanner.caching</name>
        <value>100000</value>
    </property>
</configuration>

认证模块的代码片段如下(仅作参考)

            Configuration configuration = HBaseConfiguration.create();
            configuration.addResource("hbase-site.xml");
            UserGroupInformation.setConfiguration(configuration);
            UserGroupInformation.loginUserFromKeytab("principal", "keytab.path");

            TableName tableName = TableName.valueOf("hbase.table.name"));
            Connection connection = ConnectionFactory.createConnection(configuration);
            HTable table = (HTable) connection.getTable(tableName);


7. 事后操作 

  因为批量加载操作会对原HFile文件进行多次拷贝,拆分等操作,会消耗大量的HDFS存储资源和物理机磁盘空间。 在数据合并完成并验证后,可以清理掉这些临时结果。此外如果在加载过程中出现了较多的自动Region切分,也可在此时重新将小Region进行合并。 最后在新集群上专门为数据迁移开启的YARN服务也可以停掉了,减少对HBase服务的影响。

本文来自网易实践者社区,经作者金川授权发布。