如何将实时计算 Flink 与自身环境打通?
作者 | 張鵬(七器),阿里巴巴開發工程師
本篇內容將向大家介紹如何將實時計算 Flink 與其他系統打通。介紹內容包含四個部分,分別是:
1、Jar的存儲與使用;
2、實時計算 Flink 如何與一些典型數據源進行交互;
3、如何將VVP平臺上 Flink的指標打入Metrics外部系統;
4、如何將VVP平臺上運行的 Flink作業日志打入到外部系統。
一、運行作業的Jar如何存儲在OSS上
在VVP平臺有兩種方法可以上傳作業的jar。
方法一,借助VVP提供的資源上傳功能,可以直接使用這個功能對Jar進行上傳目前該功能支持200兆以內的Jar包上傳。使用時,直接在創建作業的時候選擇上傳的jar包就可以了,演示如下:
● 進入到VVP平臺,點擊左側資源上傳功能,然后在打開頁面點擊右上角的上傳資源,選擇要上傳的Jar包,完成上傳;
● 上傳成功后,點擊左側創建作業,完善作業名等信息。在Jar URI欄,下拉選擇剛剛上傳的Jar包,點擊確定完成創建作業,然后啟動即可使用。
方法二,直接在OSS的控制臺上面,將要使用的Jar上傳上去,然后使用OSS是提供的Jar鏈接來行使用。使用的時候也比較簡單,直接使用OSS提供的Jar鏈接,演示如下:
● 打開OSS控制臺,選擇在創建VVP時候使用的Bucket,再選擇目錄,點擊上傳文件,上傳時可以將它的權限設置為公共讀,點擊上傳文件即完成;
● 使用時,OSS控制臺上點擊已上傳包右側的“詳情”,獲取該Jar包的URL鏈接。
● 創建作業時,將jar包的URL的鏈接填入Jar URI,如下圖所示:
需要注意,OSS詳情頁面提供的鏈接是公網訪問的,開通的VVP并不能直接訪問公網,所以在創建作業使用HTTPS的時候,需要使用VPC訪問的endpoint(例如:https://vvp-training.oss-cn-shanghai-internal.aliyuncs.com/artifacts/namespaces/vvp-training/WordCount.jar),這樣才能正常的啟動作業。
如果想用公網獲取一個HTTPS的鏈接,怎么操作呢?可以首先對VVP進行公網打通,打通的操作流程可以參考阿里云幫助文檔中的《Flink 全托管集群如何訪問公網》(https://help.aliyun.com/document_detail/174840.html),簡單來說步驟如下:
● 首先,創建一個NAT網關。創建時選擇“組合購買ERP”,然后選擇區域并補充名稱等信息,然后綁定彈性公網IP,完成創建;
● 其次,創建SNAT條目。創建好NAT之后,點擊“創建SNAT條目”,在彈窗選擇交換機并補充名稱信息,完成創建。
完成上述兩個步驟,該VVP實例就已經打通公網,在創建Deployment時就可以直接使用https公網可訪問的jar包了。
二、在VVP平臺上 Flink 如何與典型數據源進行交互
這部介紹如何通過SQL以及connectors與外部的一些數據存儲系統進行交互,以SLS,Kafka作為數據源讀寫數據為例。
(實操演示)點擊SQL編輯器,創建一個Datagen Table,它是用于數據的隨機生成的,然后點擊運行。然后再點擊生成一個SLS Table,補充所需參數信息,然后點擊創建完成。
創建完成后,寫入SQL語句,比如insert into sls select id, name from datagen,然后另存后點擊運行,創建Deployment并啟動。
當作業成功運行后,在SLS上查詢數據。如下圖所示,說明datagen已經生成數據并成功寫入SLS。
類似的,我們可以按照上面的步驟從SLS讀數據然后寫入Kafka:
● 在vvp的sql編輯器頁面創建一個Kafka table
● 用SQL語法從SLS讀取數據寫入Kafka中并啟動
● 作業運行成功后,即開始從SLS讀數據寫入Kafka中
三、如何將VVP平臺上 Flink的指標打入外部Metrics系統
接下介紹如果想把運行作業的指標放入到一些系統當中去,并進行指標觀測。VVP提供了兩種方法:
方法一,VVP默認的將 Flink 作業指標打入到arms,不需要額外的處理,直接運行作業之后,就能通過指標按鈕看到,如下圖所示:
方法二,如果自己有指標系統,想把 Flink 的作業指標打入到自己的系統里,主要有兩點:首先保證VVP上作業與自己指標系統網絡的連通性;其次在 Flink conf 中配置好相應的metrics reporter。如下圖所示,在創建作業過程中,進行metric配置(metrics reporters配置參考:https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/metrics.html):
例:使用premetheus的pushGateway方式,所以reporter class就選擇org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter。按上圖所示配置pushGateway的port和host,Metric reporter就配置完成了。作業啟動成功后在配置好的grafana大盤上查看指標,如下例所示。
四、如何將Flink作業日志打入到外部系統
如果在作業運行中,突然運行失敗,我們想要查看運行失敗作業的日志,就需要把 Flink 作業的日志保存下來。在VVP平臺為這個目的提供了兩種方案,將Logs寫入OSS中或SLS中,簡單來說,在創建作業的時候, 在Log配置項里面配置一些Log參數。
配置參考文檔:https://help.aliyun.com/document_detail/173646.html
方法一,將日志寫入OSS中。在創建作業的時候,在高級配置中的Log配置里,選擇使用用戶自定義,然后將(幫助文檔)里面的配置放在自定義的配置中去,再將一些參數換成OSS的必要參數就可以了。
需要查看日志時,可以通過幫助文檔的指導,找到日志存放的文件,然后點擊下載查看。
方法二,將日志寫入SLS中。與方法一類似,只是LOG配置項稍有差異;下載和查看方法與方法一一致。
原文鏈接:https://developer.aliyun.com/article/781174?
版權聲明:本文內容由阿里云實名注冊用戶自發貢獻,版權歸原作者所有,阿里云開發者社區不擁有其著作權,亦不承擔相應法律責任。具體規則請查看《阿里云開發者社區用戶服務協議》和《阿里云開發者社區知識產權保護指引》。如果您發現本社區中有涉嫌抄襲的內容,填寫侵權投訴表單進行舉報,一經查實,本社區將立刻刪除涉嫌侵權內容。總結
以上是生活随笔為你收集整理的如何将实时计算 Flink 与自身环境打通?的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ClickHouse:人群圈选业务的大杀
- 下一篇: 连续三年蝉联第一,Flink 荣膺全球最