Flink 实时计算 - 维表 Join 解读
Flink 實時計算 - 維表 Join 解讀
前言
Flink 1.9 版本可以說是一個具有里程碑意義的版本,其內部合入了很多 Blink Table/SQL 方面的功能,同時也開始增強 Flink 在批處理方面的能力,真的是向批流統一的終極方向開始前進。Flink 1.9 版本在 8.22 號也終于發布了。本文主要介紹學習 Flink SQL 維表 Join,維表 Join 對于SQL 任務來說,一般是一個很正常的功能,本文給出代碼層面的實現,和大家分享用戶如何自定義 Flink 維表。
1. 什么是維表
維表作為 SQL 任務中一種常見表的類型,其本質就是關聯表數據的額外數據屬性,通常在 Join 語句中進行使用。比如源數據有人的身份證號,人名,你現在想要得到人的家庭地址,那么可以通過身份證號去關聯人的身份證信息,就可以得到更全的數據。
維表可以是靜態的數據,也可以是動態的數據(比如定時更新的數據),一般會通過特定的主鍵來進行關聯。它可以在 Mysql 中進行存儲,也可以在 Nosql 數據庫中進行存儲,比如 HBase等。
2. Flink 中的維表
Flink 1.9 中維表功能來源于新加入的Blink中的功能,如果你要使用該功能,那就需要自己引入 Blink 的 Planner,而不是引用社區的 Planner。由于新合入的 Blink 相關功能,使得 Flink 1.9 實現維表功能很簡單,只要自定義實現 LookupableTableSource 接口,同時實現里面的方法就可以進行,下面來分析一下 LookupableTableSource的代碼:
public interface LookupableTableSource<T> extends TableSource<T> {TableFunction<T> getLookupFunction(String[] lookupKeys);AsyncTableFunction<T> getAsyncLookupFunction(String[] lookupKeys);boolean isAsyncEnabled(); }isAsyncEnabled 方法主要表示該表是否支持異步訪問外部數據源獲取數據,當返回 true 時,那么在注冊到 TableEnvironment 后,使用時會返回異步函數進行調用,當返回 false 時,則使同步訪問函數。
可以看到 LookupableTableSource 這個接口中有三個方法
getLookupFunction 方法返回一個同步訪問外部數據系統的函數,什么意思呢,就是你通過 Key 去查詢外部數據庫,需要等到返回數據后才繼續處理數據,這會對系統處理的吞吐率有影響。
getAsyncLookupFunction 方法則是返回一個異步的函數,異步訪問外部數據系統,獲取數據,這能極大的提升系統吞吐率。具體是否要實現異步函數方法,這需要用戶自己判定是否需要對異步訪問的支持,如果同步方法的吞吐率已經滿足要求,那可以先不用考慮異步的實現情況。
2.2 同步訪問函數getLookupFunction
getLookupFunction 會返回同步方法,這里你需要自定義 TableFuntion 進行實現,TableFunction 本質是 UDTF,輸入一條數據可能返回多條數據,也可能返回一條數據。用戶自定義 TableFunction 格式如下:
public class MyLookupFunction extends TableFunction<Row> {@Overridepublic void open(FunctionContext context) throws Exception {super.open(context);}public void eval(Object... paramas) {} }open 方法在進行初始化算子實例的進行調用,異步外部數據源的client要在類中定義為 transient,然后在 open 方法中進行初始化,這樣每個任務實例都會有一個外部數據源的 client。防止同一個 client 多個任務實例調用,出現線程不安全情況。
eval 則是 TableFunction 最重要的方法,它用于關聯外部數據。當程序有一個輸入元素時,就會調用eval一次,用戶可以將產生的數據使用 collect() 進行發送下游。paramas 的值為用戶輸入元素的值,比如在 Join 的時候,使用 A.id = B.id and A.name = b.name, B 是維表,A 是用戶數據表,paramas 則代表 A.id,A.name 的值。
2.3 異步訪問函數
getAsyncLookupFunction 會返回異步訪問外部數據源的函數,如果你想使用異步函數,前提是 LookupableTableSource 的 isAsyncEnabled 方法返回 true 才能使用。
使用異步函數訪問外部數據系統,一般是外部系統有異步訪問客戶端,如果沒有的話,可以自己使用線程池異步訪問外部系統。至于為什么使用異步訪問函數,無非就是為了提高程序的吞吐量,不需要每條記錄訪問返回數據后,才去處理下一條記錄。
異步函數格式如下:
public class MyAsyncLookupFunction extends AsyncTableFunction<Row> {@Overridepublic void open(FunctionContext context) throws Exception {super.open(context);}public void eval(CompletableFuture<Collection<Row>> future, Object... params) {} }維表異步訪問函數總體和同步函數實現類似,這里說一下注意點:
外部數據源異步客戶端初始化。如果是線程安全的(多個客戶端一起使用),你可以不加 transient 關鍵字,初始化一次。否則,你需要加上 transient,不對其進行初始化,而在 open 方法中,為每個 Task 實例初始化一個。
eval 方法中多了一個 CompletableFuture,當異步訪問完成時,需要調用其方法進行處理.
為了減少每條數據都去訪問外部數據系統,提高數據的吞吐量,一般我們會在同步函數和異步函數中加入緩存,如果以前某個關鍵字訪問過外部數據系統,我們將其值放入到緩存中,在緩存沒有失效之前,如果該關鍵字再次進行處理時,直接先訪問緩存,有就直接返回,沒有再去訪問外部數據系統,然后在進行緩存,進一步提升我們實時程序處理的吞吐量。
一般緩存類型有以下幾種類型:
3. 總結
Flink 在 1.9 版本開源出維表功能,用戶可以結合自己的具體需求,自定義的去開發維表。Flink 1.9 版本在Flink SQL方面的開源出很多功能,用戶可以自己選擇具體 Planner進行使用,社區的Planner、Blink的 Planner。希望 Flink 在未來越來越好。
參考:https://blog.csdn.net/u012554509/article/details/100533749
總結
以上是生活随笔為你收集整理的Flink 实时计算 - 维表 Join 解读的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: SQL 中 left join、righ
- 下一篇: GDB入门:A GDB Tutorial