基于 Kafka + Flink + Redis 的电商大屏实时计算案
前言
阿里的雙11銷量大屏可以說是一道特殊的風(fēng)景線。實(shí)時(shí)大屏(real-time dashboard)正在被越來越多的企業(yè)采用,用來及時(shí)呈現(xiàn)關(guān)鍵的數(shù)據(jù)指標(biāo)。并且在實(shí)際操作中,肯定也不會(huì)僅僅計(jì)算一兩個(gè)維度。由于Flink的“真·流式計(jì)算”這一特點(diǎn),它比Spark Streaming要更適合大屏應(yīng)用。本文從筆者的實(shí)際工作經(jīng)驗(yàn)抽象出簡(jiǎn)單的模型,并簡(jiǎn)要敘述計(jì)算流程(當(dāng)然大部分都是源碼)。
數(shù)據(jù)格式與接入
簡(jiǎn)化的子訂單消息體如下。
{"userId": 234567,"orderId": 2902306918400,"subOrderId": 2902306918401,"siteId": 10219,"siteName": "site_blabla","cityId": 101,"cityName": "北京市","warehouseId": 636,"merchandiseId": 187699,"price": 299,"quantity": 2,"orderStatus": 1,"isNewOrder": 0,"timestamp": 1572963672217 }由于訂單可能會(huì)包含多種商品,故會(huì)被拆分成子訂單來表示,每條JSON消息表示一個(gè)子訂單。現(xiàn)在要按照自然日來統(tǒng)計(jì)以下指標(biāo),并以1秒的刷新頻率呈現(xiàn)在大屏上:
-
每個(gè)站點(diǎn)(站點(diǎn)ID即siteId)的總訂單數(shù)、子訂單數(shù)、銷量與GMV;
-
當(dāng)前銷量排名前N的商品(商品ID即merchandiseId)與它們的銷量。
由于大屏的最大訴求是實(shí)時(shí)性,等待遲到數(shù)據(jù)顯然不太現(xiàn)實(shí),因此我們采用處理時(shí)間作為時(shí)間特征,并以1分鐘的頻率做checkpointing。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); env.enableCheckpointing(60 * 1000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(30 * 1000);然后訂閱Kafka的訂單消息作為數(shù)據(jù)源。
Properties consumerProps = ParameterUtil.getFromResourceFile("kafka.properties");DataStream<String> sourceStream = env.addSource(new FlinkKafkaConsumer011<>(ORDER_EXT_TOPIC_NAME, // topicnew SimpleStringSchema(), // deserializerconsumerProps // consumer properties)).setParallelism(PARTITION_COUNT).name("source_kafka_" + ORDER_EXT_TOPIC_NAME).uid("source_kafka_" + ORDER_EXT_TOPIC_NAME);給帶狀態(tài)的算子設(shè)定算子ID(通過調(diào)用uid()方法)是個(gè)好習(xí)慣,能夠保證Flink應(yīng)用從保存點(diǎn)重啟時(shí)能夠正確恢復(fù)狀態(tài)現(xiàn)場(chǎng)。為了盡量穩(wěn)妥,Flink官方也建議為每個(gè)算子都顯式地設(shè)定ID,參考:https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#should-i-assign-ids-to-all-operators-in-my-job
接下來將JSON數(shù)據(jù)轉(zhuǎn)化為POJO,JSON框架采用FastJSON。
DataStream<SubOrderDetail> orderStream = sourceStream.map(message -> JSON.parseObject(message, SubOrderDetail.class)).name("map_sub_order_detail").uid("map_sub_order_detail");JSON已經(jīng)是預(yù)先處理好的標(biāo)準(zhǔn)化格式,所以POJO類SubOrderDetail的寫法可以通過Lombok極大地簡(jiǎn)化。如果JSON的字段有不規(guī)范的,那么就需要手寫Getter和Setter,并用@JSONField注解來指明。
@Getter @Setter @NoArgsConstructor @AllArgsConstructor @ToString public class SubOrderDetail implements Serializable {private static final long serialVersionUID = 1L;private long userId;private long orderId;private long subOrderId;private long siteId;private String siteName;private long cityId;private String cityName;private long warehouseId;private long merchandiseId;private long price;private long quantity;private int orderStatus;private int isNewOrder;private long timestamp; }統(tǒng)計(jì)站點(diǎn)指標(biāo)
將子訂單流按站點(diǎn)ID分組,開1天的滾動(dòng)窗口,并同時(shí)設(shè)定ContinuousProcessingTimeTrigger觸發(fā)器,以1秒周期觸發(fā)計(jì)算。注意處理時(shí)間的時(shí)區(qū)問題,這是老生常談了。
WindowedStream<SubOrderDetail, Tuple, TimeWindow> siteDayWindowStream = orderStream.keyBy("siteId").window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8))).trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)));接下來寫個(gè)聚合函數(shù)。
DataStream<OrderAccumulator> siteAggStream = siteDayWindowStream.aggregate(new OrderAndGmvAggregateFunc()).name("aggregate_site_order_gmv").uid("aggregate_site_order_gmv");publicstaticfinalclass OrderAndGmvAggregateFuncimplements AggregateFunction<SubOrderDetail, OrderAccumulator, OrderAccumulator> {private static final long serialVersionUID = 1L;@Overridepublic OrderAccumulator createAccumulator() {returnnew OrderAccumulator();}@Overridepublic OrderAccumulator add(SubOrderDetail record, OrderAccumulator acc) {if (acc.getSiteId() == 0) {acc.setSiteId(record.getSiteId());acc.setSiteName(record.getSiteName());}acc.addOrderId(record.getOrderId());acc.addSubOrderSum(1);acc.addQuantitySum(record.getQuantity());acc.addGmv(record.getPrice() * record.getQuantity());return acc;}@Overridepublic OrderAccumulator getResult(OrderAccumulator acc) {return acc;}@Overridepublic OrderAccumulator merge(OrderAccumulator acc1, OrderAccumulator acc2) {if (acc1.getSiteId() == 0) {acc1.setSiteId(acc2.getSiteId());acc1.setSiteName(acc2.getSiteName());}acc1.addOrderIds(acc2.getOrderIds());acc1.addSubOrderSum(acc2.getSubOrderSum());acc1.addQuantitySum(acc2.getQuantitySum());acc1.addGmv(acc2.getGmv());return acc1;}}累加器類OrderAccumulator的實(shí)現(xiàn)很簡(jiǎn)單,看源碼就大概知道它的結(jié)構(gòu)了,因此不再多廢話。唯一需要注意的是訂單ID可能重復(fù),所以需要用名為orderIds的HashSet來保存它。HashSet應(yīng)付我們目前的數(shù)據(jù)規(guī)模還是沒太大問題的,如果是海量數(shù)據(jù),就考慮換用HyperLogLog吧。
接下來就該輸出到Redis供呈現(xiàn)端查詢了。這里有個(gè)問題:一秒內(nèi)有數(shù)據(jù)變化的站點(diǎn)并不多,而ContinuousProcessingTimeTrigger每次觸發(fā)都會(huì)輸出窗口里全部的聚合數(shù)據(jù),這樣做了很多無用功,并且還會(huì)增大Redis的壓力。所以,我們?cè)诰酆辖Y(jié)果后再接一個(gè)ProcessFunction,代碼如下。
DataStream<Tuple2<Long, String>> siteResultStream = siteAggStream.keyBy(0).process(new OutputOrderGmvProcessFunc(), TypeInformation.of(new TypeHint<Tuple2<Long, String>>() {})).name("process_site_gmv_changed").uid("process_site_gmv_changed");public static final class OutputOrderGmvProcessFuncextends KeyedProcessFunction<Tuple, OrderAccumulator, Tuple2<Long, String>> {private static final long serialVersionUID = 1L;private MapState<Long, OrderAccumulator> state;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);state = this.getRuntimeContext().getMapState(new MapStateDescriptor<>("state_site_order_gmv",Long.class,OrderAccumulator.class));}@Overridepublic void processElement(OrderAccumulator value, Context ctx, Collector<Tuple2<Long, String>> out) throws Exception {long key = value.getSiteId();OrderAccumulator cachedValue = state.get(key);if (cachedValue == null || value.getSubOrderSum() != cachedValue.getSubOrderSum()) {JSONObject result = new JSONObject();result.put("site_id", value.getSiteId());result.put("site_name", value.getSiteName());result.put("quantity", value.getQuantitySum());result.put("orderCount", value.getOrderIds().size());result.put("subOrderCount", value.getSubOrderSum());result.put("gmv", value.getGmv());out.collect(new Tuple2<>(key, result.toJSONString());state.put(key, value);}}@Overridepublic void close() throws Exception {state.clear();super.close();}}說來也簡(jiǎn)單,就是用一個(gè)MapState狀態(tài)緩存當(dāng)前所有站點(diǎn)的聚合數(shù)據(jù)。由于數(shù)據(jù)源是以子訂單為單位的,因此如果站點(diǎn)ID在MapState中沒有緩存,或者緩存的子訂單數(shù)與當(dāng)前子訂單數(shù)不一致,表示結(jié)果有更新,這樣的數(shù)據(jù)才允許輸出。
最后就可以安心地接上Redis?Sink了,結(jié)果會(huì)被存進(jìn)一個(gè)Hash結(jié)構(gòu)里。
// 看官請(qǐng)自己構(gòu)造合適的FlinkJedisPoolConfigFlinkJedisPoolConfig jedisPoolConfig = ParameterUtil.getFlinkJedisPoolConfig(false, true);siteResultStream.addSink(new RedisSink<>(jedisPoolConfig, new GmvRedisMapper())).name("sink_redis_site_gmv").uid("sink_redis_site_gmv").setParallelism(1);public static final class GmvRedisMapper implements RedisMapper<Tuple2<Long, String>> {private static final long serialVersionUID = 1L;private static final String HASH_NAME_PREFIX = "RT:DASHBOARD:GMV:";@Overridepublic RedisCommandDescription getCommandDescription() {returnnew RedisCommandDescription(RedisCommand.HSET, HASH_NAME_PREFIX);}@Overridepublic String getKeyFromData(Tuple2<Long, String> data) {return String.valueOf(data.f0);}@Overridepublic String getValueFromData(Tuple2<Long, String> data) {return data.f1;}@Overridepublic Optional<String> getAdditionalKey(Tuple2<Long, String> data) {return Optional.of(HASH_NAME_PREFIX +new LocalDateTime(System.currentTimeMillis()).toString(Consts.TIME_DAY_FORMAT) +"SITES");}}商品Top N
我們可以直接復(fù)用前面產(chǎn)生的orderStream,玩法與上面的GMV統(tǒng)計(jì)大同小異。這里用1秒滾動(dòng)窗口就可以了。
WindowedStream<SubOrderDetail, Tuple, TimeWindow> merchandiseWindowStream = orderStream.keyBy("merchandiseId").window(TumblingProcessingTimeWindows.of(Time.seconds(1)));DataStream<Tuple2<Long, Long>> merchandiseRankStream = merchandiseWindowStream.aggregate(new MerchandiseSalesAggregateFunc(), new MerchandiseSalesWindowFunc()).name("aggregate_merch_sales").uid("aggregate_merch_sales").returns(TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() { }));聚合函數(shù)與窗口函數(shù)的實(shí)現(xiàn)更加簡(jiǎn)單了,最終返回的是商品ID與商品銷量的二元組。
public static final class MerchandiseSalesAggregateFuncimplements AggregateFunction<SubOrderDetail, Long, Long> {private static final long serialVersionUID = 1L;@Overridepublic Long createAccumulator() {return0L;}@Overridepublic Long add(SubOrderDetail value, Long acc) {return acc + value.getQuantity();}@Overridepublic Long getResult(Long acc) {return acc;}@Overridepublic Long merge(Long acc1, Long acc2) {return acc1 + acc2;}}public static final class MerchandiseSalesWindowFuncimplements WindowFunction<Long, Tuple2<Long, Long>, Tuple, TimeWindow> {private static final long serialVersionUID = 1L;@Overridepublic void apply(Tuple key,TimeWindow window,Iterable<Long> accs,Collector<Tuple2<Long, Long>> out) throws Exception {long merchId = ((Tuple1<Long>) key).f0;long acc = accs.iterator().next();out.collect(new Tuple2<>(merchId, acc));}}既然數(shù)據(jù)最終都要落到Redis,那么我們完全沒必要在Flink端做Top N的統(tǒng)計(jì),直接利用Redis的有序集合(zset)就行了,商品ID作為field,銷量作為分?jǐn)?shù)值,簡(jiǎn)單方便。不過flink-redis-connector項(xiàng)目中默認(rèn)沒有提供ZINCRBY命令的實(shí)現(xiàn)(必須再吐槽一次),我們可以自己加,步驟參照之前寫過的那篇加SETEX的命令的文章,不再贅述。RedisMapper的寫法如下。
public static final class RankingRedisMapper implements RedisMapper<Tuple2<Long, Long>> {private static final long serialVersionUID = 1L;private static final String ZSET_NAME_PREFIX = "RT:DASHBOARD:RANKING:";@Overridepublic RedisCommandDescription getCommandDescription() {returnnew RedisCommandDescription(RedisCommand.ZINCRBY, ZSET_NAME_PREFIX);}@Overridepublic String getKeyFromData(Tuple2<Long, Long> data) {return String.valueOf(data.f0);}@Overridepublic String getValueFromData(Tuple2<Long, Long> data) {return String.valueOf(data.f1);}@Overridepublic Optional<String> getAdditionalKey(Tuple2<Long, Long> data) {return Optional.of(ZSET_NAME_PREFIX +new LocalDateTime(System.currentTimeMillis()).toString(Consts.TIME_DAY_FORMAT) + ":" +"MERCHANDISE");}}后端取數(shù)時(shí),用ZREVRANGE命令即可取出指定排名的數(shù)據(jù)了。只要數(shù)據(jù)規(guī)模不是大到難以接受,并且有現(xiàn)成的Redis,這個(gè)方案完全可以作為各類Top N需求的通用實(shí)現(xiàn)。
The End
大屏的實(shí)際呈現(xiàn)需要保密,截圖自然是沒有的。以下是提交執(zhí)行時(shí)Flink Web UI給出的執(zhí)行計(jì)劃(實(shí)際有更多的統(tǒng)計(jì)任務(wù),不止3個(gè)Sink)。通過復(fù)用源數(shù)據(jù),可以在同一個(gè)Flink job內(nèi)實(shí)現(xiàn)更多統(tǒng)計(jì)需求。
總結(jié)
以上是生活随笔為你收集整理的基于 Kafka + Flink + Redis 的电商大屏实时计算案的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 7年半老程序员,被现实击垮……
- 下一篇: 阿里面试 Java 都问什么?万字总结!