(五)HBase工具之BulkLoad

使用BulkLoad是由于实际业务的需要,之前股市的历史数据是一条一条的插入到HBase中,每个月底都需要插入大约200G的数据,接近10亿条记录。在插入过程中需要占用大量的IO资源,极大影响了现有的实时数据入库业务的效率。因此需要寻找一种替代的方案来在不影响现有业务的基础上,不占用大量集群资源,快速高效的插入大量的数据。本文主要介绍了BulkLoad的工作机制和应用,以及在使用过程中遇到的问题,最后简要分析了BulkLoad的源码步骤。

BulkLoad简介

HBase可以使用多种方式将数据加载到表中,最直接的方法是使用MapReduce作业中的TableOutputFormat类,或者使用普通的客户端API。但是这些并不总是最有效的方法,涉及到的flush,split,compaction等操作都容易造成节点不稳定,数据导入慢,耗费资源等问题,在海量数据的导入过程极大的消耗了系统性能。

BulkLoad功能使用MapReduce作业直接在 HDFS 中生成持久化的 HFile 数据格式文件,然后将生成的HFile直接加载到正在运行的集群中,从而完成巨量数据快速入库的操作。配合 MapReduce 完成这样的操作,不占用 Region 资源,不会产生巨量的写入 I/O,所以需要较少的 CPU 和网络资源。

但是BulkLoad也有局限,正常的写入是先写WAL,然后在写memstore,而BulkLoad没有写WAL这一步,因此在BulkLoad出现异常情况下,HBase可无法恢复还未持久化的数据。股市数据要求严格,不能丢任意一条记录。因此BulkLoad的局限对其有很大的影响。如果出现数据缺少就要采用其它办法了。

BulkLoad的原理和流程

使用BulkLoad需要两个过程:
1、Transform阶段:使用MapReduce将HDFS上的数据生成成HBase的底层Hfile数据。
2、Load阶段:根据生成的目标HFile,利用HBase提供的BulkLoad工具将HFile Load到HBase目录下面。

下面使用精简化码来说明整个过程。

Transform阶段

编写Mapper,对数据进行简单的处理生成rowkey,put输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static class BulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] items = line.split(",");
String code = String.join("", items[1].split("\\."));
String row = code + items[2] + items[3] + items[4];
// 生成rowKey
ImmutableBytesWritable rowKey = new ImmutableBytesWritable(row.getBytes());
Put put = new Put(Bytes.toBytes(row)); //ROWKEY
// 添加数据
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("w"),
Bytes.toBytes(line));
context.write(rowKey, put);
}
}

Load阶段

将MapReduce的输出HFile加载到HBase,

1
2
3
4
5
public static void loadIncrementalHFileToHBase(final Configuration configuration, final String tableName, final String descPath) throws Exception {
LoadIncrementalHFiles loder = new LoadIncrementalHFiles(configuration);
loder.doBulkLoad(new Path(descPath), new HTable(configuration, tableName));
System.out.println("Bulk Load Completed..");
}

最后需要编写Job驱动程序,可以不用设置reducer,但需要HFileOutputFormat2.class来格式化输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public static void bulkLoadDriver(final String tableName, final String srcPath, final String descPath)
throws Exception {
Configuration configuration = HBaseConfiguration.create();
Job job = Job.getInstance(configuration);
job.setJarByClass(BulkLoadImport.class);
job.setMapperClass(BulkLoadMapper.class);
// 输出的Key-Value格式
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
// 使用HFileOutputFormat2.class来格式化输出
job.setOutputFormatClass(HFileOutputFormat2.class);
HTable table = new HTable(configuration, tableName);
// table.getRegionLocator() 决定了Reducer的数量
HFileOutputFormat2.configureIncrementalLoad(job, table, table.getRegionLocator());
FileInputFormat.addInputPath(job, new Path(srcPath));
FileOutputFormat.setOutputPath(job, new Path(descPath));
// 运行mapreduce后,直接load
if (job.waitForCompletion(true)) {
// 计算处理的记录条数
Counter counter = job.getCounters().findCounter(TaskCounter.MAP_OUTPUT_RECORDS);
LOG.info("job finished, total " + counter.getValue() + " records!");
loadIncrementalHFileToHBase(configuration, tableName, descPath);
} else {
System.out.println("jobs run errors");
}
}

注意点:在建表时,最好进行预分区,通过HFileOutputFormat2中代码可以看出,预分区的数量决定了job的Reducer的数量。因此预分区能够提高Reducer的效率。

1
2
3
4
List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocator);
LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
"to match current region count");
job.setNumReduceTasks(startKeys.size());

BulkLoad源码过程简述

程序中调用了LoadIncrementalHFiles的doBulkLoad方法进行HFile的移动。其主要流程如下:
1、初始化一个线程池,设置线程的最大数量
2、根据参数获取是否对HFile的格式进行验证
3、初始化一个queue,然后遍历MapReduce输出的目录下的所有HFIles文件,为每一个HFile包装一个LoadQueueItem,并加入到queue中
4、检查是否有非法的列簇名
5、遍历队列,尝试将HFie加载到一个region中,如果失败,它将返回需要重试的HFie列表。如果成功,它将返回一个空列表,整个过程是原子性的。
6、从RegionServer中获取到Region的名称后,检查是否可以安全的使用BulkLoad。如果为False,则使用ProtobufUtil的bulkLoadHFile。否则将使用SecureBulkLoadClient的bulkLoadHFile,将HFile Load到HBase目录下面。
7、如果HFile的BulkLoad失败了,将会尝试将失败的HFile将重新移回原来的位置。

其中需要注意的有:
1、当HFile的数量极大时,检查HFile的格式将会成为最耗时的阶段。可以通过设置hbase.loadincremental.validate.hfile来决定是否对HFile的格式进行检查(可见HBASE-13985
2、BulkLoad阶段中,采用Callable和Future实现并发,一但BulkLoad失败,HFile需要重新排队,然后重试。重试次数可以通过hbase.client.retries.number进行设置,HBase1.2.5中默认为31次。
3、BulkLoad过程结束后,会发现MapReduce输出目录下的HFile文件都被移走了,说明全部的HFile都导入成功。如果想要试验的话,可以先备份一下,免得再跑一边MapReduce。

使用问题

MapReducer阶段为什么会这么慢

1、建表时设置预分区的个数决定了Reducer的数量。

1
2
3
4
List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocator);
LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
"to match current region count");
job.setNumReduceTasks(startKeys.size());

2、数据记录条数过多,股市的trans数据一个月接近7亿条数据,对集群的处理能力本身就是一个极大的挑战。
3、JVM参数没有调优,导致频繁出现GC,这是后续调优的重点工作。

Load阶段为什么这么慢

1、在Load阶段阶段中,如果HFile文件过多,会触发hBase的compact和split操作。因此BulkLoad只是绕过了数据Put到Memstore和MemStoreFlush这个阶段。
2、当HFile的数量极大时,检查HFile的格式将会成为最耗时的阶段,可以设置不检查。

Bulk load operation did not find any files to load in directory xxx

一般是程序写的过程中出现问题,导致Mapper阶段没有输出,需要仔细检查

总结

Bulk load的使用还是需要看场景,对于股市数据来说,使用Bulk load的导入效率可能没有直接写来得更快,但是其不占用 Region 资源和大量的IO资源,基本上不影响其它业务的运行,还是可以忍受的。

参考资料

Apache HBase ™ Reference Guide
使用 Bulk Load 快速向 HBase 中导入数据 | JR’s Blog
通过BulkLoad快速将海量数据导入到HbaseHadoop篇 – 过往记忆