rabbitmq基础10——消息追踪、Shovel插件的web端使用和命令使用
文章目錄
- 一、消息追蹤
- 1.1 Firehose功能
- 1.1.1 開啟與關閉
- 1.1.2 測試
- 1.1.3 總結
- 1.2 rabbitmq_tracing 插件
- 1.2.1 定義trace規則
- 1.2.2 測試
- 1.2.2.1 與Firehose之間的優先級
- 二、Shovel插件
- 2.1 實現原理
- 2.1.1 從隊列到交換器
- 2.1.2 從隊列到隊列
- 2.1.3 交換器到交換器
- 2.2 Shovel 插件使用
- 2.2.1 開啟Shovel功能
- 2.2.2 定義Shovel link
- 2.2.3 效果測試
- 2.2.4 集群測試
- 2.3 命令創建
- 2.3.1 創建Shovel link
- 2.3.2 查看Shovel狀態
一、消息追蹤
- 當有消息異常丟失時,可以通過跟蹤記錄消息的投遞過程,以此快速地定位問題。
- 消息丟失情況:
- 可能是生產者與 Broker 斷開了連接并且也沒有任何重試機制。
- 可能是消費者在處理消息時發生了異常,不過卻提前進行了 ak。
- 可能是交換器沒有與任何隊列進行綁定,生產者感知不到或者沒有采取相應的措施。
- 集群策略也有可能導致消息丟失。
1.1 Firehose功能
- 在rabbitmq中,Firehose 功能可以實現消息追蹤。Firehose 可以記錄每一次發送或者消費消息的記錄,方便rabbitmq的使用者進行調試、排錯等。
- Firehose原理:
- 是將生產者投遞給rabbitmq的消息,或者rabbitmq投遞給消費者的消息,按照指定格式發送到默認交換器上。
- 交換器: amq.rabbitmq.trace,是個topic 類型的交換器。
- 發送到這個交換器上的消息的綁定鍵為:publish.{exchangename}和deliver.{queuename}。
- exchangename:生產者投遞消息的交換器名稱。
- queuename: 消費者獲取消息的隊列名稱。
- 是將生產者投遞給rabbitmq的消息,或者rabbitmq投遞給消費者的消息,按照指定格式發送到默認交換器上。
1.1.1 開啟與關閉
- 開啟命令:rabbitmqctl trace_on [-p vhost]。
- 關閉命令: rabbitmqctl trace_off [-p vhost]。
- 注意事項:
- Firehose 默認情況下處于關閉狀態,并且 Firehose 的狀態也是非持久化的,會在rabbitmq服務重啟的時候還原成默認的狀態。
- Firehose 開啟之后會影響rabbitmq整體服務的性能,因為它會引起額外的消息生成、路由和存儲。
1.開啟Firehose。
2.查看。開啟之后在web頁面可以看到默認交換器amq.rabbitmq.trace。
1.1.2 測試
- 測試工作需確保 Firehose 處于開啟狀態。
2. 創建5個隊列與默認交換器amq.rabbitmq.trace綁定,用來接收生產者投遞給交換器消息這個過程的監控數據消息和消費者消費隊列里的消息這個過程的監控數據消息。
- 隊列分別為:
- queue_1: 當生產者給qingjun_exchange、baimu_exchange交換器發送消息時,發送到其中任何一個交換器,則該隊列就記錄一條消息。
- queue_2:消費者消費qingjun_queue、baimu_queue隊列里的消息時,消費其中任何一個隊列消息,則該隊列就記錄一條消息。
- queue_3:接受所有消息。
- queue_4:當生產者給qingjun_exchange交換器發送消息時,則該隊列就記錄一條消息。
- queue_5:當生產者給baimu_exchange交換器發送消息時,則該隊列就記錄一條消息。
- queue_6:消費者消費qingjun_queue隊列里的消息時,則該隊列就記錄一條消息。
- queue_7:消費者消費baimu_queue隊列里的消息時,則該隊列就記錄一條消息。
- 綁定鍵分別為:pulish.#、deliver.#、#、publish.qingjun_exchange、pulish.baimu_exchange、deliver.qingjun_queue、deliver.baimu_queue。
- qingjun_queue隊列正常收到一條消息。
- queue_3接受所有消息,所以也有一條。
- headers 屬性釋義:
- exchange_name:表示發送此條消息的交換器。
- routing_keys:表示與exchange_name綁定的路由鍵。
- properties:表示消息本身的屬性。比如delivery_mode 設置為 2,表示消息需要持久化處理。
- queue_4接受生產者給qingjun_exchange交換器發送的消息,所以也有一條。
4. 現在給baimu_exchange交換器發送一條消息“beijing”,并消費此消息。
- baimu_queue隊列正常收到一條消息。
- 消費baimu_queue里的一條消息,所以queue_7就有一條消息。
- queue_3接受所有消息。生產者給baimu_exchange發送一條消息時,queue_3隊列接受一條;消費baimu_queue一條消息時,queue_3再接受一條,所以一共增長了2條。
1.1.3 總結
- 在 Firehose 開啟狀態下,當有客戶端發送或者消費消息時,Firehose 會自動封裝相應的消息體,并添加詳細的 headers 屬性。所以我們可以細讀消息內容就可以判斷出這條消息是哪兒來的。
- 生產者發送消息時,封裝內容如下:
- 消費者消費消息時,封裝內容如下:
1.2 rabbitmq_tracing 插件
- rabbitmg_tracing 插件是在 Firehose 基礎上多了一層 GUI 的封裝,兩者作用差不多,會對流入流出的消息進行封裝,然后將封裝后的消息日志存入相應的 trace 文件之中。
- 命令使用:
- 開啟插件:rabbitmq-plugins enable rabbitmq_tracing
- 關閉插件:rabbitmq-plugins disable rabbitmq_tracing
- 定義trac任務參數釋義:
-
Name:自定義trace 任務名稱。
-
Format:表示輸出的消息日志格式,分Text 和JSON 兩種。
- Text 格式的日志方便閱讀。
- JSON 的格式方便程序解析。
- JSON 格式的 payload(消息體)默認會采用 Base64 進行編碼,如上面的“trace test payload.會被編碼“dHJhY2UgdGVzdCBwYXIsb2FkLg-=”
-
Tracer connection username :指定使用哪個用戶創建tracing。默認使用guest用戶來創建trace。
-
Tracer connection password:該賬戶的密碼。
-
Max payload bytes:表示每條消息的最大限制,單位為 B。
- 比如設置了此值為 10,那么當有超過 10B 的消息經過 RabbitMQ 流轉時,在記錄到 trace 文件時會被截斷。
-
Pattern: 設置匹配正則,和 Firehose 類似。
- “#”:匹配所有消息流入流出的情況,即當有客戶端生產消息或者消費消息的時候,會把相應的消息日志都記錄下來。
- “publish.#”:匹配所有消息流入的情況。
- “deliver.#”:匹配所有消息流出的情況。
- “#.amq.deliver”:追蹤所有從以“.amq”結尾隊列離開的消息。
- “#.myQueue”:追蹤所有從“myQueue”隊列離開的消息。
-
- 刪除trace規則和日志:
- 點擊trace旁邊的Stop按鈕就會關閉connection,并刪除創建時建立的Queue。
- 點擊Stop不會將trace的日志文件刪除,需要再點擊日志文件旁邊的Delete按鈕。
- 在Stop trace之前不要delete日志文件. 否則,trace仍會對消息進行追蹤,但不會將消息落盤。
1.2.1 定義trace規則
- rabbitmq重啟之后,trace規則會被刪除,但是日志仍然存在。
1.開啟插件。集群每個節點都需要開啟,節點之間不共享。
[root@node2 ~]# rabbitmq-plugins enable rabbitmq_tracing
2.監控頁面查看。在這里就可以定義規則了,版本的不同顯示的選項也不同,在老版本中這里還有虛擬主機的選擇。
3.定義一個tracing任務。生成之后就會有對應日志生成,默認存放路徑在/var/tmp/rabbitmq-tracing,也可以通過配置文件修改該日志存放路徑。
4.同時還會生成一個隊列,自動與amq.rabbitmq.trace交換器綁定。每定義一個tracing任務就會多一個隊列并與amq.rabbitmq.trace交換器綁定。我這里就定義了兩個tracing任務,所以有兩個隊列。
1.2.2 測試
1.2.2.1 與Firehose之間的優先級
- 注意我這里的queue_1到queue_6這幾個隊列是用來測試前文的Firehose功能的。打那個Firehose功能和rabbitmq_tracing插件同時開啟使用時,消息優先記錄到Firehose定義的隊列里,同時在trace日志也有記錄。
1.比如我給qingjun_exchange發送一條消息“武漢‘。queue_3和queue_4有消息進來,但是rabbitmq_tracing插件生成的隊列并沒有消息進來。
2.查看trace日志,有發送”wuhan“的記錄。
二、Shovel插件
- 俗稱”鏟子“,和 Federation 插件具備類似的數據轉發功能。
- Shovel 能夠可靠、持續地從一個 Broker 中的隊列(作為源端,即 source)拉取數據并轉發至另一個 Broker 中的交換器(作為目的端,即 destination)。
- 作為源端的隊列和作為目的端的交換器可以同時位于同一個 Broker 上,也可以位于不同的Broker 上。
- Shovel 主要優勢:
- 松耦合。Shovel 可以移動位于不同管理域中的 Broker(或者集群)上的消息,這些 Broker(或者集群)可以包含不同的用戶和 vhost,也可以使用不同的 RabbitMQ 和 Erlang版本。
- 支持廣域網。Shovel 插件同樣基于 AMQP 協議在 Broker 之間進行通信,被設計成可以容忍時斷時續的連通情形,并且能夠保證消息的可靠性。
- 高度定制。當 Shovel 成功連接后,可以對其進行配置以執行相關的 AMQP 命令。
2.1 實現原理
2.1.1 從隊列到交換器
- 如圖一共有兩個Broker:
- Broker_1在192.168.130.129上,Broker_2在192.168.130.128上。
- Broker_1中有qingjun_exchange交換器,通過綁定鍵qingjun_key和qingjun_queue隊列綁定。
- Broker_2中有baimu_exchange交換器,通過綁定鍵baimu_key和baimu_queue隊列綁定。
- 開啟Broker_2上的Shovel插件,配置一條Shovel link連接。
- 生產者發送消息到qingjun_exchange交換器上時,消息數據最終流轉存儲在baimu_queue隊列中。
2.1.2 從隊列到隊列
- 上圖是一般配置,在使用 Shovel 插件時,以配置隊列作為源端,交換器作為目的端。同樣也可以將隊列配置為目的端,如下圖。
- 消息雖然是從qingjun_queue隊列通過 Shovel link 連接到baimu_queue隊列,但實際上是經由 Broker_2 的交換器轉發,只不過這個交換器是默認的交換器而已。
2.1.3 交換器到交換器
- 同樣也可以將交換器作為源端。
- 消息雖然是從qingjun_exchange交換器通過 Shove link 連接轉發到 baimu_exchange交換器上。實際上rabbitmq會在 Broker_1 中新建一個隊列并綁定 qingjun_exchange交換器,消息先從qingjun_exchange交換器存儲在這個隊列,然后 Shovel 再從這個隊列中拉取消息進而轉發至baimu_exchange交換器上。
- 注意事項:
- 如上圖所示中的交換器和隊列,可以在Broker開啟插件之前創建,也可以在開啟插件之后在創建。
- 當Broker中有集群時,可以配置所有節點地址,這樣可以使得源端或者目的端的 Broker 失效后能夠嘗試重連到其他 Broker之上(隨機挑選)。
- 還可以設置 reconnect_delay 參數,從而避免由于重連行為導致的網絡泛洪,或者可以在重連失敗后直接停止連接。針對源端和目的端的所有配置聲明會在重連成功之后被重新發送。
2.2 Shovel 插件使用
- 現有兩個rabbitmq服務,分別在192.168.130.128和192.168.130.129服務器上,元數據信息如下:
- 192.168.130.129上有qingjun_exchange交換器和qingjun_queue隊列,綁定鍵為qingjun_key。
- 192.168.130.128上有baimu_exchange交換器和baimu_queue隊列,綁定鍵為baimu_key。
- 現在需要把192.168.130.129qingjun_queue隊列里的消息轉發給192.168.130.129的baimu_queue隊列。
2.2.1 開啟Shovel功能
第一步,在192.168.130.128上開啟Shovel功能和管理功能。
[root@node1 ~]# rabbitmq-plugins enable rabbitmq_shovel[root@node1 ~]# rabbitmq-plugins enable rabbitmq_shovel_management
第二步,查看。開啟管理功能之后右側就多出“Shovel Status”和“Shovel Management”兩個 Tab 頁。
- rabbitmq_shovel_management 插件依附于 rabbitmq_management 插件,所以開啟rabbitmq_shovel_management 插件的同時默認也會開啟rabbitmq_management 插件。
2.2.2 定義Shovel link
- 參數釋義:
- Virtual host:選擇本地虛擬主機,這個每個虛擬主機里的交換器和隊列都不一樣。
- Name:自定義Shovel link規則名稱。
- Source:選擇源端的AMQP版本,這個在之前都沒有,官方優化了。
- URI:源端地址,格式為amqp://user:password@IP:5672。集群多節點填寫時,用一個空格隔開。
- Queue/Exchange:選擇從哪個隊列或交換器里取消息,并填寫其名稱。
- Prefetch count:表示 Shovel 內部緩存的消息條數,可以參考 Federation 的相關參數。
- Auto-delete:有兩個選項,Never或After。前者表示從來沒有,永遠不會自己刪除,它將一直存在,直到顯式刪除;后者表示初始長度轉移后,Slovel啟動時會檢查隊列的長度,它會傳輸這么多消息,然后刪除自己。
- Destination:選擇目的地端的AMQP版本。
- URI:目的地端地址,格式為amqp://user:password@IP:5672。
- Queue/Exchange:選擇接受消息的隊列或交換器,并填寫其名稱。
- Add forwarding headers:默認為No,表示不給轉走的消息添加頭部信息,以指示它們從哪里被鏟走和被鏟到哪里。建議開啟選擇yes。
- Reconnect delay:表示在 Shovel link 失效時,重新建立連接前需要等待的時間。單位為秒。如果設置為 0,則不會進行重連動作,即 Shovel 會在首次連接失效時停止工作。reconnect delay 默認為5秒。
- Acknowledgement mode:表示在完成轉發消息時的確認模式,共有三種取值:
- No ack:表示無須任何消息確認行為。
- Nn publish:表示 Shovel 會把每一條消息發送到目的端之后再向源端發送消息確認。
- On confirm:表示 Shovel 會使用 publisher confirm 機制,在收到目的端的消息確認之后再向源端發送消息確認。也是默認設置,并強烈建議使用該種模式,可靠性高。
第三步,定義Shovel link規則。注意這里一定不能填錯,要取哪個交換器或隊列的數據?給到目的地的交換器還是隊列?地址格式,還是注意虛擬主機。
第四步,定義完了之后,可以查看狀態,顯示綠色代表已經接通,紅色就是有問題。
2.2.3 效果測試
第五步,在源端qingjun_exchange交換器發送一條消息”wuhan“,查看qingjun_queue隊列沒有這條消息,消息已經轉發給baimu_queue隊列,查看此隊列存在該條消息。
2.2.4 集群測試
1.如圖,我配置了第二條Shovel link,相比單個來說就是多了幾個源端地址,中間用一個空格隔開。
2.查看狀態,綠色運行中,沒問題。
3.給qingjun_exchange交換器發送第二條消息”beijing“,baimu_queue隊列成功收到。
4.此時關閉192.168.130.130上的rabbitmq服務,模擬從節點異常,集群中的其他節點正常。注意不能時主節點,不然從節點是不能寫的。
5.在第二個從節點192.168.130.131上發送第三條消息”999“,baimu_queue隊列收到第三條消息,說明我們配置的第二個Shovel link目的已經達到。
2.3 命令創建
- 也可以通過rabbitmqctl命令來生成Shovel link。
- 相關參數:
- name:test_Shovel
- source-uri(源地址):amqp://user:password@ip:2576
- src-queue(源隊列):qingjun_queue
- dest-uri(目的地址):amqp://user:password@ip:2576
- dest-queue(目的隊列):wuhan_queue
- prefetch-count(內部緩存的消息條數):30
- reconnect-delay(shovel失效情況下,重新建立連接需要等待的秒數):5秒
- publish-properties(發往目的端需要設置的屬性):空,代表不設置。
- ack-mode(轉發消息時確認):on-confirm
2.3.1 創建Shovel link
1.添加一個Shovel link ,名稱叫test_Shovel。
[root@node1 ~]# rabbitmqctl set_parameter shovel test_Shovel '{"src-uri":"amqp://qingjun:citms@192.168.130.129:5672","src-queue":"qingjun_queue","dest-uri":"amqp://qingjun:citms@192.168.130.128:5672","dest-queue":"wuhan_queue","prefetch-count":30,"reconnect-delay":5,"publish-properties":[],"ack-mode":"on-confirm"}'2.查看其狀態。
3. 在源端發送一條消息“888”。
4.在目的地端查看結果。wuhan_queue已被創建,并收到消息。
2.3.2 查看Shovel狀態
- 命令格式:rabbitmqctl eval ‘rabbit_shovel_status:status().’
- 參數釋義:
- 第一行:顯示該shovel link所在的虛擬空間和名稱。名稱是漢字則顯示不出來。
- 第二行:運行狀態和參數顯示,狀態有三種顯示。
- 當 Shovel 處于啟動、連接和創建資源時,顯示starting。
- 當 Shovel 正常運行時,顯示running。
- 當Shovel終止時時,顯示terminated。
- 最后一行:時間戳顯示,年月日,時分秒。
總結
以上是生活随笔為你收集整理的rabbitmq基础10——消息追踪、Shovel插件的web端使用和命令使用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 改性塑料增强尼龙66是怎么跑到汽车里
- 下一篇: 国外知名的源代码网站(转)