2021年大数据Flink(四十五):扩展阅读 双流Join
目錄
擴(kuò)展閱讀??雙流Join
介紹
Window Join
Interval Join
???????代碼演示1
???????代碼演示2
重點注意
擴(kuò)展閱讀??雙流Join
介紹
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/joining.html
https://zhuanlan.zhihu.com/p/340560908
https://blog.csdn.net/andyonlines/article/details/108173259
?
雙流Join是Flink面試的高頻問題。一般情況下說明以下幾點就可以hold了:
- Join大體分類只有兩種:Window Join和Interval Join。
- Window Join又可以根據(jù)Window的類型細(xì)分出3種:
Tumbling Window Join、Sliding Window Join、Session Widnow Join。
Windows類型的join都是利用window的機(jī)制,先將數(shù)據(jù)緩存在Window State中,當(dāng)窗口觸發(fā)計算時,執(zhí)行join操作;
- interval join也是利用state存儲數(shù)據(jù)再處理,區(qū)別在于state中的數(shù)據(jù)有失效機(jī)制,依靠數(shù)據(jù)觸發(fā)數(shù)據(jù)清理;
目前Stream join的結(jié)果是數(shù)據(jù)的笛卡爾積;
?
Window Join
- Tumbling Window Join
執(zhí)行翻滾窗口聯(lián)接時,具有公共鍵和公共翻滾窗口的所有元素將作為成對組合聯(lián)接,并傳遞給JoinFunction或FlatJoinFunction。因為它的行為類似于內(nèi)部連接,所以一個流中的元素在其滾動窗口中沒有來自另一個流的元素,因此不會被發(fā)射!
如圖所示,我們定義了一個大小為2毫秒的翻滾窗口,結(jié)果窗口的形式為[0,1]、[2,3]、。。。。該圖顯示了每個窗口中所有元素的成對組合,這些元素將傳遞給JoinFunction。注意,在翻滾窗口[6,7]中沒有發(fā)射任何東西,因為綠色流中不存在與橙色元素⑥和⑦結(jié)合的元素。
?
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(TumblingEventTimeWindows.of(Time.milliseconds(2))).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});
- Sliding Window Join
在執(zhí)行滑動窗口聯(lián)接時,具有公共鍵和公共滑動窗口的所有元素將作為成對組合聯(lián)接,并傳遞給JoinFunction或FlatJoinFunction。在當(dāng)前滑動窗口中,一個流的元素沒有來自另一個流的元素,則不會發(fā)射!請注意,某些元素可能會連接到一個滑動窗口中,但不會連接到另一個滑動窗口中!
在本例中,我們使用大小為2毫秒的滑動窗口,并將其滑動1毫秒,從而產(chǎn)生滑動窗口[-1,0],[0,1],[1,2],[2,3]…。x軸下方的連接元素是傳遞給每個滑動窗口的JoinFunction的元素。在這里,您還可以看到,例如,在窗口[2,3]中,橙色②與綠色③連接,但在窗口[1,2]中沒有與任何對象連接。
?
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */)).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});
- Session Window Join
?
在執(zhí)行會話窗口聯(lián)接時,具有相同鍵(當(dāng)“組合”時滿足會話條件)的所有元素以成對組合方式聯(lián)接,并傳遞給JoinFunction或FlatJoinFunction。同樣,這執(zhí)行一個內(nèi)部連接,所以如果有一個會話窗口只包含來自一個流的元素,則不會發(fā)出任何輸出!
在這里,我們定義了一個會話窗口連接,其中每個會話被至少1ms的間隔分割。有三個會話,在前兩個會話中,來自兩個流的連接元素被傳遞給JoinFunction。在第三個會話中,綠色流中沒有元素,所以⑧和⑨沒有連接!
?
import?org.apache.flink.api.java.functions.KeySelector;import?org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;import?org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer>?orangeStream?=?...DataStream<Integer>?greenStream?=?...orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(EventTimeSessionWindows.withGap(Time.milliseconds(1))).apply?(new?JoinFunction<Integer,?Integer,?String>?(){@Overridepublic?String?join(Integer?first,?Integer?second)?{return?first?+?","?+?second;}});
?
???????Interval Join
前面學(xué)習(xí)的Window Join必須要在一個Window中進(jìn)行JOIN,那如果沒有Window如何處理呢?
interval join也是使用相同的key來join兩個流(流A、流B),
并且流B中的元素中的時間戳,和流A元素的時間戳,有一個時間間隔。
b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound]
or
a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound
?
也就是:
流B的元素的時間戳 ≥ 流A的元素時間戳 + 下界,且,流B的元素的時間戳 ≤ 流A的元素時間戳 + 上界。
?
在上面的示例中,我們將兩個流“orange”和“green”連接起來,其下限為-2毫秒,上限為+1毫秒。默認(rèn)情況下,這些邊界是包含的,但是可以應(yīng)用.lowerBoundExclusive()和.upperBoundExclusive來更改行為
orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound
import?org.apache.flink.api.java.functions.KeySelector;import?org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;import?org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer>?orangeStream?=?...DataStream<Integer>?greenStream?=?...orangeStream.keyBy(<KeySelector>).intervalJoin(greenStream.keyBy(<KeySelector>)).between(Time.milliseconds(-2),?Time.milliseconds(1)).process?(new?ProcessJoinFunction<Integer,?Integer,?String(){@Overridepublic?void?processElement(Integer?left,?Integer?right,?Context?ctx,?Collector<String>?out)?{out.collect(first?+?","?+?second);}});
?
???????代碼演示1
- 需求
來做個案例:
使用兩個指定Source模擬數(shù)據(jù),一個Source是訂單明細(xì),一個Source是商品數(shù)據(jù)。我們通過window join,將數(shù)據(jù)關(guān)聯(lián)到一起。
?
- 思路
1、Window Join首先需要使用where和equalTo指定使用哪個key來進(jìn)行關(guān)聯(lián),此處我們通過應(yīng)用方法,基于GoodsId來關(guān)聯(lián)兩個流中的元素。
2、設(shè)置5秒的滾動窗口,流的元素關(guān)聯(lián)都會在這個5秒的窗口中進(jìn)行關(guān)聯(lián)。
3、apply方法中實現(xiàn)將兩個不同類型的元素關(guān)聯(lián)并生成一個新類型的元素。
package cn.lanson.extend;import com.alibaba.fastjson.JSON;
import lombok.Data;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;/*** Author lanson* Desc*/
public class JoinDemo01 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 構(gòu)建商品數(shù)據(jù)流DataStream<Goods> goodsDS = env.addSource(new GoodsSource()).assignTimestampsAndWatermarks(new GoodsWatermark());// 構(gòu)建訂單明細(xì)數(shù)據(jù)流DataStream<OrderItem> orderItemDS = env.addSource(new OrderItemSource()).assignTimestampsAndWatermarks(new OrderItemWatermark());// 進(jìn)行關(guān)聯(lián)查詢DataStream<FactOrderItem> factOrderItemDS = orderItemDS.join(goodsDS)// join條件:第一個流orderItemDS的GoodsId == 第二個流goodsDS的GoodsId.where(OrderItem::getGoodsId).equalTo(Goods::getGoodsId)//指定窗口.window(TumblingEventTimeWindows.of(Time.seconds(5)))//處理join結(jié)果.apply((OrderItem item, Goods goods) -> {FactOrderItem factOrderItem = new FactOrderItem();factOrderItem.setGoodsId(goods.getGoodsId());factOrderItem.setGoodsName(goods.getGoodsName());factOrderItem.setCount(new BigDecimal(item.getCount()));factOrderItem.setTotalMoney(goods.getGoodsPrice().multiply(new BigDecimal(item.getCount())));return factOrderItem;});factOrderItemDS.print();env.execute("滾動窗口JOIN");}//商品類@Datapublic static class Goods {private String goodsId;private String goodsName;private BigDecimal goodsPrice;public static List<Goods> GOODS_LIST;public static Random r;static ?{r = new Random();GOODS_LIST = new ArrayList<>();GOODS_LIST.add(new Goods("1", "小米12", new BigDecimal(4890)));GOODS_LIST.add(new Goods("2", "iphone12", new BigDecimal(12000)));GOODS_LIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000)));GOODS_LIST.add(new Goods("4", "Thinkpad X1", new BigDecimal(9800)));GOODS_LIST.add(new Goods("5", "MeiZu One", new BigDecimal(3200)));GOODS_LIST.add(new Goods("6", "Mate 40", new BigDecimal(6500)));}public static Goods randomGoods() {int rIndex = r.nextInt(GOODS_LIST.size());return GOODS_LIST.get(rIndex);}public Goods() {}public Goods(String goodsId, String goodsName, BigDecimal goodsPrice) {this.goodsId = goodsId;this.goodsName = goodsName;this.goodsPrice = goodsPrice;}@Overridepublic String toString() {return JSON.toJSONString(this);}}//訂單明細(xì)類@Datapublic static class OrderItem {private String itemId;private String goodsId;private Integer count;@Overridepublic String toString() {return JSON.toJSONString(this);}}//關(guān)聯(lián)結(jié)果@Datapublic static class FactOrderItem {private String goodsId;private String goodsName;private BigDecimal count;private BigDecimal totalMoney;@Overridepublic String toString() {return JSON.toJSONString(this);}}//構(gòu)建一個商品Stream源(這個好比就是維表)public static class GoodsSource extends RichSourceFunction<Goods> {private Boolean isCancel;@Overridepublic void open(Configuration parameters) throws Exception {isCancel = false;}@Overridepublic void run(SourceContext sourceContext) throws Exception {while(!isCancel) {Goods.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods));TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {isCancel = true;}}//構(gòu)建訂單明細(xì)Stream源public static class OrderItemSource extends RichSourceFunction<OrderItem> {private Boolean isCancel;private Random r;@Overridepublic void open(Configuration parameters) throws Exception {isCancel = false;r = new Random();}@Overridepublic void run(SourceContext sourceContext) throws Exception {while(!isCancel) {Goods goods = Goods.randomGoods();OrderItem orderItem = new OrderItem();orderItem.setGoodsId(goods.getGoodsId());orderItem.setCount(r.nextInt(10) + 1);orderItem.setItemId(UUID.randomUUID().toString());sourceContext.collect(orderItem);orderItem.setGoodsId("111");sourceContext.collect(orderItem);TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {isCancel = true;}}//構(gòu)建水印分配器,學(xué)習(xí)測試直接使用系統(tǒng)時間了public static class GoodsWatermark implements WatermarkStrategy<Goods> {@Overridepublic TimestampAssigner<Goods> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return (element, recordTimestamp) -> System.currentTimeMillis();}@Overridepublic WatermarkGenerator<Goods> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<Goods>() {@Overridepublic void onEvent(Goods event, long eventTimestamp, WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}};}}//構(gòu)建水印分配器,學(xué)習(xí)測試直接使用系統(tǒng)時間了public static class OrderItemWatermark implements WatermarkStrategy<OrderItem> {@Overridepublic TimestampAssigner<OrderItem> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return (element, recordTimestamp) -> System.currentTimeMillis();}@Overridepublic WatermarkGenerator<OrderItem> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<OrderItem>() {@Overridepublic void onEvent(OrderItem event, long eventTimestamp, WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}};}}
}
?
???????代碼演示2
1、通過keyBy將兩個流join到一起
2、interval join需要設(shè)置流A去關(guān)聯(lián)哪個時間范圍的流B中的元素。此處,我設(shè)置的下界為-1、上界為0,且上界是一個開區(qū)間。表達(dá)的意思就是流A中某個元素的時間,對應(yīng)上一秒的流B中的元素。
3、process中將兩個key一樣的元素,關(guān)聯(lián)在一起,并加載到一個新的FactOrderItem對象中
package cn.lanson.extend;import com.alibaba.fastjson.JSON;
import lombok.Data;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;/*** Author lanson* Desc*/
public class JoinDemo02 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 構(gòu)建商品數(shù)據(jù)流DataStream<Goods> goodsDS = env.addSource(new GoodsSource()).assignTimestampsAndWatermarks(new GoodsWatermark());// 構(gòu)建訂單明細(xì)數(shù)據(jù)流DataStream<OrderItem> orderItemDS = env.addSource(new OrderItemSource()).assignTimestampsAndWatermarks(new OrderItemWatermark());// 進(jìn)行關(guān)聯(lián)查詢SingleOutputStreamOperator<FactOrderItem> factOrderItemDS = orderItemDS.keyBy(OrderItem::getGoodsId).intervalJoin(goodsDS.keyBy(Goods::getGoodsId)).between(Time.seconds(-1), Time.seconds(0))//.upperBoundExclusive().process(new ProcessJoinFunction<OrderItem, Goods, FactOrderItem>() {@Overridepublic void processElement(OrderItem left, Goods right, Context ctx, Collector<FactOrderItem> out) throws Exception {FactOrderItem factOrderItem = new FactOrderItem();factOrderItem.setGoodsId(right.getGoodsId());factOrderItem.setGoodsName(right.getGoodsName());factOrderItem.setCount(new BigDecimal(left.getCount()));factOrderItem.setTotalMoney(right.getGoodsPrice().multiply(new BigDecimal(left.getCount())));out.collect(factOrderItem);}});factOrderItemDS.print();env.execute("Interval JOIN");}//商品類@Datapublic static class Goods {private String goodsId;private String goodsName;private BigDecimal goodsPrice;public static List<Goods> GOODS_LIST;public static Random r;static {r = new Random();GOODS_LIST = new ArrayList<>();GOODS_LIST.add(new Goods("1", "小米12", new BigDecimal(4890)));GOODS_LIST.add(new Goods("2", "iphone12", new BigDecimal(12000)));GOODS_LIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000)));GOODS_LIST.add(new Goods("4", "Thinkpad X1", new BigDecimal(9800)));GOODS_LIST.add(new Goods("5", "MeiZu One", new BigDecimal(3200)));GOODS_LIST.add(new Goods("6", "Mate 40", new BigDecimal(6500)));}public static Goods randomGoods() {int rIndex = r.nextInt(GOODS_LIST.size());return GOODS_LIST.get(rIndex);}public Goods() {}public Goods(String goodsId, String goodsName, BigDecimal goodsPrice) {this.goodsId = goodsId;this.goodsName = goodsName;this.goodsPrice = goodsPrice;}@Overridepublic String toString() {return JSON.toJSONString(this);}}//訂單明細(xì)類@Datapublic static class OrderItem {private String itemId;private String goodsId;private Integer count;@Overridepublic String toString() {return JSON.toJSONString(this);}}//關(guān)聯(lián)結(jié)果@Datapublic static class FactOrderItem {private String goodsId;private String goodsName;private BigDecimal count;private BigDecimal totalMoney;@Overridepublic String toString() {return JSON.toJSONString(this);}}//構(gòu)建一個商品Stream源(這個好比就是維表)public static class GoodsSource extends RichSourceFunction<Goods> {private Boolean isCancel;@Overridepublic void open(Configuration parameters) throws Exception {isCancel = false;}@Overridepublic void run(SourceContext sourceContext) throws Exception {while (!isCancel) {Goods.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods));TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {isCancel = true;}}//構(gòu)建訂單明細(xì)Stream源public static class OrderItemSource extends RichSourceFunction<OrderItem> {private Boolean isCancel;private Random r;@Overridepublic void open(Configuration parameters) throws Exception {isCancel = false;r = new Random();}@Overridepublic void run(SourceContext sourceContext) throws Exception {while (!isCancel) {Goods goods = Goods.randomGoods();OrderItem orderItem = new OrderItem();orderItem.setGoodsId(goods.getGoodsId());orderItem.setCount(r.nextInt(10) + 1);orderItem.setItemId(UUID.randomUUID().toString());sourceContext.collect(orderItem);orderItem.setGoodsId("111");sourceContext.collect(orderItem);TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {isCancel = true;}}//構(gòu)建水印分配器,學(xué)習(xí)測試直接使用系統(tǒng)時間了public static class GoodsWatermark implements WatermarkStrategy<Goods> {@Overridepublic TimestampAssigner<Goods> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return (element, recordTimestamp) -> System.currentTimeMillis();}@Overridepublic WatermarkGenerator<Goods> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<Goods>() {@Overridepublic void onEvent(Goods event, long eventTimestamp, WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}};}}//構(gòu)建水印分配器,學(xué)習(xí)測試直接使用系統(tǒng)時間了public static class OrderItemWatermark implements WatermarkStrategy<OrderItem> {@Overridepublic TimestampAssigner<OrderItem> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return (element, recordTimestamp) -> System.currentTimeMillis();}@Overridepublic WatermarkGenerator<OrderItem> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<OrderItem>() {@Overridepublic void onEvent(OrderItem event, long eventTimestamp, WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis()));}};}}
}
重點注意
注意:后面項目中涉及到雙流
接下來的內(nèi)容面試常問
雙流Join是Flink面試的高頻問題。一般情況下說明以下幾點就可以hold了: 1.Join大體分類只有兩種:Window Join和Interval Join。 2.Window Join又可以根據(jù)Window的類型細(xì)分出3種: Tumbling 、Sliding 、Session Widnow Join。 3.Windows類型的join都是利用window的機(jī)制,先將數(shù)據(jù)緩存在Window State中,當(dāng)窗口觸發(fā)計算時,執(zhí)行join操作; 4.interval join也是利用state存儲數(shù)據(jù)再處理,區(qū)別在于state中的數(shù)據(jù)有失效機(jī)制,依靠數(shù)據(jù)觸發(fā)數(shù)據(jù)清理;
看官網(wǎng)示例說明
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/joining.html
總結(jié)
以上是生活随笔為你收集整理的2021年大数据Flink(四十五):扩展阅读 双流Join的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Flink(四十四):
- 下一篇: 数据结构算法 简单的面试思考题