java socket发送定长报文_一个基于TCP协议的Socket通信实例
原標題:一個基于TCP協(xié)議的Socket通信實例
1. 前言
一般接口對接多以http/https或webservice的方式,socket方式的對接比較少并且會有一些難度。正好前段時間完成了一個socket的接口的對接需求,現(xiàn)將實現(xiàn)的思路做一個整理。
2. 需求概述
2.1 需要提供一個socket服務(wù)端,實時接收三方傳遞過來的數(shù)據(jù)
2.2 實時報文規(guī)范說明
2.2.1 通訊及接口格式說明
通訊方式:
通訊采用 TCP 協(xié)議, SOCKET 同步短連接方式。
報文結(jié)構(gòu):
報文為不定長報文,以定長報文頭+不定長報文體的方式
報文基本結(jié)構(gòu)如下圖所示:
報文長度
報文體
6位交易報文長度+交易報文。其中 6 位交易報文長度以 ASCII 碼字符串方式表示(6 個字節(jié)),右對齊,左補 0,不包括自身的長度,表示的是報文體的長度。如“000036fbced3fe-7025-4b5c-9cef-2421cd981f39”, 000036 為長度,“fbced3fe-7025-4b5c-9cef-2421cd981f39”為報文內(nèi)容。
報文結(jié)構(gòu)符合 XML 標準的報文格式,報文以無 BOM 格式的 GBK 編碼。報文根節(jié)點為 Transaction節(jié)點。除非報文里有特殊說明,報文定義的字段都是 Transaction 節(jié)點的子節(jié)點。報文格式參考下節(jié)示例。
2.2.2 報文示例
請求:
000410<?xml version="1.0" encoding="GBK"?>29greerg+4741414141test02018-06-1516:15:00
響應(yīng):
000683<?xml version="1.0" encoding="GBK"?>1OK0c2c002f-ccc6-4c7b-86e1-c7871b1c98b31Message enqueued for sendingSMS-AFFS-000000100+47419155906y06b02hdo001
3 代碼實現(xiàn)
3.1 BIO 阻塞模式
簡單的描述一下BIO的服務(wù)端通信模型:采用BIO通信模型的服務(wù)端,通常由一個獨立的Acceptor線程負責(zé)監(jiān)聽客戶端的連接,它接收到客戶端連接請求之后為每個客戶端分配一個線程進行業(yè)務(wù)邏輯處理,通過輸出流返回應(yīng)答給客戶端,線程銷毀。即典型的請求應(yīng)答模型。
傳統(tǒng)BIO通信模型圖(此圖來源于網(wǎng)絡(luò))
該模型最大的問題就是缺乏彈性伸縮能力,當客戶端并發(fā)訪問量增加后,服務(wù)端的線程個數(shù)和客戶端并發(fā)訪問數(shù)呈1:1的正比關(guān)系, Java中的線程也是比較寶貴的系統(tǒng)資源,線程數(shù)量快速膨脹后,系統(tǒng)的性能將急劇下降,隨著訪問量的繼續(xù)增大,系統(tǒng)最終崩潰。
但是這種模式在一些特定的應(yīng)用場景下效果是最好的,比如只有少量的TCP連接通信,且雙方都非常快速的傳輸數(shù)據(jù),此時這種模式的性能最好,實現(xiàn)比較簡單。
實現(xiàn)代碼如下:
3.1.1 服務(wù)端同步阻塞模式的:
import java.io.*;
import java.net.*;
import java.nio.charset.Charset;
import java.text.NumberFormat;
import javax.annotation.PostConstruct;
public class TCPBlockServer {
// 服務(wù)IP
private final String SERVER_IP = "127.0.0.1";
// 服務(wù)端口
private final int SERVER_PORT = 8888;
private final int BACKLOG = 150;
private final String CHARSET_NAME = "GBK";
@PostConstruct
public void start() throws Exception {
System.out.println("server Socket 啟動 。。。。。。。");
// 這里使用了Java的自動關(guān)閉的語法
try (ServerSocket serverSocket = new ServerSocket(SERVER_PORT, BACKLOG, InetAddress.getByName(SERVER_IP))) {
while (true) {
Socket socket = serverSocket.accept() ;
new Thread(()->handler(socket)).start();
}
}
}
private void handler(Socket socket2) {
String msg = null;
try (Socket socket = socket2 ; InputStream input = socket.getInputStream(); OutputStream out = socket.getOutputStream()) {
msg = receiveMsg(input, socket);
System.out.println("msg:" + msg);
doBusinessLogic(msg,out);
} catch (Exception e) {
e.printStackTrace();
}
}
// 處理業(yè)務(wù)邏輯
private void doBusinessLogic(String msg,OutputStream out) throws Exception {
// todo Business Logic
msg = formatMsg(msg);
out.write(msg.getBytes(CHARSET_NAME));
out.flush();
}
private String formatMsg(String msg) {
byte[] bodyBytes = msg.getBytes(Charset.forName(CHARSET_NAME));
int bodyLength = bodyBytes.length;
NumberFormat numberFormat = NumberFormat.getNumberInstance();
numberFormat.setMinimumIntegerDigits(6);
numberFormat.setGroupingUsed(false);
return numberFormat.format(bodyLength) + msg;
}
private String receiveMsg(InputStream input, Socket socket) throws Exception {
byte[] lengthBytes = new byte[6];
int count = input.read(lengthBytes);
int length = Integer.valueOf(new String(lengthBytes));
byte[] buffer = new byte[length + 2];
int readBytes = 0;
while (readBytes < length) {
count = input.read(buffer, readBytes, length - readBytes);
if (count == -1) {
break;
}
readBytes += count;
}
return new String(buffer, Charset.forName("GBK"));
}
public static void main(String[] args) throws Exception {
TCPBlockServer server = new TCPBlockServer();
server.start();
}
}
3.1.2 服務(wù)端偽異步I/O模型:
上面實現(xiàn)方面存在的一些不足之處:
1:服務(wù)器創(chuàng)建和銷毀工作線程的開銷很大。如果服務(wù)器需要和許多客戶通信,并且與每個客戶的通信時間都很短,那么有可能服務(wù)器為客戶創(chuàng)建新線程的開銷比實際與客戶通信的開銷還大。
2:除了創(chuàng)建和銷毀線程的開銷之外,活動的線程也消耗系統(tǒng)資源。并且每個線程本身也會占用一定的內(nèi)存(每個線程大約需要1MB內(nèi)存),如果同時有大量客戶連接到服務(wù)器,就必須創(chuàng)建大量的工作線程,他們會消耗大量內(nèi)存,可能會導(dǎo)致系統(tǒng)內(nèi)存不足,應(yīng)用產(chǎn)生OOM的錯誤。
3:如果線程數(shù)目固定,并且每個線程都有很長的生命周期,那么線程切換也是相對固定的。不同的操作系統(tǒng)有不同的切換周期,一般在20毫秒左右。這里所說的線程切換是指Java虛擬機,以及底層操作系統(tǒng)的調(diào)度下,線程之間轉(zhuǎn)讓CPU的使用權(quán)。如果頻繁創(chuàng)建和銷毀線程,那么將導(dǎo)致頻繁的切換線程,因為一個線程被銷毀后,必然要把CPU轉(zhuǎn)移給另外一個已經(jīng)就緒的線程,是該線程獲得運行機會。這種情況下,線程間的切換不再遵循系統(tǒng)的固定切換周期,切換線程的開銷甚至比創(chuàng)建及銷毀的開銷還大。
為了改進客戶端訪問就會創(chuàng)建線程的場景,改為由一個線程池去管理固定數(shù)量的線程來執(zhí)行客戶所需業(yè)務(wù)邏輯。實現(xiàn)線程池線程和客戶端 N(N>= 1): M的關(guān)系。如下圖所示:
相關(guān)實現(xiàn)代碼如下,根據(jù)實際場景需要設(shè)置線程池中合適的線程數(shù)量:
import java.io.*;
import java.net.*;
import java.nio.charset.Charset;
import java.text.NumberFormat;
import java.util.concurrent.*;
import javax.annotation.PostConstruct;
public class TCPBlockThreadPoolServer {
// 服務(wù)IP
private final String SERVER_IP = "127.0.0.1";
// 服務(wù)端口
private final int SERVER_PORT = 8888;
private final int BACKLOG = 150;
private final int THREADS = 150 ;
private final String CHARSET_NAME = "GBK";
private ExecutorService executorService ;
@PostConstruct
public void start() throws Exception {
System.out.println("server Socket 啟動 。。。。。。。");
executorService = Executors.newFixedThreadPool(THREADS) ;
// 這里使用了Java的自動關(guān)閉的語法
try (ServerSocket serverSocket = new ServerSocket(SERVER_PORT, BACKLOG, InetAddress.getByName(SERVER_IP))) {
while (true) {
Socket socket = serverSocket.accept() ;
executorService.execute(()->handler(socket));
}
}
}
private void handler(Socket socket2) {
String msg = null;
try (Socket socket = socket2 ; InputStream input = socket.getInputStream(); OutputStream out = socket.getOutputStream()) {
msg = receiveMsg(input, socket);
System.out.println("msg:" + msg);
doBusinessLogic(msg,out);
} catch (Exception e) {
e.printStackTrace();
}
}
// 處理業(yè)務(wù)邏輯
private void doBusinessLogic(String msg,OutputStream out) throws Exception {
// todo Business Logic
msg = formatMsg(msg);
out.write(msg.getBytes(CHARSET_NAME));
out.flush();
}
private String formatMsg(String msg) {
byte[] bodyBytes = msg.getBytes(Charset.forName(CHARSET_NAME));
int bodyLength = bodyBytes.length;
NumberFormat numberFormat = NumberFormat.getNumberInstance();
numberFormat.setMinimumIntegerDigits(6);
numberFormat.setGroupingUsed(false);
return numberFormat.format(bodyLength) + msg;
}
private String receiveMsg(InputStream input, Socket socket) throws Exception {
byte[] lengthBytes = new byte[6];
int count = input.read(lengthBytes);
int length = Integer.valueOf(new String(lengthBytes));
byte[] buffer = new byte[length + 2];
int readBytes = 0;
while (readBytes < length) {
count = input.read(buffer, readBytes, length - readBytes);
if (count == -1) {
break;
}
readBytes += count;
}
return new String(buffer, Charset.forName("GBK"));
}
public static void main(String[] args) throws Exception {
TCPBlockServer server = new TCPBlockServer();
server.start();
}
}
3.1.3 客戶端
簡單的客戶端實現(xiàn)如下:
import java.io.*;
import java.net.Socket;
import java.nio.charset.Charset;
import org.apache.commons.lang3.StringUtils;
public class Client {
public String sendAndRecv(String content, String charsetName,String ip,int port) throws Exception {
try(Socket socket = new Socket(ip,port)){
socket.setTcpNoDelay(true);
socket.setKeepAlive(true);
socket.setSoTimeout(60000);
try(OutputStream output = socket.getOutputStream();InputStream input = socket.getInputStream()){
output.write(content.getBytes(charsetName));
output.flush();
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(input, Charset.forName("GBK")));
StringBuffer buffer = new StringBuffer();
String message = null ;
while((message = bufferedReader.readLine()) != null){
buffer.append(message);
}
return StringUtils.substring(buffer.toString(), 6);
}
}
}
}
3.2 NIO 模式
相對于BIO(阻塞通信)模型來說,NIO模型非常復(fù)雜,以至于花費很大的精力去學(xué)習(xí)也不太容易能夠精通,難以編寫出一個沒有缺陷,高效且適應(yīng)各種意外情況的穩(wěn)定的NIO通信模塊。之所以有這樣的問題,是因為NIO編程不是單純的一個技術(shù)點,而是涵蓋了一系列的相關(guān)技術(shù)、專業(yè)知識、編程經(jīng)驗和編程技巧的復(fù)雜工程,所以精通這些技術(shù)相當有難度。
和BIO相比NIO有如下幾個新的概念:
1. 通道(Channel)
Channel對應(yīng)BIO中Stream的模型,到任何目的地(或來自任何地方)的所有數(shù)據(jù)都必須通過一個Channel對象。但是Channel和Stream不同的地方在于,Channel是雙向的而Stream是單向的(分為InputStream和OutputStream),所以Channel可以用于讀/寫,或同時用于讀寫。
2. 緩沖區(qū)(Buffer)
雖然Channel用于讀寫數(shù)據(jù),但是我們不能直接操作Channel進行讀寫,必須通過緩沖區(qū)來完成(Buffer)。NIO設(shè)計了一個全新的數(shù)據(jù)結(jié)構(gòu)Buffer,具體的緩存區(qū)有這些:ByteBuffe、CharBuffer、 ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer等。
Buffer中有3個重要的參數(shù):位置(position)、容量(capactiy)和上限(limit)
參數(shù)
寫模式
讀模式
位置(position)
當前緩沖區(qū)的位置,將從position的下一個位置寫數(shù)據(jù)
當前緩存區(qū)讀取的位置,將從此位置后讀取數(shù)據(jù)。
容量(capacity)
緩存區(qū)總?cè)萘康纳舷?/p>
緩存區(qū)總?cè)萘康纳舷?/p>
上限(limit)
緩存區(qū)實際上限,它總是小于等于容量。通常情況下和容量相等
代表可讀取的總?cè)萘?#xff0c;和上次寫入的容量相等。
3. 選擇器(Selector)
Selector 可以同時檢測多個Channel的事件以實現(xiàn)異步I/O,我們可以將感興趣的事件注冊到Selector上面,當事件發(fā)生時可以通過Selector獲取事件發(fā)生的Channel,并進行相關(guān)的事件處理操作。一個Selector可以同時輪詢多個Channel。
3.2.1 服務(wù)端
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.text.NumberFormat;
import java.util.Iterator;
import javax.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TCPNioServer {
// 服務(wù)IP
private final String SERVER_IP = "127.0.0.1";
// 服務(wù)端口
private final int SERVER_PORT = 8888;
private final int BACKLOG = 150;
private final String CHARSET_NAME = "GBK";
private Selector selector;
public TCPNioServer() throws Exception {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// 設(shè)置通道為非阻塞
serverChannel.configureBlocking(false);
// 將該通道所對應(yīng)的serverSocket綁定到指定的ip和port端口
InetAddress inetAddress = InetAddress.getByName(SERVER_IP);
serverChannel.socket().bind(new InetSocketAddress(inetAddress, SERVER_PORT), BACKLOG);
// 獲得一個通道管理器(選擇器)
selector = Selector.open();
/*
* 將通道管理器和該通道綁定,并為該通道注冊selectionKey.OP_ACCEPT事件
* 注冊該事件后,當事件到達的時候,selector.select()會返回, 如果事件沒有到達selector.select()會一直阻塞
*/
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
}
/**
* 采用輪詢的方式監(jiān)聽selector上是否有需要處理的事件,如果有,進行處理
*/
@PostConstruct
public void start() throws Exception {
log.info("==start server ip {} , port {}. ==", SERVER_IP, SERVER_PORT);
while (true) {
selector.select();//此方法會阻塞,直到至少有一個以注冊的事件被觸發(fā)
//獲取發(fā)生事件的SelectionKey集合
Iterator iterator = this.selector.selectedKeys().iterator();
while (iterator.hasNext()) {
try {
SelectionKey selectedKey = iterator.next();
if (selectedKey.isValid()) { // 如果key的狀態(tài)是有效的
if (selectedKey.isAcceptable()) { //如key是阻塞狀態(tài),調(diào)用accept()方法
accept(selectedKey);
}
if (selectedKey.isReadable()) { //如key是可讀狀態(tài),調(diào)用handle()方法
handle(selectedKey);
}
}
} catch (Exception e) {
iterator.remove();
} finally {
iterator.remove();//從集合中移除,避免重復(fù)處理
}
}
}
}
private void accept(SelectionKey key) throws IOException {
// 1 獲取服務(wù)器通道
ServerSocketChannel server = (ServerSocketChannel) key.channel();
// 2 執(zhí)行阻塞方法
SocketChannel chennel = server.accept();
// 3 設(shè)置阻塞模式為非阻塞
chennel.configureBlocking(false);
// 4 注冊到多路復(fù)用選擇器上,并設(shè)置讀取標識
chennel.register(selector, SelectionKey.OP_READ);
}
private void handle(SelectionKey key) throws Exception {
// 獲取之前注冊的SocketChannel通道
try (SocketChannel channel = (SocketChannel) key.channel()) {
int length = getMsgLength(key, channel);
String msg = recvMsg(key, channel, length);
System.out.println("Server:" + msg);
doBusinessLogic(msg, channel);
}
}
private byte[] read(SelectionKey key, SocketChannel channel,int capacity) throws Exception {
ByteBuffer buffer = ByteBuffer.allocate(capacity);
channel.read(buffer);
// 將channel中的數(shù)據(jù)放入buffer中
int count = channel.read(buffer);
if (count == -1) { // == -1表示通道中沒有數(shù)據(jù)
key.channel().close();
key.cancel();
return null;
}
// 讀取到了數(shù)據(jù),將buffer的position復(fù)位到0
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
// 將buffer中的數(shù)據(jù)寫入byte[]中
buffer.get(bytes);
return bytes ;
}
private int getMsgLength(SelectionKey key, SocketChannel channel) throws Exception {
byte[] bytes = this.read(key, channel, 6) ;
String length = new String(bytes, CHARSET_NAME);
return new Integer(length);
}
private String recvMsg(SelectionKey key, SocketChannel channel,int msgLength) throws Exception{
byte[] bytes = this.read(key, channel, msgLength) ;
return new String(bytes, CHARSET_NAME);
}
// 處理業(yè)務(wù)邏輯
private void doBusinessLogic(String msg, SocketChannel channel) throws Exception {
// todo Business Logic
msg = formatMsg(msg);
ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes(CHARSET_NAME));
channel.write(outBuffer);
}
private String formatMsg(String msg) {
byte[] bodyBytes = msg.getBytes(Charset.forName(CHARSET_NAME));
int bodyLength = bodyBytes.length;
NumberFormat numberFormat = NumberFormat.getNumberInstance();
numberFormat.setMinimumIntegerDigits(6);
numberFormat.setGroupingUsed(false);
return numberFormat.format(bodyLength) + msg;
}
public static void main(String[] args) throws Exception {
TCPNioServer server = new TCPNioServer();
server.start();
}
}
3.3 AIO模式
與NIO不同,當進行讀寫操作時,只須直接調(diào)用API的read或write方法即可。這兩種方法均為異步的,對于讀操作而言,當有流可讀取時,操作系統(tǒng)會將可讀的流傳入read方法的緩沖區(qū),并通知應(yīng)用程序;對于寫操作而言,當操作系統(tǒng)將write方法傳遞的流寫入完畢時,操作系統(tǒng)主動通知應(yīng)用程序。 即可以理解為,read/write方法都是異步的,完成后會主動調(diào)用回調(diào)函數(shù)。 在JDK1.7中,這部分內(nèi)容被稱作NIO2,主要在java.nio.channels包下增加了下面四個異步通道:
AsynchronousSocketChannel
對應(yīng)BIO中的ServerSocket和NIO中的ServerSocketChannel,用于server端網(wǎng)絡(luò)程序
AsynchronousServerSocketChannel
對應(yīng)BIO中的Socket和NIO中的SocketChannel,用于client端網(wǎng)絡(luò)應(yīng)用
AsynchronousFileChannel
AsynchronousDatagramChannel
異步channel API提供了兩種方式監(jiān)控/控制異步操作(connect,accept, read,write等)。
第一種方式是返回java.util.concurrent.Future對象, 檢查Future的狀態(tài)可以得到操作是完成還是失敗,還是進行中, future.get會阻塞當前進程。
第二種方式為操作提供一個回調(diào)參數(shù)java.nio.channels.CompletionHandler,這個回調(diào)類包含completed,failed兩個方法。channel的每個I/O操作都為這兩種方式提供了相應(yīng)的方法, 你可以根據(jù)自己的需要選擇合適的方式編程。
下面的例子中在accept和read方法中使用了回調(diào)CompletionHandler的方式,而發(fā)送數(shù)據(jù)(write)使用了future的方式,當然write也可以采用回調(diào)CompletionHandler的方式。因為CompletionHandler是完全異步的,所以需要在mian方法中使用一個 while循環(huán)確保程序不退出,或者也可以在start方法的最后使用channelGroup.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
3.3.1 服務(wù)端
import java.io.*;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.text.NumberFormat;
import java.util.concurrent.*;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TCPAioServer {
// 服務(wù)IP
private final String SERVER_IP = "127.0.0.1";
// 服務(wù)端口
private final int SERVER_PORT = 8888;
private final int BACKLOG = 150;
private final String CHARSET_NAME = "GBK";
private ExecutorService executorService;
private AsynchronousChannelGroup channelGroup;
private AsynchronousServerSocketChannel serverSocketChannel;
public void start() throws IOException, Exception {
// 創(chuàng)建線程池
executorService = Executors.newCachedThreadPool();
// 創(chuàng)建線程組
channelGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);
// 創(chuàng)建服務(wù)器通道
serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);
// 綁定地址
InetAddress inetAddress = InetAddress.getByName(SERVER_IP);
serverSocketChannel.bind(new InetSocketAddress(inetAddress, SERVER_PORT), BACKLOG);
log.info("server start, ip: {} , port:{}", SERVER_IP, SERVER_PORT);
serverSocketChannel.accept(this, new ServerCompletionHandler());
//channelGroup.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
}
class ServerCompletionHandler implements CompletionHandler {
@Override
public void completed(AsynchronousSocketChannel channel, TCPAioServer attachment) {
try {
handle(channel);
} finally {
// 當有下一個客戶端接入的時候,直接調(diào)用Server的accept方法,這樣反復(fù)執(zhí)行下去,保證多個客戶端都可以阻塞
serverSocketChannel.accept(attachment, this);
}
}
private void handle(AsynchronousSocketChannel channel) {
ByteBuffer buffer = allocateByteBuffer(channel);
channel.read(buffer, buffer, new CompletionHandler() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
String msg = null;
try {
msg = new String(attachment.array(), CHARSET_NAME);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
log.info("Server 收到客戶端發(fā)送的數(shù)據(jù)為:{}", msg);
doBusinessLogic(msg, channel);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});
}
private ByteBuffer allocateByteBuffer(AsynchronousSocketChannel channel) {
ByteBuffer buffer = ByteBuffer.allocate(6);
try {
channel.read(buffer).get(1000, TimeUnit.SECONDS);
// 讀取到了數(shù)據(jù),將buffer的position復(fù)位到0
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
// 將buffer中的數(shù)據(jù)寫入byte[]中
buffer.get(bytes);
String length = new String(bytes, CHARSET_NAME);
buffer = ByteBuffer.allocate(new Integer(length));
} catch (InterruptedException | ExecutionException | TimeoutException | UnsupportedEncodingException e1) {
e1.printStackTrace();
}
return buffer;
}
// 處理業(yè)務(wù)邏輯
private void doBusinessLogic(String msg, AsynchronousSocketChannel result) {
try (AsynchronousSocketChannel channel = result) {
msg = formatMsg(msg);
byte[] bodyBytes = msg.getBytes(Charset.forName(CHARSET_NAME));
ByteBuffer buffer = ByteBuffer.allocate(bodyBytes.length);
buffer.put(bodyBytes);
buffer.flip();
channel.write(buffer).get();
} catch (Exception e) {
e.printStackTrace();
}
}
private String formatMsg(String msg) {
byte[] bodyBytes = msg.getBytes(Charset.forName(CHARSET_NAME));
int bodyLength = bodyBytes.length;
NumberFormat numberFormat = NumberFormat.getNumberInstance();
numberFormat.setMinimumIntegerDigits(6);
numberFormat.setGroupingUsed(false);
return numberFormat.format(bodyLength) + msg;
}
@Override
public void failed(Throwable exc, TCPAioServer attachment) {
exc.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
TCPAioServer server = new TCPAioServer();
server.start();
while (true) {
Thread.sleep(1000);
}
}
}
目前Linux上的AIO實現(xiàn)主要有兩種:Posix AIO 與Kernel Native AIO,前者是用戶態(tài)實現(xiàn)的,而后者是內(nèi)核態(tài)實現(xiàn)的。所以Kernel Native AIO的性能和前景要好于他的前輩Posix AIO,比較有名的的軟件如Nginx,MySQL等在高版本中都有支持Kernel Native AIO,但是只應(yīng)用在少部分功能中。因為當下Linux的AIO實現(xiàn)還不是很完美,充斥著各種Bug,并且AIO Socket 還并非真正的異步I/O機制,使用AIO所帶來的性能提升也不太明顯,穩(wěn)定性并非十分可靠,如是Kernel Native AIO引起的問題,解決的難度會非常大。但是AIO是未來的發(fā)展方向,需要我們持續(xù)的關(guān)注。
3.4 開源框架Netty實現(xiàn)的Socket服務(wù)
Netty是一個高性能、異步事件驅(qū)動的NIO框架,它提供了對TCP、UDP和文件傳輸?shù)闹С?#xff0c;作為一個異步NIO框架,Netty的所有IO操作都是異步非阻塞的,通過Future-Listener機制,用戶可以方便的主動獲取或者通過通知機制獲得IO操作結(jié)果。作為當前最流行的NIO框架,Netty在互聯(lián)網(wǎng)領(lǐng)域、大數(shù)據(jù)分布式計算領(lǐng)域、游戲行業(yè)、通信行業(yè)等獲得了廣泛的應(yīng)用,一些業(yè)界著名的開源軟件也基于Netty的NIO框架構(gòu)建,如Spark、RocketMQ、Dubbo、Elasticsearch等等。
Netty的優(yōu)點
1、API使用簡單,有豐富的例子,開發(fā)門檻低。
2、功能強大,預(yù)置了多種編解碼功能,支持多種主流協(xié)議。
3、定制功能強,可以通過ChannelHandler對通信框架進行靈活的擴展。
4、性能高,通過與其他業(yè)界主流的NIO框架對比,Netty綜合性能最優(yōu)。
5、成熟、穩(wěn)定,Netty修復(fù)了已經(jīng)發(fā)現(xiàn)的NIO所有BUG。
6、社區(qū)活躍。
7、經(jīng)歷了很多商用項目的考驗。
3.4.1 服務(wù)端(Netty4.X)
import java.nio.ByteOrder;
import java.nio.charset.Charset;
import java.text.NumberFormat;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.*;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.string.*;
import io.netty.handler.logging.*;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class NettySocketServer {
private final String CHARSET_NAME = "GBK";
private final int bosscount = 2;
private final int workerCount = 8;
private final int tcpPort = 8888;
private final int backlog = 100;
private final int receiveBufferSize = 1048576;
private ServerBootstrap serverBootstrap;
private ChannelFuture serverChannelFuture;
public NamedThreadFactory bossThreadFactory() {
return new NamedThreadFactory("Server-Worker");
}
public NioEventLoopGroup bossGroup() {
return new NioEventLoopGroup(bosscount, bossThreadFactory());
}
public NamedThreadFactory workerThreadFactory() {
return new NamedThreadFactory("Server-Worker");
}
public NioEventLoopGroup workerGroup() {
return new NioEventLoopGroup(workerCount, workerThreadFactory());
}
public ServerBootstrap bootstrap() {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup(), workerGroup()).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, backlog)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("logging", new LoggingHandler(LogLevel.ERROR))
.addLast("stringEncoder", new StringEncoder(Charset.forName("GBK")))
.addLast("frameDecoder", new MsgLengthFieldBasedFrameDecoder(receiveBufferSize, 0, 6, 0, 6))
.addLast("stringDecoder", new StringDecoder(Charset.forName("GBK")))
.addLast("messageHandler", new ServerMessageHandler());
}
});
return bootstrap;
}
@PostConstruct
public void start() throws Exception {
serverBootstrap = bootstrap();
serverChannelFuture = serverBootstrap.bind(tcpPort).sync();
log.info("Starting server at tcpPort {}" , tcpPort);
}
@PreDestroy
public void stop() throws Exception {
serverChannelFuture.channel().closeFuture().sync();
}
static class NamedThreadFactory implements ThreadFactory {
public static AtomicInteger counter = new AtomicInteger(1);
private String name = this.getClass().getName();
private boolean deamon ;//守護線程
private int priority ; //線程優(yōu)先級
public NamedThreadFactory(String name){
this(name, false);
}
public NamedThreadFactory(String name,boolean deamon){
this(name, deamon, -1);
}
public NamedThreadFactory(String name,boolean deamon,int priority){
this.name = name ;
this.deamon = deamon ;
this.priority = priority ;
}
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r,name+"["+counter.getAndIncrement()+"]");
thread.setDaemon(deamon);
if(priority != -1){
thread.setPriority(priority);
}
return thread;
}
}
//拆包
class MsgLengthFieldBasedFrameDecoder extends LengthFieldBasedFrameDecoder {
/**
* @param maxFrameLength 解碼時,處理每個幀數(shù)據(jù)的最大長度
* @param lengthFieldOffset 該幀數(shù)據(jù)中,存放該幀數(shù)據(jù)的長度的數(shù)據(jù)的起始位置
* @param lengthFieldLength 記錄該幀數(shù)據(jù)長度的字段本身的長度
* @param lengthAdjustment 修改幀數(shù)據(jù)長度字段中定義的值,可以為負數(shù)
* @param initialBytesToStrip解析的時候需要跳過的字節(jié)數(shù)
*/
public MsgLengthFieldBasedFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,
int lengthAdjustment, int initialBytesToStrip) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
}
@Override
protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) {
if(length == 6){
buf = buf.order(order);
byte[] lengthBytes = new byte[6];
buf.readBytes(lengthBytes);
buf.resetReaderIndex();
return Integer.valueOf(new String(lengthBytes));
} else {
return super.getUnadjustedFrameLength(buf, offset, length, order);
}
}
}
class ServerMessageHandler extends ChannelInboundHandlerAdapter {
/**
* 功能:讀取服務(wù)器發(fā)送過來的信息
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof String) {
try {
doBusinessLogic(ctx,(String)msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
}
// 處理業(yè)務(wù)邏輯
private void doBusinessLogic(ChannelHandlerContext ctx,String msg) throws Exception {
// todo Business Logic
msg = formatMsg(msg);
ctx.channel().writeAndFlush(msg).addListener(ChannelFutureListener.CLOSE);
}
private String formatMsg(String msg) {
byte[] bodyBytes = msg.getBytes(Charset.forName(CHARSET_NAME));
int bodyLength = bodyBytes.length;
NumberFormat numberFormat = NumberFormat.getNumberInstance();
numberFormat.setMinimumIntegerDigits(6);
numberFormat.setGroupingUsed(false);
return numberFormat.format(bodyLength) + msg;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
public static void main(String[] args) throws Exception{
NettySocketServer server = new NettySocketServer();
server.start();
}
}
總結(jié)
同步阻塞IO
偽異步IO
非阻塞IO
異步IO
Netty的非阻塞IO
客戶端:服務(wù)端
1:1
N:M(M>=1)
N:M(M>=1,單線程非阻塞,多線程非阻塞)
N:0(不需要啟動額外的IO線程,被動回調(diào))
N:M(M>=1)
IO類型
BIO
BIO
NIO
AIO
NIO
API使用難度
簡單
簡單
非常復(fù)雜
復(fù)雜
簡單
可靠性
相當差
差
高
高
高+
吞吐量
低
中
高
高
高+
并發(fā)
低
中
高
高
高+
參考文獻
▲http://www.ibm.com/developerworks/cn/linux/l-async/
▲http://openjdk.java.net/projects/nio/presentations/TS-4222.pdf
▲http://blog.csdn.net/anxpp/article/details/51512200
▲Netty權(quán)威指南
▲Asynchronous I/O Tricks and Tips
責(zé)任編輯:
總結(jié)
以上是生活随笔為你收集整理的java socket发送定长报文_一个基于TCP协议的Socket通信实例的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: qt调用mysql调用了存储过_Qt调用
- 下一篇: python最常用的版本、也称为clas