Hbase 学习(三)Coprocessors
生活随笔
收集整理的這篇文章主要介紹了
Hbase 学习(三)Coprocessors
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
Coprocessors
之前我們的filter都是在客戶端定義,然后傳到服務端去執行的,這個Coprocessors是在服務端定義,在客戶端調用,然后在服務端執行,他有點兒想我們熟悉的存儲過程,傳一些參數進去,然后進行我們事先定義好的操作,我們常常用它來做一些比如二次索引啊,統計函數什么的,它也和自定義filter一樣,需要事先定好,然后在hbase-env.sh中的HBASE_CLASSPATH中指明,就像我的上一篇中的寫的那樣。 Coprocessors分兩種,observer和endpoint。 (1)observer就像觸發器一樣,當某個事件發生的時候,它就出發。 已經有一些內置的接口讓我們去實現,RegionObserver、MasterObserver、WALObserver,看名字就大概知道他們是干嘛的。 (2)endpoint可以認為是自定義函數,可以把這個理解為關系數據庫的存儲過程。 所有的Coprocessor都是實現自Coprocessor 接口,它分SYSTEM和USER,前者的優先級比后者的優先級高,先執行。 它有兩個方法,start和stop方法,兩個方法都有一個相同的上下文對象CoprocessorEnvironment。 void start(CoprocessorEnvironment env) throws IOException;? void stop(CoprocessorEnvironment env) throws IOException; 這是CoprocessorEnvironment的方法。Working with Tables 對表進行操作的時候,必須先調用getTable方法活得HTable,不可以自己定義一個HTable,目前貌似沒有禁止,但是將來會禁止。 并且在對表操作的時候,不能對行加鎖。 Coprocessor Loading Coprocessor加載需要在配置文件里面全局加載,比如在hbase-site.xml中設置。 <property><name>hbase.coprocessor.region.classes</name><value>coprocessor.RegionObserverExample,coprocessor.AnotherCoprocessor</value> </property> <property><name>hbase.coprocessor.master.classes</name><value>coprocessor.MasterObserverExample</value> </property> <property><name>hbase.coprocessor.wal.classes</name><value>coprocessor.WALObserverExample,bar.foo.MyWALObserver</value> </property> 我們自定義的時間可以注冊到三個配置項上,分別是hbase.coprocessor.region.classes,hbase.coprocessor.master.classes, hbase.coprocessor.wal.classes上,他們分別負責region,master,wal,注冊到region的要特別注意小心,因為它是針對所有表的。 <property><name>hbase.coprocessor.region.classes</name><value>coprocessor.RegionObserverExample</value> </property> 注冊到這三個觸發器上,可以監控到幾乎所有我們的操作上面,非常恐怖。。可以說是想要什么就有什么,詳細的代碼大家自己去摸索。 EndPoint的可以用來定義聚合函數,我們可以調用CoprocessorProtocol中的方法來實現我們的需求。 調用coprocessorProxy() 傳一個單獨的row key,這是在單獨一個region上操作的。 要在所有region上面操作,我們要調用coprocessorExec()方法 傳一個開始row key 和結束row key。 Demo 說了那么多廢話,我都不好意思再說了,來個例子吧,統計行數的。 public interface RowCountProtocol extends CoprocessorProtocol {long getRowCount() throws IOException;long getRowCount(Filter filter) throws IOException;long getKeyValueCount() throws IOException; }
public class RowCountEndpoint extends BaseEndpointCoprocessor implementsRowCountProtocol {private long getCount(Filter filter, boolean countKeyValues)throws IOException {Scan scan = new Scan();scan.setMaxVersions(1);if (filter != null) {scan.setFilter(filter);}RegionCoprocessorEnvironment environment = (RegionCoprocessorEnvironment) getEnvironment();// use an internal scanner to perform scanning.InternalScanner scanner = environment.getRegion().getScanner(scan);int result = 0;try {List<KeyValue> curVals = new ArrayList<KeyValue>();boolean done = false;do {curVals.clear();done = scanner.next(curVals);result += countKeyValues ? curVals.size() : 1;} while (done);} finally {scanner.close();}return result;}@Overridepublic long getRowCount() throws IOException {return getRowCount(new FirstKeyOnlyFilter());}@Overridepublic long getRowCount(Filter filter) throws IOException {return getCount(filter, false);}@Overridepublic long getKeyValueCount() throws IOException {return getCount(null, true);} }寫完之后,注冊一下吧。 <property><name>hbase.coprocessor.region.classes</name><value>coprocessor.RowCountEndpoint</value> </property>
JAVA 客戶端調用
在服務端定義之后,我們怎么在客戶端用java代碼調用呢,看下面的例子你就明白啦! public class EndPointExample {public static void main(String[] args) throws IOException {Configuration conf = HBaseConfiguration.create();HTable table = new HTable(conf, "testtable");try {Map<byte[], Long> results = table.coprocessorExec(RowCountProtocol.class, null, null,new Batch.Call<RowCountProtocol, Long>() {@Overridepublic Long call(RowCountProtocol counter)throws IOException {return counter.getRowCount();}});long total = 0;for (Map.Entry<byte[], Long> entry : results.entrySet()) {total += entry.getValue().longValue();System.out.println("Region: " + Bytes.toString(entry.getKey())+ ", Count: " + entry.getValue());}System.out.println("Total Count: " + total);} catch (Throwable throwable) {throwable.printStackTrace();}}} 通過table的coprocessorExec方法調用,然后調用RowCountProtocol接口的getRowCount()方法。 然后遍歷每個Region返回的結果,合起來就是最終的結果,打印結果如下。 Region: testtable,,1303417572005.51f9e2251c29ccb2...cbcb0c66858f., Count: 2 Region: testtable,row3,1303417572005.7f3df4dcba3f...dbc99fce5d87., Count: 3 Total Count: 5在上面的例子當中,我們是用Batch.Call()方法來調用接口當中的方法,我們可以用另外一個方法來簡化上述代碼,來看例子。 Batch.Call call =Batch.forMethod(RowCountProtocol.class,"getKeyValueCount"); Map<byte[], Long> results = table.coprocessorExec(RowCountProtocol.class, null, null, call);采用Batch.Call方法調用同時調用多個方法
Map<byte[], Pair<Long, Long>> results =table.coprocessorExec( RowCountProtocol.class, null, null, new Batch.Call<RowCountProtocol, Pair<Long, Long>>() {public Pair<Long, Long> call(RowCountProtocol counter) throws IOException {return new Pair(counter.getRowCount(),counter.getKeyValueCount());} }); long totalRows = 0; long totalKeyValues = 0; for (Map.Entry<byte[], Pair<Long, Long>> entry :results.entrySet()) {totalRows +=entry.getValue().getFirst().longValue();totalKeyValues +=entry.getValue().getSecond().longValue();System.out.println("Region: " +Bytes.toString(entry.getKey()) +", Count: " + entry.getValue()); } System.out.println("Total Row Count: " + totalRows); System.out.println("Total KeyValue Count: " +totalKeyValues);調用coprocessorProxy()在單個region上執行
RowCountProtocol protocol = table.coprocessorProxy(RowCountProtocol.class, Bytes.toBytes("row4")); long rowsInRegion = protocol.getRowCount(); System.out.println("Region Row Count: " +rowsInRegion);上面這個例子是查找row4行所在region的數據條數,這個可以幫助我們統計每個region上面的數據分布。總結
以上是生活随笔為你收集整理的Hbase 学习(三)Coprocessors的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java中由类名和方法名字符串实现其调用
- 下一篇: centos 安装 py pyhs2