Web3j监听功能代码研究
Web3j監聽功能代碼研究
高洪濤 2021-03-19
?
本周深入研究了web3j工具包實現以太坊的監聽功能,實現了交易監聽和代幣監聽的方法,對監聽過程中的常見問題進行了處理,本文就是對這部分開發經驗的總結。
1 web3j 版本
Web3官網:https://www.web3labs.com/web3j-sdk
Docs: https://docs.web3j.io/latest/quickstart/
?
我使用了3個版本的web3j, 3.6、4.5.5、4.8.4,分別進行說明。
1.1 Web3j 3.6版本
3.6版本可以實現各種監聽。來源已經搞不清楚了,是一個文件夾,包含有許多jar文件。
?
這個文件夾打包鏈接:?
使用時需要添加到編譯路徑中。
?
1.2 4.5.5版本
我下載了4.5.5版本工具包,只有一個文件,里面包含了各種jar包:console-4.5.5-all.jar
?
經過測試,該包可以正常的完成查詢和交易,但是無法監聽,提示缺少rxjava相關jar,我就沒有再折騰,放棄了。
?
1.3 4.8.4版本
該版本可以實現監聽。從官網自動下載,maven工程中添加依賴:
<dependency>
????? ? <groupId>org.web3j</groupId>
????? ? <artifactId>core</artifactId>
????? ? <version>4.8.4</version>
?? ? </dependency>
要實現監聽,還需要添加另一個依賴:
<dependency>
????? <groupId>io.reactivex.rxjava3</groupId>
????? <artifactId>rxjava</artifactId>
????? <version>3.0.11</version>
?? </dependency>
使用到了json需要添加依賴:
<!-- JSONObject對象依賴的jar包 開始 -->
????? <dependency>
????????? <groupId>commons-beanutils</groupId>
????????? <artifactId>commons-beanutils</artifactId>
????????? <version>1.9.3</version>
????? </dependency>
????? <dependency>
????????? <groupId>commons-collections</groupId>
????????? <artifactId>commons-collections</artifactId>
????????? <version>3.2.1</version>
????? </dependency>
????? <dependency>
????????? <groupId>commons-lang</groupId>
????????? <artifactId>commons-lang</artifactId>
????????? <version>2.6</version>
????? </dependency>
????? <dependency>
????????? <groupId>commons-logging</groupId>
????????? <artifactId>commons-logging</artifactId>
????????? <version>1.1.1</version>
????? </dependency>
????? <dependency>
????????? <groupId>net.sf.ezmorph</groupId>
????????? <artifactId>ezmorph</artifactId>
????????? <version>1.0.6</version>
????? </dependency>
????? <dependency>
????????? <groupId>net.sf.json-lib</groupId>
????????? <artifactId>json-lib</artifactId>
????????? <version>2.2.3</version>
????????? <classifier>jdk15</classifier>
????????? <!-- jdk版本 -->
????? </dependency>
?? ?<!-- Json依賴架包下載結束 -->
2 3.6版本監聽
?
我認為監聽有3種類型,分別是:
代幣監聽:監聽ERC20代幣交易,從startBlock區塊開始監聽token轉賬事件
重放交易:監聽過往交易,需要指定開始和結束區塊號
交易監聽:從當前區塊開始監聽交易
其中交易監聽收到的交易事件最多,包含了代幣交易。代幣監聽優點是直接過濾指定的代幣轉賬事件,用起來方便。重放交易是查詢歷史交易記錄,可以針對某段時間查詢交易。
?
2.1 代幣監聽
一般步驟:
?
說明: 再檢查交易地址是否是自己需要的這一步,一般做法是采用地址字符串比較,這樣非常費時間,我把關注的地址保存在hashMap中,查找時直接調用htAddress.containsKey(fromAddress),這樣速度最快。
?
public List<String> contracts;? //代幣合約地址列表,可以存放多個地址
public Subscription tokenSubscription;?? //token事件訂閱對象
public Subscription ethMissSubscription; //ETH交易空檔事件訂閱對象
public Subscription ethSubscription;???? //ETH交易事件訂閱對象
/*啟動監聽, 從startBlock區塊開始監聽token轉賬事件
?????????? 代幣監聽會出現的問題: 如果啟動區塊距離當前區塊稍遠,非常可能的情況是中間出現的交易太多,監視代碼內部出現空指針異常。
?????????? 如果監聽啟動時接近當前區塊問題出現概率小。
?? ?*/
??? public void startTransferListen_Token(BigInteger startBlock) {
????? // 要監聽的合約事件
????? Event event = new Event("Transfer",
???????????? Arrays.asList(
?????????????????? new TypeReference<Address>() {},
?????????????????? new TypeReference<Address>() {},
?????????????????? new TypeReference<Uint>(){}));
????? //過濾器
????? EthFilter filter = new EthFilter(
???????????? DefaultBlockParameter.valueOf(startBlock),
???????????? DefaultBlockParameterName.LATEST,
???????????? contracts);
????? filter.addSingleTopic(EventEncoder.encode(event));
?????
??????? //注冊監聽,解析日志中的事件?????
????? block_TokenSub = startBlock.intValue();
??????????? tokenSubscription = web3j.ethLogObservable(filter).subscribe(log -> {
??????????? ?
??????? ???? block_TokenSub = log.getBlockNumber().intValue();
?
??????? ???? String token = log.getAddress();? //這是Token合約地址??????????? ??
?? ??????????? String txHash = log.getTransactionHash();
?? ??????????? List<String> topics = log.getTopics();? // 提取轉賬記錄???????????????
?? ??????????? String fromAddress = "0x"+topics.get(1).substring(26);
?? ????????? ??String toAddress = "0x"+topics.get(2).substring(26);
?? ???????????
?? ??????????? System.out.println("? ---token ="+token+",? txHash ="+txHash);
?? ???????????
?? ??????????? //檢查發送地址、接收地址是否屬于系統用戶, 不是系統用戶就不予處理
?? ??????????? if(htAddress.containsKey(fromAddress) || htAddress.containsKey(toAddress)) {???????????????
?? ??????????? ? String value1 = log.getData();
?? ??????????? ? BigInteger big = new BigInteger(value1.substring(2), 16);
?? ??????????? ? BigDecimal value = Convert.fromWei(big.toString(), Convert.Unit.ETHER);
?? //??????????????????? System.out.println("value="+value);
?? ??????????? ? String timestamp = "";
?? ??????????? ?
?? ??????????? ? try {
?? ??????????? ???? EthBlock ethBlock = web3j.ethGetBlockByNumber(DefaultBlockParameter.valueOf(log.getBlockNumber()), false).send();
?? ??????????? ???? timestamp = String.valueOf(ethBlock.getBlock().getTimestamp());
?? ??????????? ? } catch (IOException e) {
?? ??????????? ???? System.out.println("Block timestamp get failure,block number is {}" + log.getBlockNumber());
?? ??????????? ???? System.out.println("Block timestamp get failure,{}"+? e.getMessage());
?? ??????????? ? }
?? ??????????? ?
?? ??????????? ? //執行關鍵的回調函數
?? ??????????? callBack_Token(token,txHash,fromAddress,toAddress,value,timestamp);
?? ??????????? }
?? ??????? }, error->{
?? ??????? ? System.out.println(" ### tokenSubscription?? error= "+ error);
?? ??????? ? error.printStackTrace();
?? ??????? });
???????? System.out.println("tokenSubscription ="+tokenSubscription);
???????? System.out.println(tokenSubscription.isUnsubscribed());
??? }
?
2.2 重放交易
重放交易功能很重要,尤其涉及充幣業務時,如果充幣運行服務器停機維護,那么在此期間的代幣充值就無法知曉造成遺漏損失。解決方法時充幣運行服務器實時記錄自己監聽的區塊高度,記錄在數據庫中,下次啟動時查找這個區塊到最新區塊之間的交易。
說明: 當指定的區塊交易重放完畢,該監聽就自動終止。ethMissSubscription.isUnsubscribed()返回值就是false。
//啟動監聽以太坊上的過往交易
??? public void startReplayListen_ETH(BigInteger startBlockNum) {
??? ?? System.out.println("? startReplayListen_ETH:? startBlockNum="+startBlockNum);
??? ?? //回放空檔期間的交易
??? ?? BigInteger currentBlockNum=null;
????? try {
????????? //獲取當前區塊號
????????? currentBlockNum = web3j.ethBlockNumber().send().getBlockNumber();
????????? System.out.println("? 000 currentBlockNum= "+currentBlockNum.intValue());
????????? if(startBlockNum.compareTo(currentBlockNum) > 0) {
???????????? return;? //測試曾經出現 currentBlockNum得到錯誤數字,比startBlockNum還小,這時不能啟動監聽
????????? }
????? } catch (IOException e) {
????????? // TODO Auto-generated catch block
????????? System.out.println("? 111 getBlockNumber() Error: ");
????????? e.printStackTrace();
????????? return;?? //出現異常不能啟動監聽
????? }????
?????
?????
??????? //創建開始與結束區塊, 重放這段時間內的交易,防止遺漏
??????? DefaultBlockParameter startBlock = new DefaultBlockParameterNumber(startBlockNum);
??????? DefaultBlockParameter endBlock = new DefaultBlockParameterNumber(currentBlockNum);
??????? System.out.println("[ startTransferListen_ETH:? miss? startBlock="+startBlockNum+", endBlock="+currentBlockNum+"]");
???????
??????? block_EthMissSub = startBlockNum.intValue();
??????? ethMissSubscription = web3j.replayTransactionsObservable(startBlock, endBlock)
???????? ???.subscribe(tx -> {
??????????? ? //更新檢查過的區塊高度
??????????? ? block_EthMissSub = tx.getBlockNumber().intValue();??????????????? ??
??????????? ? System.out.println("? ---replayPastTransactionsFlowable??? block_EthMissSub = "+block_EthMissSub);
??????????? ?
??????????? ? String fromAddress = tx.getFrom();
??????????? ? String toAddress?? = tx.getTo();
// ??????????????? ?System.out.println("toAddress="+toAddress);
??????????? ? if(htAddress.containsKey(fromAddress) || htAddress.containsKey(toAddress)) {? //發現了指定地址上的交易
??????????? ???? ?String txHash = tx.getHash();
??????????????? ?
???????????????????? BigDecimal value = Convert.fromWei(tx.getValue().toString(), Convert.Unit.ETHER);
???????????????????? String timestamp = "";
???????????????????? try {
???????????????????????? EthBlock ethBlock = web3j.ethGetBlockByNumber(DefaultBlockParameter.valueOf(tx.getBlockNumber()), false).send();
???????????????????????? timestamp = String.valueOf(ethBlock.getBlock().getTimestamp());
???????????????????? } catch (IOException e) {
???????????????????? ?? ?System.out.println("Block timestamp get failure,block number is {}" + tx.getBlockNumber());
???????????????????????? System.out.println("Block timestamp get failure,{}"+? e.getMessage());
???????????????????? }
?
???????????????????? // 監聽以太坊上是否有系統生成地址的交易
???????????????????? callBack_ETH(txHash,fromAddress,toAddress,value,timestamp);
??????????? ? }
??????????? ? ?
??????????? }, error->{
??????????? ? System.out.println("?? ### replayPastTransactionsFlowable? error= "+ error);
??????????? ? error.printStackTrace();
??????????? });
???????
??? }
?
2.3 交易監聽
這種方式監聽每一筆交易,以太坊上交易量太大,只能自己過濾出關注的交易進行處理。要盡可能的快速處理。可以考慮線程池模型進行處理。
//啟動監聽以太坊上的交易
??? public void startTransactionListen_ETH() {??? ?
??? ??
??????? //監聽當前區塊以后的交易
??????? ethSubscription = web3j.transactionObservable().subscribe(tx -> {
?? ??????? ? //更新檢查過的區塊高度
?? ??????? ? block_EthSub = tx.getBlockNumber().intValue();?
?? ??????? ? System.out.println("? ---transactionFlowable? block_EthSub = "+block_EthSub);
?? ??????? ?
?? ??????? ? String txHash = tx.getHash();??????? ?
?? ??????? ? String fromAddress = tx.getFrom();
?? ??????? ? String toAddress = tx.getTo();
?? ??????? ? if(htAddress.containsKey(fromAddress) || htAddress.containsKey(toAddress)) {? //發現了指定地址上的交易
??????????????? BigDecimal value = Convert.fromWei(tx.getValue().toString(), Convert.Unit.ETHER);
???????????? ??? String timestamp = "";
??????????????? try {
??????????????? ??? EthBlock ethBlock = web3j.ethGetBlockByNumber(DefaultBlockParameter.valueOf(tx.getBlockNumber()), false).send();
??????????????? ??? timestamp = String.valueOf(ethBlock.getBlock().getTimestamp());
??????????????? } catch (IOException e) {
?????????????????? ?System.out.println("Block timestamp get failure,block number is {}" + tx.getBlockNumber());
?????????????????? ?System.out.println("Block timestamp get failure,{}"+? e.getMessage());
??????????????? }
????????????
??????????????? // 監聽以太坊上是否有系統生成地址的交易
???????????? ??? callBack_ETH(txHash,fromAddress,toAddress,value,timestamp);
???????????? ?}
?? ??????? }, error->{
?? ??????? ? System.out.println("?? ### transactionFlowable? error= "+ error);
?? ??????? ? error.printStackTrace();
?? ??????? });
??? }
?
最后回調函數示例:
//token轉賬事件的處理函數
??? public void? callBack_Token(String token, String txHash, String from, String to, BigDecimal value, String timestamp) {
??? ?? System.out.println("----callBack_Token:");
??? ?? System.out.println("??? token = "+token);
??? ?? System.out.println("??? txHash = "+token);
??? ?? System.out.println("??? from = "+from);
??? ?? System.out.println("??? to = "+to);
??? ?? System.out.println("??? value = "+value.doubleValue());
??? ??
??? }
?
3 4.8.4版本監聽
版本升級后原來的監聽函數改變了,用法如下:
public Disposable? tokenSubscription;?? //token事件訂閱對象, 如果監視啟動成功,isDisposed()返回false;否則監視失敗返回true
?? public Disposable? ethMissSubscription; //ETH交易空檔事件訂閱對象
?? public Disposable? ethSubscription;???? //ETH交易事件訂閱對象
?
tokenSubscription = web3j.ethLogFlowable(filter)
??????? ? .subscribe(log -> {……});
?
??????????
?
ethMissSubscription = web3j.replayPastTransactionsFlowable(startBlock, endBlock)
??????????? .subscribe(tx -> {……});
?
ethSubscription = web3j.transactionFlowable()
??????? ? .subscribe(tx -> {……});
?
判斷監聽對象是否運行:
tokenSubscription.isDisposed()
?
原來通過監聽對象取消監聽:
ethSubscription.cancel();
現在沒有這個方法啦, 就是不能主動停止監聽啦。
?
4 常見問題
4.1 監聽無法啟動
指定監聽開始區塊高度后,出現啟動監聽失敗,監聽對象為false。原因未知,我多次實踐經驗:
開始區塊距離最新區塊越遠越容易失敗;
一個開始區塊啟動監視成功,以后該區塊重新監聽也大概率成功,小概率失敗;
即使監聽成功,持續運行期間內部常常出現空指針異常,可能導致監視停止運行;
對于監聽成功啟動后出現的停止運行問題,我的做法是另開一個線程專門檢查監聽對象的狀態,一旦發現停止運行就立即重新啟動監聽,該方法有效。
?
-----End-----
總結
以上是生活随笔為你收集整理的Web3j监听功能代码研究的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        