【原创】Kakfa utils源代码分析(一)
生活随笔
收集整理的這篇文章主要介紹了
【原创】Kakfa utils源代码分析(一)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
Kafka.utils,顧名思義,就是一個工具套件包,里面的類封裝了很多常見的功能實現——說到這里,筆者有一個感觸:當初為了閱讀Kafka源代碼而學習了Scala語言,本以為Kafka的實現會用到很多函數編程(Functional Programming, FP),結果目前來看,大部分還是很樸素地以面向對象的方式來實現的,只有很少一部分集合的處理使用諸如map,reduce這樣的FP方式。不能不說有點小小的遺憾。——當然也許后面Kafka的核心代碼中會看到更多FP的身影。
下圖就是kafka.utils包的所有代碼: 因為很難像其他包代碼之間有邏輯關系,我們就一個一個說吧: 一、Annotations.scala 這個源代碼文件中定義了3個注釋類:threadsafe、nonthreadsafe和immutable。它們都繼承了StaticAnnotation——Scala提供的StaticAnnotation類似于Java中的@Target(ElementType.TYPE),因此主要的作用域是類和接口。具體到這三個元注解(meta-annotation),很容易知道它們的含義:分別標記線程安全、非線程安全和不可變性。Kafka開發中常用到的SimpleConsumer類就是被標記為@threadsafe的。 二. CommandLineUtils.scala 這個文件使用JOpt Simple庫負責解析命令行參數,具體使用用法參見官網:http://pholser.github.io/jopt-simple/ Kafka在這個文件中提供了一個object:CommandLineUtils。具體包含的方法有: 1. printUsageAndDie: 打印命令使用方法并終止程序 2. checkRequiredArgs:使用Jopts Simple的API(以下皆同)檢查是否缺少必要參數 3. checkInvalidArgs:檢查指定的參數是否存在不兼容情況,即哪些參數不能同時使用 4. parseKeyValueArgs:解析key=value格式的參數對,并返回一個Properties對象 三、Crc32.scala 這個類就是CRC32校驗碼的實現類,來自于Hadoop提供的PureJavaCrc32類——CRC32校驗碼的純Java實現版本。這個類很長,里面有很多位操作,由于CRC32計算不在本次研究范圍,所以就了解到這吧。 四、DelayedItem.scala這個類是個泛型類,實現了java.util.Delayed接口。用于標記那些在給定延遲時間之后執行的對象。該類接收一個泛型T,一個延遲時間以及延遲時間的單位。另外,實現這個接口的話必須要實現一個compareTo和getDelay方法。 1. getDelay: 計算距離觸發時間還剩下多長時間 2. compareTo: 比較2個Delayed對象的延遲觸發時間 五、FileLock.scala 顧名思義,FileLock就是一個文件鎖,它的構造函數接收一個文件對象,并總是先嘗試創建這個文件(如果不存在的話),然后創建一個FileChannel對象對該文件進行隨機讀寫操作。同時創建一個java.nio.channel.FlieLock文件鎖對象用于實現下面的方法: 1. lock: 對文件加鎖,如果該文件上已有鎖拋出異常 2. tryLock: 嘗試對文件加鎖,如果成功返回true,否則返回false 3. unlock: 如果持有鎖使用FileLock.release方法釋放鎖 4. destroy: 先釋放鎖然后調用FileChannel的close方法銷毀該channel 六、IteratorTemplate.scala 這個文件視圖定義一個迭代器模板,主要為遍歷消息集合使用。迭代器模板有一個狀態字段,因此在定義迭代器模板抽象類之前首先定義了一個State狀態object,以及一組具體的狀態object:完成(DONE),READY(準備就緒),NOT_READY(未準備)和FAILED(失敗)。 之后就是定義IteratorTemplate抽象類了,它同時實現了trait Iterator和java Iterator接口——可謂迭代器領域的集大成者:) ? 如前所述,該類有個字段表明了迭代器的狀態:state,還有一個nextItem字段執行遍歷中的下一個對象,當然初始化為null——說起null,想到一個題外話。我很懷疑Kafka的開發人員是深度的Java編程人員亦或是強面向對象開發人員,Scala推薦使用Option來代替null的,可Kafka的代碼中null還是隨處可見,當然可能也是為了更好更自然地與Java集成。 ? 這個抽象類提供很多方法,但似乎只有一個抽象方法:makeNext,其他全是具體方法: 1. next:如果迭代器已遍歷完并無法找到下一項或下一項為空,直接拋出異常;否則將狀態置為NOT_READY并返回下一項 2. peek:只是探查一下迭代器是否遍歷完,如果是拋出異常,否則直接返回下一項,并不做非空判斷,也不做狀態設置 3. hasNext: 如果狀態為FAILED直接拋出異常,如果是DONE返回false,如果是READY返回true,否則調用maybeComputeNext方法 4. makeNext: 返回下一項,這是你需要唯一需要實現的抽象方法。同時你還需要在該方法中對狀態字段進行更新 5. maybeComputeNext:調用makeNext獲取到下一項,如果狀態是DONE返回false,否則返回true并將狀態置為READY 6. allDone: 將狀態置為DONE并返回null 7. resetStatus:顧名思義,就是重置狀態字段為NOT_READY 七、JSON.scala JSON的一個封裝類,用于JSON到String的相互轉換,該類不是線程安全的。Scala提供的JSON是將數字型的字符串轉化為Double,不過該類創建一個簡單函數用于將數字型字符串轉為換Integer,并指定其為JSON.globalNumberParser。該類只有2個方法: 1. parseFull: 調用scala JSON的parseFull方法將一個json字符串轉化為一個對象,如果出錯則拋出異常 2. encode: 講一個對象編碼成json字符串。這個對象只能是null,Boolean,String,Number,Map[String, T],Array[T]或Iterable[T]中的一種,否則會報錯
轉載于:https://www.cnblogs.com/huxi2b/p/4378439.html
總結
以上是生活随笔為你收集整理的【原创】Kakfa utils源代码分析(一)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 软件观念革命-交互设计精髓
- 下一篇: SQL获取当前时间、年、月、日等