2021年大数据Flink(四十):Flink模拟双十一实时大屏统计
目錄
Flink模擬雙十一實時大屏統計
需求
數據
編碼步驟:
1.env
2.source
3.transformation
4.使用上面聚合的結果,實現業務需求:
5.execute
參考代碼
實現代碼 (基于上面參考代碼重新寫一套)
實現效果
Flink模擬雙十一實時大屏統計
需求
在大數據的實時處理中,實時的大屏展示已經成了一個很重要的展示項,比如最有名的雙十一大屏實時銷售總價展示。除了這個,還有一些其他場景的應用,比如我們在我們的后臺系統實時的展示我們網站當前的pv、uv等等,其實做法都是類似的。
今天我們就做一個最簡單的模擬電商統計大屏的小例子,
需求如下:
1.實時計算出當天零點截止到當前時間的銷售總額
2.計算出各個分類的銷售top3
3.每秒鐘更新一次統計結果
數據
首先我們通過自定義source 模擬訂單的生成,生成了一個Tuple2,第一個元素是分類,第二個元素表示這個分類下產生的訂單金額,金額我們通過隨機生成.
/*** 自定義數據源實時產生訂單數據Tuple2<分類, 金額>*/
public static class MySource implements SourceFunction<Tuple2<String, Double>>{private boolean flag = true;private String[] categorys = {"女裝", "男裝","圖書", "家電","洗護", "美妝","運動", "游戲","戶外", "家具","樂器", "辦公"};private Random random = new Random();@Overridepublic void run(SourceContext<Tuple2<String, Double>> ctx) throws Exception {while (flag){//隨機生成分類和金額int index = random.nextInt(categorys.length);//[0~length) ==> [0~length-1]String category = categorys[index];//獲取的隨機分類double price = random.nextDouble() * 100;//注意nextDouble生成的是[0~1)之間的隨機數,*100之后表示[0~100)ctx.collect(Tuple2.of(category,price));Thread.sleep(20);}}@Overridepublic void cancel() {flag = false;}
}
編碼步驟:
1.env
2.source
3.transformation
3.1定義大小為一天的窗口,第二個參數表示中國使用的UTC+08:00時區比UTC時間早
keyBy(t->t.f0)
window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8))
3.2定義一個1s的觸發器
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
3.3聚合結果.aggregate(new PriceAggregate(), new WindowResult());
3.4看一下聚合的結果
CategoryPojo(category=男裝, totalPrice=17225.26, dateTime=2020-10-20 08:04:12)
4.使用上面聚合的結果,實現業務需求:
tempAggResult.keyBy(CategoryPojo::getDateTime)
//每秒鐘更新一次統計結果
????????????????.window(TumblingProcessingTimeWindows.of(Time.seconds(1))) ??
//在ProcessWindowFunction中實現該復雜業務邏輯
????????????? .process(new WindowResultProcess());
4.1實時計算出當天零點截止到當前時間的銷售總額
4.2.計算出各個分類的銷售top3
4.3.每秒鐘更新一次統計結果
5.execute
???????參考代碼
package cn.it.action;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Random;
import java.util.stream.Collectors;/*** Author lanson* Desc* 1.實時計算出當天零點截止到當前時間的銷售總額 11月11日 00:00:00 ~ 23:59:59* 2.計算出各個分類的銷售top3* 3.每秒鐘更新一次統計結果*/
public class DoubleElevenBigScreem {public static void main(String[] args) throws Exception {//TODO 1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);//方便觀察//TODO 2.sourceDataStream<Tuple2<String, Double>> orderDS = env.addSource(new MySource());//TODO 3.transformation--初步聚合:每隔1s聚合一下截止到當前時間的各個分類的銷售總金額DataStream<CategoryPojo> tempAggResult = orderDS//分組.keyBy(t -> t.f0)//如果直接使用之前學習的窗口按照下面的寫法表示://表示每隔1天計算一次//.window(TumblingProcessingTimeWindows.of(Time.days(1)));//表示每隔1s計算最近一天的數據,但是11月11日?00:01:00運行計算的是: 11月10日?00:01:00~11月11日?00:01:00 ---不對!//.window(SlidingProcessingTimeWindows.of(Time.days(1),Time.seconds(1)));//*例如中國使用UTC+08:00,您需要一天大小的時間窗口,//*窗口從當地時間的00:00:00開始,您可以使用{@code of(時間.天(1),時間.hours(-8))}.//下面的代碼表示從當天的00:00:00開始計算當天的數據,缺一個觸發時機/觸發間隔//3.1定義大小為一天的窗口,第二個參數表示中國使用的UTC+08:00時區比UTC時間早.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))//3.2自定義觸發時機/觸發間隔.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))//.sum()//簡單聚合//3.3自定義聚合和結果收集//aggregate(AggregateFunction<T, ACC, V> aggFunction,WindowFunction<V, R, K, W> windowFunction).aggregate(new PriceAggregate(), new WindowResult());//aggregate支持復雜的自定義聚合//3.4看一下聚合的結果tempAggResult.print("初步聚合的各個分類的銷售總額");//初步聚合的各個分類的銷售總額> DoubleElevenBigScreem.CategoryPojo(category=游戲, totalPrice=563.8662504982619, dateTime=2021-01-19 10:31:40)//初步聚合的各個分類的銷售總額> DoubleElevenBigScreem.CategoryPojo(category=辦公, totalPrice=876.5216500403918, dateTime=2021-01-19 10:31:40)//TODO 4.sink-使用上面初步聚合的結果(每隔1s聚合一下截止到當前時間的各個分類的銷售總金額),實現業務需求:tempAggResult.keyBy(CategoryPojo::getDateTime).window(TumblingProcessingTimeWindows.of(Time.seconds(1)))//每隔1s進行最終的聚合并輸出結果//.sum//簡單聚合//.apply().process(new FinalResultWindowProcess());//在ProcessWindowFunction中實現該復雜業務邏輯//TODO 5.executeenv.execute();}/*** 自定義數據源實時產生訂單數據Tuple2<分類, 金額>*/public static class MySource implements SourceFunction<Tuple2<String, Double>> {private boolean flag = true;private String[] categorys = {"女裝", "男裝", "圖書", "家電", "洗護", "美妝", "運動", "游戲", "戶外", "家具", "樂器", "辦公"};private Random random = new Random();@Overridepublic void run(SourceContext<Tuple2<String, Double>> ctx) throws Exception {while (flag) {//隨機生成分類和金額int index = random.nextInt(categorys.length);//[0~length) ==> [0~length-1]String category = categorys[index];//獲取的隨機分類double price = random.nextDouble() * 100;//注意nextDouble生成的是[0~1)之間的隨機小數,*100之后表示[0~100)的隨機小數ctx.collect(Tuple2.of(category, price));Thread.sleep(20);}}@Overridepublic void cancel() {flag = false;}}/*** 自定義聚合函數,指定聚合規則* AggregateFunction<IN, ACC, OUT>*/private static class PriceAggregate implements AggregateFunction<Tuple2<String, Double>, Double, Double> {//初始化累加器@Overridepublic Double createAccumulator() {return 0D;//D表示double,L表示Long}//把數據累加到累加器上@Overridepublic Double add(Tuple2<String, Double> value, Double accumulator) {return value.f1 + accumulator;}//獲取累加結果@Overridepublic Double getResult(Double accumulator) {return accumulator;}//合并各個subtask的結果@Overridepublic Double merge(Double a, Double b) {return a + b;}}/*** 自定義窗口函數,指定窗口數據收集規則* WindowFunction<IN, OUT, KEY, W extends Window>*/private static class WindowResult implements WindowFunction<Double, CategoryPojo, String, TimeWindow> {private FastDateFormat df = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss");@Override//void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out)public void apply(String category, TimeWindow window, Iterable<Double> input, Collector<CategoryPojo> out) throws Exception {long currentTimeMillis = System.currentTimeMillis();String dateTime = df.format(currentTimeMillis);Double totalPrice = input.iterator().next();out.collect(new CategoryPojo(category,totalPrice,dateTime));}}/*** 用于存儲聚合的結果*/@Data@AllArgsConstructor@NoArgsConstructorpublic static class CategoryPojo {private String category;//分類名稱private double totalPrice;//該分類總銷售額private String dateTime;// 截止到當前時間的時間,本來應該是EventTime,但是我們這里簡化了直接用當前系統時間即可}/*** 自定義窗口完成銷售總額統計和分類銷售額top3統計并輸出* abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>*/private static class FinalResultWindowProcess extends ProcessWindowFunction<CategoryPojo, Object, String, TimeWindow> {//注意://下面的key/dateTime表示當前這1s的時間//elements:表示截止到當前這1s的各個分類的銷售數據@Overridepublic void process(String dateTime, Context context, Iterable<CategoryPojo> elements, Collector<Object> out) throws Exception {//1.實時計算出當天零點截止到當前時間的銷售總額?11月11日?00:00:00 ~ 23:59:59double total = 0D;//用來記錄銷售總額//2.計算出各個分類的銷售top3:如: "女裝": 10000 "男裝": 9000 "圖書":8000//注意:這里只需要求top3,也就是只需要排前3名就行了,其他的不用管!當然你也可以每次對進來的所有數據進行排序,但是浪費!//所以這里直接使用小頂堆完成top3排序://70//80//90//如果進來一個比堆頂元素還有小的,直接不要//如果進來一個比堆頂元素大,如85,直接把堆頂元素刪掉,把85加進去并繼續按照小頂堆規則排序,小的在上面,大的在下面//80//85//90//創建一個小頂堆//https://blog.csdn.net/hefenglian/article/details/81807527Queue<CategoryPojo> queue = new PriorityQueue<>(3,//初識容量//正常的排序,就是小的在前,大的在后,也就是c1>c2的時候返回1,也就是升序,也就是小頂堆(c1, c2) -> c1.getTotalPrice() >= c2.getTotalPrice() ? 1 : -1);for (CategoryPojo element : elements) {double price = element.getTotalPrice();total += price;if(queue.size()< 3){queue.add(element);//或offer入隊}else{if(price >= queue.peek().getTotalPrice()){//peek表示取出堆頂元素但不刪除//queue.remove(queue.peek());queue.poll();//移除堆頂元素queue.add(element);//或offer入隊}}}//代碼走到這里那么queue存放的就是分類的銷售額top3,但是是升序.需要改為逆序然后輸出List<String> top3List = queue.stream().sorted((c1, c2) -> c1.getTotalPrice() >= c2.getTotalPrice() ? -1 : 1).map(c -> "分類:" + c.getCategory() + " 金額:" + c.getTotalPrice()).collect(Collectors.toList());//3.每秒鐘更新一次統計結果-也就是直接輸出double roundResult = new BigDecimal(total).setScale(2, RoundingMode.HALF_UP).doubleValue();//四舍五入保留2位小數System.out.println("時間: "+dateTime +" 總金額?:" + roundResult);System.out.println("top3: \n" + StringUtils.join(top3List,"\n"));}}
}
實現代碼 (基于上面參考代碼重新寫一套)
package cn.lanson.action;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.PriorityQueue;
import java.util.Random;/*** Author lanson* Desc今天我們就做一個最簡單的模擬電商統計大屏的小例子,* 需求如下:* 1.實時計算出當天零點截止到當前時間的銷售總額* 2.計算出各個分類的銷售額最大的top3* 3.每秒鐘更新一次統計結果*/
public class DoubleElevenBigScreen {public static void main(String[] args) throws Exception {//TODO 1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//TODO 2.source//訂單數據Tuple2<分類, 金額>DataStreamSource<Tuple2<String, Double>> orderDS = env.addSource(new MySource());//TODO 3.transformation//-1.每秒預聚合各個分類的銷售總額:從當天0點開始截止到目前為止的各個分類的銷售總額SingleOutputStreamOperator<CategoryPojo> aggregateResult = orderDS.keyBy(t -> t.f0)//注意:下面的窗口表示每隔1天計算最近1天的數據,//.window(TumblingProcessingTimeWindows.of(Time.days(1)));//注意:下面的窗口表示每隔1s計算最近1天的數據(如果現在是1點,那么計算的是昨天1點到現在1點的1天的數據,而不是當天0點開始截止到目前為止的數據)//.window(SlidingProcessingTimeWindows.of(Time.days(1), Time.seconds(1)));//注意:中國使用UTC+08:00,您需要一天大小的時間窗口,//窗口從當地時間的每00:00:00開始,您可以使用{@code of(time.days(1),time.hours(-8))}.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))//注意:下面表示每秒觸發計算.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))//聚合(可以使用之前學習的簡單聚合:sum/reduce/或自定義聚合:apply或使用aggregate聚合(可以指定如何聚合及如何收集聚合結果)).aggregate(new MyAggregate(), new MyWindow());//輸出查看下預聚合的結果//aggregateResult.print();//按照分類將訂單金額進行聚合://分類名稱 金額 時間//男裝 100 2021-11-11 11:11:11//女裝 100 2021-11-11 11:11:11//男裝 200 2021-11-11 11:11:12//女裝 200 2021-11-11 11:11:12//-2.計算所有分類的銷售總額和分類銷售額最大Top3//要是每秒更新/計算所有分類目前的銷售總額和分類銷售額Top3//aggregateResult.keyBy(CategoryPojo::getDateTime)aggregateResult.keyBy(c -> c.getDateTime())//先按照時間對數據分組,因為后續要每秒更新/計算銷售總額和分類銷售額Top3.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))//.apply(new WindowFunction<CategoryPojo, Object, String, TimeWindow>() {}).process(new MyProcessWindowFunction());//TODO 4.sink//TODO 5.executeenv.execute();}/*** abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>*/public static class MyProcessWindowFunction extends ProcessWindowFunction<CategoryPojo, Object, String, TimeWindow> {@Overridepublic void process(String key, Context context, Iterable<CategoryPojo> categoryPojos, Collector<Object> out) throws Exception {Double totalAmount = 0d;//用來記錄銷售總額//用大小頂堆來計算TopN//用大頂堆(大的數據在堆頂,取最小的TopN時使用)還是小頂堆(小的數據在堆頂,取最大的TopN時使用)?//2--堆頂//3//4--堆底//5進來,比堆頂大,堆頂元素移除,5下沉//3//4//5//1進來,比堆頂小,原來不變//3//4//5//100進來,比堆頂大,堆頂元素移除,100下沉//4//5//100//注意:Java里面提供了一個優先級隊列PriorityQueue實現了大小頂堆的功能//https://blog.csdn.net/hefenglian/article/details/81807527//所以創建一個長度為3的PriorityQueue,并指定比較規則(正常的比較規則:元素1>=元素2 ? 1:-1,就是小頂堆)PriorityQueue<CategoryPojo> queue = new PriorityQueue<>(3, (c1, c2) -> c1.getTotalPrice() >= c2.getTotalPrice() ? 1 : -1);for (CategoryPojo categoryPojo : categoryPojos) {//--1.計算截止到目前為止的所有分類的銷售總額totalAmount += categoryPojo.getTotalPrice();//--2.分類銷售額最大Top3if (queue.size() < 3) {queue.add(categoryPojo);} else {//queue.size() >= 3//查看堆頂元素,但不移除CategoryPojo peek = queue.peek();if (categoryPojo.getTotalPrice() > peek.getTotalPrice()) {//進來的元素比堆頂大//堆頂元素移除,進來的元素下沉//queue.remove(peek);queue.poll();queue.add(categoryPojo);}/*else{//進來的元素比堆頂小,不用變}*/}}//--3.直接在這里輸出System.out.println("================================================================================================================================");System.out.println("----當前時間:----");System.out.println(key);System.out.println("----銷售總額:----");System.out.println(new BigDecimal(totalAmount).setScale(2, RoundingMode.HALF_UP));System.out.println("----銷售額Top3分類:----");queue.stream().map(c -> {c.setTotalPrice(new BigDecimal(c.getTotalPrice()).setScale(2, RoundingMode.HALF_UP).doubleValue());return c;}).sorted((c1, c2) -> c1.getTotalPrice() <= c2.getTotalPrice() ? 1 : -1).forEach(System.out::println);}}/*** interface AggregateFunction<IN, ACC, OUT>* 自定義聚合函數,實現各個分類銷售額的預聚合/累加*/public static class MyAggregate implements AggregateFunction<Tuple2<String, Double>, Double, Double> {//初始化累加器@Overridepublic Double createAccumulator() {return 0d;}//將數據聚合/累加到累加器上@Overridepublic Double add(Tuple2<String, Double> value, Double accumulator) {return value.f1 + accumulator;}//獲取累加結果@Overridepublic Double getResult(Double accumulator) {return accumulator;}//合并結果(和歷史結果合并)@Overridepublic Double merge(Double a, Double b) {return a + b;}}/*** interface WindowFunction<IN, OUT, KEY, W extends Window>* 自定義窗口函數,實現窗口聚合數據的收集*/public static class MyWindow implements WindowFunction<Double, CategoryPojo, String, TimeWindow> {private FastDateFormat df = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss");@Overridepublic void apply(String key, TimeWindow window, Iterable<Double> input, Collector<CategoryPojo> out) throws Exception {double totalPrice = 0d;for (Double price : input) {totalPrice += price;}//System.out.println(df.format(window.getStart())+"-----"+df.format(window.getEnd()));CategoryPojo categoryPojo = new CategoryPojo();categoryPojo.setCategory(key);categoryPojo.setDateTime(df.format(System.currentTimeMillis()));categoryPojo.setTotalPrice(totalPrice);out.collect(categoryPojo);}}/*** 用于存儲聚合的結果*/@Data@AllArgsConstructor@NoArgsConstructorpublic static class CategoryPojo {private String category;//分類名稱private double totalPrice;//該分類總銷售額private String dateTime;// 截止到當前時間的時間,本來應該是EventTime,但是我們這里簡化了直接用當前系統時間即可}/*** 自定義數據源實時產生訂單數據Tuple2<分類, 金額>*/public static class MySource implements SourceFunction<Tuple2<String, Double>> {private boolean flag = true;private String[] categorys = {"女裝", "男裝", "圖書", "家電", "洗護", "美妝", "運動", "游戲", "戶外", "家具", "樂器", "辦公"};private Random random = new Random();@Overridepublic void run(SourceContext<Tuple2<String, Double>> ctx) throws Exception {while (flag) {//隨機生成分類和金額int index = random.nextInt(categorys.length);//[0~length) ==> [0~length-1]String category = categorys[index];//獲取的隨機分類double price = random.nextDouble() * 100;//注意nextDouble生成的是[0~1)之間的隨機數,*100之后表示[0~100)ctx.collect(Tuple2.of(category, price));Thread.sleep(20);}}@Overridepublic void cancel() {flag = false;}}
}
實現效果
總結
以上是生活随笔為你收集整理的2021年大数据Flink(四十):Flink模拟双十一实时大屏统计的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Flink(三十九):
- 下一篇: 2021年大数据Flink(四十一):