Flink异步io应用场景之流表join维表
生活随笔
收集整理的這篇文章主要介紹了
Flink异步io应用场景之流表join维表
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
簡介
維度表,作為數據倉庫里面的概念,是維度屬性的集合,比如時間維、地點維;可以是一個mysql或者cassandra,redis等存儲,甚至是自己定義的一些api。
流表是kafka等流式數據。
根據流表join維表的字段去異步查詢維表。
舉個例子
流表:kafka id1,id2,id3三列
維表:mysql id,age,name
sql:select id1,id2,id3,age,name from kafka join mysql on id1=id;
join的結果就是: id1,id2,id3,age,name 流表的字段加上mysql維表的字段。
流表這邊提供id1,給到維表,維表那邊執行的sql是select * from mysql where id=id1
實戰
流表:文本數據csv包含uid、phone
維表:Elasticsearch數據包含uid、username
需要把流表和維表的數據進行join,形成uid、username、phone
第一步從文本獲取流數據
public class Test {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> dataStreamSource = env.readTextFile("/mytextFile.txt");SingleOutputStreamOperator<Tuple5<String, String, String, String, String>> map = dataStreamSource.map(new MapFunction<String, Tuple2<String, String>>() {@Overridepublic Tuple5<String, String, String, String, String> map(String s) throws Exception {String[] splits = s.split("\t");String uid = splits[0];String phone = splits[1];return new Tuple2<>(uid, phone);}});//SingleOutputStreamOperator<Tuple5<String, Set<String>, Set<String>, Set<String>, Set<String>>> renyuanku = AsyncDataStream.unorderedWait(map, new AsyncEsDataRequest(), 2, TimeUnit.SECONDS, 100);//renyuanku.writeAsText("E:/test/renyuanku.txt").setParallelism(1);env.execute("Test");} }異步從Elasticsearch獲取數據
public class AsyncEsDataRequest extends RichAsyncFunction<Tuple2<String, String>, Tuple3<String, String, String>> {private transient RestHighLevelClient restHighLevelClient;@Overridepublic void open(Configuration parameters) throws Exception {HttpHost httpHost = new HttpHost("swarm-manager", 9200, "http");//初始化ElasticSearch-ClientrestHighLevelClient = new RestHighLevelClient(RestClient.builder(httpHost));}@Overridepublic void close() throws Exception {restHighLevelClient.close();}@Overridepublic void asyncInvoke(Tuple2<String, String> input, ResultFuture<Tuple3<String, String, String>> resultFuture) throws Exception {search(input, resultFuture);}//異步去讀Es表private void search(Tuple2<String, String> input, ResultFuture<Tuple3<String, String, String>> resultFuture) {SearchRequest searchRequest = new SearchRequest("renyuanku");String uid = input.f0;QueryBuilder builder = QueryBuilders.boolQuery().must(QueryBuilders.termQuery("uid", uid));SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();sourceBuilder.query(builder);searchRequest.source(sourceBuilder);ActionListener<SearchResponse> listener = new ActionListener<SearchResponse>() {String uid = input.f1;String phone = input.f2;//成功@Overridepublic void onResponse(SearchResponse searchResponse) {SearchHit[] searchHits = searchResponse.getHits().getHits();if (searchHits.length > 0) {JSONObject jsonObject = JSONObject.parseObject(searchHits[0].getSourceAsString());String username = jsonObject.getString("username");}resultFuture.complete(Collections.singleton(Tuple5.of(uid, username, phone)));}//失敗@Overridepublic void onFailure(Exception e) {System.out.println(e.getMessage());resultFuture.complete(Collections.singleton(Tuple5.of(uid, username, phone));*/}};restHighLevelClient.searchAsync(searchRequest, listener);} }連接這兩個流,并將結果輸出到文件
SingleOutputStreamOperator<Tuple5<String, Set<String>, Set<String>, Set<String>, Set<String>>> renyuanku = AsyncDataStream.unorderedWait(map, new AsyncEsDataRequest(), 2, TimeUnit.SECONDS, 100);renyuanku.writeAsText("E:/test/renyuanku.txt").setParallelism(1);這樣就將這兩個流進行合并了。
總結
以上是生活随笔為你收集整理的Flink异步io应用场景之流表join维表的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: HDFS文件存储
- 下一篇: 本地操作HDFS报错:java.net.