kiwi包源码解析
公司kiwi-util包源碼分析
代碼是監聽消息中間件的工具包。最開始招我進來的主管寫的,現在已經是總監級別,不再寫代碼。記得16年夏天入職時候還是我還是小白一枚,感謝主管給了我機會。來了之后才發現真的是大神一枚,在此收下我的膝蓋。
使用java監聽activeMQ的相關鏈接為:
https://blog.csdn.net/zbw18297786698/article/details/52994746
話不多說,開始解析。
一 初始化加載配置信息
代碼監聽mq消息時配置的消息隊列如下,啟動時候,加載該配置:
@Component("BootSpringListener") public class BootSpringListener implements ApplicationListener {@Overridepublic void onApplicationEvent(ApplicationEvent event) {if (event instanceof ContextRefreshedEvent) {final String key = UUID.randomUUID().toString();try {Resource[] resource = new PathMatchingResourcePatternResolver().getResources("classpath*:bussiness/" + bussinessSide + "/message*.xml");logger.info("[key={}][init messageService resource size={}]", key, resource.length);for (Resource res : resource) {messageService.configure(res.getInputStream());}} catch (Exception e) {logger.error("[key={}][exception={}]", key, e.getMessage(), e);}logger.info("init messageService");}} <?xml version="1.0" encoding="UTF-8" standalone="yes"?> <MessageConfigs><client name="cartoonactivemq"><factory>com.*.kiwi.message.impl.activemq.ActiveMQMessageClientFactory</factory><properties><property key="messageSwitch" value="${cartoon.message.switch}"/><property key="maxConnections" value="5"/><property key="maximumActive" value="100"/><property key="username" value="${cartoon.activemq.username}"/><property key="password" value="${cartoon.activemq.password}"/><property key="url" value="${cartoon.activemq.url}"/></properties></client><destination><name>${activemq.reading.message.prefix}picturebook_property_modify</name><type>queue</type><client>readingactivemq</client><processor>com.*.search.api.core.service.search.message.mongo.PartialMongoUpdateProcessor</processor><mode>listen</mode><properties><property key="threadPoolSize" value="100"/></properties></destination> </MessageConfigs>二 加載配置步驟:
(1)解析xml: MessageConfigs configs = jaxbBinder.fromXML(is);
這里的類 MessageConfigs 為使用XmlRootElement將類映射到XML元素,具體參考:
http://desert3.iteye.com/blog/1570092
(2)從client中初始化客戶端連接池【詳細見代碼注釋A處】,并將每個destination中都加入在xml中對應的客戶端,并通過destination的initialize方法真正開始監聽消息【詳細見代碼注釋B處】
(3)destination中含有消息隊列的基本配置,消息隊列名稱、消息處理器、線程池size大小,以及封裝的consumer。而consumer則是根據destination的信息來構造,并初始化。Consumer consumer = client.createConsumer(this);這里this即為destination。
(3)destination.initialize 會調用consumer.start
public void configure(InputStream is) {try {JaxbBinder jaxbBinder = new JaxbBinder(MessageConfigs.class);MessageConfigs configs = jaxbBinder.fromXML(is);//初始化客戶端、連接池for (ClientConfig clientConfig : configs.getClientConfig()) {//使用類加載器創建對象實例MessageClientFactory messageClientFactory = (MessageClientFactory) Class.forName(clientConfig.getFactory()).newInstance();//A.此處保存mq連接的基本信息MessageClient client = messageClientFactory.createMessageClient(clientConfig.getProperties());client.setProperties(clientConfig.getProperties());client.setName(clientConfig.getName());client.initialize();clientMap.put(client.getName(), client);}//初始化每一個消息目的地(Queue或topic,及其生產者和消費者)for (DestinationConfig destinationConfig : configs.getDestinationConfigs()) {MessageClient client = this.clientMap.get(destinationConfig.getClient());DestinationImpl destination = new DestinationImpl(destinationConfig.getName(), destinationConfig.getType(), client);destination.setProcessor(destinationConfig.getProcessor());destination.setMode(destinationConfig.getMode());destination.setProperties(destinationConfig.getProperties());destination.setClient(client);destination.setTransacted(destinationConfig.isTransacted());destination.setEnableRecover(destinationConfig.isEnableRecover());//B.真正開始監聽消息destination.initialize();this.destinationMap.put(destination.getName(), destination);}} catch (Exception e) {throw new RuntimeException("message configure error: ", e);} finally {if (is != null) {try {is.close();} catch (IOException e) {throw new RuntimeException("message configure error: ", e);}}}}###三 創建Consumer以及session
Client根據destination構造Consumer類,注意這里創建session的方式:
session = connection.createSession(destination.isTransacted(), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);即為逐條消息確認,每消費一條確認一條(INDIVIDUAL_ACKNOWLEDGE)),且不開啟事務。
public Consumer createConsumer(Destination destination) {try {connection = connectionFactory.getConnectionFactory().createConnection();//注意這里sessisession = connection.createSession(destination.isTransacted(), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);javax.jms.Destination dest = null;if(Destination.TYPE_QUEUE.equals(destination.getType())) {dest = session.createQueue(destination.getName());} else if(Destination.TYPE_TOPIC.equals(destination.getType())) {dest = session.createTopic(destination.getName());}MessageConsumer messageConsumer = null;if(isDurable && dest instanceof Topic) {messageConsumer = session.createDurableSubscriber((Topic)dest, clientId +"-"+ destination.getName());} else {messageConsumer = session.createConsumer(dest);}ActiveMQConsumer consumer = new ActiveMQConsumer();consumer.setConsumer(messageConsumer);consumer.setSession(session);consumer.setDestination(destination);consumer.setConnection(connection);String threadPoolSizeString = destination.getProperties().get("threadPoolSize");if(threadPoolSizeString != null) {consumer.setThreadPoolSize(Integer.parseInt(threadPoolSizeString));}return consumer;} catch (JMSException e) {logger.error("init producer for {} error", destination.getName(), e);}return null;}###四 consumer類詳解
consumer基礎:
consumer類已經與destination綁定,一個queue對應了一個consumer類對象的實例。
consumer類實現了callback【各種processor 調用finished時候回調 】,監聽消息隊列MessageListener 【onMessage接口】,以及conumser【start、stop】
當消息抵達時,consumer的onMessage接口被調用,觸發處理邏輯:
(1)設置回調為當前類,finished時候觸發,如果finished為false則session回滾或recover【根據是否配置事務來設置】
(2)實例化該處理器,并提交至線程池
@Overridepublic void onMessage(Message message) {this.processingMessageCount.incrementAndGet();try {logger.info("[module:message] [action:start] [id:{}]", message.getJMSMessageID());Object msg = null;try {messageText = JsonBinder.buildNormalBinder().toJson(msg);} catch (Exception e) {}logger.info("[module:message] [action:parse] [id:{}] [content:{}]", message.getJMSMessageID(), messageText);if (destination.getProcessor() != null) {Class compileClazz = Class.forName(destination.getProcessor());Processor processorObject = (Processor) compileClazz.newInstance();Map<String, Object> context = new HashMap<String, Object>();context.put("original", message);//設置回調為當前consumer類 processorObject.setCallback(this);processorObject.setContext(context);processorObject.setMessage(msg);//threadPoolExecutor.execute(processorObject);logger.debug("[module:message] [action:execute] [id:{}] [content:{}]", message.getJMSMessageID(), messageText);logger.info("[module:message] [action:end] [id:{}] [content:{}]", message.getJMSMessageID(), messageText);} catch (JMSException e) {logger.error("consume message {} error!", message.toString(), e);} finally {while (this.processingMessageCount.get() >= maxProcessingMessageCount) {try {Thread.sleep(1000);} catch (InterruptedException e) {logger.error("consume message {} error!", message.toString(), e);}}}}總結
- 上一篇: lisp角度转换弪度_角度和弧度换算(角
- 下一篇: 蜘蛛大战之 站点LOGO(SEO)