如何在 PyFlink 1.10 中自定义 Python UDF?
我們知道 PyFlink 是在 Apache Flink 1.9 版新增的,那么在 Apache Flink 1.10 中 Python UDF 功能支持的速度是否能夠滿足用戶的急切需求呢?
Python UDF 的發(fā)展趨勢(shì)
直觀的判斷,PyFlink Python UDF 的功能也可以如上圖一樣能夠迅速?gòu)挠酌缱兂纱髽?#xff0c;為啥有此判斷,請(qǐng)繼續(xù)往下看…
Flink on Beam
我們都知道有 Beam on Flink 的場(chǎng)景,就是 Beam 支持多種 Runner,也就是說 Beam SDK 編寫的 Job 可以運(yùn)行在 Flink 之上。如下圖所示:
上面這圖是 Beam Portability Framework 的架構(gòu)圖,他描述了 Beam 如何支持多語(yǔ)言,如何支持多 Runner,單獨(dú)說 Apache Flink 的時(shí)候我們就可以說是 Beam on Flink,那么怎么解釋 Flink on Beam 呢?
在 Apache Flink 1.10 中我們所說的 Flink on Beam 更精確的說是 PyFlink on Beam Portability Framework。我們看一下簡(jiǎn)單的架構(gòu)圖,如下:
Beam Portability Framework 是一個(gè)成熟的多語(yǔ)言支持框架,框架高度抽象了語(yǔ)言之間的通信協(xié)議(gRPC),定義了數(shù)據(jù)的傳輸格式(Protobuf),并且根據(jù)通用流計(jì)算框架所需要的組件,抽象個(gè)各種服務(wù),比如 DataService,StateService,MetricsService 等。在這樣一個(gè)成熟的框架下,PyFlink 可以快速的構(gòu)建自己的 Python 算子,同時(shí)重用 Apache Beam Portability Framework 中現(xiàn)有 SDK harness 組件,可以支持多種 Python 運(yùn)行模式,如:Process,Docker,etc.,這使得 PyFlink 對(duì) Python UDF 的支持變得非常容易,在 Apache Flink 1.10 中的功能也非常的穩(wěn)定和完整。那么為啥說是 Apache Flink 和 Apache Beam 共同打造呢,是因?yàn)槲野l(fā)現(xiàn)目前 Apache Beam Portability Framework 的框架也存在很多優(yōu)化的空間,所以我在 Beam 社區(qū)進(jìn)行了優(yōu)化討論,并且在 Beam 社區(qū)也貢獻(xiàn)了?20+ 的優(yōu)化補(bǔ)丁。
概要了解了 Apache Flink 1.10 中 Python UDF 的架構(gòu)之后,我們還是切入的代碼部分,看看如何開發(fā)和使用 Python UDF。
如何定義 Python UDF
在 Apache Flink 1.10 中我們有多種方式進(jìn)行 UDF 的定義,比如:
- Extend ScalarFunction, e.g.:
- Lambda Functio
- Named Function
- Callable Function
我們發(fā)現(xiàn)上面定義函數(shù)除了第一個(gè)擴(kuò)展 ScalaFunction 的方式是 PyFlink 特有的,其他方式都是 Python 語(yǔ)言本身就支持的,也就是說,在 Apache Flink 1.10 中 PyFlink 允許以任何 Python 語(yǔ)言所支持的方式定義 UDF。
如何使用 Python UDF
那么定義完 UDF 我們應(yīng)該怎樣使用呢?Apache Flink 1.10 中提供了 2 種 Decorators,如下:
- Decorators - udf(), e.g. :
- Decorators - @udf, e.g. :
然后在使用之前進(jìn)行注冊(cè),如下:
st_env.register_function("hash_code", hash_code_mean)接下來(lái)就可以在 Table API/SQL 中進(jìn)行使用了,如下:
my_table.select("hash_code_mean(a, b)").insert_into("Results")目前為止,我們已經(jīng)完成了 Python UDF 的定義,聲明和注冊(cè)了。接下來(lái)我們還是看一個(gè)完整的示例吧:)
案例描述
- 需求
假設(shè)蘋果公司要統(tǒng)計(jì)該公司產(chǎn)品在雙 11 期間各城市的銷售數(shù)量和銷售金額分布情況。
- 數(shù)據(jù)格式
每一筆訂單是一個(gè)字符串,字段用逗號(hào)分隔, 例如:
ItemName, OrderCount, Price, City ------------------------------------------- iPhone 11, 30, 5499, Beijing\n iPhone 11 Pro,20,8699,Guangzhou\n案例分析
根據(jù)案例的需求和數(shù)據(jù)結(jié)構(gòu)分析,我們需要對(duì)原始字符串進(jìn)行結(jié)構(gòu)化解析,那么需要一個(gè)按“,”號(hào)分隔的 UDF(split) 和一個(gè)能夠?qū)⒏鱾€(gè)列信息展平的 DUF(get)。同時(shí)我們需要根據(jù)城市進(jìn)行分組統(tǒng)計(jì)。
核心實(shí)現(xiàn)
UDF 定義
- Split UDF
- Get UDF
注冊(cè) UDF
- 注冊(cè) Split UDF
- 注冊(cè) Get UDF
核心實(shí)現(xiàn)邏輯
如下代碼我們發(fā)現(xiàn)核心實(shí)現(xiàn)邏輯非常簡(jiǎn)單,只需要對(duì)數(shù)據(jù)進(jìn)行解析和對(duì)數(shù)據(jù)進(jìn)行集合計(jì)算:
t_env.from_table_source(SocketTableSource(port=9999))\ .alias("line")\ .select("split(line) as str_array")\ .select("get(str_array, 3) as city, " "get(str_array, 1).cast(LONG) as count, " "get(str_array, 2).cast(LONG) as unit_price")\ .select("city, count, count * unit_price as total_price")\ .group_by("city")\ .select("city, sum(count) as sales_volume, sum(total_price) as sales")\.insert_into("sink") t_env.execute("Sales Statistic")上面的代碼我們假設(shè)是一個(gè) Socket 的 Source,Sink 是一個(gè) Chart Sink,那么最終運(yùn)行效果圖,如下:
我總是認(rèn)為在博客中只是文本描述而不能讓讀者真正的在自己的機(jī)器上運(yùn)行起來(lái)的博客,不是好博客,所以接下來(lái)我們看看按照我們下面的操作,是否能在你的機(jī)器上也運(yùn)行起來(lái)?:)
環(huán)境
因?yàn)槟壳?PyFlink 還沒有部署到 PyPI 上面,在 Apache Flink 1.10 發(fā)布之前,我們需要通過構(gòu)建 Flink 的 master 分支源碼來(lái)構(gòu)建運(yùn)行我們 Python UDF 的 PyFlink 版本。
源代碼編譯
在進(jìn)行編譯代碼之前,我們需要你已經(jīng)安裝了?JDK8?和?Maven3x。
- 下載解壓
- 修改環(huán)境變量(~/.bashrc)
除了 JDK 和 MAVEN 完整的環(huán)境依賴性如下:
- JDK 1.8+ (1.8.0_211)
- Maven 3.x (3.2.5)
- Scala 2.11+ (2.12.0)
- Python 3.6+ (3.7.3)
- Git 2.20+ (2.20.1)
- Pip3 19+ (19.1.1)
我們看到基礎(chǔ)環(huán)境安裝比較簡(jiǎn)單,我這里就不每一個(gè)都貼出來(lái)了。如果大家有問題歡迎郵件或者博客留言。
- 下載 Flink 源代碼:
- 編譯
- 構(gòu)建 PyFlink 發(fā)布包
- 安裝 PyFlink(PyFlink 1.10 需要 Python3.6+)
也可以查看一下,我們核心需要 apache-beam 和 apache-flink,如下命令:
jincheng:flink-python jincheng.sunjc$ pip3 list Package Version ----------------------------- --------- alabaster 0.7.12 apache-beam 2.15.0 apache-flink 1.10.dev0 atomicwrites 1.3.0如上信息證明你我們所需的 Python 依賴已經(jīng)沒問題了,接下來(lái)回過頭來(lái)在看看如何進(jìn)行業(yè)務(wù)需求的開發(fā)。
PyFlinlk 的 Job 結(jié)構(gòu)
一個(gè)完成的 PyFlink 的 Job 需要有外部數(shù)據(jù)源的定義,有業(yè)務(wù)邏輯的定義和最終計(jì)算結(jié)果輸出的定義。也就是 Source connector, Transformations, Sink connector,接下來(lái)我們根據(jù)這個(gè)三個(gè)部分進(jìn)行介紹來(lái)完成我們的需求。
Source Connector
我們需要實(shí)現(xiàn)一個(gè) Socket Connector,首先要實(shí)現(xiàn)一個(gè) StreamTableSource, 核心代碼是實(shí)現(xiàn) getDataStream,代碼如下:
@Overridepublic DataStream<Row> getDataStream(StreamExecutionEnvironment env) {return env.socketTextStream(hostname, port, lineDelimiter, MAX_RETRY).flatMap(new Spliter(fieldNames.length, fieldDelimiter, appendProctime)).returns(getReturnType());}上面代碼利用了 StreamExecutionEnvironment 中現(xiàn)有 socketTextStream 方法接收數(shù)據(jù),然后將業(yè)務(wù)訂單數(shù)據(jù)傳個(gè)一個(gè) FlatMapFunction, FlatMapFunction 主要實(shí)現(xiàn)將數(shù)據(jù)類型封裝為 Row,詳細(xì)代碼查閱?Spliter。
同時(shí),我們還需要在 Python 封裝一個(gè) SocketTableSource,詳情查閱?socket_table_source.py。
Sink Connector
我們預(yù)期要得到的一個(gè)效果是能夠?qū)⒔Y(jié)果數(shù)據(jù)進(jìn)行圖形化展示,簡(jiǎn)單的思路是將數(shù)據(jù)寫到一個(gè)本地的文件,然后在寫一個(gè) HTML 頁(yè)面,使其能夠自動(dòng)更新結(jié)果文件,并展示結(jié)果。所以我們還需要自定義一個(gè) Sink 來(lái)完成該功能,我們的需求計(jì)算結(jié)果是會(huì)不斷的更新的,也就是涉及到 Retraction(如果大家不理解這個(gè)概念,可以查閱我以前的博客),目前在 Flink 里面還沒有默認(rèn)支持 Retract 的 Sink,所以我們需要自定義一個(gè) RetractSink,比如我們實(shí)現(xiàn)一下 CsvRetractTableSink。
CsvRetractTableSink 的核心邏輯是緩沖計(jì)算結(jié)果,每次更新進(jìn)行一次全量(這是個(gè)純 demo,不能用于生產(chǎn)環(huán)境)文件輸出。源代碼查閱?CsvRetractTableSink。
同時(shí)我們還需要利用 Python 進(jìn)行封裝,詳見 chart_table_sink.py。
在 chart_table_sink.py 我們封裝了一個(gè) http server,這樣我們可以在瀏覽器中查閱我們的統(tǒng)計(jì)結(jié)果。
業(yè)務(wù)邏輯
完成自定義的 Source 和 Sink 之后我們終于可以進(jìn)行業(yè)務(wù)邏輯的開發(fā)了,其實(shí)整個(gè)過程自定義 Source 和 Sink 是最麻煩的,核心計(jì)算邏輯似乎要簡(jiǎn)單的多。
- 設(shè)置 Python 版本(很重要)
如果你本地環(huán)境 python 命令版本是 2.x,那么需要對(duì) Python 版本進(jìn)行設(shè)置,如下:
t_env.get_config().set_python_executable("python3")PyFlink 1.10 之后支持 Python 3.6+ 版本。
- 讀取數(shù)據(jù)源
PyFlink 讀取數(shù)據(jù)源非常簡(jiǎn)單,如下:
... ... t_env.from_table_source(SocketTableSource(port=9999)).alias("line")上面這一行代碼定義了監(jiān)聽端口 9999 的數(shù)據(jù)源,同時(shí)結(jié)構(gòu)化 Table 只有一個(gè)名為 line 的列。
- 解析原始數(shù)據(jù)
我們需要對(duì)上面列進(jìn)行分析,為了演示 Python UDF,我們?cè)?SocketTableSource中并沒有對(duì)數(shù)據(jù)進(jìn)行預(yù)處理,所以我們利用上面 UDF 定義 一節(jié)定義的 UDF,來(lái)對(duì)原始數(shù)據(jù)進(jìn)行預(yù)處理。
... ... .select("split(line) as str_array") .select("get(str_array, 3) as city, " "get(str_array, 1).cast(LONG) as count, " "get(str_array, 2).cast(LONG) as unit_price") .select("city, count, count * unit_price as total_price")- 統(tǒng)計(jì)分析
核心的統(tǒng)計(jì)邏輯是根據(jù) city 進(jìn)行分組,然后對(duì) 銷售數(shù)量和銷售金額進(jìn)行求和,如下:
... ... .group_by("city") .select("city, sum(count) as sales_volume, sum(total_price) as sales")\- 計(jì)算結(jié)果輸出
計(jì)算結(jié)果寫入到我們自定義的 Sink 中,如下:
... ... .insert_into("sink")- 完整的代碼(blog_demo.py)
上面代碼中大家會(huì)發(fā)現(xiàn)一個(gè)陌生的部分,就是 from pyflink.demo import ChartConnector, SocketTableSource. 其中 pyflink.demo 是哪里來(lái)的呢?其實(shí)就是包含了上面我們介紹的 自定義 Source/Sink(Java&Python)。下面我們來(lái)介紹如何增加這個(gè) pyflink.demo 模塊。
安裝 pyflink.demo
為了大家方便我把自定義 Source/Sink(Java&Python)的源代碼放到了這里 ,大家可以進(jìn)行如下操作:
- 下載源碼
- 編譯源碼
- 構(gòu)建發(fā)布包
- 安裝 Pyflink.demo
出現(xiàn)上面信息證明已經(jīng)將 PyFlink.demo 模塊成功安裝。接下來(lái)我們可以運(yùn)行我們的示例了 :)
運(yùn)行示例
示例的代碼在上面下載的源代碼里面已經(jīng)包含了,為了簡(jiǎn)單,我們利用 PyCharm 打開enjoyment.code/myPyFlink。同時(shí)在 Terminal 啟動(dòng)一個(gè)端口:
nc -l 6666啟動(dòng) blog_demo,如果一切順利,啟動(dòng)之后,控制臺(tái)會(huì)輸出一個(gè) web 地址,如下所示:
我們打開這個(gè)頁(yè)面,開始是一個(gè)空白頁(yè)面,如下:
我們嘗試將下面的數(shù)據(jù),一條,一條的發(fā)送給 Source Connector:
iPhone 11,30,5499,Beijing iPhone 11 Pro,20,8699,Guangzhou MacBook Pro,10,9999,Beijing AirPods Pro,50,1999,Beijing MacBook Pro,10,11499,Shanghai iPhone 11,30,5999,Shanghai iPhone 11 Pro,20,9999,Shenzhen MacBook Pro,10,13899,Hangzhou iPhone 11,10,6799,Beijing MacBook Pro,10,18999,Beijing iPhone 11 Pro,10,11799,Shenzhen MacBook Pro,10,22199,Shanghai AirPods Pro,40,1999,Shanghai當(dāng)輸入第一條訂單 iPhone 11,30,5499,Beijing,之后,頁(yè)面變化如下:
隨之訂單數(shù)據(jù)的不斷輸入,統(tǒng)計(jì)圖不斷變化。一個(gè)完整的 GIF 演示如下:
小結(jié)
本篇從架構(gòu)到 UDF 接口定義,再到具體的實(shí)例,向大家介紹了在 Apache Flink 1.10 發(fā)布之后,如何利用 PyFlink 進(jìn)行業(yè)務(wù)開發(fā),其中 用戶自定義 Source 和 Sink部分比較復(fù)雜,這也是目前社區(qū)需要進(jìn)行改進(jìn)的部分(Java/Scala)。真正的核心邏輯部分其實(shí)比較簡(jiǎn)單,為了大家按照本篇進(jìn)行實(shí)戰(zhàn)操作有些成就感,所以我增加了自定義 Source/Sink 和圖形化部分。但如果大家想簡(jiǎn)化實(shí)例的實(shí)現(xiàn)也可以利用 Kafka 作為 Source 和 Sink,這樣就可以省去自定義的部分,做起來(lái)也會(huì)簡(jiǎn)單一些。
雙12來(lái)襲!500元淘寶紅包、iPhone11等你拿。
https://www.aliyun.com/1212/2019/home?utm_content=g_1000092611
原文鏈接
本文為阿里云原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
總結(jié)
以上是生活随笔為你收集整理的如何在 PyFlink 1.10 中自定义 Python UDF?的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 用Flink取代Spark Stream
- 下一篇: AnalyticDB for MySQL