vertx web连接超时 阻塞_Flink之基于Vertx的Mysql异步IO
導(dǎo)讀
在流計算中,如果以事件流為主,關(guān)聯(lián)一些維度信息,就需要根據(jù)每個事件中的關(guān)鍵信息去數(shù)據(jù)庫執(zhí)行一次查詢。正常的思路可能是通過mapFunction以阻塞的方式查詢數(shù)據(jù)庫,等待數(shù)據(jù)結(jié)果返回,然后執(zhí)行下一個步驟。如果數(shù)據(jù)庫查詢時間很長,那有可能會阻塞流計算的整體流程。因此可以考慮異步的方式請求數(shù)據(jù)庫,當(dāng)數(shù)據(jù)返回時,該事件再繼續(xù)執(zhí)行下面的操作。這樣提升了流計算的并發(fā)度,但是也增加了數(shù)據(jù)庫的訪問以及網(wǎng)絡(luò)帶寬的壓力。
1 Flink中的異步IO
在Flink中提供了一種異步IO的模式,不需要使用map函數(shù)阻塞式的加載數(shù)據(jù),而是使用異步方法同時處理大量請求。不過這就需要數(shù)據(jù)庫支持異步請求,如果不支持異步請求也可以手動維護(hù)線程池調(diào)用,只不過效率上沒有原生的異步client更高效。比如Mysql可以通過Vertx支持異步查詢,HBase2.x也支持異步查詢。
一般要實(shí)現(xiàn)Flink的異步查詢需要自定義幾個方法:
class MyAsyncReq extends RichAsyncFunction<IN,OUT>{@Overridepublic void open(..) throws Exception {}@Overridepublic void close() throws Exception {}@Overridepublic void asyncInvoke(..) throws Exception {} }其中open中需要定義連接或者連接池,close中進(jìn)行釋放,asyncInvoke執(zhí)行異步查詢。
AsyncDataStream.unorderedWait( stream, new MyAsyncReq(), 1000, TimeUnit.MILLISECONDS, 100);使用的使用執(zhí)行下面的方法即可,
stream為主要的事件流,
myasyncreq是異步IO類,
1000為異步請求的超時時間,
100是同時進(jìn)行異步請求的最大數(shù)量
另外,由于是異步請求,所以可能請求結(jié)束后順序與原來的順序就不一致了。使用unordered時會以異步請求結(jié)束的時間為準(zhǔn),ordered會以事件時間為準(zhǔn)。
2 基于Vertx實(shí)現(xiàn)的Mysql異步IO
如果外部數(shù)據(jù)源是Mysql,一般的jdbc連接都是同步機(jī)制的,看浪尖大大的文章,推薦了一個異步JDBC組件——Vertx,下面就以Vertx為例作為異步IO的Client。
maven引入除flink之外其他的jar:
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.13</version> </dependency> <dependency><groupId>io.vertx</groupId><artifactId>vertx-jdbc-client</artifactId><version>3.8.3</version> </dependency> <dependency><groupId>io.vertx</groupId><artifactId>vertx-core</artifactId><version>3.8.3</version> </dependency>先在open中創(chuàng)建SQLClient,它內(nèi)部維護(hù)了自己的異步請求服務(wù);然后再close中關(guān)閉client;在asyncInvoke中調(diào)用獲取connection,執(zhí)行查詢,并釋放連接。
public class JDBCAsyncFunction extends RichAsyncFunction<Click, Store> {private SQLClient client;@Overridepublic void open(Configuration parameters) throws Exception {Vertx vertx = Vertx.vertx(new VertxOptions().setWorkerPoolSize(10).setEventLoopPoolSize(10));JsonObject config = new JsonObject().put("url", "jdbc:mysql://xx:3306/base").put("driver_class", "com.mysql.cj.jdbc.Driver").put("max_pool_size", 10).put("user", "x").put("password", "x");client = JDBCClient.createShared(vertx, config);}@Overridepublic void close() throws Exception {client.close();}@Overridepublic void asyncInvoke(Click input, ResultFuture<Store> resultFuture) throws Exception {client.getConnection(conn -> {if (conn.failed()) {return;}final SQLConnection connection = conn.result();connection.query("select id, name from t where id = " + input.getId(), res2 -> {ResultSet rs = new ResultSet();if (res2.succeeded()) {rs = res2.result();}List<Store> stores = new ArrayList<>();for (JsonObject json : rs.getRows()) {Store s = new Store();s.setId(json.getInteger("id"));s.setName(json.getString("name"));stores.add(s);}connection.close();resultFuture.complete(stores);});});} }注意,一定要在query的返回調(diào)用方法中手動釋放connection,不然馬上就會報連接池耗盡的異常。使用時就沒什么區(qū)別了:
AsyncDataStream .unorderedWait(clicks,new JDBCPoolFunction(), 100,TimeUnit.SECONDS,10) .print();3 參考
1 vertx:
https://vertx.io/docs/vertx-jdbc-client/java/
2 設(shè)計思想?yún)⒖?#xff1a;
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673
總結(jié)
以上是生活随笔為你收集整理的vertx web连接超时 阻塞_Flink之基于Vertx的Mysql异步IO的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Atititi tesseract使用总
- 下一篇: python单例模式解析_Python下