使用var,Lombok和Fluxtion轻松处理事件
介紹
在本文中,我將結合使用Lombok和Fluxtion這兩種產(chǎn)品,以演示工具如何在減少代碼編寫和交付時間的同時提高代碼的可讀性。 使用Java 10中的var可以進一步改善這種情況。 產(chǎn)品和var都在構建時使用推斷來加速開發(fā)。
Fluxtion的精神是最大程度地減少浪費,我們這里的目標是刪除樣板代碼,減少代碼噪聲并簡化集成任務。 我們希望花費盡可能少的開發(fā)時間,同時仍然提供能夠每秒處理數(shù)百萬條消息的高效且高性能的解決方案。
使用所描述的技術,我將Fluxtion / Lombok實現(xiàn)與使用Akka流的scala示例進行了比較,Java版本需要較少的代碼,并且更易于構建。
家政服務,對未承認Richard Warburton的道歉
Opsian ,在我的第一個博客中 。
代碼信噪比
在編寫代碼時,我們要解決兩個主要任務:
- 將業(yè)務需求轉換為程序邏輯
- 將邏輯與部署環(huán)境接口
理想情況下,我們希望將所有時間都花在第一個而不是第二個上。 另外,應減少編寫的代碼總量。 平衡抽象同時又要賦予開發(fā)人員權力并不容易,抽象太大了,我們就失去了表現(xiàn)力。 我希望能夠與本文采用的方法取得良好的平衡。
想象一下,編寫一些占用50行的稅收計算邏輯,但是編寫數(shù)據(jù)庫,Web服務器,編組,日志記錄等代碼需要1000行。 盡管演示了技術能力,但在純技術實施細節(jié)中并沒有商業(yè)價值。 從另一個角度來看,我們可以將業(yè)務邏輯視為一種信號,將基礎架構代碼視為一種噪聲。 我們編寫的解決方案可以相對于有用的業(yè)務邏輯用信噪比來衡量。
維基百科將信噪比定義為:
信噪比(縮寫為SNR或S / N)是用于
將所需信號的電平與背景噪聲的電平進行比較的科學和工程學 。 SNR定義為信號功率與噪聲功率之比,通常以分貝表示。 比率大于1:1(大于0 dB)表示信號多于噪聲。
在大多數(shù)系統(tǒng)中,希望以高SNR比為目標,從編程的角度來看,高SNR的一些優(yōu)點是:
- 減少編寫代碼
- 易于理解和維護的業(yè)務邏輯
- 學習曲線更短
- 調試/故障查找更簡單,出錯更少
- 更有效的發(fā)展
在Java中,多年來,從重量級的j2ee容器到更簡單的框架(如spark和spring boot) ,我們一直感受到提高代碼SNR的壓力。 語言本身通過引入諸如lambda,流,方法引用和var變量聲明之類的更改來適應了這一轉變。
結合通量和Lombok
在該示例之前,先快速入門有關通量和Lombok。
助焊劑
Fluxtion是用Java編寫的可嵌入流事件處理引擎。 開發(fā)人員以聲明式和命令式的混合形式描述處理過程,以便Fluxtion可以生成決策引擎。 該引擎已序列化為Java代碼,并且可以嵌入任何Java應用程序中。 該應用程序將事件輸入引擎以進行流處理。
引擎生成可以在應用程序中內聯(lián)發(fā)生,也可以通過maven插件在生成過程中進行。
Lombok底漆
Lombok是一種實用程序,可以自動為Java類編寫樣板代碼,從而節(jié)省開發(fā)人員時間并減少代碼噪音。 作為注釋處理工具執(zhí)行時,Lombok會生成表示已注釋類的樣板代碼的字節(jié)代碼。 Lombok功能集不完整包括:
- 屬性的自動bean樣式獲取器和設置器
- 哈希代碼和為屬性生成的等于
- 自動toString方法
- 所有類屬性的自動構造函數(shù)
只需將Lombok添加到您的Maven構建中,您的ide應該就可以使用,或者與netbeans和intellij一起使用。
流最大溫度示例
讓我們看一下常見的Fluxtion使用模式。 訂閱事件流,從事件中提取值,對該值執(zhí)行計算,過濾并將結果壓入用戶對象。 在這個簡單的示例中,我們滿足以下要求:
- 傾聽溫度事件
- 提取溫度
- 保持最高溫度
- 當出現(xiàn)新的最高溫度時,將溫度推入用戶定義的實例中
克隆的回購從GitHub和使用本文的標記版本。 該項目在這里 。
git clone --branch? article_lombok_july2019 https://github.com/gregv12/articles.gitcd articles/2019/june/lombok/
滿足處理要求的助焊劑代碼:
select(TempEvent::getTemp).map(max()).notifyOnChange(true).push(new MyTempProcessor()::setMaxTemp);這提供了較高的代碼SNR和較低的行數(shù),所有代碼都是針對業(yè)務邏輯的。 要實現(xiàn)此功能,需要使用方法引用和類型推斷。 方法引用允許Fluxtion推斷所需的行為,要構建的功能,源和目標類型以及如何在執(zhí)行圖中將數(shù)據(jù)從一個節(jié)點傳遞到另一個節(jié)點。 方法參考為我們提供了一種表達任意邏輯的令人愉快的類型安全方式。 該工具采用的推論消除了開發(fā)人員的負擔,以明確表達每個處理步驟,從而為我們提供了一個低代碼環(huán)境。
在生成Fluxtion之后,序列化的流事件處理器為
在此 ,以Java代碼表示。 示例的測試在這里 。
輸出:
08:08:42.921 [main] INFO c.f.generator.compiler.SepCompiler - generated sep: D:\projects\fluxtion\articles\2019\june\lombok\target\generated-sources\fluxtion\com\fluxtion\articles\lombok\temperature\generated\lombok\TempMonitor.java new max temp:10.0 new max temp:17.0 new max temp:24.0 Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 4.79 sec處理圖形圖像:
仔細觀察上面示例中的第一行select(TempEvent :: getTemp) ,我們可以檢查Fluxtion所做的推斷。 這里暗示的邏輯是:
- 為TempEvent類型的事件創(chuàng)建訂閱,
- 添加一個從傳入事件中提取getTemp值的節(jié)點
- 使臨時值可用作節(jié)點的Number屬性
- 當接收到溫度事件時,通知孩子改變溫度值。
映射,notifyOnChange和push函數(shù)是添加到執(zhí)行鏈中的步驟。 有關詳細信息,請參見Fluxtion流模塊的包裝器接口。 由于SNR高,因此很容易理解它們的目的和效果,但出于完整性考慮:
- map(max())從上一個節(jié)點(溫度)提取number屬性。 接收到新值時,將該值應用于有狀態(tài)max函數(shù)。 將當前最大值存儲在具有Number屬性的節(jié)點中。 接收事件時,通知任何子節(jié)點當前最大值。
- notifyOnChange一個有狀態(tài)函數(shù),在監(jiān)視的值已更新且與先前的值不同時觸發(fā)。 僅新的最大值傳播到子節(jié)點。
- push(new MyTempProcessor():: setMaxTemp)將用戶節(jié)點MyTempProcessor添加到執(zhí)行鏈中。 當由新的最大溫度觸發(fā)時,將節(jié)點的值推入MyTempProcessor的setMaxTemp中。 對原始類型執(zhí)行所有類型轉換,而不會產(chǎn)生垃圾。
要在TempEvent上使用方法引用,我們首先需要定義一個getter / setter樣式訪問器方法對。 當然,ide可以生成所需的方法,但是SNR在生成后仍會下降。 將其擴展到更大的域,問題成倍增加。 Lombok可以在這里為我們提供幫助,刪除不必要的代碼并恢復SNR。
在Lombok之前:
public class InlineNoLombok {public EventHandler handler() throws Exception {return sepInstance(c-> select(TempEvent::getTemp).map(max()).notifyOnChange(true).push(new MyTempProcessor()::setMaxTemp),"com.fluxtion.articles.lombok.temperature.generated.nolombok", "TempMonitor");}public static class TempEvent extends Event {private double temp;public TempEvent(double temp) {this.temp = temp;}public double getTemp() {return temp;}public void setTemp(double temp) {this.temp = temp;}}}Lombok之后:
添加單個@Data批注將刪除getter / setter,而@AllArgsConstructor則刪除構造函數(shù):
public class InlineLombok {public EventHandler handler() throws Exception {return sepInstance(c-> select(TempEvent::getTemp).map(max()).notifyOnChange(true).push(new MyTempProcessor()::setMaxTemp),"com.fluxtion.articles.lombok.temperature.generated.nolombok", "TempMonitor");}@Data@AllArgsConstructorpublic static class TempEvent extends Event {private double temp;} }即使使用Lombok和Fluxtion的最小示例,實際的業(yè)務邏輯也更易于閱讀。 更好的代碼SNR使應用程序的構建效率更高,更易于理解。
航班資料示例
讓我們將其擴展到一個更復雜的示例,其中高SNR的值變得顯而易見。 在此示例中,我們正在處理整個一年的航班數(shù)據(jù)。 該示例的靈感來自此博客 ,而akka流解決方案的代碼在此處 。 要求摘要:
處理以CSV格式存儲的一年的所有美國航班著陸記錄的價值
在這里 。
- 按名稱分組運營商
- 篩選延遲> 0的記錄
- 運營商名稱:第8列,延遲時間:第14列
- 對于運營商分組,計算:
- 總延遲累計
- 計算航班總數(shù),不考慮延誤
我們需要定義數(shù)據(jù)類型和處理邏輯來解決該問題。 解決方案中的噪聲很容易使您不知所措。 但是Fluxtion允許我們專注于業(yè)務邏輯,而Lombok使數(shù)據(jù)類型易于使用,兩種工具都使用推理來減少編寫代碼:
public class FlightAnalyser {@SepBuilder(name = "FlightDelayAnalyser",packageName = "com.fluxtion.articles.lombok.flight.generated")public void buildFlightProcessor(SEPConfig cfg) {var flightDetails = csvMarshaller(FlightDetails.class, 1).map(14, FlightDetails::setDelay).converter(14, defaultInt(-1)).map(8, FlightDetails::setCarrier).converter(8, Converters::intern).build();//filter and group byvar delayedFlight = flightDetails.filter(FlightDetails::getDelay, positive());var carrierDelay = groupBy(delayedFlight, FlightDetails::getCarrier, CarrierDelay.class);//derived values for a groupcarrierDelay.init(FlightDetails::getCarrier, CarrierDelay::setCarrierId);carrierDelay.avg(FlightDetails::getDelay, CarrierDelay::setAvgDelay);carrierDelay.count(CarrierDelay::setTotalFlights);carrierDelay.sum(FlightDetails::getDelay, CarrierDelay::setTotalDelayMins);//make public for testingvar delayByGroup = cfg.addPublicNode(carrierDelay.build(), "delayMap");//dump to console, triggers on EofEventprintValues("\nFlight delay analysis\n========================",delayByGroup, eofTrigger());}@Data //input data from CSVpublic static class FlightDetails {private String carrier;private int delay;}@Data //derived datapublic static class CarrierDelay {private String carrierId;private int avgDelay;private int totalFlights;private int totalDelayMins;}}實施分析
Lombok允許我們處理數(shù)據(jù)類和字段類型,而無需使用getter / setter方法。 我們定義輸入記錄FlightDetails和分組摘要記錄CarrierDelay。
使用var關鍵字進行中間實例分配可以簡化代碼的讀寫。
- 第8行 Fluxtion將csv映射到FlightDetails類型,該數(shù)字1表示要忽略的初始標題行。
- 第9行將第 14列映射到延遲值。 可選的轉換器函數(shù)將丟失的或非數(shù)字的延遲映射到值-1。 通過Fluxtion進行類型推斷可確保從零gc到char到int的轉換
- 第10行將第 8列映射到運營商名稱。 可以使用載體名稱,以減少不必要的String對象分配,因為我們希望相同的載體名稱會多次出現(xiàn)。 請記住,有700萬條記錄將大大降低gc壓力。
- 第12行 ,過濾器函數(shù)positive()應用于字段FlightDetails :: getDelay。 只有延遲的航班由子節(jié)點處理。
- 第13行的已過濾記錄delayFlight由關鍵字FlightDetails :: getCarrier分組,該組的目標是CarrierDelay。
- 第15行定義了新載波進入組的初始化函數(shù),僅當在組中分配了新密鑰時才調用。
- 第16行將平均值函數(shù)應用于延遲并設置值CarrierDelay:setAvgDelay
- 第17行將 count函數(shù)應用于延遲并設置值CarrierDelay:setTotalFlights
- 第18行將求和函數(shù)應用于延遲并設置值CarrierDelay:setTotalDelayMinutes
計算是有狀態(tài)的,并且對于每個承運人都有唯一的值,每次收到FlightDelay記錄時,相關承運人的計算都會更新。
- 第21行將delayMap分配為公共最終變量以協(xié)助測試
- 第22行在收到文件結束事件時打印映射值
性能
執(zhí)行2008年的航班分析,解壓縮Flight Csv數(shù)據(jù)并將文件位置傳遞到發(fā)行版中的可執(zhí)行jar。
java.exe -jar dist\flightanalyser.jar [FLIGHT_CSV_DATA]Flight delay analysis ======================== FlightAnalyser.CarrierDelay(carrierId=OO, avgDelay=31, totalFlights=219367, totalDelayMins=6884487) FlightAnalyser.CarrierDelay(carrierId=AA, avgDelay=35, totalFlights=293277, totalDelayMins=10414936) FlightAnalyser.CarrierDelay(carrierId=MQ, avgDelay=35, totalFlights=205765, totalDelayMins=7255602) FlightAnalyser.CarrierDelay(carrierId=FL, avgDelay=31, totalFlights=117632, totalDelayMins=3661868) FlightAnalyser.CarrierDelay(carrierId=DL, avgDelay=27, totalFlights=209018, totalDelayMins=5839658) FlightAnalyser.CarrierDelay(carrierId=NW, avgDelay=28, totalFlights=158797, totalDelayMins=4482112) FlightAnalyser.CarrierDelay(carrierId=UA, avgDelay=38, totalFlights=200470, totalDelayMins=7763908) FlightAnalyser.CarrierDelay(carrierId=9E, avgDelay=32, totalFlights=90601, totalDelayMins=2907848) FlightAnalyser.CarrierDelay(carrierId=CO, avgDelay=34, totalFlights=141680, totalDelayMins=4818397) FlightAnalyser.CarrierDelay(carrierId=XE, avgDelay=36, totalFlights=162602, totalDelayMins=5989016) FlightAnalyser.CarrierDelay(carrierId=AQ, avgDelay=12, totalFlights=1908, totalDelayMins=23174) FlightAnalyser.CarrierDelay(carrierId=EV, avgDelay=35, totalFlights=122751, totalDelayMins=4402397) FlightAnalyser.CarrierDelay(carrierId=AS, avgDelay=27, totalFlights=62241, totalDelayMins=1714954) FlightAnalyser.CarrierDelay(carrierId=F9, avgDelay=21, totalFlights=46836, totalDelayMins=992044) FlightAnalyser.CarrierDelay(carrierId=B6, avgDelay=42, totalFlights=83202, totalDelayMins=3559212) FlightAnalyser.CarrierDelay(carrierId=WN, avgDelay=26, totalFlights=469518, totalDelayMins=12633319) FlightAnalyser.CarrierDelay(carrierId=OH, avgDelay=34, totalFlights=96154, totalDelayMins=3291908) FlightAnalyser.CarrierDelay(carrierId=HA, avgDelay=18, totalFlights=18736, totalDelayMins=342715) FlightAnalyser.CarrierDelay(carrierId=YV, avgDelay=37, totalFlights=111004, totalDelayMins=4159465) FlightAnalyser.CarrierDelay(carrierId=US, avgDelay=28, totalFlights=167945, totalDelayMins=4715728)millis:2682加工性能分析:
file size? ? ? ? ? ?= 673 Mbrecord count? ? ? ? = 7,009,728
比較這兩種解決方案,我們觀察到以下幾點:
- Java版本使用的代碼少于scala版本
- 通量消除了定義圖的需要,而不僅僅是業(yè)務邏輯
- 手動構建圖形是錯誤的根源
- Lombok使數(shù)據(jù)類型像scala case類一樣簡潔
- var減少代碼膨脹
- 信噪比高,使代碼更易于維護和理解
- Fluxtion易于運行,不需要安裝服務器,只需編譯即可。
比較性能數(shù)字很困難,Akka版本只花了一分鐘時間來運行示例,但是我沒有足夠的Akka經(jīng)驗來驗證這一點。 此外,這是一個古老的博客,因此情況可能會繼續(xù)發(fā)展。
結論
我們著手證明,如果我們選擇了一套不錯的工具,那么Java可以成為事件流的簡潔語言。 Lombok和Fluxtion完美地結合在一起,使處理邏輯的聲明式定義既簡單又類型安全。 var的使用使代碼更具可讀性,更易于編寫。 所有這些的關鍵是推論,每個工具都推論出不同類型的行為,所有這些行為都使編碼人員不必顯式指定行為:
- var –類型推斷
- Lombok–推斷鍋爐板的實施
- 通量–推斷加工圖
對于Fluxtion,我們比較Akka版本如何要求開發(fā)人員明確定義處理圖。 這不適用于更大,更復雜的情況,并且將成為錯誤的來源。 更糟糕的是,業(yè)務邏輯被技術基礎架構所掩蓋,從而使維護成本在未來更加昂貴。
最后要指出的是,該解決方案的性能非常出色,每秒處理260萬條記錄且gc為零。 我希望您喜歡這份工作,并很想嘗試Fluxtion和Lombok。
致謝
AllSimon在github上 ,他對Fluxtion的貢獻使我開始嘗試Lombok
翻譯自: https://www.javacodegeeks.com/2019/07/easy-event-processing-with-var-lombok-and-fluxtion.html
總結
以上是生活随笔為你收集整理的使用var,Lombok和Fluxtion轻松处理事件的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 查找文件内容 linux(查找linux
- 下一篇: 阿里云ddos高防ip(阿里云ddos可