全局记录RabbitMQ的消费者消息日志
還是為了方便不同環(huán)境的問題排查,需要記錄 消費(fèi)者收到的所有消息,最好也能記錄一下每個(gè)消息的處理時(shí)長(zhǎng),哈哈。
注:本文的完整代碼,已經(jīng)上傳到:Github代碼
通過分析springframework.amqp代碼,發(fā)現(xiàn)
RabbitListener注解的消費(fèi)者,是通過 SimpleMessageListenerContainer 類在處理監(jiān)聽:
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
而 SimpleMessageListenerContainer 類是由 SimpleRabbitListenerContainerFactory 工廠類創(chuàng)建的:
org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory
SimpleRabbitListenerContainerFactory 工廠類的父類提供了一個(gè) setAdviceChain 可以添加一些攔截器。
OK,我們就是在這里增加一個(gè)攔截器,以記錄日志。
為不對(duì)項(xiàng)目造成影響,我們不重新定義bean,選擇在已有的Bean上設(shè)置Advice,關(guān)鍵代碼:
@Configuration public class RabbitConfiguration implements BeanPostProcessor {@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {switch (beanName) {// (bean instanceof SimpleRabbitListenerContainerFactory) 如果項(xiàng)目未引用mq,這里會(huì)報(bào)錯(cuò):NoClassDefFoundErrorcase "rabbitListenerContainerFactory":// 修改原有Bean,避免new SimpleRabbitListenerContainerFactory 出現(xiàn)問題SimpleRabbitListenerContainerFactory factory = (SimpleRabbitListenerContainerFactory) bean;Advice myAdvice = new RabbitAdvice();Advice[] adviceList = factory.getAdviceChain();if (adviceList == null || adviceList.length <= 0) {adviceList = new Advice[]{myAdvice};} else {adviceList = Arrays.copyOf(adviceList, adviceList.length + 1);adviceList[adviceList.length - 1] = myAdvice;}factory.setAdviceChain(adviceList);下面是 RabbitAdvice 關(guān)鍵代碼:
@Slf4j public class RabbitAdvice implements MethodInterceptor {@Overridepublic Object invoke(MethodInvocation invocation) throws Throwable {long start = System.currentTimeMillis();try {return invocation.proceed();} finally {long cost = System.currentTimeMillis() - start;StringBuilder sb = new StringBuilder("收到消息, 處理耗時(shí):");sb.append(cost).append("ms\r\n");try {Object[] args = invocation.getArguments();for (Object obj : args) {sb.append(obj).append("\r\n");if (obj instanceof Message) {sb.append("body: ").append(new String(((Message) obj).getBody())).append("\r\n");}}log.debug(sb.toString());} catch (Exception exp) {sb.append("Exception: ").append(exp.getMessage()).append("\r\n");log.error(sb.toString());}}} }OK,你的消費(fèi)者代碼該怎么定義,就怎么定義:
@RabbitListener(queues = Producer.QUEUE) void handler(@Payload Dto dto, @Headers Map<String, Object> headers) {System.out.println("我收到消息了:" + dto); }當(dāng)收到消息時(shí),日志里會(huì)出現(xiàn)如下格式的日志:
2020-11-05 11:21:20.604 DEBUG 16412 --- [ntContainer#0-1] b.c.demolograbbitmq.rabbit.RabbitAdvice : 收到消息, 處理耗時(shí):53ms Cached Rabbit Channel: AMQChannel(amqp://admin@10.2.3.250:5672/,1), conn: Proxy@3676ac27 Shared Rabbit Connection: SimpleConnection@5c77053b [delegate=amqp://admin@10.2.3.250:5672/, localPort= 56004] (Body:'{"clas":"beinet.cn.demolograbbitmq.DemoLogRabbitmqApplication$$EnhancerBySpringCGLIB$$57eda434","method":null,"para":null,"result":null,"costTime":2147483647,"remark":"這是測(cè)試消息","exp":null}' MessageProperties [headers={aaa=bbb, __TypeId__=beinet.cn.demolograbbitmq.util.Dto}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=001.ybl, deliveryTag=1, consumerTag=amq.ctag-RX1jaLWzHpqhPkG9Ma6O7w, consumerQueue=001.ybl]) body: {"clas":"beinet.cn.demolograbbitmq.DemoLogRabbitmqApplication$$EnhancerBySpringCGLIB$$57eda434","method":null,"para":null,"result":null,"costTime":2147483647,"remark":"這是測(cè)試消息","exp":null}總結(jié)
以上是生活随笔為你收集整理的全局记录RabbitMQ的消费者消息日志的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 用php编写一个同学录,PHP+DBM的
- 下一篇: 引流脚本有没有效果,引流脚本是什么