【手写dubbo-2】超详细!netty实现群聊、私聊
文章目錄
- 一、功能背景
- 二、功能描述
- 三、功能架構(gòu)圖
- 四、功能預(yù)覽
- 五、代碼示例
- 5.1、pom引入jar
- 5.2、server端
- 5.3、自定義協(xié)議
- 5.4、客戶端代碼
一、功能背景
????????鞏固netty知識,使用netty完成一個聊天系統(tǒng),通過該聊天系統(tǒng)更加深入的了解netty。設(shè)計知識點:nio、reactor模型、tcp粘包拆包、自定義協(xié)議等等。
二、功能描述
三、功能架構(gòu)圖
四、功能預(yù)覽
server
client1 群發(fā)消息
client2消息接收
client3消息接收
client1發(fā)送消息給client3
client3控制臺打印
五、代碼示例
5.1、pom引入jar
<dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>3.6.1</version></dependency><!-- 阿里JSON解析器 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version></dependency>5.2、server端
????????新建QQChatServer.java文件。主要完成功能:server啟動,設(shè)置監(jiān)聽端口。指定編解碼器、處理器等等。其中有內(nèi)部類QQChatHandler。其主要完成用戶連接成功之后保存所有連接用戶Channel信息并且分配對應(yīng)userId。并且通知客戶端他自己的userId。
import com.alibaba.fastjson.JSONObject; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.util.concurrent.GlobalEventExecutor;import java.util.HashMap; import java.util.Iterator; import java.util.Map;public class QQChatServer {public static void main (String[] args) throws InterruptedException{//單線程接收tcp連接EventLoopGroup bossGroup = new NioEventLoopGroup (1);EventLoopGroup workerGroup = new NioEventLoopGroup ();try{ServerBootstrap serverBootstrap = new ServerBootstrap ();serverBootstrap.group (bossGroup, workerGroup).option (ChannelOption.SO_BACKLOG, 128).childOption (ChannelOption.SO_KEEPALIVE, true).channel (NioServerSocketChannel.class).childHandler (new ChannelInitializer<SocketChannel> (){@Overrideprotected void initChannel (SocketChannel ch) throws Exception{ChannelPipeline pipeline = ch.pipeline ();//設(shè)置編解碼器pipeline.addLast ("qqdecoder", new QQChatDecoder ());pipeline.addLast ("qqencoder", new QQChatEncoder ());//處理器pipeline.addLast ("qqChatHandler", new QQChatHandler ());}});ChannelFuture sync = serverBootstrap.bind (7777).sync ();//監(jiān)聽關(guān)閉sync.channel ().closeFuture ().sync ();}finally{bossGroup.shutdownGracefully ();workerGroup.shutdownGracefully ();}} }class QQChatHandler extends SimpleChannelInboundHandler<QQMessageProtocol>{private static ChannelGroup channelGroup = new DefaultChannelGroup (GlobalEventExecutor.INSTANCE);private static Map<Long,Channel> channelMap = new HashMap<> ();//客戶端連接成功@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {//保存channel信息channelGroup.add (ctx.channel ());//分配userId,因為簡單測試,這里用隨機數(shù)簡單測試double random = Math.random ();Long userId = (long)(random * 100);channelMap.put (userId,ctx.channel ());System.out.println ("userId:"+userId+"上線了。");//返回用戶userId,告知客戶端Content ackCon = new Content ();ackCon.setMsg ("你的userId為:"+userId);QQMessageProtocol ackmsg = new QQMessageProtocol (ackCon);ctx.channel ().writeAndFlush (ackmsg);}//接收客戶端的消息并抓發(fā)@Overrideprotected void channelRead0 (ChannelHandlerContext ctx, QQMessageProtocol buf) throws Exception{//獲取客戶端發(fā)送的信息String s = new String (buf.getContent (), "utf-8");Content content = JSONObject.parseObject (s, Content.class);//如果userId 為0,則認為是群發(fā)if(content.getUserId ()!=null && content.getUserId ().equals (0l)){Iterator<Channel> iterator = channelGroup.iterator ();content.setUserId (null);content.setMsg ("群聊消息:"+content.getMsg ());QQMessageProtocol groupMsg = new QQMessageProtocol (content);while (iterator.hasNext ()){Channel next = iterator.next ();if(next != ctx.channel ()){next.writeAndFlush (groupMsg);}}}else{//私聊Channel channel = channelMap.get (content.getUserId ());channel.writeAndFlush (buf);}} }5.3、自定義協(xié)議
????????QQMessageProtocol.java。主要功能:在數(shù)據(jù)傳輸時,解決tcp粘包、拆包等問題。
import com.alibaba.fastjson.JSONObject; import java.nio.charset.Charset; public class QQMessageProtocol {//數(shù)據(jù)包長度private int len; //數(shù)據(jù)private byte[] content;public int getLen() {return len;}public void setLen(int len) {this.len = len;}public byte[] getContent() {return content;}public void setContent(byte[] content) {this.content = content;}public QQMessageProtocol (){}public QQMessageProtocol (Content content){//獲取content的內(nèi)容,字節(jié)數(shù)信息String s = JSONObject.toJSONString (content);byte[] con = s.getBytes(Charset.forName("utf-8"));int length = s.getBytes(Charset.forName("utf-8")).length;this.content = con;this.len = length;} }Content.java。主要功能:存放真正的信息,私聊對應(yīng)的userId和msg。傳輸時content轉(zhuǎn)成字節(jié)數(shù)組存放在QQMessageProtocol的content中。
public class Content {private Long userId;private String msg;//此處省略get、set方法 }QQChatEncoder.java。主要功能:完成消息的編碼。
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder;public class QQChatEncoder extends MessageToByteEncoder<QQMessageProtocol> {@Overrideprotected void encode(ChannelHandlerContext ctx, QQMessageProtocol msg, ByteBuf out) throws Exception {out.writeInt(msg.getLen());out.writeBytes(msg.getContent());} }QQChatDecoder.java。主要功能:完成消息的解碼。
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ReplayingDecoder; import java.util.List; public class QQChatDecoder extends ReplayingDecoder<Void> {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {//需要將得到二進制字節(jié)碼-> MessageProtocol 數(shù)據(jù)包(對象)int length = in.readInt();byte[] content = new byte[length];in.readBytes(content);//封裝成 MessageProtocol 對象,放入 out, 傳遞下一個handler業(yè)務(wù)處理QQMessageProtocol QQMessageProtocol = new QQMessageProtocol ();QQMessageProtocol.setLen(length);QQMessageProtocol.setContent(content);out.add(QQMessageProtocol);} }5.4、客戶端代碼
QQChatClient.java。主要功能:完成控制臺輸入進行消息傳輸、消息接收打印。
import com.alibaba.fastjson.JSONObject; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.util.Scanner; public class QQChatClient {public static void main (String[] args) throws InterruptedException{EventLoopGroup group = new NioEventLoopGroup ();try{Bootstrap serverBootstrap = new Bootstrap ();serverBootstrap.group (group).channel (NioSocketChannel.class).handler (new ChannelInitializer<SocketChannel> (){@Overrideprotected void initChannel (SocketChannel ch) throws Exception{ChannelPipeline pipeline = ch.pipeline ();pipeline.addLast ("qqdecoder", new QQChatDecoder ());pipeline.addLast ("qqencoder", new QQChatEncoder ());pipeline.addLast ("GroupChatHandler", new QQClientHandler ());}});ChannelFuture sync = serverBootstrap.connect ("127.0.0.1",7777).sync ();Channel channel = sync.channel();System.out.println("-------請輸入發(fā)送信息 {userId},{msg} userId=0時為群發(fā)--------");//客戶端需要輸入信息,創(chuàng)建一個掃描器Scanner scanner = new Scanner(System.in);while (scanner.hasNextLine()) {String msg = scanner.nextLine();Content con = new Content();String[] split = msg.split (",");con.setUserId (Long.parseLong (split[0]));con.setMsg (split[1]);QQMessageProtocol QQMessageProtocol = new QQMessageProtocol (con);//通過channel 發(fā)送到服務(wù)器端channel.writeAndFlush(QQMessageProtocol);}//監(jiān)聽關(guān)閉sync.channel ().closeFuture ().sync ();}finally{group.shutdownGracefully ();}} }class QQClientHandler extends SimpleChannelInboundHandler<QQMessageProtocol> {//打印消息@Overrideprotected void channelRead0(ChannelHandlerContext ctx, QQMessageProtocol msg) throws Exception {String s = new String (msg.getContent (), "utf-8");Content content = JSONObject.parseObject (s, Content.class);String userId = content.getUserId () == null ? "server" : content.getUserId ().toString ();System.out.println(userId+":"+content.getMsg ());} }總結(jié)
以上是生活随笔為你收集整理的【手写dubbo-2】超详细!netty实现群聊、私聊的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: c语言速成pdf,c语言速成_笔记.pd
- 下一篇: js获取当前域名的方法