apache beam入门之初次使用
beam入門寶典之初次使用
咱們不多廢話,先直接來如何簡單使用beam框架。
這里我不使用常見的wordCount做例子,而是一個大寫轉小寫的例子,語言選用java語言
這個例子里我們會初步學到:
首先我們要新建1個maven工程,然后在pom.xml中加入如下依賴:
<dependency><groupId>org.apache.beam</groupId><artifactId>beam-runners-direct-java</artifactId><version>${beam.version}</version> </dependency>beam.version版本選擇beam官網上最新,筆者編寫此文時使用的版本是2.13.0
接著我們新建1個HowToCreateAndShowData類,然后開始例子
建立管道
任何beam程序,都需要先建立1個管道選項option,再建立1個初始管道
// 建立選項 PipelineOptions pipelineOptions = PipelineOptionsFactory.create(); // 建立管道 Pipeline pipeline = Pipeline.create(pipelineOptions);關于選項option和pipeline的更多用法,后面的章節會繼續介紹
手動生成數據
我們有了pipeline之后,就要往里面塞入數據
beam里提供了手動輸入數據的方式,如下:
我們調用pipeline的apply方法來輸入1個Create對象,里面的元素就是我們的輸入元素
并且返回1個PCollection的對象,我們稱之為數據集。
<String>指的是數據集中數據的類型
如何轉換
要實現轉換,需要先編寫1個DoFn的子類,并實現processElement方法,代碼和講解如下:
// 把字符串轉成小寫的轉換方法類 // DoFn<String,String>中的第一個String是輸入的類型,第二個String是輸出的類型 static class StrToLowerCaseFn extends DoFn<String, String> { /** * processElement,過程元素處理方法,類似于spark、mr中的map操作 * 必須加上@ProcessElement注解,并實現processElement方法 * @param context */ @ProcessElement public void processElement(ProcessContext context) {// 從管道中取出的1個元素String inputStr = context.element();// 轉成大寫String outputStr = inputStr.toLowerCase();// 輸出結果context.output(outputStr);} }接著將這個計算方法,用數據集.apply(ParDo.of(計算類))的方式組裝到剛才的pcStart中
// 組裝小寫轉換 PCollection<String> pcMid = pcStart.apply(ParDo.of(new StrToLowerCaseFn()));如何輸出
輸出的話,我們可以按照上面的方法再編寫1個DoFn子類,用于將數據集中輸入的元素打印到控制臺
// 打印結果方法類 // 因為不需要再往下輸出,所以 static class PrintStrFn extends DoFn<String, Void> { /** * processElement,過程元素處理方法,類似于spark、mr中的map操作 * 必須加上@ProcessElement注解,并實現processElement方法 * @param context */ @ProcessElement public void processElement(ProcessContext context) {// 從管道中取出的1個元素String inputStr = context.element();// 輸出System.out.println(inputStr);} }然后組裝
// 組裝輸出操作 pcMid.apply(ParDo.of(new PrintStrFn()));運行
剛才的3次apply結束后,其實轉換都還沒有開始,僅僅只是組裝計算拓撲的1個流程。
真正開始計算需要調用下面的代碼:
執行main方法,輸出如下結果:
image.png
完整代碼
/*** The howToCreateAndShowData** */ public class HowToCreateAndShowData {public static void main(String[] args) {PipelineOptions pipelineOptions = PipelineOptionsFactory.create();Pipeline pipeline = Pipeline.create(pipelineOptions);// 生成初始的輸入數據// 相當于往管道里塞入了3個自己寫的字符串元素PCollection<String> pcStart = pipeline.apply(Create.of("HELLO!","THIS IS BEAM DEMO!","HAPPY STUDY!"));// 組裝小寫轉換PCollection<String> pcMid = pcStart.apply(ParDo.of(new StrToLowerCaseFn()));// 組裝輸出操作pcMid.apply(ParDo.of(new PrintStrFn()));// 運行結果pipeline.run().waitUntilFinish();}// 把字符串轉成小寫的轉換方法類// DoFn<String,String>中的第一個String是輸入的類型,第二個String是輸出的類型static class StrToLowerCaseFn extends DoFn<String, String> {/*** processElement,過程元素處理方法,類似于spark、mr中的map操作* 必須加上@ProcessElement注解,并實現processElement方法** @param context*/@ProcessElementpublic void processElement(ProcessContext context) {// 從管道中取出的1個元素String inputStr = context.element();// 轉成大寫String outputStr = inputStr.toLowerCase();// 輸出結果context.output(outputStr);}}// 打印結果方法類// 因為不需要再往下輸出,所以static class PrintStrFn extends DoFn<String, Void> {/*** processElement,過程元素處理方法,類似于spark、mr中的map操作* 必須加上@ProcessElement注解,并實現processElement方法** @param context*/@ProcessElementpublic void processElement(ProcessContext context) {// 從管道中取出的1個元素String inputStr = context.element();// 輸出System.out.println(inputStr);}} }總結
以上是生活随笔為你收集整理的apache beam入门之初次使用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 皮克斯开源_皮克斯的故事讲述规则适合网页
- 下一篇: BIEE使用体验 – 关于时间格式