MapReduce操作HBase錯誤一例
運行HBase時常會遇到個錯誤,我就有這樣的經(jīng)歷。
ERROR: org.apache.hadoop.hbase.MasterNotRunningException: Retried 7 times
檢查日志:org.apache.hadoop.ipc.RPC$VersionMismatch: Protocol org.apache.hadoop.hdfs.protocol.ClientProtocol version mismatch. (client = 42, server = 41)
如果是這個錯誤,說明RPC協(xié)議不一致所造成的,解決方法:將hbase/lib目錄下的hadoop-core的jar文件刪除,將hadoop目錄下的hadoop-0.20.2-core.jar拷貝到hbase/lib下面,然后重新啟動hbase即可。第二種錯誤是:沒有啟動hadoop,先啟用hadoop,再啟用hbase。
在Eclipse開發(fā)中,需要加入hadoop所有的jar包以及HBase二個jar包(hbase,zooKooper)。
HBase基礎(chǔ)可見帖子:http://www.cnblogs.com/liqizhou/archive/2012/05/14/2499112.html
建表,通過HBaseAdmin類中的create靜態(tài)方法來創(chuàng)建表。
HTable類是操作表,例如,靜態(tài)方法put可以插入數(shù)據(jù),該類初始化時可以傳遞一個行鍵,靜態(tài)方法getScanner()可以獲得某一列上的所有數(shù)據(jù),返回Result類,Result類中有個靜態(tài)方法getFamilyMap()可以獲得以列名為key,值為value,這剛好與hadoop中map結(jié)果是一樣的。
- package test;
- import java.io.IOException;
- import java.util.Map;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.HColumnDescriptor;
- import org.apache.hadoop.hbase.HTableDescriptor;
- import org.apache.hadoop.hbase.client.HBaseAdmin;
- import org.apache.hadoop.hbase.client.HTable;
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.hadoop.hbase.client.Result;
- public class Htable {
- /**
- * @param args
- */
- public static void main(String[] args) throws IOException {
- // TODO Auto-generated method stub
- Configuration hbaseConf = HBaseConfiguration.create();
- HBaseAdmin admin = new HBaseAdmin(hbaseConf);
- HTableDescriptor htableDescriptor = new HTableDescriptor("table"
- .getBytes()); //set the name of table
- htableDescriptor.addFamily(new HColumnDescriptor("fam1")); //set the name of column clusters
- admin.createTable(htableDescriptor); //create a table
- HTable table = new HTable(hbaseConf, "table"); //get instance of table.
- for (int i = 0; i < 3; i++) { //for is number of rows
- Put putRow = new Put(("row" + i).getBytes()); //the ith row
- putRow.add("fam1".getBytes(), "col1".getBytes(), "vaule1"
- .getBytes()); //set the name of column and value.
- putRow.add("fam1".getBytes(), "col2".getBytes(), "vaule2"
- .getBytes());
- putRow.add("fam1".getBytes(), "col3".getBytes(), "vaule3"
- .getBytes());
- table.put(putRow);
- }
- for(Result result: table.getScanner("fam1".getBytes())){//get data of column clusters
- for(Map.Entry<byte[], byte[]> entry : result.getFamilyMap("fam1".getBytes()).entrySet()){//get collection of result
- String column = new String(entry.getKey());
- String value = new String(entry.getValue());
- System.out.println(column+","+value);
- }
- }
- admin.disableTable("table".getBytes()); //disable the table
- admin.deleteTable("table".getBytes()); //drop the tbale
- }
- }
以上代碼不難看懂。
下面介紹一下,用mapreduce怎樣操作HBase,主要對HBase中的數(shù)據(jù)進行讀取。
現(xiàn)在有一些大的文件,需要存入HBase中,其思想是先把文件傳到HDFS上,利用map階段讀取<key,value>對,可在reduce把這些鍵值對上傳到HBase中。
- package test;
- import java.io.IOException;
- import java.util.Map;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.HColumnDescriptor;
- import org.apache.hadoop.hbase.HTableDescriptor;
- import org.apache.hadoop.hbase.client.HBaseAdmin;
- import org.apache.hadoop.hbase.client.HTable;
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.hadoop.hbase.client.Result;
- public class Htable {
- /**
- * @param args
- */
- public static void main(String[] args) throws IOException {
- // TODO Auto-generated method stub
- Configuration hbaseConf = HBaseConfiguration.create();
- HBaseAdmin admin = new HBaseAdmin(hbaseConf);
- HTableDescriptor htableDescriptor = new HTableDescriptor("table"
- .getBytes()); //set the name of table
- htableDescriptor.addFamily(new HColumnDescriptor("fam1")); //set the name of column clusters
- admin.createTable(htableDescriptor); //create a table
- HTable table = new HTable(hbaseConf, "table"); //get instance of table.
- for (int i = 0; i < 3; i++) { //for is number of rows
- Put putRow = new Put(("row" + i).getBytes()); //the ith row
- putRow.add("fam1".getBytes(), "col1".getBytes(), "vaule1"
- .getBytes()); //set the name of column and value.
- putRow.add("fam1".getBytes(), "col2".getBytes(), "vaule2"
- .getBytes());
- putRow.add("fam1".getBytes(), "col3".getBytes(), "vaule3"
- .getBytes());
- table.put(putRow);
- }
- for(Result result: table.getScanner("fam1".getBytes())){//get data of column clusters
- for(Map.Entry<byte[], byte[]> entry : result.getFamilyMap("fam1".getBytes()).entrySet()){//get collection of result
- String column = new String(entry.getKey());
- String value = new String(entry.getValue());
- System.out.println(column+","+value);
- }
- }
- admin.disableTable("table".getBytes()); //disable the table
- admin.deleteTable("table".getBytes()); //drop the tbale
- }
- }
Reduce類,主要是將鍵值傳到HBase表中
- package test;
- import java.io.IOException;
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
- import org.apache.hadoop.hbase.mapreduce.TableReducer;
- import org.apache.hadoop.io.Text;
- public class ReducerClass extends TableReducer<Text,Text,ImmutableBytesWritable>{
- public void reduce(Text key,Iterable<Text> values,Context context){
- String k = key.toString();
- StringBuffer str=null;
- for(Text value: values){
- str.append(value.toString());
- }
- String v = new String(str);
- Put putrow = new Put(k.getBytes());
- putrow.add("fam1".getBytes(), "name".getBytes(), v.getBytes());
- }
- }
由上面可知ReducerClass繼承TableReduce,在hadoop里面ReducerClass繼承Reducer類。它的原型為:TableReducer<KeyIn,Values,KeyOut>可以看出,HBase里面是讀出的Key類型是ImmutableBytesWritable。
Map,Reduce,以及Job的配置分離,比較清晰,mahout也是采用這種構(gòu)架。
- package test;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.hadoop.util.Tool;
- public class Driver extends Configured implements Tool{
- @Override
- public static void run(String[] arg0) throws Exception {
- // TODO Auto-generated method stub
- Configuration conf = HBaseConfiguration.create();
- conf.set("hbase.zookeeper.quorum.", "localhost");
- Job job = new Job(conf,"Hbase");
- job.setJarByClass(TxtHbase.class);
- Path in = new Path(arg0[0]);
- job.setInputFormatClass(TextInputFormat.class);
- FileInputFormat.addInputPath(job, in);
- job.setMapperClass(MapperClass.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(Text.class);
- TableMapReduceUtil.initTableReducerJob("table", ReducerClass.class, job);
- job.waitForCompletion(true);
- }
- }
Driver中job配置的時候沒有設(shè)置 job.setReduceClass(); 而是用 TableMapReduceUtil.initTableReducerJob("tab1", THReducer.class, job); 來執(zhí)行reduce類。
主函數(shù)
- package test;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.util.ToolRunner;
- public class TxtHbase {
- public static void main(String [] args) throws Exception{ Driver.run(new Configuration(),new THDriver(),args); } }
讀取數(shù)據(jù)時比較簡單,編寫Mapper函數(shù),讀取<key,value>值就行了。
- package test;
- import java.io.IOException;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.hbase.client.Result;
- import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
- import org.apache.hadoop.hbase.mapred.TableMap;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapred.MapReduceBase;
- import org.apache.hadoop.mapred.OutputCollector;
- import org.apache.hadoop.mapred.Reporter;
- public class MapperClass extends MapReduceBase implements
- TableMap<Text, Text> {
- static final String NAME = "GetDataFromHbaseTest";
- private Configuration conf;
- public void map(ImmutableBytesWritable row, Result values,
- OutputCollector<Text, Text> output, Reporter reporter)
- throws IOException {
- StringBuilder sb = new StringBuilder();
- for (Entry<byte[], byte[]> value : values.getFamilyMap(
- "fam1".getBytes()).entrySet()) {
- String cell = value.getValue().toString();
- if (cell != null) {
- sb.append(new String(value.getKey())).append(new String(cell));
- }
- }
- output.collect(new Text(row.get()), new Text(sb.toString()));
- }
要實現(xiàn)這個方法 initTableMapJob(String table, String columns, Class<? extends TableMap> mapper, Class<? extends org.apache.hadoop.io.WritableComparable> outputKeyClass, Class<? extends org.apache.hadoop.io.Writable> outputValueClass, org.apache.hadoop.mapred.JobConf job, boolean addDependencyJars)。
- package test;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.hadoop.util.Tool;
- public class Driver extends Configured implements Tool{
- @Override
- public static void run(String[] arg0) throws Exception {
- // TODO Auto-generated method stub
- Configuration conf = HBaseConfiguration.create();
- conf.set("hbase.zookeeper.quorum.", "localhost");
- Job job = new Job(conf,"Hbase");
- job.setJarByClass(TxtHbase.class);
- job.setInputFormatClass(TextInputFormat.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(Text.class);
- TableMapReduceUtilinitTableMapperJob("table", args0[0],MapperClass.class, job); job.waitForCompletion(true); } }
主函數(shù)
- package test;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.util.ToolRunner;
- public class TxtHbase {
- public static void main(String [] args) throws Exception{
- Driver.run(new Configuration(),new THDriver(),args);
- }
- }
- --------------------------------------------------------------------------------
原文鏈接:http://www.cnblogs.com/liqizhou/archive/2012/05/17/2504279.html
【編輯推薦】