javascript
RabbitMQ初步到精通-第十章-RabbitMQ之Spring客户端源码
目錄
第十章-RabbitMQ之Spring客戶端源碼
????????1. 前言
2. 客戶端消費代碼
2.1 消費的實現方式
2.2 消費中注解解釋
2.3?推測Spring實現過程
3.MQ消費源碼分析
3.1?集成SpringBoot 啟動過程
3.2 Broker投遞消息給客戶端過程
3.3 客戶端消費過程
4. 總結
第十章-RabbitMQ之Spring客戶端源碼
1. 前言
經過前面前面的學習,我們已經掌握了rabbitmq的基本用法,高級用法延遲隊列、死信隊列等,已經研究過了amqp-client的java客戶端源碼,由于我們在使用的時候,一般還是以SpringBoot為主,那經過Spring封裝后的客戶端源碼是是如何實現的呢?
同學們最好需要有研讀過 Spring源碼及SpringBoot 源碼的經驗,會更好銜接一下,不過關系也不大。
由于Spring 體系的龐大,封裝的rabbit客戶端實現的功能也很多,例 創建連接、生產者推送消息,事務,消費者消費等等內容,那我們這次只抽取rabbitmq消費的部分,進行研讀。
集成starter
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>2. 客戶端消費代碼
2.1 消費的實現方式
如之前我們提到的集成SpringBoot后的使用方式:
@RabbitHandler @RabbitListener(queues = "SolarWaterHeater") @RabbitHandler@RabbitListener(queuesToDeclare = @Queue("SolarWaterHeater")) @RabbitHandler@RabbitListener(bindings = @QueueBinding(value = @Queue("SolarWaterHeater-RedWine"),key = "REDWINE",exchange = @Exchange(value = "routing-exchange", type = ExchangeTypes.DIRECT, durable = "false")))2.2 消費中注解解釋
這里面出現了兩個注解
第一個:RabbitHandler 看下它的解釋:
* Annotation that marks a method to be the target of a Rabbit message * listener within a class that is annotated with {@link RabbitListener}.如果一天類上面的注解是RabbitListener,那RabbitHandler標注的方法,即是Rabbit的消息監聽者。
@Target({ ElementType.METHOD, ElementType.ANNOTATION_TYPE }) 這個注解只能標注到Method第二個?RabbitListener
1. Annotation that marks a method to be the target of a Rabbit message listener標注的方法是一個消息監聽者
2. When defined at the class level, a single message listener container is used to * service all methods annotated with {@code @RabbitHandler}如果標注到類上,那標注RabbitHandler的方法即是消息監聽
鏈一個:@RabbitListener和@RabbitHandler的使用_sliver1836的博客-CSDN博客
2.3?推測Spring實現過程
所以,我們后續的源碼分析即基于此兩個注解開展。
在開始看代碼之前,我們先想一想,我們之前的使用java amqp客戶端開發消費邏輯的過程,
1、創建連接
2、創建Channel
3、聲明隊列、Exchange、綁定關系
4、監聽方法實現 繼承DefaultConumer
5、basic.Consume 注冊到Broker
6、Broker消息推送,監聽方法實現消費
那現在Spring就靠兩個注解就幫我們實現了消息的消費,有沒有很神奇。頓時感嘆程序猿越來越幸福,寫代碼如此簡單了呢?但有利就有弊,Spring幫我們封裝的太多,而我們知道的底層卻太少了。
閑話少說,到這,大家想一下,如果讓你寫個注解,就去實現上面6個步驟的內容,你該如何去做呢?
開發自定義注解大家都應該做過,大致的邏輯應該是不是可以,在系統啟動的時候,我們就會抓取到標注注解的方法,有此類的方法時,我們認為需要使用mq,我們在后端服務中依次的去執行上面中的6個步驟。這樣把注解的方法實現了監聽,后續監聽消息進行消費。
這里只是一個大概的推測,大家自己自行發揮想像。
3.MQ消費源碼分析
從哪入手呢?首先點開?RabbitListener 的源碼,然后Download源碼。
到這個界面:
我們不再研讀RabbitListener這個注解的功能了,大家自己看。
然后緊接著看到?RabbitListenerAnnotationBeanPostProcessor
這個類有什么特點呢?首先是處理RabbitListener 的處理類,然后呢是一個BeanPostProcessor繼承了BeanPostProcessor 接口-讀過Spring源碼的同學,肯定就能得到最有效的信息了,這個類會在系統初始化的時候,執行postProcessAfterInitialization()這個方法。如果沒讀過Spring源碼的話就先跟著節奏走吧。
從這開始了我們的切入。
3.1?集成SpringBoot 啟動過程
接著上面的步驟呢,我們往上簡單倒一下,
首先 這是一個SpringBoot 項目,通過SpringBoot?的啟動類的Main 方法進行啟動,然后開始掃描各個組件,初始化各種信息,這個不再細聊。【需要讀SpringBoot源碼】
其次呢,SpringBoot 只是對Spring 的封裝,還是需要回到Spring 的類初始化的過程中去。【需要讀Spring源碼】
如下呢,即Spring 的核心初始化方法:無論Spring 再怎么升級,這幾個核心方法基本不會怎么變化了,這里面我們找到 【registerBeanPostProcessors】,從這里面就會觸發到我們上面所說的-
RabbitListenerAnnotationBeanPostProcessor 了。
@Overridepublic void refresh() throws BeansException, IllegalStateException {synchronized (this.startupShutdownMonitor) {// Prepare this context for refreshing.prepareRefresh();// Tell the subclass to refresh the internal bean factory.ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory();// Prepare the bean factory for use in this context.prepareBeanFactory(beanFactory);try {// Allows post-processing of the bean factory in context subclasses.postProcessBeanFactory(beanFactory);// Invoke factory processors registered as beans in the context.invokeBeanFactoryPostProcessors(beanFactory);// Register bean processors that intercept bean creation.registerBeanPostProcessors(beanFactory);// Initialize message source for this context.initMessageSource();// Initialize event multicaster for this context.initApplicationEventMulticaster();// Initialize other special beans in specific context subclasses.onRefresh();// Check for listener beans and register them.registerListeners();// Instantiate all remaining (non-lazy-init) singletons.finishBeanFactoryInitialization(beanFactory);// Last step: publish corresponding event.finishRefresh();}catch (BeansException ex) {if (logger.isWarnEnabled()) {logger.warn("Exception encountered during context initialization - " +"cancelling refresh attempt: " + ex);}// Destroy already created singletons to avoid dangling resources.destroyBeans();// Reset 'active' flag.cancelRefresh(ex);// Propagate exception to caller.throw ex;}finally {// Reset common introspection caches in Spring's core, since we// might not ever need metadata for singleton beans anymore...resetCommonCaches();}}}隨著Spring 的啟動,開始觸發到了RabbitListenerAnnotationBeanPostProcessor 中的?
postProcessAfterInitialization 方法。代碼:
這就很好解釋了,bean 就是我們的消費類,
解析到了 標有注解的方法 @RabbitListener,然后進行處理。processAmqpListener
@Overridepublic Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {Class<?> targetClass = AopUtils.getTargetClass(bean);final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this::buildMetadata);for (ListenerMethod lm : metadata.listenerMethods) {for (RabbitListener rabbitListener : lm.annotations) {processAmqpListener(rabbitListener, lm.method, bean, beanName);}}if (metadata.handlerMethods.length > 0) {processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName);}return bean;} protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {// 對應的消費方法Method methodToUse = checkProxy(method, bean);//封裝對象MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();endpoint.setMethod(methodToUse);// 繼續處理processListener(endpoint, rabbitListener, bean, methodToUse, beanName);}繼續:
protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,Object adminTarget, String beanName) {endpoint.setBean(bean);endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);endpoint.setId(getEndpointId(rabbitListener));endpoint.setQueueNames(resolveQueues(rabbitListener));endpoint.setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener.concurrency(), "concurrency"));endpoint.setBeanFactory(this.beanFactory);endpoint.setReturnExceptions(resolveExpressionAsBoolean(rabbitListener.returnExceptions()));Object errorHandler = resolveExpression(rabbitListener.errorHandler());if (errorHandler instanceof RabbitListenerErrorHandler) {endpoint.setErrorHandler((RabbitListenerErrorHandler) errorHandler);}else if (errorHandler instanceof String) {String errorHandlerBeanName = (String) errorHandler;if (StringUtils.hasText(errorHandlerBeanName)) {endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, RabbitListenerErrorHandler.class));}}else {throw new IllegalStateException("error handler mut be a bean name or RabbitListenerErrorHandler, not a "+ errorHandler.getClass().toString());}String group = rabbitListener.group();if (StringUtils.hasText(group)) {Object resolvedGroup = resolveExpression(group);if (resolvedGroup instanceof String) {endpoint.setGroup((String) resolvedGroup);}}String autoStartup = rabbitListener.autoStartup();if (StringUtils.hasText(autoStartup)) {endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup));}endpoint.setExclusive(rabbitListener.exclusive());String priority = resolve(rabbitListener.priority());if (StringUtils.hasText(priority)) {try {endpoint.setPriority(Integer.valueOf(priority));}catch (NumberFormatException ex) {throw new BeanInitializationException("Invalid priority value for " +rabbitListener + " (must be an integer)", ex);}}// 以上 前面都完成了對 MethodRabbitListenerEndpoint 對象的封裝,封裝的也都是注解中的屬性//此方法內部實際沒執行 跳過resolveAdmin(endpoint, rabbitListener, adminTarget);//跳過RabbitListenerContainerFactory<?> factory = resolveContainerFactory(rabbitListener, adminTarget, beanName);// 屬性填充 放入List ,不重要this.registrar.registerEndpoint(endpoint, factory);}程序回轉:
這里面來到一個
public void afterSingletonsInstantiated() 方法,這是由于實現了接口SmartInitializingSingleton, 后續得到了處理。這里面會涉及到兩個類:
1.?RabbitListenerEndpointRegistrar
2.?RabbitListenerEndpointRegistry
有沒有長得很像,這里面是把?RabbitListenerEndpointRegistry 手工注冊到了RabbitListenerEndpointRegistrar 里面,然后進行了一系列初始化,
這里面不再詳細展開了,但這個RabbitListenerEndpointRegistry 很重要,后面還會涉及到它
?RabbitListenerEndpointRegistry 實現了一個Lifecycle接口,后續會調用到它的實現start()
將對應的消費Class 做好了封裝 ,返回,繼續Spring的初始化過程。
來到Spring核心流程?
finishRefresh(); /*** Finish the refresh of this context, invoking the LifecycleProcessor's* onRefresh() method and publishing the* {@link org.springframework.context.event.ContextRefreshedEvent}.*/protected void finishRefresh() {// Clear context-level resource caches (such as ASM metadata from scanning).clearResourceCaches();// Initialize lifecycle processor for this context.initLifecycleProcessor();// Propagate refresh to lifecycle processor first.getLifecycleProcessor().onRefresh();// Publish the final event.publishEvent(new ContextRefreshedEvent(this));// Participate in LiveBeansView MBean, if active.LiveBeansView.registerApplicationContext(this);}其中第三個方法
getLifecycleProcessor().onRefresh();這個方法是獲取 lifecycle的處理器,進行lifecycle接口實現類的處理,這就呼應到了上面的?RabbitListenerEndpointRegistry ,他實現了lifecycle的接口。
最終一番流轉終于到了 這個Registry處理邏輯中:
@Overridepublic void start() {for (MessageListenerContainer listenerContainer : getListenerContainers()) {startIfNecessary(listenerContainer);}} /*** Start the specified {@link MessageListenerContainer} if it should be started* on startup or when start is called explicitly after startup.* @param listenerContainer the container.* @see MessageListenerContainer#isAutoStartup()*/private void startIfNecessary(MessageListenerContainer listenerContainer) {if (this.contextRefreshed || listenerContainer.isAutoStartup()) {listenerContainer.start();}} MessageListenerContainer 也是在上面afterSingletonsInstantiated 處理好的,現在要啟動這個監聽者容器。來到了?AbstractMessageListenerContainer 中的啟動方法:
/*** Start this container.* @see #doStart*/@Overridepublic void start() {if (isRunning()) {return;}if (!this.initialized) {synchronized (this.lifecycleMonitor) {if (!this.initialized) {afterPropertiesSet();}}}try {logger.debug("Starting Rabbit listener container.");configureAdminIfNeeded();checkMismatchedQueues();doStart();}catch (Exception ex) {throw convertRabbitAccessException(ex);}finally {this.lazyLoad = false;}} configureAdminIfNeeded() 獲取RabbitAdmin checkMismatchedQueues() 這個方法就很關鍵了,運行到此時打開我們的抓包工具,這里面開始創建Connection了。 protected void checkMismatchedQueues() {if (this.mismatchedQueuesFatal && this.amqpAdmin != null) {try {this.amqpAdmin.initialize();}catch (AmqpConnectException e) {logger.info("Broker not available; cannot check queue declarations");}catch (AmqpIOException e) {if (RabbitUtils.isMismatchedQueueArgs(e)) {throw new FatalListenerStartupException("Mismatched queues", e);}else {logger.info("Failed to get connection during start(): " + e);}}}else {try {// 創建連接方法Connection connection = getConnectionFactory().createConnection(); // NOSONARif (connection != null) {connection.close();}}catch (Exception e) {logger.info("Broker not available; cannot force queue declarations during start: " + e.getMessage());}}}有沒有很熟悉
Connection connection = getConnectionFactory().createConnection(); @Overridepublic final Connection createConnection() throws AmqpException {if (this.stopped) {throw new AmqpApplicationContextClosedException("The ApplicationContext is closed and the ConnectionFactory can no longer create connections.");}synchronized (this.connectionMonitor) {if (this.cacheMode == CacheMode.CHANNEL) {if (this.connection.target == null) {this.connection.target = super.createBareConnection();// invoke the listener *after* this.connection is assignedif (!this.checkoutPermits.containsKey(this.connection)) {this.checkoutPermits.put(this.connection, new Semaphore(this.channelCacheSize));}this.connection.closeNotified.set(false);getConnectionListener().onCreate(this.connection);}return this.connection;}else if (this.cacheMode == CacheMode.CONNECTION) {return connectionFromCache();}}return null; // NOSONAR - never reach here - exceptions}運行完此步,如上的代碼中,兩個重要的點:
1. 此步直接就創建了Connection、
this.connection.target = super.createBareConnection();看下抓包:
2. 繼續這一步也很關鍵,創建完連接后,會把接下來的 Exchange、Queue、綁定關系根據注解配置中的內容,該創建的都創建一遍。
getConnectionListener().onCreate(this.connection);直接運行到了
RabbitAdmin.initialize()看方法頭上的注釋也很清晰
/*** Declares all the exchanges, queues and bindings in the enclosing application context, if any. It should be safe* (but unnecessary) to call this method more than once.*/@Override // NOSONAR complexitypublic void initialize() {if (this.applicationContext == null) {this.logger.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings");return;}this.logger.debug("Initializing declarations");Collection<Exchange> contextExchanges = new LinkedList<Exchange>(this.applicationContext.getBeansOfType(Exchange.class).values());Collection<Queue> contextQueues = new LinkedList<Queue>(this.applicationContext.getBeansOfType(Queue.class).values());Collection<Binding> contextBindings = new LinkedList<Binding>(this.applicationContext.getBeansOfType(Binding.class).values());processLegacyCollections(contextExchanges, contextQueues, contextBindings);processDeclarables(contextExchanges, contextQueues, contextBindings);final Collection<Exchange> exchanges = filterDeclarables(contextExchanges);final Collection<Queue> queues = filterDeclarables(contextQueues);final Collection<Binding> bindings = filterDeclarables(contextBindings);for (Exchange exchange : exchanges) {if ((!exchange.isDurable() || exchange.isAutoDelete()) && this.logger.isInfoEnabled()) {this.logger.info("Auto-declaring a non-durable or auto-delete Exchange ("+ exchange.getName()+ ") durable:" + exchange.isDurable() + ", auto-delete:" + exchange.isAutoDelete() + ". "+ "It will be deleted by the broker if it shuts down, and can be redeclared by closing and "+ "reopening the connection.");}}for (Queue queue : queues) {if ((!queue.isDurable() || queue.isAutoDelete() || queue.isExclusive()) && this.logger.isInfoEnabled()) {this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue ("+ queue.getName()+ ") durable:" + queue.isDurable() + ", auto-delete:" + queue.isAutoDelete() + ", exclusive:"+ queue.isExclusive() + ". "+ "It will be redeclared if the broker stops and is restarted while the connection factory is "+ "alive, but all messages will be lost.");}}if (exchanges.size() == 0 && queues.size() == 0 && bindings.size() == 0) {this.logger.debug("Nothing to declare");return;}this.rabbitTemplate.execute(channel -> {declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));declareQueues(channel, queues.toArray(new Queue[queues.size()]));declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));return null;});this.logger.debug("Declarations finished");}由于我們只創建了Queue,使用默認的Exchange,代碼不貼太多了,只貼聲明Queue的內容:
DeclareOk declareOk = channel.queueDeclare(queue.getName(), queue.isDurable(),queue.isExclusive(), queue.isAutoDelete(), queue.getArguments());我們看下抓包情況:
?到此呢,Queue也聲明好了。下面呢,下面就該basic.Consume 了吧,把消費者注冊到Broker中去。
好,我們繼續:
繼續代碼又倒回去,倒到:
/*** Start this container.* @see #doStart*/@Overridepublic void start() {if (isRunning()) {return;}if (!this.initialized) {synchronized (this.lifecycleMonitor) {if (!this.initialized) {afterPropertiesSet();}}}try {logger.debug("Starting Rabbit listener container.");configureAdminIfNeeded();checkMismatchedQueues();doStart();}catch (Exception ex) {throw convertRabbitAccessException(ex);}finally {this.lazyLoad = false;}} doStart();一看doxxx,那一定是要干實際的事情的,很重要對吧,
我們進入到?
SimpleMessageListenerContainer中的實現方法中:
/*** Re-initializes this container's Rabbit message consumers, if not initialized already. Then submits each consumer* to this container's task executor.*/@Overrideprotected void doStart() {checkListenerContainerAware();super.doStart();synchronized (this.consumersMonitor) {if (this.consumers != null) {throw new IllegalStateException("A stopped container should not have consumers");}int newConsumers = initializeConsumers();if (this.consumers == null) {logger.info("Consumers were initialized and then cleared " +"(presumably the container was stopped concurrently)");return;}if (newConsumers <= 0) {if (logger.isInfoEnabled()) {logger.info("Consumers are already running");}return;}Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();for (BlockingQueueConsumer consumer : this.consumers) {AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);processors.add(processor);getTaskExecutor().execute(processor);if (getApplicationEventPublisher() != null) {getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));}}waitForConsumersToStart(processors);}}前面幾步意義不大,走到
int newConsumers = initializeConsumers(); protected int initializeConsumers() {int count = 0;synchronized (this.consumersMonitor) {if (this.consumers == null) {this.cancellationLock.reset();this.consumers = new HashSet<BlockingQueueConsumer>(this.concurrentConsumers);for (int i = 0; i < this.concurrentConsumers; i++) {BlockingQueueConsumer consumer = createBlockingQueueConsumer();this.consumers.add(consumer);count++;}}}return count;}重點來咯,
BlockingQueueConsumer consumer = createBlockingQueueConsumer();這里把BlockingQueueConsumer做了一個初始化,相關的不再展開。
BlockingQueueConsumer -這將是后續我們非常重要的一個類
繼續重點內容,回到我們上面代碼塊中的內容:
for (BlockingQueueConsumer consumer : this.consumers) {AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);processors.add(processor);getTaskExecutor().execute(processor);if (getApplicationEventPublisher() != null) {getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));}}這個for循環很重要了,由于我們是一個消費者,循環一次。
初始化一個
AsyncMessageProcessingConsumer對象。這個對象點進去,大家看下這是個實現了Runnable接口的線程對象。哦哦,真正的核心哦。使用 SimpleAsyncTaskExecutor? ?來new的線程,這個執行器可不是線程池哦,來一個線程就會New一個,大家自行研究。
這里面我們可以得到一個結論,就是一個消費者,就會開啟一個線程進行監聽。
從此開啟了新線程,【打斷點記得Thread模式】
看線程的實現:
@Override // NOSONAR - complexity - many catch blockspublic void run() { // NOSONAR - line countif (!isActive()) {return;}boolean aborted = false;this.consumer.setLocallyTransacted(isChannelLocallyTransacted());String routingLookupKey = getRoutingLookupKey();if (routingLookupKey != null) {SimpleResourceHolder.bind(getRoutingConnectionFactory(), routingLookupKey); // NOSONAR both never null}if (this.consumer.getQueueCount() < 1) {if (logger.isDebugEnabled()) {logger.debug("Consumer stopping; no queues for " + this.consumer);}SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);if (getApplicationEventPublisher() != null) {getApplicationEventPublisher().publishEvent(new AsyncConsumerStoppedEvent(SimpleMessageListenerContainer.this, this.consumer));}this.start.countDown();return;}try {initialize();while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {mainLoop();}}摘出核心點:
1、initialize();
private void initialize() throws Throwable { // NOSONARtry {redeclareElementsIfNecessary();this.consumer.start();this.start.countDown();}初始化內容,
1.? redeclareElementsIfNecessary - 這個是再進行檢查進行Exchange 、Queue、Binding的聲明與前面聲明的方法實現的共用。
2.this.consumer.start();??
public void start() throws AmqpException {if (logger.isDebugEnabled()) {logger.debug("Starting consumer " + this);}this.thread = Thread.currentThread();try {this.resourceHolder = ConnectionFactoryUtils.getTransactionalResourceHolder(this.connectionFactory,this.transactional);this.channel = this.resourceHolder.getChannel();ClosingRecoveryListener.addRecoveryListenerIfNecessary(this.channel); // NOSONAR never null here}catch (AmqpAuthenticationException e) {throw new FatalListenerStartupException("Authentication failure", e);}this.deliveryTags.clear();this.activeObjectCounter.add(this);passiveDeclarations();setQosAndreateConsumers();} 這里面我們看這個方法就行 setQosAndreateConsumers();Qos是設定消費時每次抓取的數量
并CreadConsumers
private void setQosAndreateConsumers() {if (!this.acknowledgeMode.isAutoAck() && !cancelled()) {// Set basicQos before calling basicConsume (otherwise if we are not acking the broker// will send blocks of 100 messages)try {this.channel.basicQos(this.prefetchCount);}catch (IOException e) {this.activeObjectCounter.release(this);throw new AmqpIOException(e);}}try {if (!cancelled()) {for (String queueName : this.queues) {if (!this.missingQueues.contains(queueName)) {consumeFromQueue(queueName);}}}}catch (IOException e) {throw RabbitExceptionTranslator.convertRabbitAccessException(e);}}有沒有很熟悉:
this.channel.basicQos(this.prefetchCount);抓包:
繼續:
consumeFromQueue(queueName); private void consumeFromQueue(String queue) throws IOException {InternalConsumer consumer = new InternalConsumer(this.channel, queue);String consumerTag = this.channel.basicConsume(queue, this.acknowledgeMode.isAutoAck(),(this.tagStrategy != null ? this.tagStrategy.createConsumerTag(queue) : ""), this.noLocal,this.exclusive, this.consumerArgs,consumer);if (consumerTag != null) {this.consumers.put(queue, consumer);if (logger.isDebugEnabled()) {logger.debug("Started on queue '" + queue + "' with tag " + consumerTag + ": " + this);}}else {logger.error("Null consumer tag received for queue " + queue);}}?有沒有很熟悉:
String consumerTag = this.channel.basicConsume(queue, this.acknowledgeMode.isAutoAck(),(this.tagStrategy != null ? this.tagStrategy.createConsumerTag(queue) : ""), this.noLocal,this.exclusive, this.consumerArgs,consumer);那這里有有一個核心的類出現了。InternalConsumer
這里轉向 3.2 Broker投遞消息給客戶端? 解釋
到這里呢,我們把消費者注冊到了Broker中去了,看下抓包情況:
?到這呢,所以Broker也就能給我們投遞消息了。
2、mainLoop();
initialize();while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {mainLoop();}這里也有個mainLoop ,于是想到了,java 的amqp客戶端也存在呢mainLoop ,這里的邏輯難道也和他的邏輯契合的?我們轉向 3.3 客戶端消費過程繼續。
3.2 Broker投遞消息給客戶端過程
上面說到了,已經將消費者注冊到了Broker中去了,但一定注意哦,注冊到Broker 中的,可不是我們使用注解 RabbitListener 標注的實際消費方法哦,而是新創建了一個內部的消費者:InternalConsumer
我們看下他的一個實現
private final class InternalConsumer extends DefaultConsumer {private final String queueName;boolean canceled;InternalConsumer(Channel channel, String queue) {super(channel);this.queueName = queue;}@Overridepublic void handleConsumeOk(String consumerTag) {super.handleConsumeOk(consumerTag);if (logger.isDebugEnabled()) {logger.debug("ConsumeOK: " + BlockingQueueConsumer.this);}if (BlockingQueueConsumer.this.applicationEventPublisher != null) {BlockingQueueConsumer.this.applicationEventPublisher.publishEvent(new ConsumeOkEvent(this, this.queueName, consumerTag));}}@Overridepublic void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {if (logger.isDebugEnabled()) {if (RabbitUtils.isNormalShutdown(sig)) {logger.debug("Received shutdown signal for consumer tag=" + consumerTag + ": " + sig.getMessage());}else {logger.debug("Received shutdown signal for consumer tag=" + consumerTag, sig);}}BlockingQueueConsumer.this.shutdown = sig;// The delivery tags will be invalid if the channel shuts downBlockingQueueConsumer.this.deliveryTags.clear();BlockingQueueConsumer.this.activeObjectCounter.release(BlockingQueueConsumer.this);}@Overridepublic void handleCancel(String consumerTag) throws IOException {if (logger.isWarnEnabled()) {logger.warn("Cancel received for " + consumerTag + " ("+ this.queueName+ "); " + BlockingQueueConsumer.this);}BlockingQueueConsumer.this.consumers.remove(this.queueName);if (!BlockingQueueConsumer.this.consumers.isEmpty()) {basicCancel(false);}else {BlockingQueueConsumer.this.cancelled.set(true);}}@Overridepublic void handleCancelOk(String consumerTag) {if (logger.isDebugEnabled()) {logger.debug("Received cancelOk for tag " + consumerTag + " ("+ this.queueName+ "); " + BlockingQueueConsumer.this);}this.canceled = true;}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) {if (logger.isDebugEnabled()) {logger.debug("Storing delivery for consumerTag: '"+ consumerTag + "' with deliveryTag: '" + envelope.getDeliveryTag() + "' in "+ BlockingQueueConsumer.this);}try {if (BlockingQueueConsumer.this.abortStarted > 0) {if (!BlockingQueueConsumer.this.queue.offer(new Delivery(consumerTag, envelope, properties, body, this.queueName),BlockingQueueConsumer.this.shutdownTimeout, TimeUnit.MILLISECONDS)) {Channel channelToClose = super.getChannel();RabbitUtils.setPhysicalCloseRequired(channelToClose, true);// Defensive - should never happenBlockingQueueConsumer.this.queue.clear();if (!this.canceled) {getChannel().basicCancel(consumerTag);}try {channelToClose.close();}catch (@SuppressWarnings("unused") TimeoutException e) {// no-op}}}else {BlockingQueueConsumer.this.queue.put(new Delivery(consumerTag, envelope, properties, body, this.queueName));}}catch (@SuppressWarnings("unused") InterruptedException e) {Thread.currentThread().interrupt();}catch (Exception e) {BlockingQueueConsumer.logger.warn("Unexpected exception during delivery", e);}}@Overridepublic String toString() {return "InternalConsumer{" + "queue='" + this.queueName + '\'' +", consumerTag='" + getConsumerTag() + '\'' +'}';}}哇,內部類,而且繼承了?DefaultConsumer ,這和我們前面學習Rabbitmq工作模式的過程中,自己手動開發的代碼一樣了吧,那我找到 投遞方法:
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,好親切有木有,所以到這里真相大白咯。Broker將消息投遞到了這里,我們看看他接收到消息搞什么動作?
BlockingQueueConsumer.this.queue.put(new Delivery(consumerTag, envelope, properties, body, this.queueName));很明顯,和java amqp client 實現一樣,他這也用到了Queue,去存儲了,
this.queue = new LinkedBlockingQueue<Delivery>(prefetchCount);也是個阻塞Queue哦,看來spring搞了一通,從客戶端那邊的queue里拿來,又放了一次queue。
那放進去了,就等著取唄,看誰來取咯。
3.3 客戶端消費過程
接續上面的?mainLoop(),既然消息又存到了本地的queue中,那mainLoop 的目的豈不是很明確了,那就是死循環的去取消息消息,然后再轉調到我們實際的 加入@RabbitListener 的方法中去呢。究竟是不是呢,驗證下:
private void mainLoop() throws Exception { // NOSONAR Exceptiontry {boolean receivedOk = receiveAndExecute(this.consumer); // At least one message receivedif (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {checkAdjust(receivedOk);}long idleEventInterval = getIdleEventInterval();if (idleEventInterval > 0) {if (receivedOk) {updateLastReceive();}else {long now = System.currentTimeMillis();long lastAlertAt = SimpleMessageListenerContainer.this.lastNoMessageAlert.get();long lastReceive = getLastReceive();if (now > lastReceive + idleEventInterval&& now > lastAlertAt + idleEventInterval&& SimpleMessageListenerContainer.this.lastNoMessageAlert.compareAndSet(lastAlertAt, now)) {publishIdleContainerEvent(now - lastReceive);}}}}catch (ListenerExecutionFailedException ex) {// Continue to process, otherwise re-throwif (ex.getCause() instanceof NoSuchMethodException) {throw new FatalListenerExecutionException("Invalid listener", ex);}}catch (AmqpRejectAndDontRequeueException rejectEx) {/** These will normally be wrapped by an LEFE if thrown by the* listener, but we will also honor it if thrown by an* error handler.*/}}看下重點方法:
boolean receivedOk = receiveAndExecute(this.consumer); private boolean receiveAndExecute(final BlockingQueueConsumer consumer) throws Exception { // NOSONARPlatformTransactionManager transactionManager = getTransactionManager();if (transactionManager != null) {try {if (this.transactionTemplate == null) {this.transactionTemplate =new TransactionTemplate(transactionManager, getTransactionAttribute());}return this.transactionTemplate.execute(status -> { // NOSONAR null never returnedRabbitResourceHolder resourceHolder = ConnectionFactoryUtils.bindResourceToTransaction(new RabbitResourceHolder(consumer.getChannel(), false),getConnectionFactory(), true);// unbound in ResourceHolderSynchronization.beforeCompletion()try {return doReceiveAndExecute(consumer);}catch (RuntimeException e1) {prepareHolderForRollback(resourceHolder, e1);throw e1;}catch (Exception e2) {throw new WrappedTransactionException(e2);}});}catch (WrappedTransactionException e) { // NOSONAR exception flow controlthrow (Exception) e.getCause();}}return doReceiveAndExecute(consumer);}拋開事務,我們不關注。
return doReceiveAndExecute(consumer); private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Exception { //NOSONARChannel channel = consumer.getChannel();for (int i = 0; i < this.txSize; i++) {logger.trace("Waiting for message from consumer.");Message message = consumer.nextMessage(this.receiveTimeout);if (message == null) {break;}try {executeListener(channel, message);}重點哦:
Message message = consumer.nextMessage(this.receiveTimeout);從內部消費者取消息咯
public Message nextMessage(long timeout) throws InterruptedException, ShutdownSignalException {if (logger.isTraceEnabled()) {logger.trace("Retrieving delivery for " + this);}checkShutdown();if (this.missingQueues.size() > 0) {checkMissingQueues();}Message message = handle(this.queue.poll(timeout, TimeUnit.MILLISECONDS));if (message == null && this.cancelled.get()) {throw new ConsumerCancelledException();}return message;}看到poll 我們就放心了,把消息取出來,包裝成Message對象。
快調頭回來,繼續看:
try {executeListener(channel, message); }這就要真正處理這個消息了
protected void executeListener(Channel channel, Message messageIn) {if (!isRunning()) {if (logger.isWarnEnabled()) {logger.warn("Rejecting received message because the listener container has been stopped: " + messageIn);}throw new MessageRejectedWhileStoppingException();}try {doExecuteListener(channel, messageIn);}catch (RuntimeException ex) {if (messageIn.getMessageProperties().isFinalRetryForMessageWithNoId()) {if (this.statefulRetryFatalWithNullMessageId) {throw new FatalListenerExecutionException("Illegal null id in message. Failed to manage retry for message: " + messageIn, ex);}else {throw new ListenerExecutionFailedException("Cannot retry message more than once without an ID",new AmqpRejectAndDontRequeueException("Not retryable; rejecting and not requeuing", ex),messageIn);}}handleListenerException(ex);throw ex;}}代碼不往下貼了,繼續追就可以,最終還是找到了,打標@RabbitListener的那個方法上,得到了執行。真正讓業務邏輯執行到了MQ推送過來的消息,
太不容易了,消息從發送-> Exchange->Queue -> java amqp client? ->spring client - >consume 最終得到了消費。
4. 總結
小結一下,我們從注解RabbitHandler RabbitListener 入手,一步步追蹤到 與Broker鏈接的創建,Queue的聲明,接著,啟動新線程 注冊一個內部的消費者到Broker中,Broker有消息的時候會推送到本地的BlockingQueue中去。
使用MainLoop 消費本地Blockinqueue的內容
貼個小圖:
總結
以上是生活随笔為你收集整理的RabbitMQ初步到精通-第十章-RabbitMQ之Spring客户端源码的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 新浪微博图床架构解析
- 下一篇: 【新示例】协作云路上的先驱or先烈,思科