生活随笔
收集整理的這篇文章主要介紹了
spring boot 整合 谷歌guava的EventBus 实现单机版的消息发布订阅
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
spring boot 整合 谷歌guava的EventBus 實現(xiàn)單機版的消息發(fā)布訂閱
大型分布式系統(tǒng),直接用mq解耦,那么單機系統(tǒng)怎么辦,可以考慮用EventBus
用EventBus的好處也是異步解耦,和mq的類似,可以勉強認為是單機版的mq
先解釋,后附示例代碼(jdk1.8)
EventBus 處理的事情類似觀察者模式,基于事件驅(qū)動,觀察者們監(jiān)聽自己感興趣的特定事件,進行相應(yīng)的處理。
流程是這樣的:
1、先自定義一個注解@EventBusListener,所有異步實現(xiàn)的類都加這個注解,代表自己是一個監(jiān)聽器
2、再定義一個實現(xiàn)類,就用這個@EventBusListener注解,代表自己是監(jiān)聽器
類中方法用@Subscribe注解,方法入?yún)㈩惡秃竺鎸懙陌l(fā)送異步或同步消息的類保持一致即可,這個類也叫消息體,是這個消息的注意內(nèi)容,比如訂單號id,消息體最好是自己定義的不同的類
3、核心的類是EventBusCenter,負責(zé)將帶有@EventBusListener注解的bean注冊成一個監(jiān)聽器,并提供發(fā)送異步或同步消息的方法入口
4、業(yè)務(wù)service中注入EventBusCenter,并調(diào)用發(fā)送異步或同步消息的方法,方法入?yún)⑹窍Ⅲw,不同的消息體對應(yīng)不同的發(fā)布訂閱,訂閱的時候也是根據(jù)這個消息體來區(qū)分不同類型的消息的
簡單的源碼解讀
核心類是:com.google.common.eventbus.SubscriberRegistry
里面有個靜態(tài)變量:subscriberMethodsCache
類在初始化的時候,會給這個靜態(tài)變量復(fù)制,拿到所有的@Subscribe注釋的方法和對應(yīng)類的映射關(guān)系
然后我們代碼中會注冊監(jiān)聽器,就把方法和監(jiān)聽器對應(yīng)上了
至此,方法的入?yún)㈩愋秃头椒ê皖惖膶?yīng)關(guān)系就知道了,key就是方法參數(shù)類型,value就是一組對應(yīng)的方法和類
用的時候,調(diào)用asyncEventBus.post(event);
入?yún)vent就是真正的消息體類型,然后會根據(jù)這個類型去上面找對應(yīng)的方法和類,然后獲取bean,調(diào)用
所以說,這個發(fā)布訂閱,就是通過消息體類型去唯一識別的方法的。
對比mq的主題概念,這個消息體類型,就可以看成是主題的意思。
下面附示例代碼
模擬OrderService中訂單創(chuàng)建后,發(fā)送訂單創(chuàng)建的異步事件,再發(fā)送訂單修改的異步事件,再發(fā)送訂單修改的同步事件
訂閱端是OrderChangeListener和OrderChangeListener02兩個訂閱
OrderChangeListener訂閱了訂單創(chuàng)建和訂單修改事件
OrderChangeListener02訂閱了訂單創(chuàng)建事件
執(zhí)行流程是:
啟動springboot,注冊bean的時候遇到EventBusCenter,開始注冊O(shè)rderChangeListener和OrderChangeListener02為監(jiān)聽器
啟動springboot后立即執(zhí)行FistRun類,里面直接調(diào)用訂單創(chuàng)建方法,發(fā)布訂單創(chuàng)建和修改的消息
OrderChangeListener和OrderChangeListener02消費消息,完
<dependency><groupId>org
.projectlombok
</groupId
><artifactId>lombok
</artifactId
><optional>true</optional
>
</dependency
>
<dependency><groupId>com
.google
.guava
</groupId
><artifactId>guava
</artifactId
><version>22.0</version
>
</dependency
>
@Target({ElementType
.TYPE
})
@Retention(RetentionPolicy
.RUNTIME
)
public @
interface EventBusListener {
}
@Component
public class EventBusCenter {private EventBus syncEventBus
= new EventBus();private AsyncEventBus asyncEventBus
= new AsyncEventBus(Executors
.newCachedThreadPool());public void postSync(Object event
) {syncEventBus
.post(event
);}public void postAsync(Object event
) {asyncEventBus
.post(event
);}@PostConstructpublic void init() {List
<Object> listeners
= SpringContextUtils
.getBeansWithAnnotation(EventBusListener
.class);for (Object listener
: listeners
) {asyncEventBus
.register(listener
);syncEventBus
.register(listener
);}}
}
@Component
public class SpringContextUtils implements BeanFactoryPostProcessor {private static ConfigurableListableBeanFactory beanFactory
;@Overridepublic void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory
) throws BeansException
{SpringContextUtils
.beanFactory
= configurableListableBeanFactory
;}public static <T> T
getBean(String name
) throws BeansException
{return (T
) beanFactory
.getBean(name
);}public static <T> T
getBean(Class
<T> clz
) throws BeansException
{T result
= beanFactory
.getBean(clz
);return result
;}public static <T> List
<T> getBeansOfType(Class
<T> type
) {return beanFactory
.getBeansOfType(type
).entrySet().stream().map(entry
-> entry
.getValue()).collect(Collectors
.toList());}public static List
<Object> getBeansWithAnnotation(Class
<? extends Annotation> annotationType
) {Map
<String, Object> beansWithAnnotation
= beanFactory
.getBeansWithAnnotation(annotationType
);return beansWithAnnotation
.entrySet().stream().map(entry
-> entry
.getValue()).collect(Collectors
.toList());
}
}
@Data
public class OrderCreatedEvent {private long orderId
;private long userId
;public OrderCreatedEvent(long orderId
, long userId
) {this.setOrderId(orderId
);this.setUserId(userId
);}
}
@Data
public class OrderChangeEvent {private long orderId
;private long userId
;public OrderChangeEvent(long orderId
, long userId
) {this.setOrderId(orderId
);this.setUserId(userId
);}
}
@Component
@EventBusListener
@Slf4j
public class OrderChangeListener {@Subscribepublic void created(OrderCreatedEvent event
) throws InterruptedException
{long orderId
= event
.getOrderId();Thread
.sleep(300);log
.info("訂單創(chuàng)建監(jiān)聽,發(fā)送短信,orderId=" + orderId
);}@Subscribepublic void change(OrderChangeEvent event
) throws InterruptedException
{long orderId
= event
.getOrderId();Thread
.sleep(200);log
.info("訂單修改監(jiān)聽,物流變化,orderId=" + orderId
);}
}
@Component
@EventBusListener
@Slf4j
public class OrderChangeListener02 {@Subscribepublic void created(OrderCreatedEvent event
) {long orderId
= event
.getOrderId();long userId
= event
.getUserId();log
.info("訂單創(chuàng)建監(jiān)聽02,修改庫存,orderId=" + orderId
);}}
@Service
@Slf4j
public class OrderService {@Autowiredprivate EventBusCenter eventBusCenter
;public void createOrder() throws InterruptedException
{eventBusCenter
.postAsync(new OrderCreatedEvent(1L
, 1L
));System
.out
.println("發(fā)送異步事件,訂單創(chuàng)建");eventBusCenter
.postAsync(new OrderChangeEvent(1L
, 1L
));System
.out
.println("發(fā)送異步事件,訂單修改");Thread
.sleep(500);try {System
.out
.println("發(fā)送同步事件,訂單修改,開始");eventBusCenter
.postSync(new OrderChangeEvent(1L
, 1L
));System
.out
.println("發(fā)送同步事件,訂單修改,結(jié)束");} catch (Exception e
) {log
.error("發(fā)送同步事件,抓異常");}}
}
@Component
@Slf4j
@Order(1)
public class FistRun implements CommandLineRunner {@Autowiredprivate OrderService orderService
;@Overridepublic void run(String
... args
) throws Exception
{log
.info("FistRun start===============");orderService
.createOrder();}
}
總結(jié)
以上是生活随笔為你收集整理的spring boot 整合 谷歌guava的EventBus 实现单机版的消息发布订阅的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。