Flink 异步IO访问外部数据(mysql篇)
接上篇:【翻譯】Flink 異步I / O訪問外部數據
最近看了大佬的博客,突然想起Async I/O方式是Blink 推給社區的一大重要功能,可以使用異步的方式獲取外部數據,想著自己實現以下,項目上用的時候,可以不用現去找了。
最開始想用scala 實現一個讀取 hbase數據的demo,參照官網demo:
/*** An implementation of the 'AsyncFunction' that sends requests and sets the callback.*/ class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {/** The database specific client that can issue concurrent requests with callbacks */lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials)/** The context used for the future callbacks */implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, String)]): Unit = {// issue the asynchronous request, receive a future for the resultval resultFutureRequested: Future[String] = client.query(str)// set the callback to be executed once the request by the client is complete// the callback simply forwards the result to the result future resultFutureRequested.onSuccess {case result: String => resultFuture.complete(Iterable((str, result)))}} }// create the original stream val stream: DataStream[String] = ...// apply the async I/O transformation val resultStream: DataStream[(String, String)] =AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100)失敗了,上圖標紅的部分實現不了
1、Future 找不到可以用的實現類
2、unorderedWait 一直報錯
源碼example 里面也有Scala 的案例
def main(args: Array[String]) {val timeout = 10000Lval env = StreamExecutionEnvironment.getExecutionEnvironmentval input = env.addSource(new SimpleSource())val asyncMapped = AsyncDataStream.orderedWait(input, timeout, TimeUnit.MILLISECONDS, 10) {(input, collector: ResultFuture[Int]) =>Future {collector.complete(Seq(input))} (ExecutionContext.global)}asyncMapped.print()env.execute("Async I/O job")}主要部分是這樣的,菜雞表示無力,想繼承RichAsyncFunction,可以使用open 方法初始化鏈接。
網上博客翻了不少,大部分是翻譯官網的原理,案例也沒有可以執行的,苦惱。
失敗了。
轉為java版本的,昨天在群里問,有個大佬給我個Java版本的:?https://github.com/perkinls/flink-local-train/blob/c8b4efe33620352aea0100adef4fae2a068a3b65/src/main/scala/com/lp/test/asyncio/AsyncIoSideTableJoinMysqlJava.java 還沒看過,因為Java版的官網的案例能看懂。
下面開始上mysql 版本 的 源碼(hbase 的還沒測試過,本機的hbase 掛了):
業務如下:
接收kafka數據,轉為user對象,調用async,使用user.id 查詢對應的phone,放回user對象,輸出?主類:
import com.alibaba.fastjson.JSON; import com.venn.common.Common; import org.apache.flink.formats.json.JsonNodeDeserializationSchema; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.streaming.api.datastream.AsyncDataStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.concurrent.TimeUnit;public class AsyncMysqlRequest {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();FlinkKafkaConsumer<ObjectNode> source = new FlinkKafkaConsumer<>("async", new JsonNodeDeserializationSchema(), Common.getProp());// 接收kafka數據,轉為User 對象DataStream<User> input = env.addSource(source).map(value -> {String id = value.get("id").asText();String username = value.get("username").asText();String password = value.get("password").asText();return new User(id, username, password);});// 異步IO 獲取mysql數據, timeout 時間 1s,容量 10(超過10個請求,會反壓上游節點)DataStream async = AsyncDataStream.unorderedWait(input, new AsyncFunctionForMysqlJava(), 1000, TimeUnit.MICROSECONDS, 10);async.map(user -> {return JSON.toJSON(user).toString();}).print();env.execute("asyncForMysql");} }函數類:
import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.*;public class AsyncFunctionForMysqlJava extends RichAsyncFunction<AsyncUser, AsyncUser> {Logger logger = LoggerFactory.getLogger(AsyncFunctionForMysqlJava.class);private transient MysqlClient client;private transient ExecutorService executorService;/*** open 方法中初始化鏈接** @param parameters* @throws Exception*/@Overridepublic void open(Configuration parameters) throws Exception {logger.info("async function for mysql java open ...");super.open(parameters);client = new MysqlClient();executorService = Executors.newFixedThreadPool(30);}/*** use asyncUser.getId async get asyncUser phone** @param asyncUser* @param resultFuture* @throws Exception*/@Overridepublic void asyncInvoke(AsyncUser asyncUser, ResultFuture<AsyncUser> resultFuture) throws Exception {executorService.submit(() -> {// submit querySystem.out.println("submit query : " + asyncUser.getId() + "-1-" + System.currentTimeMillis());AsyncUser tmp = client.query1(asyncUser);// 一定要記得放回 resultFuture,不然數據全部是timeout 的 resultFuture.complete(Collections.singletonList(tmp));});}@Overridepublic void timeout(AsyncUser input, ResultFuture<AsyncUser> resultFuture) throws Exception {logger.warn("Async function for hbase timeout");List<AsyncUser> list = new ArrayList();input.setPhone("timeout");list.add(input);resultFuture.complete(list);}/*** close function** @throws Exception*/@Overridepublic void close() throws Exception {logger.info("async function for mysql java close ...");super.close();} }MysqlClient:
import com.venn.flink.util.MathUtil; import org.apache.flink.shaded.netty4.io.netty.channel.DefaultEventLoop; import org.apache.flink.shaded.netty4.io.netty.util.concurrent.Future; import org.apache.flink.shaded.netty4.io.netty.util.concurrent.SucceededFuture;import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException;public class MysqlClient {private static String jdbcUrl = "jdbc:mysql://192.168.229.128:3306?useSSL=false&allowPublicKeyRetrieval=true";private static String username = "root";private static String password = "123456";private static String driverName = "com.mysql.jdbc.Driver";private static java.sql.Connection conn;private static PreparedStatement ps;static {try {Class.forName(driverName);conn = DriverManager.getConnection(jdbcUrl, username, password);ps = conn.prepareStatement("select phone from async.async_test where id = ?");} catch (ClassNotFoundException | SQLException e) {e.printStackTrace();}}/*** execute query** @param user* @return*/public AsyncUser query1(AsyncUser user) {try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}String phone = "0000";try {ps.setString(1, user.getId());ResultSet rs = ps.executeQuery();if (!rs.isClosed() && rs.next()) {phone = rs.getString(1);}System.out.println("execute query : " + user.getId() + "-2-" + "phone : " + phone + "-" + System.currentTimeMillis());} catch (SQLException e) {e.printStackTrace();}user.setPhone(phone);return user;}// 測試代碼public static void main(String[] args) {MysqlClient mysqlClient = new MysqlClient();AsyncUser asyncUser = new AsyncUser();asyncUser.setId("526");long start = System.currentTimeMillis();asyncUser = mysqlClient.query1(asyncUser);System.out.println("end : " + (System.currentTimeMillis() - start));System.out.println(asyncUser.toString());} }?
函數類(錯誤示范:asyncInvoke 方法中阻塞查詢數據庫,是同步的):
import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.ArrayList; import java.util.List;public class AsyncFunctionForMysqlJava extends RichAsyncFunction<User, User> {// 鏈接private static String jdbcUrl = "jdbc:mysql://192.168.229.128:3306?useSSL=false";private static String username = "root";private static String password = "123456";private static String driverName = "com.mysql.jdbc.Driver";java.sql.Connection conn;PreparedStatement ps;Logger logger = LoggerFactory.getLogger(AsyncFunctionForMysqlJava.class);/*** open 方法中初始化鏈接* @param parameters* @throws Exception*/@Overridepublic void open(Configuration parameters) throws Exception {logger.info("async function for hbase java open ...");super.open(parameters);Class.forName(driverName);conn = DriverManager.getConnection(jdbcUrl, username, password);ps = conn.prepareStatement("select phone from async.async_test where id = ?");}/*** use user.getId async get user phone** @param user* @param resultFuture* @throws Exception*/@Overridepublic void asyncInvoke(User user, ResultFuture<User> resultFuture) throws Exception {// 使用 user id 查詢ps.setString(1, user.getId());ResultSet rs = ps.executeQuery();String phone = null;if (rs.next()) {phone = rs.getString(1);}user.setPhone(phone);List<User> list = new ArrayList();list.add(user);// 放回 result 隊列 resultFuture.complete(list);}@Overridepublic void timeout(User input, ResultFuture<User> resultFuture) throws Exception {logger.info("Async function for hbase timeout");List<User> list = new ArrayList();list.add(input);resultFuture.complete(list);}/*** close function** @throws Exception*/@Overridepublic void close() throws Exception {logger.info("async function for hbase java close ...");super.close();conn.close();} }測試數據如下:
{"id" : 1, "username" : "venn", "password" : 1561709530935} {"id" : 2, "username" : "venn", "password" : 1561709536029} {"id" : 3, "username" : "venn", "password" : 1561709541033} {"id" : 4, "username" : "venn", "password" : 1561709546037} {"id" : 5, "username" : "venn", "password" : 1561709551040} {"id" : 6, "username" : "venn", "password" : 1561709556044} {"id" : 7, "username" : "venn", "password" : 1561709561048}執行結果如下:
submit query : 1-1-1562763486845 submit query : 2-1-1562763486846 submit query : 3-1-1562763486846 submit query : 4-1-1562763486849 submit query : 5-1-1562763486849 submit query : 6-1-1562763486859 submit query : 7-1-1562763486913 submit query : 8-1-1562763486967 submit query : 9-1-1562763487021 execute query : 1-2-phone : 12345678910-1562763487316 1> {"password":"1562763486506","phone":"12345678910","id":"1","username":"venn"} submit query : 10-1-1562763487408 submit query : 11-1-1562763487408 execute query : 9-2-phone : 1562661110630-1562763487633 1> {"password":"1562763487017","phone":"1562661110630","id":"9","username":"venn"} # 這里可以看到異步,提交查詢的到 11 了,執行查詢 的只有 1/9,返回了 1/9(unorderedWait 調用) submit query : 12-1-1562763487634 execute query : 8-2-phone : 1562661110627-1562763487932 1> {"password":"1562763486963","phone":"1562661110627","id":"8","username":"venn"} submit query : 13-1-1562763487933 execute query : 7-2-phone : 1562661110624-1562763488228 1> {"password":"1562763486909","phone":"1562661110624","id":"7","username":"venn"} submit query : 14-1-1562763488230 execute query : 6-2-phone : 1562661110622-1562763488526 1> {"password":"1562763486855","phone":"1562661110622","id":"6","username":"venn"} submit query : 15-1-1562763488527 execute query : 4-2-phone : 12345678913-1562763488832 1> {"password":"1562763486748","phone":"12345678913","id":"4","username":"venn"}?
hbase、redis或其他實現類似
?
轉載于:https://www.cnblogs.com/Springmoon-venn/p/11103558.html
總結
以上是生活随笔為你收集整理的Flink 异步IO访问外部数据(mysql篇)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Web负载均衡学习笔记之K8S内Ngni
- 下一篇: Netty源码分析--NIO(一)