Java Review - Java进程内部的消息中间件_Event Bus设计模式
文章目錄
- 概述
- EventBus架構(gòu)類(lèi)圖
- Code
- Bus接口 (定義注冊(cè)topic以及發(fā)送event接口)
- 自定義注解-回調(diào)方法及topic
- 同步EventBus
- 異步EventBus
- Subscriber注冊(cè)表Registry (維護(hù)topic和subscriber之間的關(guān)系)
- Event廣播Dispatcher
- 其他類(lèi)接口設(shè)計(jì)
- 測(cè)試
- 同步&異步 Event Bus
概述
在工作中,我們都會(huì)使用到MQ 比如 Apache Kafka等,某subscriber在消息中間件上注冊(cè)了某個(gè)topic(主題),當(dāng)有消息發(fā)送到了該topic上之后,注冊(cè)在該topic上的所有subscriber都將會(huì)收到消息 。
消息中間件提供了系統(tǒng)之間的異步處理機(jī)制。 主業(yè)務(wù)完成后即可向用戶(hù)返回成功的通知,然后提交各種消息至消息中間件,這樣注冊(cè)在消息中間件的其他系統(tǒng)就可以順利地接收通知了,然后執(zhí)行各自的業(yè)務(wù)邏輯。消息中間件主要用于解決進(jìn)程之間消息異步處理的解決方案,這里,我們使用消息中間件的思想設(shè)計(jì)一個(gè)Java進(jìn)程內(nèi)部的消息中間件——Event Bus。
EventBus架構(gòu)類(lèi)圖
-
Bus接口對(duì)外提供了幾種主要的使用方式,比如post方法用來(lái)發(fā)送Event,register方法用來(lái)注冊(cè)Event接收者(Subscriber)接受響應(yīng)事件
-
EventBus采用同步的方式推送Event,AsyncEventBus采用異步的方式(Thread-Per-Message)推送Event。
-
Registry注冊(cè)表,主要用來(lái)記錄對(duì)應(yīng)的Subscriber以及受理消息的回調(diào)方法,回調(diào)方法我們用注解@Subscribe來(lái)標(biāo)識(shí)。
-
Dispatcher主要用來(lái)將event廣播給注冊(cè)表中監(jiān)聽(tīng)了topic的Subscriber
Code
Bus接口 (定義注冊(cè)topic以及發(fā)送event接口)
/*** Bus接口定義了EventBus的所有使用方法** @author artisan*/ public interface Bus {/*** 將某個(gè)對(duì)象注冊(cè)到Bus上,從此之后該類(lèi)就成為Subscriber了*/void register(Object subscriber);/*** 將某個(gè)對(duì)象從Bus上取消注冊(cè),取消注冊(cè)之后就不會(huì)再接收到來(lái)自Bus的任何消息*/void unregister(Object subscriber);/*** 提交Event到默認(rèn)的topic*/void post(Object event);/*** 提交Event到指定的topic*/void post(Object event, String topic);/*** 關(guān)閉該bus*/void close();/*** 返回Bus的名稱(chēng)標(biāo)識(shí)*/String getBusName();}Bus接口中定義了注冊(cè)topic的方法和Event發(fā)送的方法
-
register(Object subscriber):將某個(gè)對(duì)象實(shí)例注冊(cè)給Event Bus。
-
unregister(Object subscriber):取消對(duì)該對(duì)象實(shí)例的注冊(cè),會(huì)在Event Bus的注冊(cè)表(Registry)中將其移除。
-
post(Object event):提交Event到Event Bus中,如果未指定topic則會(huì)將event廣播給Event Bus默認(rèn)的topic。
-
post(Object event, String topic) :提交Event的同時(shí)指定了topic。
-
close():銷(xiāo)毀該Event Bus。
-
getBusName():返回該Event Bus的名稱(chēng)
自定義注解-回調(diào)方法及topic
注冊(cè)對(duì)象給Event Bus的時(shí)候需要指定接收消息時(shí)的回調(diào)方法,我們采用注解的方式進(jìn)行Event回調(diào)
import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target;/*** @author artisan*/ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface Subscribe {String topic() default "default-topic"; }@Subscribe要求注解在類(lèi)中的方法,注解時(shí)可指定topic,不指定的情況下為默認(rèn)的topic 【default-topic】
同步EventBus
同步EventBus是最核心的一個(gè)類(lèi),它實(shí)現(xiàn)了Bus的所有功能,但是該類(lèi)對(duì)Event的廣播推送采用的是同步的方式
/*** @author 小工匠* @version 1.0* @description: Bus實(shí)現(xiàn)類(lèi)* @date 2021/12/1 23:00* @mark: show me the code , change the world*/ public class EventBus implements Bus {/*** 用于維護(hù)Subscriber的注冊(cè)表*/private final Registry registry = new Registry();/*** Event Bus的名字*/private String busName;/*** 默認(rèn)的Event Bus的名字*/private final static String DEFAULT_BUS_NAME = "default";/*** 默認(rèn)的topic的名字*/private final static String DEFAULT_TOPIC = "default-topic";/*** 用于分發(fā)廣播消息到各個(gè)Subscriber的類(lèi)*/private final Dispatcher dispatcher;/*** 構(gòu)造函數(shù)*/public EventBus() {this(DEFAULT_BUS_NAME, null, Dispatcher.SEQ_EXECUTOR_SERVICE);}public EventBus(String busName) {this(busName, null, Dispatcher.SEQ_EXECUTOR_SERVICE);}EventBus(String busName, EventExceptionHandler exceptionHandler, Executor executor) {this.busName = busName;this.dispatcher = Dispatcher.newDispatcher(exceptionHandler, executor);}public EventBus(EventExceptionHandler exceptionHandler) {this(DEFAULT_BUS_NAME, exceptionHandler, Dispatcher.SEQ_EXECUTOR_SERVICE);}/*** 將注冊(cè)Subscriber的動(dòng)作直接委托給Registry** @param subscriber*/@Overridepublic void register(Object subscriber) {this.registry.bind(subscriber);}/*** 接觸注冊(cè)同樣委托給Registry** @param subscriber*/@Overridepublic void unregister(Object subscriber) {this.registry.unbind(subscriber);}/*** 提交Event到默認(rèn)的topic** @param event*/@Overridepublic void post(Object event) {this.post(event, DEFAULT_TOPIC);}/*** 提交Event到指定的topic,具體的動(dòng)作是由Dispatcher來(lái)完成的** @param event* @param topic*/@Overridepublic void post(Object event, String topic) {this.dispatcher.dispatch(this, registry, event, topic);}/*** 關(guān)閉銷(xiāo)毀Bus*/@Overridepublic void close() {this.dispatcher.close();}/*** 返回Bus的名稱(chēng)** @return*/@Overridepublic String getBusName() {return this.busName;} }有幾個(gè)點(diǎn)需要注意一下
-
EventBus的構(gòu)造除了名稱(chēng)之外,還需要有ExceptionHandler和Executor,后兩個(gè)主要是給Dispatcher使用的。
-
registry和unregister都是通過(guò)Subscriber注冊(cè)表來(lái)完成的。
-
Event的提交則是由Dispatcher來(lái)完成的
-
Executor使用JDK中的Executor接口,如果我們自己開(kāi)發(fā)的ThreadPool天生就是多線(xiàn)程并發(fā)執(zhí)行任務(wù)的線(xiàn)程池,自帶異步處理能力,但是無(wú)法做到同步任務(wù)處理,因此我們使用Executor可以任意擴(kuò)展同步、異步的任務(wù)處理方式。
異步EventBus
異步的EventBus比較簡(jiǎn)單,繼承自同步Bus,然后將Thread-Per-Message用異步處理任務(wù)的Executor替換EventBus中的同步Executor即可
import java.util.concurrent.ThreadPoolExecutor;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/12/2 10:59* @mark: show me the code , change the world*/ public class AsyncEventBus extends EventBus {AsyncEventBus(String busName, EventExceptionHandler exceptionHandler, ThreadPoolExecutor executor) {super(busName, exceptionHandler, executor);}public AsyncEventBus(String busName, ThreadPoolExecutor executor) {this(busName, null, executor);}public AsyncEventBus(ThreadPoolExecutor executor) {this("default-async", null, executor);}public AsyncEventBus(EventExceptionHandler exceptionHandler, ThreadPoolExecutor executor) {this("default-async", exceptionHandler, executor);} }可以看到AsyncEventBus 重寫(xiě)了父類(lèi)EventBus的構(gòu)造函數(shù),使用ThreadPoolExecutor替代Executor。
Subscriber注冊(cè)表Registry (維護(hù)topic和subscriber之間的關(guān)系)
注冊(cè)表Registry維護(hù)了topic和subscriber之間的關(guān)系。
當(dāng)有Event被post之后,Dispatcher需要知道該消息應(yīng)該發(fā)送給哪個(gè)Subscriber的實(shí)例和對(duì)應(yīng)的方法,Subscriber對(duì)象沒(méi)有任何特殊要求,就是普通的類(lèi)不需要繼承任何父類(lèi)或者實(shí)現(xiàn)任何接口
import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/12/1 23:42* @mark: show me the code , change the world*/class Registry {/*** 存儲(chǔ)Subscriber集合和topic之間關(guān)系的map*/private final ConcurrentHashMap<String, ConcurrentLinkedQueue<Subscriber>> subscriberContainer = new ConcurrentHashMap<>();/*** 獲取Subscriber Object的方法集合然后進(jìn)行綁定** @param subscriber*/public void bind(Object subscriber) {List<Method> subscribeMethods = getSubscribeMethods(subscriber);subscribeMethods.forEach(m -> tierSubscriber(subscriber, m));}public void unbind(Object subscriber) {//unbind為了提高速度,只對(duì)Subscriber進(jìn)行失效操作subscriberContainer.forEach((key, queue) ->queue.forEach(s ->{if (s.getSubscribeObject() == subscriber) {s.setDisable(true);}}));}public ConcurrentLinkedQueue<Subscriber> scanSubscriber(final String topic) {return subscriberContainer.get(topic);}private void tierSubscriber(Object subscriber, Method method) {final Subscribe subscribe = method.getDeclaredAnnotation(Subscribe.class);String topic = subscribe.topic();//當(dāng)某topic沒(méi)有Subscriber Queue的時(shí)候創(chuàng)建一個(gè)subscriberContainer.computeIfAbsent(topic, key -> new ConcurrentLinkedQueue<>());//創(chuàng)建一個(gè)Subscriber并且加入Subscriber列表中subscriberContainer.get(topic).add(new Subscriber(subscriber, method));}private List<Method> getSubscribeMethods(Object subscriber) {final List<Method> methods = new ArrayList<>();Class<?> temp = subscriber.getClass();//不斷獲取當(dāng)前類(lèi)和父類(lèi)的所有@Subscribe方法while (temp != null) {//獲取所有的方法Method[] declaredMethods = temp.getDeclaredMethods();//只有public方法 &&有一個(gè)入?yún)?&&最重要的是被@Subscribe標(biāo)記的方法才符合回調(diào)方法Arrays.stream(declaredMethods).filter(m -> m.isAnnotationPresent(Subscribe.class)&& m.getParameterCount() == 1&& m.getModifiers() == Modifier.PUBLIC).forEach(methods::add);temp = temp.getSuperclass();}return methods;}}由于Registry是在Bus中使用的,不能暴露給外部,因此Registry被設(shè)計(jì)成了包可見(jiàn)的類(lèi)。
我們所設(shè)計(jì)的EventBus對(duì)Subscriber沒(méi)有做任何限制,但是要接受event的回調(diào)則需要將方法使用注解@Subscribe進(jìn)行標(biāo)記(可指定topic)
同一個(gè)Subscriber的不同方法通過(guò)@Subscribe注解之后可接受來(lái)自?xún)蓚€(gè)不同的topic消息
public class SimpleObject {/*** subscribe方法,比如使用@Subscribe標(biāo)記,并且是void類(lèi)型且有一個(gè)參數(shù)*/@Subscribe(topic = "artisan-topic")public void test2(Integer x) {}@Subscribe(topic = "test-topic")public void test3(Integer x) {} }SimpleObject的實(shí)例被注冊(cè)到了Event Bus之后,test2和test3這兩個(gè)方法將會(huì)被加入到注冊(cè)表中,分別用來(lái)接受來(lái)自artisan-topic和test-topic的event 。
Event廣播Dispatcher
Dispatcher的主要作用是將EventBus post的event推送給每一個(gè)注冊(cè)到topic上的subscriber上,具體的推送其實(shí)就是執(zhí)行被@Subscribe注解的方法。
import java.lang.reflect.Method; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/12/2 00:36* @mark: show me the code , change the world*/ public class Dispatcher {private final Executor executorService;private final EventExceptionHandler exceptionHandler;public static final Executor SEQ_EXECUTOR_SERVICE = SeqExecutorService.INSTANCE;public static final Executor PRE_THREAD_EXECUTOR_SERVICE = PreThreadExecutorService.INSTANCE;private Dispatcher(Executor executorService, EventExceptionHandler exceptionHandler) {this.executorService = executorService;this.exceptionHandler = exceptionHandler;}public void dispatch(Bus bus, Registry registry, Object event, String topic) {//根據(jù)topic獲取所有的Subscriber列表ConcurrentLinkedQueue<Subscriber> subscribers = registry.scanSubscriber(topic);if (null == subscribers) {if (exceptionHandler != null) {exceptionHandler.handle(new IllegalArgumentException("The topic " + topic + " not bind yet"),new BaseEventContext(bus.getBusName(), null, event));}return;}//遍歷所有的方法,并且通過(guò)反射的方式進(jìn)行方法調(diào)用subscribers.stream().filter(subscriber -> !subscriber.isDisable()).filter(subscriber ->{Method subscribeMethod = subscriber.getSubscribeMethod();Class<?> aClass = subscribeMethod.getParameterTypes()[0];return (aClass.isAssignableFrom(event.getClass()));}).forEach(subscriber -> realInvokeSubscribe(subscriber, event, bus));}private void realInvokeSubscribe(Subscriber subscriber, Object event, Bus bus) {Method subscribeMethod = subscriber.getSubscribeMethod();Object subscribeObject = subscriber.getSubscribeObject();executorService.execute(() -> {try {subscribeMethod.invoke(subscribeObject, event);} catch (Exception e) {if (null != exceptionHandler) {exceptionHandler.handle(e, new BaseEventContext(bus.getBusName(), subscriber, event));}}});}public void close() {if (executorService instanceof ExecutorService) {((ExecutorService) executorService).shutdown();}}static Dispatcher newDispatcher(EventExceptionHandler exceptionHandler, Executor executor) {return new Dispatcher(executor, exceptionHandler);}static Dispatcher seqDispatcher(EventExceptionHandler exceptionHandler) {return new Dispatcher(SEQ_EXECUTOR_SERVICE, exceptionHandler);}static Dispatcher perThreadDispatcher(EventExceptionHandler exceptionHandler) {return new Dispatcher(PRE_THREAD_EXECUTOR_SERVICE, exceptionHandler);}/*** 順序執(zhí)行的ExecutorService*/private static class SeqExecutorService implements Executor {private final static SeqExecutorService INSTANCE = new SeqExecutorService();@Overridepublic void execute(Runnable command) {command.run();}}/*** 每個(gè)線(xiàn)程負(fù)責(zé)一次消息推送*/private static class PreThreadExecutorService implements Executor {private final static PreThreadExecutorService INSTANCE = new PreThreadExecutorService();@Overridepublic void execute(Runnable command) {new Thread(command).start();}}/*** 默認(rèn)的EventContext實(shí)現(xiàn)*/private static class BaseEventContext implements EventContext {private final String eventBusName;private final Subscriber subscriber;private final Object event;private BaseEventContext(String eventBusName, Subscriber subscriber, Object event) {this.eventBusName = eventBusName;this.subscriber = subscriber;this.event = event;}@Overridepublic String getSource() {return this.eventBusName;}@Overridepublic Object getSubscriber() {return subscriber != null ? subscriber.getSubscribeObject() : null;}@Overridepublic Method getSubscribe() {return subscriber != null ? subscriber.getSubscribeMethod() : null;}@Overridepublic Object getEvent() {return this.event;}} }在Dispatcher中,除了從Registry中獲取對(duì)應(yīng)的Subscriber執(zhí)行之外,我們還定義了幾個(gè)靜態(tài)內(nèi)部類(lèi),其主要是實(shí)現(xiàn)了Executor接口和EventContent。
其他類(lèi)接口設(shè)計(jì)
除了上面一些比較核心的類(lèi)之外,還需要Subscriber封裝類(lèi)以及EventContext、Event-ExceptionHandler接口
【Subscriber類(lèi)】
import java.lang.reflect.Method;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/12/1 23:33* @mark: show me the code , change the world*/ public class Subscriber {private final Object subscribeObject;private final Method subscribeMethod;private boolean disable = false;public Subscriber(Object subscribeObject, Method subscribeMethod) {this.subscribeObject = subscribeObject;this.subscribeMethod = subscribeMethod;}public Object getSubscribeObject() {return subscribeObject;}public Method getSubscribeMethod() {return subscribeMethod;}public boolean isDisable() {return disable;}public void setDisable(boolean disable) {this.disable = disable;} }Subscriber類(lèi)封裝了對(duì)象實(shí)例和被@Subscribe標(biāo)記的方法,也就是說(shuō)一個(gè)對(duì)象實(shí)例有可能會(huì)被封裝成若干個(gè)Subscriber
【EventExceptionHandler接口】
EventBus會(huì)將方法的調(diào)用交給Runnable接口去執(zhí)行,我們都知道Runnable接口不能拋出checked異常信息,并且在每一個(gè)subscribe方法中,也不允許將異常拋出從而影響EventBus對(duì)后續(xù)Subscriber進(jìn)行消息推送,但是異常信息又不能被忽略掉,因此注冊(cè)一個(gè)異常回調(diào)接口就可以知道在進(jìn)行消息廣播推送時(shí)都發(fā)生了什么
public interface EventExceptionHandler {void handle(Throwable cause, EventContext context); }【EventContext接口】
EventContext接口提供了獲取消息源、消息體,以及該消息是由哪一個(gè)Subscriber的哪個(gè)subscribe方法所接受,主要用于消息推送出錯(cuò)時(shí)被回調(diào)接口EventExceptionHandler使用
import java.lang.reflect.Method;public interface EventContext {String getSource();Object getSubscriber();Method getSubscribe();Object getEvent();}測(cè)試
我們簡(jiǎn)單地定義兩個(gè)普通對(duì)象SimpleSubscriber1和SimpleSubscriber2
/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/12/2 11:06* @mark: show me the code , change the world*/ public class SimpleSubscriber1 {@Subscribepublic void method1(String message) {System.out.println(String.format("線(xiàn)程 %s , SimpleSubscriber2#method1 called --- %s ",Thread.currentThread().getName() ,message));}@Subscribe(topic = "test")public void method2(String message) {System.out.println(String.format("線(xiàn)程:%s: Test Topic | SimpleSubscriber2#method2 called --- %s", Thread.currentThread().getName(), message));}} /*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/12/2 11:27* @mark: show me the code , change the world*/ public class SimpleSubscriber2 {@Subscribepublic void method1(String message) {System.out.println(String.format("線(xiàn)程 %s , SimpleSubscriber2#method1 called --- %s ",Thread.currentThread().getName() ,message));}@Subscribe(topic = "test")public void method2(String message) {System.out.println(String.format("線(xiàn)程:%s: Test Topic | SimpleSubscriber2#method2 called --- %s", Thread.currentThread().getName(), message));} }模擬復(fù)雜消息
/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/12/2 11:28* @mark: show me the code , change the world*/ public class SimpleSubscriber3 {@Subscribepublic void method1(Object message) {if (message instanceof WildMessage){System.out.println(String.format("線(xiàn)程 %s , SimpleSubscriber3#method1 called --- %s ",Thread.currentThread().getName() ,((WildMessage)message).getData()));}}} /*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/12/2 13:16* @mark: show me the code , change the world*/ public class WildMessage {private String data;public WildMessage(String data) {this.data = data;}public String getData() {return data;}public void setData(String data) {this.data = data;} }同步&異步 Event Bus
import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/12/2 13:32* @mark: show me the code , change the world*/ public class Test {public static void main(String[] args) {Bus bus = new EventBus("TestBus");// 注冊(cè)bus.register(new SimpleSubscriber1());bus.register(new SimpleSubscriber2());// 發(fā)布消息bus.post("Hello");bus.post("Hello", "test");bus.register(new SimpleSubscriber3());bus.post(new WildMessage("SourceMessage"));System.out.println("\n\n\n\n");System.out.println("-------異步-----");Bus asyncEventBus = new AsyncEventBus("TestBus", (ThreadPoolExecutor) Executors.newFixedThreadPool(10));asyncEventBus.register(new SimpleSubscriber1());asyncEventBus.register(new SimpleSubscriber2());asyncEventBus.post("Hello");asyncEventBus.post("Hello", "test");}}解析下結(jié)果哈: 同步的EventBus,將三個(gè)普通的對(duì)象注冊(cè)給了bus,當(dāng)bus發(fā)送Event的時(shí)候topic相同,Event類(lèi)型相同的subscribe方法將被執(zhí)行。
同步的Event Bus有個(gè)缺點(diǎn),若其中的一個(gè)subscribe方法運(yùn)行時(shí)間比較長(zhǎng),則會(huì)影響下一個(gè)subscribe方法的執(zhí)行,因此采用AsyncEventBus是另外一個(gè)比較好的選擇。
總結(jié)
以上是生活随笔為你收集整理的Java Review - Java进程内部的消息中间件_Event Bus设计模式的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Java Review - 并发编程_
- 下一篇: Java Review - 使用Even