HBase實(shí)戰(zhàn)(1):數(shù)據(jù)導(dǎo)入方式
*). Client API實(shí)現(xiàn)
借助HBase的Client API來導(dǎo)入, 是最簡易學(xué)的方式.
- Configuration config = HBaseConfiguration.create();
- // 配置hbase.zookeeper.quorum: 后接zookeeper集群的機(jī)器列表
- config.set("hbase.zookeeper.quorum", "tw-node109,tw-node110,tw-node111");
- // 配置hbase.zookeeper.property.clientPort: zookeeper集群的服務(wù)端口
- config.set("hbase.zookeeper.property.clientPort", "2181");
- HTable htable = null;
- try {
- // 配置hbase的具體表名
- htable = new HTable(config, "hbase_table");
- // 設(shè)置rowkey的值
- Put put = new Put(Bytes.toBytes("rowkey:1001"));
- // 設(shè)置family:qualifier:value
- put.add(Bytes.toBytes("family"), Bytes.toBytes("qualifier"), Bytes.toBytes("value"));
- // 使用put類, 寫入hbase對(duì)應(yīng)的表中
- htable.put(put);
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if (htable != null) {
- try {
- htable.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
評(píng): HBase的client api編程, 相對(duì)還是簡單的. 唯一需要注意的是, 若在本地編寫測(cè)試用列, 需要在本地配置hbase集群相關(guān)的域名, 使得域名和ip地址能對(duì)應(yīng)上, 切記.
至于hbase client的讀寫優(yōu)化, 我們放到下面的博文進(jìn)行講解.
*). 批量導(dǎo)入Bulkload
HBase的bulkload數(shù)據(jù)導(dǎo)入, 分兩個(gè)階段:
#). 階段一: 借助使用HFileOutputFormat的MapReduce, 直接生成HBase的內(nèi)部數(shù)據(jù)存儲(chǔ)格式HFile.
其原理: HFileOutputFormat借助configureIncrementalLoad函數(shù), 基于當(dāng)前table的各個(gè)region邊界自動(dòng)匹配MapReduce的分區(qū)類TotalOrderPartitioner, 這樣生成的HFile都對(duì)應(yīng)一個(gè)具體的region, 此時(shí)效率最高效.
#). 階段二: 借助completebulkload工具, 將生成的HFile文件熱載入hbase集群.
1. importtsv數(shù)據(jù)導(dǎo)入演示
hbase自帶了importtsv工具, 其對(duì)tsv格式的數(shù)據(jù)文件提供了默認(rèn)的支持.
數(shù)據(jù)文件data.tsv(以'\t'分割數(shù)據(jù)文件)
1
2
3
4
|
1001 lilei 17 13800001111
1002 lily 16 13800001112
1003 lucy 16 13800001113
1004 meimei 16 13800001114
|
上傳至hdfs目錄 /test/hbase/tsv/input
- sudo -u hdfs hdfs dfs -mkdir -p /test/hbase/tsv/input
- sudo -u hdfs hdfs dfs -put data.tsv /test/hbase/tsv/input/
嘗試構(gòu)建的HBase表student
- hbase shell
- hbase> create 'student', {NAME => 'info'}
執(zhí)行importtsv
- sudo -u hdfs hadoop jar /usr/lib/hbase/hbase-<version>.jar importtsv -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:age,info:phone -Dimporttsv.bulk.output=/test/hbase/tsv/output/ student /test/hbase/tsv/input
沒有指定-Dimporttsv.bulk.output, importtsv默認(rèn)行為是才有client api的put來導(dǎo)入數(shù)據(jù)于hbase, 指定-Dimporttsv.bulk.output, 則需要下一步
- sudo -u hdfs hadoop jar /usr/lib/hbase/hbase-<version>.jar completebulkload /test/hbase/tsv/output/ student
數(shù)據(jù)驗(yàn)證:
scan 'student', {LIMIT => 10}
2. 自定義bulkload數(shù)據(jù)導(dǎo)入演示
數(shù)據(jù)文件準(zhǔn)備, 以之前data.tsv文件為準(zhǔn)
構(gòu)建HBase表student_new
- hbase> create 'student_new', {NAME => 'info'}
編寫MapReduce代碼, 如下所示:
- public class MyBulkload {
- public static class MyBulkMapper extends
- Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
- @Override
- protected void setup(Context context) throws IOException,
- InterruptedException {
- super.setup(context);
- }
- @Override
- protected void map(LongWritable key, Text value, Context context)
- throws IOException, InterruptedException {
- // 數(shù)據(jù)按\t切分組織, 也可以自定義的方式來解析, 比如復(fù)雜的json/xml文本行
- String line = value.toString();
- String[] terms = line.split("\t");
- if ( terms.length == 4 ) {
- byte[] rowkey = terms[0].getBytes();
- ImmutableBytesWritable imrowkey = new ImmutableBytesWritable(rowkey);
- // 寫入context中, rowkey => keyvalue, 列族:列名 info:name, info:age, info:phone
- context.write(imrowkey, new KeyValue(rowkey, Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(terms[1])));
- context.write(imrowkey, new KeyValue(rowkey, Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(terms[2])));
- context.write(imrowkey, new KeyValue(rowkey, Bytes.toBytes("info"), Bytes.toBytes("phone"), Bytes.toBytes(terms[3])));
- }
- }
- }
- public static void main(String[] args) throws Exception {
- if ( args.length != 3 ) {
- System.err.println("Usage: MyBulkload <table_name> <data_input_path> <hfile_output_path>");
- System.exit(2);
- }
- String tableName = args[0];
- String inputPath = args[1];
- String outputPath= args[2];
- // 創(chuàng)建的HTable實(shí)例用于, 用于獲取導(dǎo)入表的元信息, 包括region的key范圍劃分
- Configuration conf = HBaseConfiguration.create();
- HTable table = new HTable(conf, tableName);
- Job job = Job.getInstance(conf, "MyBulkload");
- job.setMapperClass(MyBulkMapper.class);
- job.setJarByClass(MyBulkload.class);
- job.setInputFormatClass(TextInputFormat.class);
- // 最重要的配置代碼, 需要重點(diǎn)分析
- HFileOutputFormat.configureIncrementalLoad(job, table);
- FileInputFormat.addInputPath(job, new Path(inputPath));
- FileOutputFormat.setOutputPath(job, new Path(outputPath));
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
- }
注: 借助maven的assembly插件, 生成胖jar包(就是把依賴的zookeeper和hbase jar包都打到該MapReduce包中), 否則的話, 就需要用戶靜態(tài)配置, 在Hadoop的class中添加zookeeper和hbase的配置文件和相關(guān)jar包.
最終的jar包為 mybulk.jar, 主類名為com.m8zmyp.mmxf.MyBulkload, 生成HFile, 增量熱載入hbase
- sudo -u hdfs hadoop jar <xxoo>.jar <MainClass> <table_name> <data_input_path> <hfile_output_path>
- hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles <hfile_output_path> <table_name>
- sudo -u hdfs hadoop jar mybulk.jar com.m8zmyp.mmxf.MyBulkload student_new /test/hbase/tsv/input /test/hbase/tsv/new_output
- hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /test/hbase/tsv/new_output student_new
數(shù)據(jù)驗(yàn)證:
- scan 'student_new', {LIMIT => 10}
*). 借助Hive Over Hbase
構(gòu)建Hbase表hbase_student
- hbase> create 'hbase_student', 'info'
構(gòu)建hive外表hive_student, 并對(duì)應(yīng)hbase_student表
- CREATE EXTERNAL TABLE hive_student (rowkey string, name string, age int, phone string)
- STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
- WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:name,info:age,info:phone")
- TBLPROPERTIES("hbase.table.name" = "hbase_student");
數(shù)據(jù)導(dǎo)入驗(yàn)證:
1. 創(chuàng)建數(shù)據(jù)外表
- CREATE EXTERNAL TABLE data_student (rowkey string, name string, age int, phone string)
- ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
- LOCATION '/test/hbase/tsv/input/';
2. 數(shù)據(jù)通過hive_student導(dǎo)入到hbase_student表中
- SET hive.hbase.bulk=true;
- INSERT OVERWRITE TABLE hive_student SELECT rowkey, name, age, phone FROM data_student;
備注: 若遇到j(luò)ava.lang.IllegalArgumentException: Property value must not be null異常, 需要hive-0.13.0及以上版本支持
詳見: https://issues.apache.org/jira/browse/HIVE-5515
原文鏈接:http://www.cnblogs.com/mumuxinfei/p/3823367.html