javascript
SpringAMQP--WorkQueue模型
WorkQueue
?
Work queues,也被稱為(Task queues),任務(wù)模型。簡(jiǎn)單來說就是讓多個(gè)消費(fèi)者綁定到一個(gè)隊(duì)列,共同消費(fèi)隊(duì)列中的消息。
當(dāng)消息處理比較耗時(shí)的時(shí)候,可能生產(chǎn)消息的速度會(huì)遠(yuǎn)遠(yuǎn)大于消息的消費(fèi)速度。長(zhǎng)此以往,消息就會(huì)堆積越來越多,無法及時(shí)處理。
此時(shí)就可以使用work 模型,多個(gè)消費(fèi)者共同處理消息處理,速度就能大大提高了。
消息發(fā)送
這次我們循環(huán)發(fā)送,模擬大量消息堆積現(xiàn)象。
在publisher服務(wù)中的SpringAmqpTest類中添加一個(gè)測(cè)試方法:
/*** workQueue* 向隊(duì)列中不停發(fā)送消息,模擬消息堆積。*/ @Test public void testWorkQueue() throws InterruptedException {// 隊(duì)列名稱String queueName = "simple.queue";// 消息String message = "hello, message_";for (int i = 0; i < 50; i++) {// 發(fā)送消息rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);} }消息接收
要模擬多個(gè)消費(fèi)者綁定同一個(gè)隊(duì)列,我們?cè)赾onsumer服務(wù)的SpringRabbitListener中添加2個(gè)新的方法:
@RabbitListener(queues = "simple.queue") public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消費(fèi)者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20); }@RabbitListener(queues = "simple.queue") public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消費(fèi)者2........接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(200); }注意到這個(gè)消費(fèi)者sleep了1000秒,模擬任務(wù)耗時(shí)。
測(cè)試
啟動(dòng)ConsumerApplication后,在執(zhí)行publisher服務(wù)中剛剛編寫的發(fā)送測(cè)試方法testWorkQueue。
可以看到消費(fèi)者1很快完成了自己的25條消息。消費(fèi)者2卻在緩慢的處理自己的25條消息。
也就是說消息是平均分配給每個(gè)消費(fèi)者,并沒有考慮到消費(fèi)者的處理能力。這樣顯然是有問題的。
能者多勞
在spring中有一個(gè)簡(jiǎn)單的配置,可以解決這個(gè)問題。我們修改consumer服務(wù)的application.yml文件,添加配置:
spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能獲取一條消息,處理完成才能獲取下一個(gè)消息總結(jié)
Work模型的使用:
-
多個(gè)消費(fèi)者綁定到一個(gè)隊(duì)列,同一條消息只會(huì)被一個(gè)消費(fèi)者處理
-
通過設(shè)置prefetch來控制消費(fèi)者預(yù)取的消息數(shù)量
總結(jié)
以上是生活随笔為你收集整理的SpringAMQP--WorkQueue模型的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: SpringAMQP--入门案例的消息接
- 下一篇: SpringAMQP--发布订阅模型介绍