Mina网络通信框架
認識 Mina
Apache Mina Server 是一個網絡通信應用框架,與 Netty 出自同一作者,Netty 借鑒了部分 Mina 的設計思路。
Mina 主要是對基于 TCP/IP、UDP/IP 協議棧的通信框架,Mina 可以幫助我們快速開發高性能、高擴展性的網絡通信應用,Mina 提供了事件驅動、異步操作的編程模型,Mina 的異步 IO 默認使用的是 JAVA NIO(New IO)作為底層支持,基于 Channel 的雙向通道。Mina 主要有1.x 和2.x 兩個分支。Mina 同時提供了網絡通信的 Server 端、Client 端的封裝,無論是哪端,Mina 在整個網通通信結構中提供了一系列接口 API,Mina 的 API 將真正的網絡通信與我們的應用程序隔離開來。
Mina 的底層依賴的主要是 Java NIO 庫,上層提供的是基于事件的異步接口。其整體的結構如下:
- IoService:最底層的是 IOService,負責具體的 IO 相關工作。這一層的典型代表有 IOSocketAcceptor 和 IOSocketChannel,分別對應 TCP 協議下的服務端和客戶端的 IOService。IOService 的意義在于隱藏底層 IO 的細節,對上提供統一的基于事件的異步 IO 接口。每當有數據到達時,IOService 會先調用底層 IO 接口讀取數據,封裝成 IoBuffer,之后以事件的形式通知上層代碼,從而將 Java NIO 的同步 IO 接口轉化成了異步 IO。所以從圖上看,進來的 low-level IO 經過 IOService 層后變成 IO Event。具體的代碼可以參考 org.apache.mina.core.polling.AbstractPollingIoProcessor 的私有內部類 Processor。
- IoProcessor:這個接口在另一個線程上,負責檢查是否有數據在通道上讀寫,也就是說它也擁有自己的 Selector,這是與我們使用 JAVA NIO 編碼時的一個不同之處,通常在 JAVA NIO 編碼中,我們都是使用一個 Selector,也就是不區分 IoService 與 IoProcessor 兩個功能接口。另外,IoProcessor 負責調用注冊在 IoService 上的過濾器,并在過濾器鏈之后調用 IoHandler。
- IoFilter:這個接口定義一組攔截器,這些攔截器可以包括日志輸出、黑名單過濾、數據的編碼(write 方向)與解碼(read 方向)等功能,其中數據的 encode 與decode 是最為重要的、也是你在使用 Mina 時最主要關注的地方。
- IoHandler:這個接口負責編寫業務邏輯,也就是接收、發送數據的地方。需要有開發者自己來實現這個接口。IoHandler 可以看成是 Mina 處理流程的終點,每個 IoService 都需要指定一個 IoHandler。
- IoSession:是對底層連接(服務器與客戶端的特定連接,該連接由服務器地址、端口以及客戶端地址、端口來決定)的封裝,一個 IoSession 對應于一個底層的 IO 連接(在 Mina 中 UDP 也被抽象成了連接)。通過 IoSession,可以獲取當前連接相關的上下文信息,以及向遠程 peer 發送數據。發送數據其實也是個異步的過程。發送的操作首先會逆向穿過 IoFilterChain,到達 IoService。但 IoService 上并不會直接調用底層 IO 接口來將數據發送出去,而是會將該次調用封裝成一個 WriteRequest,放入 session 的 writeRequestQueue 中,最后由 IoProcessor 線程統一調度 flush 出去。所以發送操作并不會引起上層調用線程的阻塞。具體代碼可以參考 org.apache.mina.core.filterchain.DefaultIoFilterChain 的內部類 HeadFilter 的 filterWrite 方法。
服務端流程:
- 通過 SocketAcceptor 同客戶端建立連接
- 連接建立之后 I/O 的讀寫交給了 I/O Processor 線程,I/O Processor 是多線程的
- 通過 I/O Processor 讀取的數據經過 IoFilterChain 里所有配置的 IoFilter,IoFilter 進行消息的過濾,格式的轉換,在這個層面可以制定一些自定義的協議
- 最后 IoFilter 將數據交給 Handler 進行業務處理,完成了整個讀取的過程
寫入過程也是類似,只是剛好倒過來,通過 IoSession.write 寫出數據,然后 Handler 進行寫入的業務處理,處理完成后交給 IoFilterChain,進行消息過濾和協議的轉換,最后通過 I/O Processor 將數據寫出到 socket 通道。
簡單的 TCPServer
第一步:編寫 IoService
IoAcceptor acceptor = new NioSocketAcceptor(); acceptor.getSessionConfig().setReadBufferSize(2048); acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);acceptor.bind(new InetSocketAddress(9124));第二步:編寫過濾器
// 編寫過濾器 acceptor.getFilterChain().addLast("codec",new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"),LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue())) );第三步:編寫 IoHandler
import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IoSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory;public class TCPServerHandler extends IoHandlerAdapter {@Overridepublic void messageReceived(IoSession session, Object message) throws Exception {}@Overridepublic void sessionCreated(IoSession session) throws Exception {super.sessionCreated(session);}@Overridepublic void sessionOpened(IoSession session) throws Exception {super.sessionOpened(session);}@Overridepublic void sessionClosed(IoSession session) throws Exception {super.sessionClosed(session);} }把這個 IoHandler 注冊到 IoService:
acceptor.setHandler(new TCPServerHandler());當然這段代碼也要在 acceptor.bind() 方法之前執行。完成的代碼:
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.charset.Charset;import org.apache.mina.core.service.IoAcceptor; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.textline.LineDelimiter; import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.transport.socket.nio.NioSocketAcceptor;public class TCPServer {public static void main(String[] args) throws IOException {IoAcceptor acceptor = new NioSocketAcceptor();acceptor.getSessionConfig().setReadBufferSize(2048);acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);// 編寫過濾器acceptor.getFilterChain().addLast("codec",new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"),LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue())));//設置handleracceptor.setHandler(new TCPServerHandler());//綁定端口acceptor.bind(new InetSocketAddress(9124));} }簡單的 TCPClient
第一步:編寫 IoService 并注冊過濾器
import java.net.InetSocketAddress; import java.nio.charset.Charset;import org.apache.mina.core.service.IoConnector; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.textline.LineDelimiter; import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.transport.socket.nio.NioSocketConnector;public class TCPClient {public static void main(String[] args) {IoConnector connector = new NioSocketConnector();connector.setConnectTimeoutMillis(30000);connector.getFilterChain().addLast("codec",new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"),LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue())));connector.setHandler(new TCPClientHandler("你好!\r\n 大家好!")); connector.connect(new InetSocketAddress("localhost", 9124));}}第三步:編寫 IoHandler
import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IoSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory;public class TCPClientHandler extends IoHandlerAdapter {private final String values;public TCPClientHandler(String values) {this.values = values;}@Overridepublic void sessionOpened(IoSession session) {session.write(values);} }注冊 IoHandler:
connector.setHandler(new ClientHandler("你好!\r\n 大家好!"));Mina網絡通信框架
由于傳統的 Socket 網絡編程基于一個線程對應一個客戶端的實現方式,大量的線程創建和銷毀導致性能下降,無法應對高并發量的訪問,所以基于服務器端的網絡通信開發,我們常用 Mina 網絡通信框架,即常說的 Java NIO ( java non-blocking IO ) 開發。
首先,我們來看看 Mina 的幾個重要接口:
IoServiece :這個接口在一個線程上負責套接字的建立,擁有自己的 Selector,監聽是否有連接被建立。 IoProcessor :這個接口在另一個線程上負責檢查是否有數據在通道上讀寫,也就是說它也擁有自己的 Selector, 這是與我們使用 JAVA NIO 編碼時的一個不同之處,通常在 JAVA NIO 編碼中,我們都是使用一個 Selector, 也就是不區分 IoService與 IoProcessor 兩個功能接口。 另外,IoProcessor 負責調用注冊在 IoService 上的過濾器,并在過濾器鏈之后調用 IoHandler。 IoAccepter :相當于網絡應用程序中的服務器端 IoConnector :相當于客戶端 IoSession :當前客戶端到服務器端的一個連接實例 IoHandler :這個接口負責編寫業務邏輯,也就是接收、發送數據的地方。這也是實際開發過程中需要用戶自己編寫的部分代碼。 IoFilter :過濾器用于懸接通訊層接口與業務層接口,這個接口定義一組攔截器,這些攔截器可以包括日志輸 出、黑名單過濾、數據的編碼(write 方向)與解碼(read 方向)等功能,其中數據的 encode與 decode是 最為重要的、也是你在使用 Mina時最主要關注的地方。接著,我們來看看 Mina 的一個重要的類 IoHandlerAdapter,此類僅僅是實現了 IoHandler 接口,但并不做任何處理。
一個 IoHandler 接口中具有如下一些方法(摘自 MINA 的 API 文檔):
下面我們來看看 Mina 服務器網絡通信框架開發流程:
1、導入相關 jar 包: mina-core-2.0.13.jar、slf4j-api-1.7.14.jar
2、創建acceptor,綁定Handler,設置Filter,綁定端口
NioSocketAcceptor acceptor = new NioSocketAcceptor();acceptor.setHandler(new MyServerHandler());// 獲取攔截器,攔截器作用:把字節轉成對象// TextLineCodecFactory 把數據進行加解碼acceptor.getFilterChain().addLast("codec",new ProtocolCodecFilter(new TextLineCodecFactory()));// 5秒鐘服務器和客戶端沒有進行讀寫,則進入空閑狀態Idleacceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 5);//綁定9898端口acceptor.bind(new InetSocketAddress(9898));3、創建自定義的Handler
package com.example.server;import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession;public class MyServerHandler extends IoHandlerAdapter {@Overridepublic void exceptionCaught(IoSession session, Throwable cause)throws Exception {System.out.println("exceptionCaught");}@Overridepublic void messageReceived(IoSession session, Object message)throws Exception {String s = (String) message;System.out.println("messageReceived: " + s);session.write("server replay: " + s);}@Overridepublic void messageSent(IoSession session, Object message) throws Exception {System.out.println("messageSent");}@Overridepublic void sessionClosed(IoSession session) throws Exception {System.out.println("sessionClosed");}@Overridepublic void sessionCreated(IoSession session) throws Exception {System.out.println("sessionCreated");}/*** 會話空閑狀態*/@Overridepublic void sessionIdle(IoSession session, IdleStatus status)throws Exception {System.out.println("sessionIdle");}@Overridepublic void sessionOpened(IoSession session) throws Exception {System.out.println("sessionOpened");}}4、由于默認的 Filter: TextLineCodeFactory 只能接收以換行符為結束的消息,有時為滿足特定需求,需要自定義一個 Factory
package com.example.server;import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFactory; import org.apache.mina.filter.codec.ProtocolDecoder; import org.apache.mina.filter.codec.ProtocolEncoder;public class MyTextLineFactory implements ProtocolCodecFactory {private MyTextLineDecoder mDecoder;private MyTextLineEncoder mEncoder;private MyTextLineCumulativeDecoder mCumulativeDecoder;public MyTextLineFactory() {mDecoder = new MyTextLineDecoder();mEncoder = new MyTextLineEncoder();mCumulativeDecoder = new MyTextLineCumulativeDecoder();}// 加密@Overridepublic ProtocolDecoder getDecoder(IoSession arg0) throws Exception {// return mDecoder;// 解決沒檢測到\n時的數據丟失問題return mCumulativeDecoder;}// 解密@Overridepublic ProtocolEncoder getEncoder(IoSession arg0) throws Exception {return mEncoder;}} package com.example.server;import java.nio.charset.Charset; import java.nio.charset.CharsetEncoder;import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolEncoder; import org.apache.mina.filter.codec.ProtocolEncoderOutput;public class MyTextLineEncoder implements ProtocolEncoder {@Overridepublic void dispose(IoSession session) throws Exception {}@Overridepublic void encode(IoSession session, Object message,ProtocolEncoderOutput output) throws Exception {String s = null;if (message instanceof String) {s = (String) message;}if (s != null) {// 系統默認的EncoderCharsetEncoder charsetEncoder = (CharsetEncoder) session.getAttribute("encoder");if (charsetEncoder == null) {charsetEncoder = Charset.defaultCharset().newEncoder();session.setAttribute("encoder", charsetEncoder);}IoBuffer ioBuffer = IoBuffer.allocate(s.length());ioBuffer.setAutoExpand(true);// 自動擴展ioBuffer.putString(s, charsetEncoder);ioBuffer.flip();output.write(ioBuffer);}} } package com.example.server;import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolDecoder; import org.apache.mina.filter.codec.ProtocolDecoderOutput;public class MyTextLineDecoder implements ProtocolDecoder {@Overridepublic void decode(IoSession session, IoBuffer ioBuffer,ProtocolDecoderOutput output) throws Exception {// 起始位置int startPosition = ioBuffer.position();// 是否還有數據while (ioBuffer.hasRemaining()) {byte b = ioBuffer.get();// 讀取到\nif (b == '\n') {// 當前位置int currentPosition = ioBuffer.position();// 當前總長度,指向末尾int limit = ioBuffer.limit();// 截取行ioBuffer.position(startPosition);ioBuffer.limit(currentPosition);IoBuffer buf = ioBuffer.slice();// 把buf中的數據寫入到destbyte[] dest = new byte[buf.limit()];buf.get(dest);String str = new String(dest);output.write(str);// 還原位置ioBuffer.position(currentPosition);ioBuffer.limit(limit);}}}@Overridepublic void dispose(IoSession arg0) throws Exception {}@Overridepublic void finishDecode(IoSession arg0, ProtocolDecoderOutput arg1)throws Exception {} }5、為了解決數據丟失問題,我們常使用 CumulativeProtocolDecoder
package com.example.server;import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.CumulativeProtocolDecoder; import org.apache.mina.filter.codec.ProtocolDecoderOutput;/*** 處理數據丟失*/ public class MyTextLineCumulativeDecoder extends CumulativeProtocolDecoder {@Overrideprotected boolean doDecode(IoSession ioSession, IoBuffer ioBuffer,ProtocolDecoderOutput output) throws Exception {// 起始位置int startPosition = ioBuffer.position();// 是否還有數據while (ioBuffer.hasRemaining()) {byte b = ioBuffer.get();// 讀取到\nif (b == '\n') {// 當前位置int currentPosition = ioBuffer.position();// 當前總長度,指向末尾int limit = ioBuffer.limit();// 截取行ioBuffer.position(startPosition);ioBuffer.limit(currentPosition);IoBuffer buf = ioBuffer.slice();// 把buf中的數據寫入到destbyte[] dest = new byte[buf.limit()];buf.get(dest);String str = new String(dest);output.write(str);// 還原位置ioBuffer.position(currentPosition);ioBuffer.limit(limit);return true;// 讀取完成}}ioBuffer.position(startPosition);return false;// 讀取未完成} }至此,整個服務器搭建框架就完了,是否比 Socket 開發要簡潔呢?下節筆者將為大家帶來一個完整案例,并給出相應的 jar 包和源碼。
Mina詳解:https://www.cnblogs.com/duanxz/p/5143227.html
總結
以上是生活随笔為你收集整理的Mina网络通信框架的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 基因分子生物学~tRNA,mRNA,蛋白
- 下一篇: 分子生物学之蛋白质与氨基酸