Netty中实现多客户端连接与通信-以实现聊天室群聊功能为例(附代码下载)
場景
Netty的Socket編程詳解-搭建服務(wù)端與客戶端并進行數(shù)據(jù)傳輸:
https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/108615023
在此基礎(chǔ)上要實現(xiàn)多個客戶端之間通信,實現(xiàn)類似群聊或者聊天室的功能。
注:
博客:
https://blog.csdn.net/badao_liumang_qizhi
關(guān)注公眾號
霸道的程序猿
獲取編程相關(guān)電子書、教程推送與免費下載。
實現(xiàn)
在上面實現(xiàn)的服務(wù)端與客戶端通信的基礎(chǔ)上,在src下新建com.badao.Char包,包下新建ChatServer類作為聊天室的服務(wù)端。
package com.badao.Chat;import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel;public class ChatServer {public static void main(String[] args) throws? Exception{EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try{ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChatServerInitializer());//綁定端口ChannelFuture channelFuture = serverBootstrap.bind(70).sync();channelFuture.channel().closeFuture().sync();}finally {//關(guān)閉事件組bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}} }在上面中綁定70端口并添加了一個服務(wù)端的初始化器ChatServerInitializer
所以新建類ChatServerInitializer
package com.badao.Chat;import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil;public class ChatServerInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));pipeline.addLast(new ChatServerHandler());} }使其繼承ChannelInitializer,并重寫InitChannel方法,在方法中使用Netty自帶的處理器進行編碼的處理并最后添加一個自定義的處理器ChatServerHandler
新建處理器類ChatServerHandler
package com.badao.Chat;import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor;public class ChatServerHandler extends SimpleChannelInboundHandler<String> {private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {Channel channel = ctx.channel();channelGroup.forEach(ch->{if(channel!=ch){ch.writeAndFlush(channel.remoteAddress()+"發(fā)送的消息:"+msg+"\n");}else{ch.writeAndFlush("[自己]:"+msg+"\n");}});}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();channelGroup.writeAndFlush("[服務(wù)器]:"+channel.remoteAddress()+"加入\n");channelGroup.add(channel);}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();channelGroup.writeAndFlush("[服務(wù)器]:"+channel.remoteAddress()+"離開\n");}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();System.out.println(channel.remoteAddress()+"上線了\n");System.out.println("當(dāng)前在線人數(shù):"+channelGroup.size());}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();System.out.println(channel.remoteAddress()+"下線了\n");System.out.println("當(dāng)前在線人數(shù):"+channelGroup.size());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();} }使處理器繼承SimpleChannelinboundHandler并重寫channelRead0方法。
在最上面聲明了一個通道組的通過 DefaultChannelGroup(GlobalEventExecutor.INSTANCE)
獲取其單例,只要是建立連接的客戶端都會自動添加進此通道組中。
然后只要是客戶端與服務(wù)端發(fā)送消息后就會執(zhí)行該方法。
在此方法中直接遍歷通道組,判斷通道組里面的每一個客戶端是不是當(dāng)前發(fā)消息的客戶端。
如果是就顯示自己發(fā)送消息,如果不是則獲取遠程地址并顯示發(fā)送消息。
然后就是實現(xiàn)客戶端的上線功能以及在線人數(shù)統(tǒng)計的功能。
在上面的處理器中重寫channelActive方法,此方法會在通道激活即建立連接后調(diào)用
??? @Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();System.out.println(channel.remoteAddress()+"上線了\n");System.out.println("當(dāng)前在線人數(shù):"+channelGroup.size());}同理重寫channelInactive方法,此方法會在斷掉連接后調(diào)用
??? @Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();System.out.println(channel.remoteAddress()+"下線了\n");System.out.println("當(dāng)前在線人數(shù):"+channelGroup.size());}然后就是實現(xiàn)向所有的客戶端廣播新建客戶端加入聊天室的功能
重寫handlerAdded方法,此方法會在將通道添加到通道組中調(diào)用,所以在此方法中獲取加入到通道組的遠程地址
并使用channelGroup的writeAndFlush方法就能實現(xiàn)向所有建立連接的客戶端發(fā)送消息,新的客戶端剛上線時不用向自己
發(fā)送上線消息,所以在廣播完上線消息后再講此channel添加到channelGroup中。
??? @Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();channelGroup.writeAndFlush("[服務(wù)器]:"+channel.remoteAddress()+"加入\n");channelGroup.add(channel);}同理實現(xiàn)下線提醒需要重寫handlerRemoved方法
??? @Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();channelGroup.writeAndFlush("[服務(wù)器]:"+channel.remoteAddress()+"離開\n");}但是此方法中不用手動從channelGroup中手動去掉channel,因為Netty會自動將其移除掉。
服務(wù)端搭建完成之后再搭建客戶端,新建ChatClient類并編寫main方法,在main方法中
package com.badao.Chat;import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel;import java.io.BufferedReader; import java.io.InputStreamReader;public class ChatClient {public static void main(String[] args) throws? Exception {EventLoopGroup eventLoopGroup = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new ChatClientInitializer());//綁定端口Channel channel = bootstrap.connect("localhost", 70).channel();BufferedReader br = new BufferedReader(new InputStreamReader(System.in));for(;;){channel.writeAndFlush(br.readLine()+"\r\n");}} finally {//關(guān)閉事件組eventLoopGroup.shutdownGracefully();}} }在客戶端中讀取輸入的內(nèi)容并在一個無限循環(huán)中將輸入的內(nèi)容發(fā)送至服務(wù)端。
在Client中建立對服務(wù)端的連接同理也要設(shè)置一個初始化器ChatClientInitializer
新建初始化器的類ChatClientInitializer
package com.badao.Chat;import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil;public class ChatClientInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));pipeline.addLast(new ChatClientHandler());} }使用Netty自帶的處理器對編碼進行處理并添加一個自定義的處理器ChatClientHandler
新建類ChatClientHandler
package com.badao.Chat;import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler;public class ChatClientHandler extends SimpleChannelInboundHandler<String> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println(msg);} }在重寫的channelRead0方法中只需要將收到的消息進行輸出即可。
現(xiàn)在運行服務(wù)端的main方法
為了能運行多個客戶端在IDEA中客戶端編輯
?
然后將下面的勾選上
?
然后首先運行一個客戶端
?
那么在服務(wù)端中就會輸出上線的客戶端以及在線人數(shù)
再次運行客戶端的main方法,此時服務(wù)端會輸出兩個客戶端上線
?
同時在第二個客戶端上線時第一個客戶端會收到加入的提示
?
此時停掉第二個客戶端即將第二個客戶端下線
服務(wù)端會提示下線并更新在線人數(shù)
同時在第一個客戶端會收到服務(wù)端的推送
?
再運行第二個客戶端,并在控制臺輸入消息,回車發(fā)送
?
此時第一個客戶端就會收到第二個客戶端發(fā)送的消息。
?
然后第一個客戶端再輸入一個消息并回車
?
那么第二個客戶端也能收到消息
?
示例代碼下載:
https://download.csdn.net/download/BADAO_LIUMANG_QIZHI/12850228
與50位技術(shù)專家面對面20年技術(shù)見證,附贈技術(shù)全景圖總結(jié)
以上是生活随笔為你收集整理的Netty中实现多客户端连接与通信-以实现聊天室群聊功能为例(附代码下载)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Netty的Socket编程详解-搭建服
- 下一篇: ProtoBuf在使用protoc进行编