2021年大数据Flink(四十二):BroadcastState
?????目錄
?BroadcastState
BroadcastState介紹
需求-實現配置動態(tài)更新
編碼步驟
1.env
2.source
3.transformation
4.sink
5.execute
參考實現
實現代碼
?
BroadcastState
BroadcastState介紹
在開發(fā)過程中,如果遇到需要下發(fā)/廣播配置、規(guī)則等低吞吐事件流到下游所有 task 時,就可以使用 Broadcast State。Broadcast State 是 Flink 1.5 引入的新特性。
下游的 task 接收這些配置、規(guī)則并保存為 BroadcastState, 將這些配置應用到另一個數據流的計算中 。
- 場景舉例
- 動態(tài)更新計算規(guī)則: 如事件流需要根據最新的規(guī)則進行計算,則可將規(guī)則作為廣播狀態(tài)廣播到下游Task中。
- 實時增加額外字段: 如事件流需要實時增加用戶的基礎信息,則可將用戶的基礎信息作為廣播狀態(tài)廣播到下游Task中。
?
- API介紹
首先創(chuàng)建一個Keyed 或Non-Keyed 的DataStream,
然后再創(chuàng)建一個BroadcastedStream,
最后通過DataStream來連接(調用connect 方法)到Broadcasted Stream 上,
這樣實現將BroadcastState廣播到Data Stream 下游的每個Task中。
?
1.如果DataStream是Keyed Stream ,則連接到Broadcasted Stream 后, 添加處理ProcessFunction 時需要使用KeyedBroadcastProcessFunction 來實現, 下面是KeyedBroadcastProcessFunction 的API,代碼如下所示:
public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends BaseBroadcastProcessFunction {public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;}
l KS:表示Flink 程序從最上游的Source Operator 開始構建Stream,當調用keyBy 時所依賴的Key 的類型;上面泛型中的各個參數的含義,說明如下:
l IN1:表示非Broadcast 的Data Stream 中的數據記錄的類型;
l IN2:表示Broadcast Stream 中的數據記錄的類型;
l OUT:表示經過KeyedBroadcastProcessFunction 的processElement()和processBroadcastElement()方法處理后輸出結果數據記錄的類型。
?
2.如果Data Stream 是Non-Keyed Stream,則連接到Broadcasted Stream 后,添加處理ProcessFunction 時需要使用BroadcastProcessFunction 來實現, 下面是BroadcastProcessFunction 的API,代碼如下所示:
public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;}
具體如何使用上面的BroadcastProcessFunction,接下來我們會在通過實際編程,來以使用KeyedBroadcastProcessFunction 為例進行詳細說明。上面泛型中的各個參數的含義,與前面KeyedBroadcastProcessFunction 的泛型類型中的后3 個含義相同,只是沒有調用keyBy 操作對原始Stream 進行分區(qū)操作,就不需要KS 泛型參數。
?
- 注意事項
1) Broadcast State 是Map 類型,即K-V 類型。
2) Broadcast State 只有在廣播的一側, 即在BroadcastProcessFunction 或KeyedBroadcastProcessFunction 的processBroadcastElement 方法中可以修改。在非廣播的一側, 即在BroadcastProcessFunction 或KeyedBroadcastProcessFunction 的processElement 方法中只讀。
3) Broadcast State 中元素的順序,在各Task 中可能不同。基于順序的處理,需要注意。
4) Broadcast State 在Checkpoint 時,每個Task 都會Checkpoint 廣播狀態(tài)。
5) Broadcast State 在運行時保存在內存中,目前還不能保存在RocksDB State Backend 中。
?
需求-實現配置動態(tài)更新
?
?
實時過濾出配置中的用戶,并在事件流中補全這批用戶的基礎信息。
事件流:表示用戶在某個時刻瀏覽或點擊了某個商品,格式如下。
{"userID": "user_3", "eventTime": "2019-08-17 12:19:47", "eventType": "browse", "productID": 1}{"userID": "user_2", "eventTime": "2019-08-17 12:19:48", "eventType": "click", "productID": 1}
配置數據: 表示用戶的詳細信息,在Mysql中,如下。
DROP TABLE IF EXISTS `user_info`;CREATE TABLE `user_info` ?(`userID` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,`userName` varchar(10) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`userAge` int(11) NULL DEFAULT NULL,PRIMARY KEY (`userID`) USING BTREE) ENGINE = MyISAM CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;-- ------------------------------ Records of user_info-- ----------------------------INSERT INTO `user_info` VALUES ('user_1', '張三', 10);INSERT INTO `user_info` VALUES ('user_2', '李四', 20);INSERT INTO `user_info` VALUES ('user_3', '王五', 30);INSERT INTO `user_info` VALUES ('user_4', '趙六', 40);SET FOREIGN_KEY_CHECKS = 1;
輸出結果:
(user_3,2019-08-17 12:19:47,browse,1,王五,33)
(user_2,2019-08-17 12:19:48,click,1,李四,20)
?
?
?
編碼步驟
1.env
2.source
- -1.構建實時數據事件流-自定義隨機
<userID, eventTime, eventType, productID>
- -2.構建配置流-從MySQL
<用戶id,<姓名,年齡>>
3.transformation
- -1.定義狀態(tài)描述器
MapStateDescriptor<Void, Map<String, Tuple2<String, Integer>>> descriptor =
new MapStateDescriptor<>("config",Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT)));
- -2.廣播配置流
BroadcastStream<Map<String, Tuple2<String, Integer>>> broadcastDS = configDS.broadcast(descriptor);
- -3.將事件流和廣播流進行連接
BroadcastConnectedStream<Tuple4<String, String, String, Integer>, Map<String, Tuple2<String, Integer>>> connectDS =eventDS.connect(broadcastDS);
- -4.處理連接后的流-根據配置流補全事件流中的用戶的信息
4.sink
5.execute
?
參考實現
package cn.lanson.action;import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;/*** Author Lansonli* Desc* 需求:* 使用Flink的BroadcastState來完成* 事件流和配置流(需要廣播為State)的關聯,并實現配置的動態(tài)更新!*/
public class BroadcastStateConfigUpdate {public static void main(String[] args) throws Exception{//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.source//-1.構建實時的自定義隨機數據事件流-數據源源不斷產生,量會很大//<userID, eventTime, eventType, productID>DataStreamSource<Tuple4<String, String, String, Integer>> eventDS = env.addSource(new MySource());//-2.構建配置流-從MySQL定期查詢最新的,數據量較小//<用戶id,<姓名,年齡>>DataStreamSource<Map<String, Tuple2<String, Integer>>> configDS = env.addSource(new MySQLSource());//3.transformation//-1.定義狀態(tài)描述器-準備將配置流作為狀態(tài)廣播MapStateDescriptor<Void, Map<String, Tuple2<String, Integer>>> descriptor =new MapStateDescriptor<>("config", Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT)));//-2.將配置流根據狀態(tài)描述器廣播出去,變成廣播狀態(tài)流BroadcastStream<Map<String, Tuple2<String, Integer>>> broadcastDS = configDS.broadcast(descriptor);//-3.將事件流和廣播流進行連接BroadcastConnectedStream<Tuple4<String, String, String, Integer>, Map<String, Tuple2<String, Integer>>> connectDS =eventDS.connect(broadcastDS);//-4.處理連接后的流-根據配置流補全事件流中的用戶的信息SingleOutputStreamOperator<Tuple6<String, String, String, Integer, String, Integer>> result = connectDS//BroadcastProcessFunction<IN1, IN2, OUT>.process(new BroadcastProcessFunction<//<userID, eventTime, eventType, productID> //事件流Tuple4<String, String, String, Integer>,//<用戶id,<姓名,年齡>> //廣播流Map<String, Tuple2<String, Integer>>,//<用戶id,eventTime,eventType,productID,姓名,年齡> //需要收集的數據Tuple6<String, String, String, Integer, String, Integer>>() {//處理事件流中的元素@Overridepublic void processElement(Tuple4<String, String, String, Integer> value, ReadOnlyContext ctx, Collector<Tuple6<String, String, String, Integer, String, Integer>> out) throws Exception {//取出事件流中的userIdString userId = value.f0;//根據狀態(tài)描述器獲取廣播狀態(tài)ReadOnlyBroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(descriptor);if (broadcastState != null) {//取出廣播狀態(tài)中的map<用戶id,<姓名,年齡>>Map<String, Tuple2<String, Integer>> map = broadcastState.get(null);if (map != null) {//通過userId取map中的<姓名,年齡>Tuple2<String, Integer> tuple2 = map.get(userId);//取出tuple2中的姓名和年齡String userName = tuple2.f0;Integer userAge = tuple2.f1;out.collect(Tuple6.of(userId, value.f1, value.f2, value.f3, userName, userAge));}}}//處理廣播流中的元素@Overridepublic void processBroadcastElement(Map<String, Tuple2<String, Integer>> value, Context ctx, Collector<Tuple6<String, String, String, Integer, String, Integer>> out) throws Exception {//value就是MySQLSource中每隔一段時間獲取到的最新的map數據//先根據狀態(tài)描述器獲取歷史的廣播狀態(tài)BroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(descriptor);//再清空歷史狀態(tài)數據broadcastState.clear();//最后將最新的廣播流數據放到state中(更新狀態(tài)數據)broadcastState.put(null, value);}});//4.sinkresult.print();//5.executeenv.execute();}/*** <userID, eventTime, eventType, productID>*/public static class MySource implements SourceFunction<Tuple4<String, String, String, Integer>>{private boolean isRunning = true;@Overridepublic void run(SourceContext<Tuple4<String, String, String, Integer>> ctx) throws Exception {Random random = new Random();SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");while (isRunning){int id = random.nextInt(4) + 1;String user_id = "user_" + id;String eventTime = df.format(new Date());String eventType = "type_" + random.nextInt(3);int productId = random.nextInt(4);ctx.collect(Tuple4.of(user_id,eventTime,eventType,productId));Thread.sleep(500);}}@Overridepublic void cancel() {isRunning = false;}}/*** <用戶id,<姓名,年齡>>*/public static class MySQLSource extends RichSourceFunction<Map<String, Tuple2<String, Integer>>> {private boolean flag = true;private Connection conn = null;private PreparedStatement ps = null;private ResultSet rs = null;@Overridepublic void open(Configuration parameters) throws Exception {conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "root");String sql = "select `userID`, `userName`, `userAge` from `user_info`";ps = conn.prepareStatement(sql);}@Overridepublic void run(SourceContext<Map<String, Tuple2<String, Integer>>> ctx) throws Exception {while (flag){Map<String, Tuple2<String, Integer>> map = new HashMap<>();ResultSet rs = ps.executeQuery();while (rs.next()){String userID = rs.getString("userID");String userName = rs.getString("userName");int userAge = rs.getInt("userAge");//Map<String, Tuple2<String, Integer>>map.put(userID,Tuple2.of(userName,userAge));}ctx.collect(map);Thread.sleep(5000);//每隔5s更新一下用戶的配置信息!}}@Overridepublic void cancel() {flag = false;}@Overridepublic void close() throws Exception {if (conn != null) conn.close();if (ps != null) ps.close();if (rs != null) rs.close();}}
}
實現代碼
package cn.lanson.action;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;/*** Author Lansonli* Desc 需求:* 1.實時日志事件流:<userID, eventTime, eventType, productID> 哪個用戶 在 什么時間,對 哪個商品 進行了 什么操作* 2.用戶信息流(配置流/規(guī)則流): <用戶id,<姓名,年齡>> 用戶的詳細信息* 3.將較小的信息流(配置流/規(guī)則流)作為狀態(tài)廣播到各個節(jié)點,便于對實時日志事件流中的用戶信息進行補全!--其實就是做 狀態(tài)廣播 并要支持狀態(tài)更新*/
public class BroadcastStateDemo {public static void main(String[] args) throws Exception {//TODO 1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//TODO 2.sourceDataStream<Tuple4<String, String, String, Integer>> logDS = env.addSource(new MySource());DataStream<Map<String, Tuple2<String, Integer>>> userInfoDS = env.addSource(new MySQLSource());//TODO 3.transformation//--1.定義狀態(tài)描述器(要將userInfoDS作為狀態(tài)進行廣播,key可以指定其他值,也可以不需要key/void/null)//下面的數據結構較為復雜,可以簡化,這里只是給大家演示一下復雜嵌套類型的聲明而已MapStateDescriptor<Void, Map<String, Tuple2<String, Integer>>> descriptor = new MapStateDescriptor<>("user", Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT)));//--2.根據狀態(tài)描述器將userInfoDS作為狀態(tài)進行廣播BroadcastStream<Map<String, Tuple2<String, Integer>>> broadcastDS = userInfoDS.broadcast(descriptor);//--3.將實時日志事件流和廣播流進行連接BroadcastConnectedStream<Tuple4<String, String, String, Integer>, Map<String, Tuple2<String, Integer>>> connectDS = logDS.connect(broadcastDS);//--4.處理連接流中的數據SingleOutputStreamOperator<Object> resultDS = connectDS.process(new BroadcastProcessFunction<Tuple4<String, String, String, Integer>, //實時日志事件流:<userID, eventTime, eventType, productID> 哪個用戶 在 什么時間,對 哪個商品 進行了 什么操作Map<String, Tuple2<String, Integer>>, //用戶信息流(配置流/規(guī)則流): <用戶id,<姓名,年齡>> 用戶的詳細信息Object>() {//處理元素@Overridepublic void processElement(Tuple4<String, String, String, Integer> value, ReadOnlyContext ctx, Collector<Object> out) throws Exception {String userId = value.f0;//拿到狀態(tài)ReadOnlyBroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(descriptor);Map<String, Tuple2<String, Integer>> userMap = broadcastState.get(null);if (userMap != null) {Tuple2<String, Integer> user = userMap.get(userId);String name = user.f0;Integer age = user.f1;out.collect(Tuple6.of(userId, name, age, value.f1, value.f3, value.f2));}}//處理廣播狀態(tài)@Overridepublic void processBroadcastElement(Map<String, Tuple2<String, Integer>> value, Context ctx, Collector<Object> out) throws Exception {BroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(descriptor);//清理廣播數據后再重新廣播新數據broadcastState.clear();broadcastState.put(null, value);}});//TODO 4.sinkresultDS.print();//TODO 5.executeenv.execute();}/*** 1.實時日志事件流:<userID, eventTime, eventType, productID> 哪個用戶 在 什么時間,對 哪個商品 進行了 什么操作* <userID, eventTime, eventType, productID>*/public static class MySource implements SourceFunction<Tuple4<String, String, String, Integer>> {private boolean isRunning = true;@Overridepublic void run(SourceContext<Tuple4<String, String, String, Integer>> ctx) throws Exception {Random random = new Random();SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");while (isRunning){int id = random.nextInt(4) + 1;String user_id = "user_" + id;String eventTime = df.format(new Date());String eventType = "type_" + random.nextInt(3);int productId = random.nextInt(4);ctx.collect(Tuple4.of(user_id,eventTime,eventType,productId));Thread.sleep(500);}}@Overridepublic void cancel() {isRunning = false;}}/*** 2.用戶信息流(配置流/規(guī)則流): <用戶id,<姓名,年齡>> 用戶的詳細信息* <用戶id,<姓名,年齡>>*/public static class MySQLSource extends RichSourceFunction<Map<String, Tuple2<String, Integer>>> {private boolean flag = true;private Connection conn = null;private PreparedStatement ps = null;private ResultSet rs = null;@Overridepublic void open(Configuration parameters) throws Exception {conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "root");String sql = "select `userID`, `userName`, `userAge` from `user_info`";ps = conn.prepareStatement(sql);}@Overridepublic void run(SourceContext<Map<String, Tuple2<String, Integer>>> ctx) throws Exception {while (flag){Map<String, Tuple2<String, Integer>> map = new HashMap<>();ResultSet rs = ps.executeQuery();while (rs.next()){String userID = rs.getString("userID");String userName = rs.getString("userName");int userAge = rs.getInt("userAge");//Map<String, Tuple2<String, Integer>>map.put(userID, Tuple2.of(userName,userAge));}ctx.collect(map);Thread.sleep(5000);//每隔5s更新一下用戶的配置信息!}}@Overridepublic void cancel() {flag = false;}@Overridepublic void close() throws Exception {if (conn != null) conn.close();if (ps != null) ps.close();if (rs != null) rs.close();}}
}
?
總結
以上是生活随笔為你收集整理的2021年大数据Flink(四十二):BroadcastState的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Flink(四十一):
- 下一篇: 2021年大数据常用语言Scala(一)