插入數據前判斷download_type類型是否在200-300范圍中,如果在直接將用戶id插入到另外一個表中,并做關聯
@Overridepublic void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,final Put put, final WALEdit edit, final Durability durability)throws IOException {LOG.warn("###########################################");// 獲取列簇為FAMAILLY_NAME,列名為DOWNLOAD_TYPE的數據List<Cell> cells = put.get(Bytes.toBytes(FAMAILLY_NAME),Bytes.toBytes(DOWNLOAD_TYPE));//判斷列名為DOWNLOAD_TYPE是否包含數據,不包含直接return,退出處理if (cells == null || cells.size() == 0) {LOG.warn("download_type 字段不存在退出過濾");return;}// 列名為DOWNLOAD_TYPE已包含數據,進行處理byte[] aValue = null;for (Cell cell : cells) {try {//DOWNLOAD_TYPE轉換為數字aValue = CellUtil.cloneValue(cell);Integer valueOf = Integer.valueOf(Bytes.toString(aValue));if(valueOf>=200 &&valueOf<=300) {//如果DOWNLOAD_TYPE范圍在200-300之間,獲取用戶UID信息List<Cell> list = put.get(Bytes.toBytes(FAMAILLY_NAME),Bytes.toBytes(UID));//判斷用戶是否包含UIDif (list == null || list.size() == 0) {LOG.warn("用戶BIZ不存在,或者為空");return;}//獲取用戶UIDCell cell2 = list.get(0);LOG.warn("UID--->"+Bytes.toString(CellUtil.cloneValue(cell2)));//將用戶UID設置為rowkey,原數據的rowkey當作列名,download_type當作列值Put put2 = new Put(CellUtil.cloneValue(cell2));put2.addColumn(Bytes.toBytes(FAMAILLY_NAME), put.getRow(), aValue);//插入數據到table中table.put(put2);table.close();}else {LOG.warn("Download type is not in range.");}} catch (Exception e1) {LOG.error("異常------->>>>>> "+e1.getMessage());return ;}}}
preGetOp-處理返回結果只對get有效
@Overridepublic void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,final Get get, final List<Cell> results) throws IOException { //判斷查詢的rowkey是否等于testif(Bytes.equals(get.getRow(),Bytes.toBytes("test"))) { //新增返回數據fn:time,值為當前時間戳給客戶端KeyValue kv = new KeyValue(get.getRow(),Bytes.toBytes("fn"),Bytes.toBytes("time"),1535555555555l ,Bytes.toBytes(String.valueOf(System.currentTimeMillis())));results.add(kv);}}
preScannerOpen-在客戶端打開新掃描儀之前過濾,此方法會覆蓋原有filter
@Overridepublic RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, final Scan scan,final RegionScanner s) throws IOException {//查詢rowkey等于test的行進行過濾(顯示不等test的數據),會覆蓋原有filterFilter filter = new RowFilter(CompareOp.NOT_EQUAL, new BinaryComparator(Bytes.toBytes("test")));scan.setFilter(filter);return s;}
postScannerNext,對返回結果進行過濾
@Overridepublic boolean postScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e, final InternalScanner s,final List<Result> results, final int limit, final boolean hasMore) throws IOException {Result result = null;//獲取返回結果,如果rowkey等于test則過濾掉Iterator<Result> iterator = results.iterator();while (iterator.hasNext()) {result = iterator.next();if (Bytes.equals(result.getRow(), Bytes.toBytes("test2"))) {iterator.remove();break;}}return hasMore;}