Hbase的MapReduce(Hbase权威指南)+ BulkLoad导入HBase
?
目錄
?
Hbase權威指南?第7張?與MapReduce集成
預測執行和版本介紹
Hbase與MapReduce集成
例7.1?MapReduce作業從一個文件中讀取數據并寫入Hbase表
MapReduce生成HFile文件,再使用BulkLoad導入HBase中
Hbase權威指南?第7張?與MapReduce集成
預測執行和版本介紹
2.5.2.9.? 預測執行 (Speculative Execution)
MapReduce任務的預測執行缺省是打開的,HBase集群一般建議在系統級關閉預測執行,除非在某種特殊情況下需要打開,此時可以每任務配置。設置mapred.map.tasks.speculative.execution?和 mapred.reduce.tasks.speculative.execution?為 false.
5.8.?版本
一個?{row, column, version}?元組是HBase中的一個單元(cell).但是有可能會有很多的單元的行和列是相同的,可以使用版本來區分不同的單元.
rows和column key是用字節數組表示的,version則是用一個長整型表示。這個long的值使用?java.util.Date.getTime()?或者?System.currentTimeMillis()產生的。這就意味著他的含義是“當前時間和1970-01-01 UTC的時間差,單位毫秒。”
Hbase與MapReduce集成
例7.1?MapReduce作業從一個文件中讀取數據并寫入Hbase表
package com.gosun;import jdk.nashorn.internal.runtime.ParserException; import org.apache.commons.cli.*; import org.apache.commons.codec.digest.DigestUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.GenericOptionsParser;import java.io.IOException;public class ImportFromFile {public static final String NAME = "ImportFromFile"; //為后續的使用定義一個作業名public enum Counters {LINES}static class ImportMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { // 定義mapper類,繼承自Hadoop已有的類private byte[] family = null;private byte[] qualifer = null;@Overridepublic void setup(Context context) throws IOException, InterruptedException{String column = context.getConfiguration().get("conf.column");byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column));family = colkey[0];if (colkey.length > 1){qualifer = colkey[1];}}// map()函數將InputFormat提供的鍵值對轉化為了OutputFormat需要的類型@Overridepublic void map(LongWritable offset, Text line, Context context) throws IOException, InterruptedException{String lineString = line.toString();byte[] rowkey = DigestUtils.md5(lineString);ImmutableBytesWritable rowKey = new ImmutableBytesWritable(Bytes.toBytes(lineString));Put put = new Put(rowkey);put.addColumn(family, qualifer, Bytes.toBytes(lineString));context.write(rowKey, put);}}// 使用Apache Commons CLI類解析命令行參數private static CommandLine parseArgs(String[] args) throws ParserException{Options options = new Options();Option o = new Option("t", "table", true, "table to import into (must exist)");o.setArgName("table-name");o.setRequired(true);options.addOption(o);o = new Option("c", "column", true, "column to store row data into (must exist)");o.setArgName("family:qualifier");o.setRequired(true);options.addOption(o);o = new Option("i", "input", true, "the directory or file to read from");o.setArgName("path-in-HDFS");o.setRequired(true);options.addOption(o);options.addOption("d", "debug", false, "switch on EDBUG log level");CommandLineParser parser = new PosixParser();CommandLine cmd = null;try{cmd = parser.parse(options, args);}catch (Exception e){System.err.println("ERROR: " + e.getMessage() + "\n");HelpFormatter formatter = new HelpFormatter();formatter.printHelp(NAME + " ", options, true);System.exit(-1);}return cmd;}public static void main(String[] args) throws Exception {Configuration conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.property.clientPort", "2181");conf.set("hbase.zookeeper.quorum", "www.migu-cdn-biz18.migu01.mmtrix.com,www.migu-cdn-biz19.migu01.mmtrix.com,www.migu-cdn-biz20.migu01.mmtrix.com");conf.set("hbase.master", "www.migu-cdn-hadoop25.migu01.mmtrix.com:60000");conf.set("hbase.client.keyvalue.maxsize","2097152000");String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();CommandLine cmd = parseArgs(otherArgs);String table = cmd.getOptionValue("t");String input = cmd.getOptionValue("i");String column = cmd.getOptionValue("c");conf.set("conf.column", column);Job job = new Job(conf, "Import from file " + input + " into table " + table); //使用特定的類定義作業job.setJarByClass(ImportFromFile.class);job.setMapperClass(ImportMapper.class);job.setOutputFormatClass(TableOutputFormat.class);job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);job.setOutputKeyClass(ImmutableBytesWritable.class);job.setNumReduceTasks(0); //這是一個只包含map階段的作業,框架會直接跳過reduce階段FileInputFormat.addInputPath(job, new Path(input));System.exit(job.waitForCompletion(true) ? 0 : 1);} }? ? ? ? ?
? ? ? ? ImportMapper類的重載的setup()方法只會在框架初始化改類時調用一次。?
? ? ? ? 這個例子通過TableOutputFormat類使用了隱式的寫緩沖區。調用context.write()方法時,改方法內部會傳入給定的Put實例并調用tbale.Put()。在作業結束前,TableOutputFormat會主動調用flushCommints()以保存仍舊駐留在寫緩沖區的數據。
? ? ? ? 最后,請注意作業在沒有reduce的情況下,map階段是怎樣工作的,這是相當典型的Hbase與MapReduce作業結合:?由于數據是存儲在排序表中的,并且每行數據都擁有唯一的行健,用戶可以在流程中避免更消耗的sort,shuffle和reduce階段。
創建hbase表:
測試數據:
執行命令:
$JAVA_HOME/bin/java -classpath hbaseTest/merge-hbase-1.0-SNAPSHOT.jar?com.gosun.ImportFromFile?-t testtable -i test-data.txt -c data:json
hbase表結果:
?
MapReduce生成HFile文件,再使用BulkLoad導入HBase中
介紹:
? ? ?通常MapReduce在寫HBase時使用的是TableOutputFormat方式,在reduce中直接生成put對象寫入HBase,該方式在大數據量寫入時效率低下(HBase會block寫入,頻繁進行flush,split,compact等大量IO操作),并對HBase節點的穩定性造成一定的影響(GC時間過長,響應變慢,導致節點超時退出,并引起一系列連鎖反應),而HBase支持 bulk load 的入庫方式,它是利用hbase的數據信息按照特定格式存儲在hdfs內這一原理,直接在HDFS中生成持久化的HFile數據格式文件,然后上傳至合適位置,即完成巨量數據快速入庫的辦法。配合mapreduce完成,高效便捷,而且不占用region資源,增添負載,在大數據量寫入時能極大的提高寫入效率,并降低對HBase節點的寫入壓力。
代碼:
BulkLoadToHBase.java package com.gosun;import java.io.IOException;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser;public class BulkLoadToHBase {public static class ConvertWordCountOutToHFileMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>{@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {// TODO Auto-generated method stubString wordCountStr=value.toString();String[] wordCountArray=wordCountStr.split("\t");String word=wordCountArray[0];//創建HBase中的RowKeybyte[] rowKey=Bytes.toBytes(word);ImmutableBytesWritable rowKeyWritable=new ImmutableBytesWritable(rowKey);byte[] family=Bytes.toBytes("data");byte[] qualifier=Bytes.toBytes("count");byte[] hbaseValue=Bytes.toBytes(word);// Put 用于列簇下的多列提交,若只有一個列,則可以使用 KeyValue 格式KeyValue keyValue = new KeyValue(rowKey, family, qualifier, hbaseValue); // Put put=new Put(rowKey); // put.add(family, qualifier, hbaseValue);context.write(rowKeyWritable, keyValue);}}public static void main(String[] args) throws Exception {Configuration hadoopConfiguration=new Configuration();hadoopConfiguration.set("fs.defaultFS", "hdfs://migumaster");hadoopConfiguration.set("dfs.nameservices", "migumaster");hadoopConfiguration.set("dfs.ha.namenodes.migumaster", "nn1,nn2");hadoopConfiguration.set("dfs.namenode.rpc-address.migumaster.nn1", "www.migu-cdn-biz18.migu01.mmtrix.com:9000");hadoopConfiguration.set("dfs.namenode.rpc-address.migumaster.nn2", "www.migu-cdn-biz19.migu01.mmtrix.com:9000");hadoopConfiguration.set("dfs.client.failover.proxy.provider.migumaster", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");String[] dfsArgs = new GenericOptionsParser(hadoopConfiguration, args).getRemainingArgs();String htable = dfsArgs[0]; // 寫入的hbase表String inputPath = dfsArgs[1]; // 數據源路徑String outPath = dfsArgs[2]; // HFile文件的路徑// 如果存放 HFile文件的路徑已經存在,就刪除掉FileSystem fileSystem = FileSystem.get(hadoopConfiguration);if(fileSystem.exists(new Path(outPath))) {fileSystem.delete(new Path(outPath), true);}//只需要編寫Mapper類,在Mapper類中對一個job的輸出進行分析,并轉換為HBase需要的KeyValue的方式。Job job=new Job(hadoopConfiguration, "wordCount_bulkload");job.setJarByClass(BulkLoadToHBase.class);job.setMapperClass(ConvertWordCountOutToHFileMapper.class);job.setMapOutputKeyClass(ImmutableBytesWritable.class);job.setMapOutputValueClass(KeyValue.class);FileInputFormat.addInputPath(job, new Path(inputPath));FileOutputFormat.setOutputPath(job, new Path(outPath));//創建HBase的配置對象Configuration hbaseConfiguration = HBaseConfiguration.create();hbaseConfiguration.set("hbase.zookeeper.property.clientPort", "2181");hbaseConfiguration.set("hbase.zookeeper.quorum", "www.migu-cdn-biz18.migu01.mmtrix.com,www.migu-cdn-biz19.migu01.mmtrix.com,www.migu-cdn-biz20.migu01.mmtrix.com");hbaseConfiguration.set("hbase.master", "www.migu-cdn-hadoop25.migu01.mmtrix.com:60000");hbaseConfiguration.set("hbase.client.keyvalue.maxsize","2097152000");//創建目標表對象Connection HbaseConn = ConnectionFactory.createConnection(hbaseConfiguration);HTable table = (HTable) HbaseConn.getTable(TableName.valueOf(htable));HFileOutputFormat2.configureIncrementalLoad(job, table);//提交jobint convertWordCountJobOutputToHFileJobResult=job.waitForCompletion(true)?0:1;//當job結束之后,調用BulkLoad方式來將MR結果批量入庫LoadIncrementalHFiles loader = new LoadIncrementalHFiles(hbaseConfiguration);//第一個參數為第二個Job的輸出目錄即保存HFile的目錄,第二個參數為目標表loader.doBulkLoad(new Path(outPath), table);System.exit(convertWordCountJobOutputToHFileJobResult);}}執行命令:
$JAVA_HOME/bin/java $JAVA_OPTS -classpath $CLASSPATH $APP_MAIN testtable hdfs://migumaster/tmp/test-data.txt ?hdfs://migumaster/tmp/tmp1/
hbase創建表:
測試數據:
執行顯示:
?
?
參考: hbase權威指南
? ? ? ? ? ?https://blog.csdn.net/m0_37739193/article/details/78781579
總結
以上是生活随笔為你收集整理的Hbase的MapReduce(Hbase权威指南)+ BulkLoad导入HBase的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: CodeForces - 894E Ra
- 下一篇: JZOJ5776. 【NOIP2008模