hbase编程指南
hbase編程指南
@(HBASE)[hbase, 大數據]
- hbase編程指南
- 一概述
- 一創建項目
- 1pomxml
- 2在eclipse中運行的注意事項
- 3關于addResource的說明
- 一創建項目
- 二Best Practices
- 三常用API
- 一創建Configuration及Connection對象
- 二表管理
- 1創建表
- 2判斷表是否存在
- 3刪除表
- 三插入數據
- 1插入單條數據
- 2使用緩存
- 四讀取數據單個數據和一批數據
- 1遍歷返回數據的方法
- 五append數據
- 六掃描表
- 七更改表結構
- 四常見異常
- 1javaioIOException No FileSystem for scheme hdfs
- 2UnknownHostException
- 3NoSuchMethodErroraddFamily
- 4SASL authentication failed
本文示范了如何創建表,刪除表,更改表結構,還有put, get, scan等操作。
完整代碼請見:
https://github.com/lujinhong/hbasecommons
一、概述
(一)創建項目
1、pom.xml
pom.xml中除了hbase以外,還需要添加hadoop相關的依賴:
<dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.0.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>2.5.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.5.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.5.0</version></dependency>2、在eclipse中運行的注意事項
將hadoop/hbase的配置文件加入classpath中。
3、關于addResource的說明
(1)一般情況下,以如下方式加載hbase的配置文件:
Configuration Connfiguration = HBaseConfiguration.create(); Connfiguration.addResource(new Path(HbaseSiteXml)); Connection = ConnectionFactory.createConnection(Connfiguration);(2)如果沒有使用Configuration對象,則只會加載classpath中的hbase-site.xml。
(3)如果使用String作為參數,則此hbase-site.xml必須在classpath中:
(4)如果需要加載不在classpath中的hbase-site.xml,則需要使用Path對象:
Connfiguration.addResource(new Path(HbaseSiteXml));注意這里的Path 不是java的Path,面是Hadoop的Path。
二、Best Practices
三、常用API
(一)創建Configuration及Connection對象
在客戶端中連接hbase,首先需要創建一個Connection對象,然后就可以使用connection獲取Table, Admin, Scanner等對象,進行相應的操作。
Configuration config = HBaseConfiguration.create(); Connection connection = ConnectionFactory.createConnection(config);基本方法如上:
(1)創建一個Configuration對象,它會從classpath中查找hadoop/hbase的配置文件。
(2)創建Connection對象。
正如上面所如,創建connection是一個很heavy的操作,應該謹慎使用。最好將其封裝在一個方法中getConnection()的方法中返回,而不是直接創建,同時考慮使用單例模式,如:
private static Connection connection = null;private HBaseHelper(Configuration conf) throws IOException {configuration = conf;connection = ConnectionFactory.createConnection(configuration);this.admin = connection.getAdmin(); }/** 用于獲取一個HBaseHelper對象的入口,需要提供一個Configuration對象,這個配置主要指定hbase-site.xml與core-* site.xml。* 使用單例,保證只創建一個helper,因為每創建一個connection都是高代價的,如果需要多個連接,請使用Pool。*/ public static HBaseHelper getHelper(Configuration configuration) throws IOException {if (helper == null) {helper = new HBaseHelper(configuration);}return helper; }然后通過getConnection()方法獲取到connection對象:
public Configuration getConfiguration() {return configuration; }由于HBaseHelper是單例對象,因此其成員變量也是只有一個的。
同時提供一個close()用于關掉Connection對象,因為如果用戶隨意關閉了connection,會導致需要經常重新創建Connection對象:
這個方法只會在整個應用關閉后才應該調用,比如某些框架的cleanUp()方法等,一般情況下只要應用程序還在運行就不應該調用這個方法。
由于HBaseHelper實現了較多功能,所以這里將HBaseHelper設為單例,如果只需要將Connection設為單例也是可以的,此時代碼相對簡單。
private static Connection connection = null;public static Connection getConnection(Configuration config) throws IOException {if (connection == null) {connection = ConnectionFactory.createConnection(config);}return connection; }(二)表管理
1、創建表
創建表的完整應用如下:
public void createTable(TableName table, int maxVersions, byte[][] splitKeys, String... colfams)throws IOException {HTableDescriptor desc = new HTableDescriptor(table);for (String cf : colfams) {HColumnDescriptor coldef = new HColumnDescriptor(cf);coldef.setMaxVersions(maxVersions);desc.addFamily(coldef);}if (splitKeys != null) {admin.createTable(desc, splitKeys);} else {admin.createTable(desc);} }幾個參數的意思分別為表名,最多保留多少個版本,用于預分區的keys,family的名稱。
使用預分區創建表,形式如byte[][] splits = new byte[][]{Bytes.toBytes(“row2000id”),Bytes.toBytes(“row4000id”),Bytes.toBytes(“row6000id”),Bytes.toBytes(“row8000id”)};
同時還應封裝將見的應用方式:
public void createTable(String table, String... colfams) throws IOException {createTable(TableName.valueOf(table), 1, null, colfams); }public void createTable(TableName table, String... colfams) throws IOException {createTable(table, 1, null, colfams); }public void createTable(String table, int maxVersions, String... colfams) throws IOException {createTable(TableName.valueOf(table), maxVersions, null, colfams); }public void createTable(TableName table, int maxVersions, String... colfams) throws IOException {createTable(table, maxVersions, null, colfams); }public void createTable(String table, byte[][] splitKeys, String... colfams) throws IOException {createTable(TableName.valueOf(table), 1, splitKeys, colfams); }關鍵步驟為:
(1)獲取一個Admin對象,用于管理表。這個對象在HBaseHelper中創建了,所以這里就不創建了。
(2)創建一個HTableDescriptor對象,表示一個表,但這個表還不存在。與下面的Table類對比。這個對象還可以設置很多屬性,如壓縮格式,文件大小等。
(3)判斷表是否已經存在,若存在的話,先disable, 然后delete。
(4)創建表。
Admin, HTableDescriptor對象都是輕量級的,只要有需要就可以創建,
2、判斷表是否存在
public boolean existsTable(String table) throws IOException {return existsTable(TableName.valueOf(table)); }public boolean existsTable(TableName table) throws IOException {return admin.tableExists(table); }其實上面的代碼就是直接調用hbase API的tableExists()方法,但不需要每次重新創建admin對象等。
3、刪除表
public void disableTable(String table) throws IOException {disableTable(TableName.valueOf(table)); }public void disableTable(TableName table) throws IOException {admin.disableTable(table); }public void dropTable(String table) throws IOException {dropTable(TableName.valueOf(table)); }public void dropTable(TableName table) throws IOException {if (existsTable(table)) {if (admin.isTableEnabled(table))disableTable(table);admin.deleteTable(table);} }(三)插入數據
1、插入單條數據
下面定義了各種常見的put方式,最后一種其實并不常用。
public void put(String table, String row, String fam, String qual, String val) throws IOException {put(TableName.valueOf(table), row, fam, qual, val); }public void put(TableName table, String row, String fam, String qual, String val) throws IOException {Table tbl = connection.getTable(table);Put put = new Put(Bytes.toBytes(row));put.addColumn(Bytes.toBytes(fam), Bytes.toBytes(qual), Bytes.toBytes(val));tbl.put(put);tbl.close(); }public void put(String table, String row, String fam, String qual, long ts, String val) throws IOException {put(TableName.valueOf(table), row, fam, qual, ts, val); }public void put(TableName table, String row, String fam, String qual, long ts, String val) throws IOException {Table tbl = connection.getTable(table);Put put = new Put(Bytes.toBytes(row));put.addColumn(Bytes.toBytes(fam), Bytes.toBytes(qual), ts, Bytes.toBytes(val));tbl.put(put);tbl.close(); }public void put(String table, String[] rows, String[] fams, String[] quals, long[] ts, String[] vals)throws IOException {put(TableName.valueOf(table), rows, fams, quals, ts, vals); }public void put(TableName table, String[] rows, String[] fams, String[] quals, long[] ts, String[] vals)throws IOException {Table tbl = connection.getTable(table);for (String row : rows) {Put put = new Put(Bytes.toBytes(row));for (String fam : fams) {int v = 0;for (String qual : quals) {String val = vals[v < vals.length ? v : vals.length - 1];long t = ts[v < ts.length ? v : ts.length - 1];System.out.println("Adding: " + row + " " + fam + " " + qual + " " + t + " " + val);put.addColumn(Bytes.toBytes(fam), Bytes.toBytes(qual), t, Bytes.toBytes(val));v++;}}tbl.put(put);}tbl.close(); }這里每次put一個數據均會創建一個Table對象,然后close這個對象。雖然說這個對象是輕量級的,但如果發生一個循環里面,則不斷的創建及destory對象還是會有較大的消耗的,這種情況應該考慮復用Table對象,或者使用下面介紹的緩存技術。
2、使用緩存
在hbase1.0.0以后,使用BufferedMutator處理緩存,這些數據會先在客戶端中保存,直到緩沖區滿了,或者是顯示調用flush方法數據才會通過PRC請求發送至hbase。
/** 將一系列的數據put進table的fam:qual中,由rows和vals來定義寫入的數據,它們的長期必須相等。*/ public void put(String table, String[] rows, String fam, String qual, String[] vals) throws IOException {if (rows.length != vals.length) {LOG.error("rows.lenght {} is not equal to val.length {}", rows.length, vals.length);}try (BufferedMutator mutator = connection.getBufferedMutator(TableName.valueOf(table));) {for (int i = 0; i < rows.length; i++) {Put p = new Put(Bytes.toBytes(rows[i]));p.addColumn(Bytes.toBytes(fam), Bytes.toBytes(qual), Bytes.toBytes(vals[i]));mutator.mutate(p);//System.out.println(mutator.getWriteBufferSize());}mutator.flush();} } public void put(String table, String[] rows, String fam, String qual, String[] vals) throws IOException {put(TableName.valueOf(table), rows, fam, qual, vals); }最后的輸出是緩沖區大小,默認是2M,由參數hbase.client.write.buffer.決定。可以通過下面方法得到:
mutator.getWriteBufferSize()怎樣設置緩沖區大小呢?
(四)讀取數據:單個數據和一批數據
/** 獲取table表中,所有rows行中的,fam:qual列的值。*/ public Result get(String table, String row, String fam, String qual) throws IOException {return get(TableName.valueOf(table), new String[]{row}, new String[]{fam}, new String[]{qual})[0]; }public Result get(TableName table, String row, String fam, String qual) throws IOException {return get(table, new String[]{row}, new String[]{fam}, new String[]{qual})[0]; }public Result[] get(TableName table, String[] rows, String fam, String qual) throws IOException {return get(table, rows, new String[]{fam}, new String[]{qual}); }public Result[] get(String table, String[] rows, String fam, String qual) throws IOException {return get(TableName.valueOf(table), rows, new String[]{fam}, new String[]{qual}); }public Result[] get(String table, String[] rows, String[] fams, String[] quals) throws IOException {return get(TableName.valueOf(table), rows, fams, quals); }/** 獲取table表中,所有rows行中的,fams和quals定義的所有行。*/ public Result[] get(TableName table, String[] rows, String[] fams, String[] quals) throws IOException {Table tbl = connection.getTable(table);List<Get> gets = new ArrayList<Get>();for (String row : rows) {Get get = new Get(Bytes.toBytes(row));get.setMaxVersions();if (fams != null) {for (String fam : fams) {for (String qual : quals) {get.addColumn(Bytes.toBytes(fam), Bytes.toBytes(qual));}}}gets.add(get);}Result[] results = tbl.get(gets);tbl.close();return results; }1、遍歷返回數據的方法
for (Result result : results) {for (Cell cell : result.rawCells()) {System.out.println("Cell: " + cell + ", Value: "+ Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));}}如果直接調用result.toString(),則只返回前面那部分,即cell,而沒有value部分。
(五)append數據
append既可以給一行append新的一列,也可以給一列里面的內容append新的內容。
List<Append> appends = new ArrayList<Append>();for (int i = 0; i < lineCount;i = i + 1) {int r_int = rand.nextInt(tasks.size());String rowid = "1";Append append = new Append(Bytes.toBytes(rowid));append.add(Bytes.toBytes("cf"), Bytes.toBytes("qual_"), Bytes.toBytes("test" + i));//table.append(append);appends.add(append);System.out.println("appending: " + i);}table.batch(appends);table.close();(六)掃描表
將表打印出來:
public void dump(String table) throws IOException {dump(TableName.valueOf(table)); }public void dump(TableName table) throws IOException {try (Table t = connection.getTable(table); ResultScanner scanner = t.getScanner(new Scan())) {for (Result result : scanner) {dumpResult(result);}} }public void dumpResult(Result result) {for (Cell cell : result.rawCells()) {System.out.println("Cell: " + cell + ", Value: "+ Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));} }Scanner的另一種常見用法:
Scan scan = new Scan();scan.addFamily(Bytes.toBytes(family));Filter filter = new PrefixFilter(Bytes.toBytes(rowkeyPrefix));scan.setFilter(filter);ResultScanner scanner = table.getScanner(scan);(七)更改表結構
//有問題,而且一般不建議在代碼中更改表結構。 public static void modifySchema(Connection connection) throws IOException {try (Admin admin = connection.getAdmin()) {TableName tableName = TableName.valueOf(TABLE_NAME);if (!admin.tableExists(tableName)) {System.out.println("Table does not exist.");System.exit(-1);}HTableDescriptor table = new HTableDescriptor(tableName);// Update existing tableHColumnDescriptor newColumn = new HColumnDescriptor("NEWCF");newColumn.setCompactionCompressionType(Algorithm.GZ);newColumn.setMaxVersions(HConstants.ALL_VERSIONS);admin.addColumn(tableName, newColumn);// Update existing column familyHColumnDescriptor existingColumn = new HColumnDescriptor(FAMILY);existingColumn.setCompactionCompressionType(Algorithm.GZ);existingColumn.setMaxVersions(HConstants.ALL_VERSIONS);table.modifyFamily(existingColumn);admin.modifyTable(tableName, table);// Disable an existing tableadmin.disableTable(tableName);// Delete an existing column familyadmin.deleteColumn(tableName, FAMILY.getBytes("UTF-8"));// Delete a table (Need to be disabled first)admin.deleteTable(tableName);} }四、常見異常
1、java.io.IOException: No FileSystem for scheme: hdfs
解決方法:將hadoop相關的jar包添加至classpath中。
2、UnknownHostException
Caused by: java.net.UnknownHostException: logstreaming上面的logstreaming是hdfs的集群URL,這里表示未能正確加載hadoop的配置。解決辦法:
export HADOOP_CONF_DIR=/home/hadoop/conf_loghbase3、NoSuchMethodError:addFamily
Exception in thread “main” java.lang.NoSuchMethodError: org.apache.hadoop.hbase.HTableDescriptor.addFamily(Lorg/apache/hadoop/hbase/HColumnDescriptor;)Lorg/apache/hadoop/hbase/HTableDescriptor;
at co.cask.hbasetest.HBaseTest.createTable(HBaseTest.java:34)
at co.cask.hbasetest.HBaseTest.doMain(HBaseTest.java:49)
at co.cask.hbasetest.HBaseTest.main(HBaseTest.java:67)
在1.0.0之后,apache hbase將addFamily的返回值從void改成了HTableDescriptor,但CDH沒改,因此如果使用其中一個作編譯,另一個作為運行環境,則會出現上述錯誤。
解決辦法:
使用同一版本編譯。
如果使用CDH,則需要添加:
然后指定CDH的版本:
<dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.0.0-cdh5.4.5</version></dependency>4、SASL authentication failed.
出現以下錯誤,提示沒有kinit。但事實上你已經kinit。其中一個原因是要使用
java -cp `hbase classpath`:yourjar.jar Main來運行任務,不能用
java -cp `/home/hadoop/hbase/lib`:yourjar.jar MainCaused by: java.io.IOException: Could not set up IO Streams to gdc-dn152-formal.i.nease.net/10.160.254.123:60020
at org.apache.hadoop.hbase.ipc.RpcClientImplConnection.setupIOstreams(RpcClientImpl.java:773)atorg.apache.hadoop.hbase.ipc.RpcClientImplConnection.writeRequest(RpcClientImpl.java:890)
at org.apache.hadoop.hbase.ipc.RpcClientImplConnection.tracedWriteRequest(RpcClientImpl.java:859)atorg.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1193)atorg.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:216)atorg.apache.hadoop.hbase.ipc.AbstractRpcClientBlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:300)
at org.apache.hadoop.hbase.protobuf.generated.ClientProtosClientServiceBlockingStub.get(ClientProtos.java:32627)
at org.apache.hadoop.hbase.protobuf.ProtobufUtil.getRowOrBefore(ProtobufUtil.java:1583)
at org.apache.hadoop.hbase.client.ConnectionManagerHConnectionImplementation.locateRegionInMeta(ConnectionManager.java:1293)atorg.apache.hadoop.hbase.client.ConnectionManagerHConnectionImplementation.locateRegion(ConnectionManager.java:1125)
at org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:299)
… 9 more
Caused by: java.lang.RuntimeException: SASL authentication failed. The most likely cause is missing or invalid credentials. Consider ‘kinit’.
at org.apache.hadoop.hbase.ipc.RpcClientImplConnection1.run(RpcClientImpl.java:673)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1707)
at org.apache.hadoop.hbase.ipc.RpcClientImplConnection.handleSaslConnectionFailure(RpcClientImpl.java:631)atorg.apache.hadoop.hbase.ipc.RpcClientImplConnection.setupIOstreams(RpcClientImpl.java:739)
… 19 more
Caused by: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]
at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
at org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179)
at org.apache.hadoop.hbase.ipc.RpcClientImplConnection.setupSaslConnection(RpcClientImpl.java:605)atorg.apache.hadoop.hbase.ipc.RpcClientImplConnection.access600(RpcClientImpl.java:154)atorg.apache.hadoop.hbase.ipc.RpcClientImplConnection2.run(RpcClientImpl.java:731)atorg.apache.hadoop.hbase.ipc.RpcClientImplConnection2.run(RpcClientImpl.java:728)atjava.security.AccessController.doPrivileged(NativeMethod)atjavax.security.auth.Subject.doAs(Subject.java:415)atorg.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1707)atorg.apache.hadoop.hbase.ipc.RpcClientImplConnection.setupIOstreams(RpcClientImpl.java:728)
… 19 more
總結
- 上一篇: Java静态域与静态方法
- 下一篇: Builder 模式