kafka netty_惠而浦:使用Netty和Kafka的微服务
kafka netty
介紹
在上一個博客中 ,我介紹了Netty用作Web服務器。 該示例運行良好……只要需要廣播服務器即可。
大多數情況下不是很有用。 更有可能的是,每個客戶端僅接收針對其的數據,并保留了特殊情況下的廣播,例如“服務器在15分鐘內停機!” 關于該特定服務器示例的另一件事是,一切都是獨立的。 例如,單片應用程序很好,但是在當今環境中,分布式微服務要好得多。 可伸縮性和可靠性至關重要。
Netty和Kafka在一起很棒。 Netty擅長處理大量客戶,Kafka擅長使大量服務協同工作。 結合起來,它們是開發中的最佳選擇。 但是,有些“陷阱”可能會使其變得繁瑣。 該博客以及示例微服務/ Netty體系結構和功能全面的代碼將有望幫助減輕煩惱并實現甜味。
第一要務
示例代碼位于此處 。
有詳細的自述文件,描述了設置環境所需的內容。 我試圖將需求降到最低,僅Java 8和Maven 。 SLF4J和Logback用于記錄日志。 我為Mac OSX和Ubuntu設置了腳本(我在Parallels容器中運行的14.04版本是我測試過的腳本),因此如果您在Windows上進行開發,則表示歉意。 該代碼全是Java,并且我在Windows上看到過Kafka教程,因此所有內容都應在此處運行。 Maven構建也應該產生可以啟動的目標,因此,在安裝Zookeeper / Kafka的時候加了一點肘油(您可以按照腳本查看需要的設置),手動運行它并不重要。視窗。
注:如README.md中所述,該腳本將刪除任何現有的Zookeeper / Kafka安裝和數據。 如果您已有設置,請不要使用腳本!
安裝和配置必備mvn package如果不使用腳本,請運行mvn package如果是,則運行maclocal_run.sh (或linuxlocal_run.sh )。 該腳本將下載Zk / Kafka(如果尚未下載),進行安裝,配置,啟動它們,運行mvn package ,啟動服務并最終啟動服務器。 一旦啟動,就抵制離開外殼的沖動,因為它會自動為架構的每個部分彈出新的選項卡。 啟動Whirlpool服務器之后,就可以開始了。
我強烈建議創建一個腳本,以在本地安裝,配置,構建和啟動微服務環境。 創建每個單獨的服務是一個很大的痛苦。 必要時也可以使用Docker,但我發現只需本地運行所有內容,下載所需的內容就少得多。
作為一個預告片,這里是UI(您也可以從GitHub上的README.md看到它)。
- 要添加股票代碼,請輸入它(即“ GOOG”),然后單擊“股票”下的A按鈕。 要刪除它,請單擊X。
- 要添加一個網站來測試它是打開還是關閉,請鍵入完全限定的URL(即http://facebook.com ),然后單擊“ UpDown”下的A按鈕。 要刪除它,請單擊X。
- 要添加天氣檢查,請在中鍵入城市,州(即“芝加哥,il”),然后單擊“城市,州”下的A按鈕。 要刪除它,請單擊X。
- 由于訂閱與每個服務一起存儲在內存中,因此訂閱在頁面刷新甚至登錄/注銷(具有相同的用戶ID)后都不會丟失。 當然,“真實”系統將使用數據庫。
- 訂閱每10秒鐘更新一次,因此我不會壓倒Yahoo API,因此添加數據后請耐心等待。
建筑
在此示例中,我試圖考慮可能有用的良好通用服務。 我最終選擇了股票報價服務,“此網站是否正常運轉”服務以及氣象服務。 這些中的每一個都獨立于各自具有Kafka主題的其他主題運行。
我選擇配置Kafka的方式是每個服務使用一個命令主題,每個服務使用一個數據主題。 一切都可以只使用一個全局主題,讀者可以決定要處理的內容,但將其分離出來可以使其更加清晰和整潔。
這是數據如何通過Kafka流動的示意圖。 它是通過一個免費的基于Keyhole的基于Web的實用程序Mockola完成的 。 請注意,服務器知道所有主題,但是服務僅知道它們自己的主題。 cmd主題用于將命令發送到服務,而數據主題(在其上沒有-cmd主題)用于從服務發送數據。 同樣,所有這些都可以在一個bus主題上進行處理,但是通過將它們分離出來,可以更輕松地了解發生了什么。
服務
現在讓我們談談服務。 這三者非常相似,因此有一項基本服務可以完成大部分工作。 每個服務都有三個線程,由Java ExecutorService處理。 關于Executor服務的一件好事是,如果出現問題,它將自動重新啟動線程。 這有助于彈性。
每個服務通過告訴基類使用什么主題和命令主題來啟動自己。 然后,基類啟動三個線程:一個用于從cmd主題讀取命令,一個用于定期為客戶端收集數據,一個用于在數據主題上發送數據。 這些線程使用非阻塞Java并發類ConcurrentLinkedQueue和ConcurrentHashMap 。 哈希映射存儲每個用戶的訂閱集,隊列存儲準備發送給數據主題的響應。
每個服務的流程是同時工作的三個線程。 閱讀器使用Kafka使用者從其命令主題讀取命令。 根據命令,添加或刪除訂閱。 該線程非常笨拙,因為它不要求服務對請求進行任何驗證,而只是盲目地將發送給訂閱的內容添加進去。 生產代碼顯然會添加一個調用,要求服務在允許成功訂閱之前驗證命令。 創建一個響應以放置主題,然后等待下一個命令。
注意 :關于數據的一些話題。 我使用JSON作為傳輸格式,但是XML或您想要的其他任何內容也可以使用。 重要的是,每個人都同意數據格式并堅持使用。 通用模塊具有POJO類,這些類定義了數據將遵循的協定。 通常對所有消息有用的是時間戳,消息類型和客戶端的ID。
另一個有用的東西是到期時間戳。 這些示例消息永遠存在。 Message類僅查看Message的類型和ID。 服務器使用它來確定需要處理哪種類型的消息以及誰對該消息感興趣。 沒有這些,就很難甚至不可能處理數據。 現在,消息格式可以涉及很多,其中一些格式使用標題和部分來描述復雜的數據。 本示例嘗試使所有內容盡可能簡單。
凈值服務器
讓我們一次上一堂課。
NettyHttpFileHandler
與以前的博客相比,該類幾乎沒有變化。 可重用的片段已移至WebSocketHelper類。 該文件的主要用途是提供瀏覽器要求的文件。
WebSocket助手
可能令人困惑的第一項是類變量clientAttr 。 在Netty Channel中存儲數據要求將其附加到AttributeKey 。 這類似于Java并發類中的Atomic實例-它提供??了數據容器。 我們將存儲客戶端ID(在本例中為用戶名,但也可以很容易地作為會話ID),以便我們確定哪個Channel需要接收消息。
realWriteAndFlush()方法設置適當的標題,內容長度和cookie。 然后,它寫入并刷新HTTP響應。 線
channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);告訴Netty這是需要寫入客戶端的數據的結尾,因此Netty會將其發送出去。
特別說明 :關于cookie的創建,請確保未設置HTTP Only標志。 如果是,則JavaScript無法看到Cookie,也不會與WebSocket升級請求一起發送。 這樣一來,您就必須創建自己的頁面刷新管理和會話管理方法。
關于cookie的另一件事是使用Netty cookie編碼器的STRICT版本,因此它將不允許多個具有相同名稱的cookie。 我不確定何時允許這種情況發生。
WebSocketMessageHandler
這個類只是定義了一個接口WhirlpoolServerHandler使用交談的WhirlpoolMessageHandler 。
WhirlpoolMessageHandler
這是Netty和Kafka之間存在連接的地方。 兩個執行器處理一個讀取器線程和一個寫入器線程。
編寫器線程在請求??隊列中查找消息(有關這些消息在一分鐘內來自何處的更多信息),并將消息放置在適當的Kafka命令主題上。
閱讀器線程在Kafka數據主題上查找傳入消息,為每個主題查找正確的Channel,然后將消息寫入這些主題。
當客戶端通過WebSockets發送消息時, WhirlpoolServerHandler將確保已handleMessage()完整的消息,然后調用handleMessage() 。 該方法確定是否為有效消息,然后將請求添加到請求隊列中,以便讀取器線程可以將其提取并提供給Kafka。
WhirlpoolServerHandler
這堂課有幾件有趣的事。 首先,它可以區分HTTP,REST和WebSocket消息之間的區別。 執行此操作的Netty重寫方法是channelRead0 。 這是Netty用來告訴我們消息何時到達以及消息是哪種類型的方法。 對于HTTP和REST調用, handleHttpRequest調用handleHttpRequest ,對于handleWebSocketFrame將調用handleWebSocketFrame 。
如果存在cookie方法,則handleHttpRequest方法handleHttpRequest讀取該cookie。 在POST上,它會查找登錄和注銷信息。 對于登錄,它將找出用戶名/密碼,創建cookie,并防止多次使用相同的名稱登錄。 所有這些代碼將在應用程序的生產版本中添加額外的安全性進行拆分。 要注銷,它會查找Channel,清理,關閉它并使cookie過期。
對于WebSocketUpgrade ,它要求Netty處理啟動websocket所需的復雜握手。 完成此操作后,會將用戶添加到握手期間創建的Channel。 這是用戶連接到Channel的地方,如果cookie沒有在請求中出現,那將不是一件容易的事。
在此唯一需要注意的另一件事是,此類設置為處理為SPA(單頁應用程序)編碼的客戶端,因為它將將所有無法識別的調用重定向到index.html 。
該類中的其他方法更多地是為了提供信息,將在高級情況下使用。
漩渦服務器
此類啟動Netty服務器并創建通道管道。 這是Netty的一個標準類,緊隨Netty示例。
最后的想法
顯然,此代碼中還有更多內容。 每個服務和服務器的多個實例可以同時運行,并且Zk / Kafka可以集群以幫助提高彈性。 一個測試微服務應用程序彈性的強大實用程序是另一個名為TroubleMaker的免費開源Keyhole實用程序。 我還沒有機會測試這個例子,但是我很期待這個機會。
我們沒有涉及安全性,盡管我以前希望展示Netty與Shiro的集成,但這是一個非常復雜的話題。 我只能說這是有可能的,但是我還沒有將所有內容都包裹在腦海中,以至于無法形成一個連貫的博客。
希望您喜歡該博客,并找到有用的代碼。 通過博客或Twitter與我聯系( @johnwboardman ,在這里我總是很欣賞新的關注)。
翻譯自: https://www.javacodegeeks.com/2016/05/whirlpool-microservices-using-netty-kafka.html
kafka netty
總結
以上是生活随笔為你收集整理的kafka netty_惠而浦:使用Netty和Kafka的微服务的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: PHP环境变量配置(php环境 linu
- 下一篇: 现在的ddos一般流量多大(现在的DDO