2021年大数据Flink(四十一):Flink实现订单自动好评
目錄
Flink實現訂單自動好評
需求
數據
編碼步驟
1.env
2.source
3.transformation
4.sink
5.execute
參考代碼
參考效果
實現代碼:
Flink實現訂單自動好評
需求
在電商領域會有這么一個場景,如果用戶買了商品,在訂單完成之后,一定時間之內沒有做出評價,系統自動給與五星好評,我們今天主要使用Flink的定時器來簡單實現這一功能。
數據
自定義source模擬生成一些訂單數據.
在這里,我們生了一個最簡單的二元組Tuple3,包含用戶id,訂單id和訂單完成時間三個字段.
/*** 自定義source實時產生訂單數據Tuple3<用戶id,訂單id, 訂單生成時間>*/
public static class MySource implements SourceFunction<Tuple3<String, String, Long>> {private boolean flag = true;@Overridepublic void run(SourceContext<Tuple3<String, String, Long>> ctx) throws Exception {Random random = new Random();while (flag) {String userId = random.nextInt(5) + "";String orderId = UUID.randomUUID().toString();long currentTimeMillis = System.currentTimeMillis();ctx.collect(Tuple3.of(userId, orderId, currentTimeMillis));Thread.sleep(500);}}@Overridepublic void cancel() {flag = false;}
}
???????編碼步驟
1.env
2.source
3.transformation
設置經過interval毫秒用戶未對訂單做出評價,自動給與好評.為了演示方便,設置5s的時間
long interval = 5000L;
分組后使用自定義KeyedProcessFunction完成定時判斷超時訂單并自動好評
dataStream.keyBy(0).process(new TimerProcessFuntion(interval));
3.1定義MapState類型的狀態,key是訂單號,value是訂單完成時間
3.2創建MapState
MapStateDescriptor<String, Long> mapStateDesc =
????????????new MapStateDescriptor<>("mapStateDesc", String.class, Long.class);
????????????mapState = getRuntimeContext().getMapState(mapStateDesc);
3.3注冊定時器
mapState.put(value.f0, value.f1);
ctx.timerService().registerProcessingTimeTimer(value.f1 + interval);
3.4定時器被觸發時執行并輸出結果
4.sink
5.execute
參考代碼
package cn.lanson.action;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.UUID;/*** Author lanson* Desc*/
public class OrderAutomaticFavorableComments {public static void main(String[] args) throws Exception {//TODO 1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);//TODO 2.source//Tuple3<用戶id,訂單id,訂單生成時間>DataStream<Tuple3<String, String, Long>> orderDS = env.addSource(new MySource());//TODO 3.transformation//設置經過interval毫秒用戶未對訂單做出評價,自動給與好評.為了演示方便,設置5s的時間long interval = 5000L;//5s//分組后使用自定義KeyedProcessFunction完成定時判斷超時訂單并自動好評orderDS.keyBy(t -> t.f0).process(new TimerProcessFunction(interval));//TODO 4.sink//TODO 5.executeenv.execute();}/*** 自定義source實時產生訂單數據Tuple3<用戶id,訂單id, 訂單生成時間>*/public static class MySource implements SourceFunction<Tuple3<String, String, Long>> {private boolean flag = true;@Overridepublic void run(SourceContext<Tuple3<String, String, Long>> ctx) throws Exception {Random random = new Random();while (flag) {String userId = random.nextInt(5) + "";String orderId = UUID.randomUUID().toString();long currentTimeMillis = System.currentTimeMillis();ctx.collect(Tuple3.of(userId, orderId, currentTimeMillis));Thread.sleep(500);}}@Overridepublic void cancel() {flag = false;}}/*** 自定義ProcessFunction完成訂單自動好評* 進來一條數據應該在interval時間后進行判斷該訂單是否超時是否需要自動好評* abstract class KeyedProcessFunction<K, I, O>*/private static class TimerProcessFunction extends KeyedProcessFunction<String, Tuple3<String, String, Long>, Object> {private long interval;//訂單超時時間 傳進來的是5000ms/5spublic TimerProcessFunction(long interval) {this.interval = interval;}//-0.準備一個State來存儲訂單id和訂單生成時間private MapState<String, Long> mapState = null;//-1.初始化@Overridepublic void open(Configuration parameters) throws Exception {MapStateDescriptor<String, Long> mapStateDescriptor = new MapStateDescriptor<>("mapState", String.class, Long.class);mapState = getRuntimeContext().getMapState(mapStateDescriptor);}//-2.處理每一條數據并存入狀態并注冊定時器@Overridepublic void processElement(Tuple3<String, String, Long> value, Context ctx, Collector<Object> out) throws Exception {//Tuple3<用戶id,訂單id, 訂單生成時間> value里面是當前進來的數據里面有訂單生成時間//把訂單數據保存到狀態中mapState.put(value.f1, value.f2);//xxx,2020-11-11 00:00:00 || xx,2020-11-11 00:00:01//該訂單在value.f2 + interval時過期/到期,這時如果沒有評價的話需要系統給與默認好評//注冊一個定時器在value.f2 + interval時檢查是否需要默認好評ctx.timerService().registerProcessingTimeTimer(value.f2 + interval);//2020-11-11 00:00:05 ?|| 2020-11-11 00:00:06}//-3.執行定時任務@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception {//檢查歷史訂單數據(在狀態中存儲著)//遍歷取出狀態中的訂單數據Iterator<Map.Entry<String, Long>> iterator = mapState.iterator();while (iterator.hasNext()) {Map.Entry<String, Long> map = iterator.next();String orderId = map.getKey();Long orderTime = map.getValue();//先判斷是否好評--實際中應該去調用訂單評價系統看是否好評了,我們這里寫個方法模擬一下if (!isFavorable(orderId)) {//該訂單沒有給好評//判斷是否超時--不用考慮進來的數據是否過期,統一判斷是否超時更保險!if (System.currentTimeMillis() - orderTime >= interval) {System.out.println("orderId:" + orderId + "該訂單已經超時未評價,系統自動給與好評!....");//移除狀態中的數據,避免后續重復判斷iterator.remove();mapState.remove(orderId);}} else {System.out.println("orderId:" + orderId + "該訂單已經評價....");//移除狀態中的數據,避免后續重復判斷iterator.remove();mapState.remove(orderId);}}}//自定義一個方法模擬訂單系統返回該訂單是否已經好評public boolean isFavorable(String orderId) {return orderId.hashCode() % 2 == 0;}}
}
???????參考效果
實現代碼:
package cn.lanson.action;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.UUID;/*** Author lanson* Desc* 在電商領域會有這么一個場景,如果用戶買了商品,在訂單完成之后,一定時間之內沒有做出評價,系統自動給與五星好評(或者下單之后在一定時間內沒有付款, 就觸發站內信/短信提醒/取消...)* 我們今天主要使用Flink的定時器來簡單實現這一功能。* 注意: 這個需求不使用大數據的技術,就是用Web的定時器也可以做* 課后可以用你熟悉的編程語言/工具/框架去實現*/
public class OrderAutomaticFavorite {public static void main(String[] args) throws Exception {//TODO 1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//TODO 2.source//Tuple3<用戶id,訂單id,訂單生成時間>DataStream<Tuple3<String, String, Long>> orderDS = env.addSource(new MySource());//TODO 3.transformation//設置經過interval毫秒用戶未對訂單做出評價就自動給予好評,為了方便測試,設置5000ms/5s(實際中可以長一點)long interval = 5000L;//實現這個功能原本不需要分組,但是為了后面使用keyedState狀態,所以這里分下組orderDS.keyBy(t->t.f0).process(new MyKeyedProcessFunction(interval));//TODO 4.sink//TODO 5.executeenv.execute();}/*** public abstract class KeyedProcessFunction<K, I, O>*/public static class MyKeyedProcessFunction extends KeyedProcessFunction<String,Tuple3<String, String, Long>,Object> {//準備一個MapState存儲訂單信息<訂單號,訂單時間>private MapState<String,Long> mapState = null;private long interval = 0L;public MyKeyedProcessFunction(long interval) {this.interval = interval;}@Overridepublic void open(Configuration parameters) throws Exception {//創建狀態描述器MapStateDescriptor<String, Long> descriptor = new MapStateDescriptor<>("mapState", String.class, Long.class);mapState = getRuntimeContext().getMapState(descriptor);}//處理進來的每個元素/訂單,然后注冊定時器,到時候判斷是否進行了好評@Overridepublic void processElement(Tuple3<String, String, Long> value, Context ctx, Collector<Object> out) throws Exception {//把訂單信息存入狀態中方便后續使用mapState.put(value.f1,value.f2);//注冊定時器在interval時間后執行/在value.f2 + interval時間時執行ctx.timerService().registerProcessingTimeTimer(value.f2 + interval);}//實現定時器執行方法@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception {//定時器觸發的時候需要檢查狀態中的訂單是否已經好評了Iterator<Map.Entry<String, Long>> iterator = mapState.iterator();while (iterator.hasNext()){Map.Entry<String, Long> entry = iterator.next();String orderId = entry.getKey();Long orderTime = entry.getValue();//判斷該訂單是否已經評價--實際中需要調用外部訂單系統的接口,我們自己簡單起見直接調用模擬的方法if(isEvaluate(orderId)){//已經評價過了System.out.println("該訂單:"+orderId+"用戶已評價");//移除當前訂單iterator.remove();//迭代器可以直接移除元素//保險一定狀態中也移除mapState.remove(orderId);}else{//沒有評價//注意:一個key(用戶)有很多訂單,有的可能超時,有的可能還未超時//所以需要判斷是否超時if(System.currentTimeMillis() - orderTime >= interval){//超時且未評價,需要系統給予自動好評System.out.println("該訂單:"+orderId+"已超時未評價,系統給予自動好評");//移除當前訂單iterator.remove();//迭代器可以直接移除元素//保險一定狀態中也移除mapState.remove(orderId);}/*else{//未超時,不用管}*/}}}//模擬訂單系統,傳入訂單id,返回該訂單是否已經評價public boolean isEvaluate(String orderId){//下面這行代碼會隨機返回訂單是否已經評價return new Random().nextInt(10) % 2 == 0;}}/*** 自定義source實時產生訂單數據Tuple3<用戶id,訂單id,訂單生成時間>*/public static class MySource implements SourceFunction<Tuple3<String, String, Long>> {private boolean flag = true;@Overridepublic void run(SourceContext<Tuple3<String, String, Long>> ctx) throws Exception {Random random = new Random();while (flag) {String userId = random.nextInt(5) + "";String orderId = UUID.randomUUID().toString();long currentTimeMillis = System.currentTimeMillis();ctx.collect(Tuple3.of(userId, orderId, currentTimeMillis));Thread.sleep(500);}}@Overridepublic void cancel() {flag = false;}}
}
總結
以上是生活随笔為你收集整理的2021年大数据Flink(四十一):Flink实现订单自动好评的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Flink(四十):
- 下一篇: 2021年大数据Flink(四十二):