springboot情操陶冶-web配置(四)
承接前文springboot情操陶冶-web配置(三),本文將在DispatcherServlet應用的基礎上談下websocket的使用
websocket
websocket的簡單了解可見維基百科WebSocket,在筆者看來其大多數應用在web瀏覽器上用于與服務端的持續性通信,并大多用于接收服務器的推送內容
簡單例子
spring很友好的向我們展示了如何在springboot上整合websocket,并給出了一個hello例子。讀者可參照官方例子走一遍便可對websocket有一定的了解。附上官方部分源碼
Controller響應層
package hello;import org.springframework.messaging.handler.annotation.MessageMapping; import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.stereotype.Controller; import org.springframework.web.util.HtmlUtils;@Controller public class GreetingController {@MessageMapping("/hello")@SendTo("/topic/greetings")public Greeting greeting(HelloMessage message) throws Exception {Thread.sleep(1000); // simulated delayreturn new Greeting("Hello, " + HtmlUtils.htmlEscape(message.getName()) + "!");}}客戶端HTML界面
<!DOCTYPE html> <html> <head><title>Hello WebSocket</title><link href="/webjars/bootstrap/css/bootstrap.min.css" rel="stylesheet"><link href="/main.css" rel="stylesheet"><script src="/webjars/jquery/jquery.min.js"></script><script src="/webjars/sockjs-client/sockjs.min.js"></script><script src="/webjars/stomp-websocket/stomp.min.js"></script><script src="/app.js"></script> </head> <body> <noscript><h2 style="color: #ff0000">Seems your browser doesn't support Javascript! Websocket relies on Javascript beingenabled. Please enableJavascript and reload this page!</h2></noscript> <div id="main-content" class="container"><div class="row"><div class="col-md-6"><form class="form-inline"><div class="form-group"><label for="connect">WebSocket connection:</label><button id="connect" class="btn btn-default" type="submit">Connect</button><button id="disconnect" class="btn btn-default" type="submit" disabled="disabled">Disconnect</button></div></form></div><div class="col-md-6"><form class="form-inline"><div class="form-group"><label for="name">What is your name?</label><input type="text" id="name" class="form-control" placeholder="Your name here..."></div><button id="send" class="btn btn-default" type="submit">Send</button></form></div></div><div class="row"><div class="col-md-12"><table id="conversation" class="table table-striped"><thead><tr><th>Greetings</th></tr></thead><tbody id="greetings"></tbody></table></div></div> </div> </body> </html>在閱讀完官方的demo例子之后,讀者務必再去閱覽下WebSocket在springboot的基本概念>>>Web on Servlet Stack。雖然文檔很長,但還是需要理解下其的工作原理,大致上和rabbitmq類似,采取的是訂閱推送的模式。
源碼層分析
筆者優先關注下@EnableWebSocketMessageBroker注解,其用于開啟websocket服務
@Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) @Documented @Import(DelegatingWebSocketMessageBrokerConfiguration.class) public @interface EnableWebSocketMessageBroker {}由上可知,引入的DelegatingWebSocketMessageBrokerConfiguration類用于加載websocket的相關配置。
本文不進行詳細的源碼分析,筆者則會根據上圖的原理圖尋找在springboot中的配置,這樣應該會起到事半功倍的效果。
RequestChannel和ResponseChannel
請求與響應處理通道
@Beanpublic AbstractSubscribableChannel clientInboundChannel() {ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientInboundChannelExecutor());// 攔截器,用戶也可通過WebSocketMessageBrokerConfigurer接口增加攔截器ChannelRegistration reg = getClientInboundChannelRegistration();if (reg.hasInterceptors()) {channel.setInterceptors(reg.getInterceptors());}return channel;}protected final ChannelRegistration getClientInboundChannelRegistration() {if (this.clientInboundChannelRegistration == null) {ChannelRegistration registration = new ChannelRegistration();// 加載請求通道,也可新增攔截器configureClientInboundChannel(registration);registration.interceptors(new ImmutableMessageChannelInterceptor());this.clientInboundChannelRegistration = registration;}return this.clientInboundChannelRegistration;}@Beanpublic AbstractSubscribableChannel clientOutboundChannel() {ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientOutboundChannelExecutor());// 攔截器,用戶也可通過WebSocketMessageBrokerConfigurer接口增加攔截器ChannelRegistration reg = getClientOutboundChannelRegistration();if (reg.hasInterceptors()) {channel.setInterceptors(reg.getInterceptors());}return channel;}不管是請求通道還是響應通道代碼一模一樣,使用的均是ExecutorSubscribableChannel,其內部整合了攔截器和線程池。此類基本是所有channel的公用類,筆者稍微看下里面有什么小花頭
No.1 消息處理ExecutorSubscribableChannel
@Overridepublic boolean sendInternal(Message<?> message, long timeout) {// 消息處理者遍歷for (MessageHandler handler : getSubscribers()) {// 統一由SendTask類處理消息SendTask sendTask = new SendTask(message, handler);if (this.executor == null) {sendTask.run();}else {this.executor.execute(sendTask);}}return true;}這里的消息處理者有直接處理注解的,也有直接返回給BorkerRelay的,讀者可自行去查閱
No.2 消息任務SendTask
@Overridepublic void run() {Message<?> message = this.inputMessage;try {// 應用攔截器message = applyBeforeHandle(message);if (message == null) {return;}// 通過messageHandler來處理消息this.messageHandler.handleMessage(message);triggerAfterMessageHandled(message, null);}catch (Exception ex) {triggerAfterMessageHandled(message, ex);if (ex instanceof MessagingException) {throw (MessagingException) ex;}String description = "Failed to handle " + message + " to " + this + " in " + this.messageHandler;throw new MessageDeliveryException(message, description, ex);}catch (Throwable err) {String description = "Failed to handle " + message + " to " + this + " in " + this.messageHandler;MessageDeliveryException ex2 = new MessageDeliveryException(message, description, err);triggerAfterMessageHandled(message, ex2);throw ex2;}}由此可得出通用的SendTask只是個消息中轉器,最終的消息處理均是由MessageHandler來解決的。看來處理消息的路由核心得繼續往下文分析了
注解方式消息處理器MessageHandler
即解析@MessageMapping/@SendTo等websocket注解的方法,其類似于MVC的@RequestMapping等注解。可見SimpAnnotationMethodMessageHandler類
@Overrideprotected SimpAnnotationMethodMessageHandler createAnnotationMethodMessageHandler() {// 請求和響應通道、broker消息模板return new WebSocketAnnotationMethodMessageHandler(clientInboundChannel(), clientOutboundChannel(), brokerMessagingTemplate());}上述代碼中的broker消息模板主要通過brokerChannel通道將注解方法返回的值經過訂閱關系處理后再傳入響應通道。筆者此處對WebSocketAnnotationMethodMessageHandler作下分步驟的解析
No.1 入參解析器
protected List<HandlerMethodArgumentResolver> initArgumentResolvers() {ApplicationContext context = getApplicationContext();ConfigurableBeanFactory beanFactory = (context instanceof ConfigurableApplicationContext ?((ConfigurableApplicationContext) context).getBeanFactory() : null);List<HandlerMethodArgumentResolver> resolvers = new ArrayList<>();// @Header和@Headers參數注解解析resolvers.add(new HeaderMethodArgumentResolver(this.conversionService, beanFactory));resolvers.add(new HeadersMethodArgumentResolver());// @DestinationVariable注解參數解析resolvers.add(new DestinationVariableMethodArgumentResolver(this.conversionService));// 讀取Principal類型的參數,讀取的為頭部的simpUser屬性resolvers.add(new PrincipalMethodArgumentResolver());// 讀取Message類型的參數resolvers.add(new MessageMethodArgumentResolver(this.messageConverter));resolvers.addAll(getCustomArgumentResolvers());//用戶自定義,可擴展// @Payload注解的參數解析resolvers.add(new PayloadArgumentResolver(this.messageConverter, this.validator));return resolvers;}No.2 反參解析器
@Overrideprotected List<? extends HandlerMethodReturnValueHandler> initReturnValueHandlers() {List<HandlerMethodReturnValueHandler> handlers = new ArrayList<>();// Single-purpose return value typeshandlers.add(new ListenableFutureReturnValueHandler());handlers.add(new CompletableFutureReturnValueHandler());// @SendTo和@SendToUser注解解析器SendToMethodReturnValueHandler sendToHandler =new SendToMethodReturnValueHandler(this.brokerTemplate, true);if (this.headerInitializer != null) {sendToHandler.setHeaderInitializer(this.headerInitializer);}handlers.add(sendToHandler);// @SubscribeMapping注解解析器SubscriptionMethodReturnValueHandler subscriptionHandler =new SubscriptionMethodReturnValueHandler(this.clientMessagingTemplate);subscriptionHandler.setHeaderInitializer(this.headerInitializer);handlers.add(subscriptionHandler);// custom return value typeshandlers.addAll(getCustomReturnValueHandlers());// 默認處理sendToHandler = new SendToMethodReturnValueHandler(this.brokerTemplate, false);sendToHandler.setHeaderInitializer(this.headerInitializer);handlers.add(sendToHandler);return handlers;}No.3 HandlerMethod對象創建
@Overridepublic void afterPropertiesSet() {// 入參和反參解析器配置if (this.argumentResolvers.getResolvers().isEmpty()) {this.argumentResolvers.addResolvers(initArgumentResolvers());}if (this.returnValueHandlers.getReturnValueHandlers().isEmpty()) {this.returnValueHandlers.addHandlers(initReturnValueHandlers());}ApplicationContext context = getApplicationContext();if (context == null) {return;}for (String beanName : context.getBeanNamesForType(Object.class)) {if (!beanName.startsWith(SCOPED_TARGET_NAME_PREFIX)) {Class<?> beanType = null;try {beanType = context.getType(beanName);}catch (Throwable ex) {// An unresolvable bean type, probably from a lazy bean - let's ignore it.if (logger.isDebugEnabled()) {logger.debug("Could not resolve target class for bean with name '" + beanName + "'", ex);}}// 查找被@Controller注解下修飾的帶有@MessageMapping和@SubscribeMapping注解的方法集合并存放至handlerMethods映射集合中if (beanType != null && isHandler(beanType)) {detectHandlerMethods(beanName);}}}}主要是搜索帶有@MessageMapping和@SubscribeMapping注解的方法注冊至MessageHandler對象中的handlerMethods屬性,方便后續對請求的路由
No.4 請求通道注冊SimpAnnotationMethodMessageHandler處理類
@Overridepublic final void start() {synchronized (this.lifecycleMonitor) {// 請求通道注入此處理器this.clientInboundChannel.subscribe(this);this.running = true;}}針對注解方式的消息路由處理我們基本知道了,那這個針對websocket的發過來的請求如何被路由至相應的HandlerMethod中呢?
HandlerMapping
筆者此處找尋下針對websocket的請求的路由
@Beanpublic HandlerMapping stompWebSocketHandlerMapping() {WebSocketHandler handler = decorateWebSocketHandler(subProtocolWebSocketHandler());WebMvcStompEndpointRegistry registry = new WebMvcStompEndpointRegistry(handler, getTransportRegistration(), messageBrokerTaskScheduler());ApplicationContext applicationContext = getApplicationContext();if (applicationContext != null) {registry.setApplicationContext(applicationContext);}registerStompEndpoints(registry);// 返回的是AbstractUrlHandlerMapping的繼承類return registry.getHandlerMapping();}具體的讀者可查詢源碼,內容還是很多的,筆者只知道上述代碼返回的是AbstractUrlHandlerMapping的繼承類,其存儲的urlMap中的key值為websocket的端點路徑,比如/ws-demo/**,而value值則是HttpRequestHandler接口的實現類,其主要處理基于HTTP的websocket請求。
@FunctionalInterface public interface HttpRequestHandler {/*** Process the given request, generating a response.* @param request current HTTP request* @param response current HTTP response* @throws ServletException in case of general errors* @throws IOException in case of I/O errors*/void handleRequest(HttpServletRequest request, HttpServletResponse response)throws ServletException, IOException;}感興趣的讀者可進行深入的研究,其中有包括對ajax/sockJs/handshake等的支持。
消息流
主要是由AbstractHttpReceivingTransportHandler接收客戶端的請求,然后通過StompSubProtocolHandler類將消息發送至ExecutorSubscribableChannel,由其調用sendInternal()方法遍歷注冊的MessageHandlers,由后者去真正的處理消息并回包。具體的代碼邏輯還是很復雜的,筆者此處羅列下注解方式的處理以及響應給客戶端的消息處理
No.1 注解消息處理AbstractMethodMessageHandler
@Overridepublic void handleMessage(Message<?> message) throws MessagingException {// 獲取目的地址String destination = getDestination(message);if (destination == null) {return;}// 確保請求的發過來的地址是指定的前綴,否則消息就會被直接丟棄String lookupDestination = getLookupDestination(destination);if (lookupDestination == null) {return;}MessageHeaderAccessor headerAccessor = MessageHeaderAccessor.getMutableAccessor(message);headerAccessor.setHeader(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, lookupDestination);headerAccessor.setLeaveMutable(true);message = MessageBuilder.createMessage(message.getPayload(), headerAccessor.getMessageHeaders());if (logger.isDebugEnabled()) {logger.debug("Searching methods to handle " +headerAccessor.getShortLogMessage(message.getPayload()) +", lookupDestination='" + lookupDestination + "'");}// 找尋注解進行相應的方法響應handleMessageInternal(message, lookupDestination);headerAccessor.setImmutable();}此處需要注意的是請求的路徑前綴必須是指定的前綴,此前綴可通過WebSocketMessageBrokerConfigurer#configureMessageBroker()方法來設置,如下
@Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {// response destination prefixregistry.enableSimpleBroker("/topic");// request destination prefixregistry.setApplicationDestinationPrefixes("/app");}No.2 注解消息響應處理SimpleBrokerMessageHandler
@Overrideprotected void handleMessageInternal(Message<?> message) {MessageHeaders headers = message.getHeaders();SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers);String destination = SimpMessageHeaderAccessor.getDestination(headers);String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);updateSessionReadTime(sessionId);// 此處確保回包的路徑是以指定的BrokerPath作為前綴,否則則會被丟棄,配置同上if (!checkDestinationPrefix(destination)) {return;}// 針對消息的發送,會根據多個訂閱者進行廣播發送if (SimpMessageType.MESSAGE.equals(messageType)) {logMessage(message);sendMessageToSubscribers(destination, message);}// 連接請求響應else if (SimpMessageType.CONNECT.equals(messageType)) {....}}// 關閉請求響應else if (SimpMessageType.DISCONNECT.equals(messageType)) {....}// 訂閱請求響應else if (SimpMessageType.SUBSCRIBE.equals(messageType)) {...}// 取消訂閱請求響應else if (SimpMessageType.UNSUBSCRIBE.equals(messageType)) {....}}No.3 消息響應處理StompBrokerRelayMessageHandler,其作為真實的處理響應的出處
@Overrideprotected void handleMessageInternal(Message<?> message) {String sessionId = SimpMessageHeaderAccessor.getSessionId(message.getHeaders());..........// 回包路徑,默認以用戶設定的BrokerPath為前綴;不滿足就將包丟棄String destination = stompAccessor.getDestination();if (command != null && command.requiresDestination() && !checkDestinationPrefix(destination)) {return;}// 連接請求if (StompCommand.CONNECT.equals(command)) {if (logger.isDebugEnabled()) {logger.debug(stompAccessor.getShortLogMessage(EMPTY_PAYLOAD));}stompAccessor = (stompAccessor.isMutable() ? stompAccessor : StompHeaderAccessor.wrap(message));stompAccessor.setLogin(this.clientLogin);stompAccessor.setPasscode(this.clientPasscode);if (getVirtualHost() != null) {stompAccessor.setHost(getVirtualHost());}StompConnectionHandler handler = new StompConnectionHandler(sessionId, stompAccessor);this.connectionHandlers.put(sessionId, handler);this.stats.incrementConnectCount();Assert.state(this.tcpClient != null, "No TCP client available");this.tcpClient.connect(handler);}// 關閉請求else if (StompCommand.DISCONNECT.equals(command)) {StompConnectionHandler handler = this.connectionHandlers.get(sessionId);if (handler == null) {if (logger.isDebugEnabled()) {logger.debug("Ignoring DISCONNECT in session " + sessionId + ". Connection already cleaned up.");}return;}stats.incrementDisconnectCount();handler.forward(message, stompAccessor);}else {StompConnectionHandler handler = this.connectionHandlers.get(sessionId);if (handler == null) {if (logger.isDebugEnabled()) {logger.debug("No TCP connection for session " + sessionId + " in " + message);}return;}// 直接調用連接返回,內含sessionId以及訂閱者id等等handler.forward(message, stompAccessor);}}小結
先了解websocket的原理,然后再結合源碼加深對原理的理解,這便是了解一個新技術的必要步驟。筆者此處針對官方的例子作以下小貼士
1.配置websocket的請求響應前綴以及端點配置,務必實現WebSocketMessageBrokerConfigurer接口
2.針對回包處理時,一般我們需要指定路徑,如果采用注解方式,默認情況下@SendTo不指定的時候,會采用用戶設置的回包路徑前綴,比如@MessageMapping("/app/hello")-->/topic/hello。
當然用戶也可以采用SimpMessageTemplate#convertAndSend()方法直接發送至指定的回包路徑
3.客戶端采用sockJs相關API時,其支持通過HTTP/HTTPS協議連接指定的websocket端點,但是務必在訂閱或者發送消息的時候,指定的目的地址必須以/為開頭,否則發送不成功
4.客戶端采用sockJs時,針對發起的subscribe請求作如下總結
// 當服務端采取@SubscribeMapping注解時,則會對/app/subscribe直接請求有返回值 stompClient.subscribe('/app/subscribe', function (greeting) {showGreeting(JSON.parse(greeting.body));});// 當服務端沒有采取@SubsribeMapping注解時,下述代碼則實現對/topic/subscribe的消息接收 stompClient.subscribe('/topic/subscribe', function (greeting) {showGreeting(JSON.parse(greeting.body).content);});5.本文的例子讀者也可訪問該地址獲取,建議了解原理再去閱讀源碼會事半功倍的
轉載于:https://www.cnblogs.com/question-sky/p/9636756.html
總結
以上是生活随笔為你收集整理的springboot情操陶冶-web配置(四)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: css3+jQuery制作导航菜单(带动
- 下一篇: Win强制删除文件windows批处理强