基于SpringBoot+Netty实现即时通讯(IM)功能
生活随笔
收集整理的這篇文章主要介紹了
基于SpringBoot+Netty实现即时通讯(IM)功能
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
簡單記錄一下實現的整體框架,具體細節在實際生產中再細化就可以了。
第一步 引入netty依賴
SpringBoot的其他必要的依賴像Mybatis、Lombok這些都是老生常談了 就不在這里放了
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.85.Final</version>
</dependency>
第二步 接下來就是準備工作。
消息服務類(核心代碼) 聊天服務的功能就是靠這個類的start()函數來啟動的 綁定端口8087 之后可以通socket協議訪問這個端口來執行通訊
import com.bxt.demo.im.handler.WebSocketHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; /**
* @Description: 即時通訊服務類
* @author: bhw
* @date: 2023年09月27日 13:44
*/
@Slf4j
public class IMServer {
// 用來存放連入服務器的用戶集合
public static final Map<String, Channel> USERS = new ConcurrentHashMap<>(1024);
// 用來存放創建的群聊連接
public static final ChannelGroup GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); public static void start() throws InterruptedException {
log.info("IM服務開始啟動");
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup(); // 綁定端口
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 添加http編碼解碼器
pipeline.addLast(new HttpServerCodec())
//支持大數據流
.addLast(new ChunkedWriteHandler())
// 對http消息做聚合操作 FullHttpRequest FullHttpResponse
.addLast(new HttpObjectAggregator(1024*64))
//支持websocket
.addLast(new WebSocketServerProtocolHandler("/"))
.addLast(new WebSocketHandler());
}
}); ChannelFuture future = bootstrap.bind(8087).sync();
log.info("服務器啟動開始監聽端口: {}", 8087);
future.channel().closeFuture().sync();
//關閉主線程組
bossGroup.shutdownGracefully();
//關閉工作線程組
workGroup.shutdownGracefully();
} }
創建聊天消息實體類
/**
* @Description: 聊天消息對象 可以自行根據實際業務擴展
* @author: seizedays
*/
@Data
public class ChatMessage extends IMCommand {
//消息類型
private Integer type;
//消息目標對象
private String target;
//消息內容
private String content; }
連接類型枚舉類,暫時定義為建立連接、發送消息和加入群組三種狀態碼
@AllArgsConstructor
@Getter
public enum CommandType { //建立連接
CONNECT(10001),
//發送消息
CHAT(10002),
//加入群聊
JOIN_GROUP(10003),
ERROR(-1)
; private Integer code; public static CommandType match(Integer code){
for (CommandType value : CommandType.values()) {
if (value.code.equals(code)){
return value;
}
}
return ERROR;
} }
命令動作為聊天的時候 消息類型又劃分為私聊和群聊兩種 枚舉類如下:
@AllArgsConstructor
@Getter
public enum MessageType { //私聊
PRIVATE(1),
//群聊
GROUP(2),
ERROR(-1)
;
private Integer type; public static MessageType match(Integer code){
for (MessageType value : MessageType.values()) {
if (value.type.equals(code)){
return value;
}
}
return ERROR;
} }
創建連接請求的攔截器
import com.alibaba.fastjson2.JSON;
import com.bxt.common.vo.Result;
import com.bxt.demo.im.cmd.IMCommand;
import com.bxt.demo.im.server.IMServer;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; /**
* @Description: 用戶連接到服務端的攔截器
* @author: bhw
* @date: 2023年09月27日 14:28
*/
public class ConnectionHandler {
public static void execute(ChannelHandlerContext ctx, IMCommand command) {
if (IMServer.USERS.containsKey(command.getNickName())) {
ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error(command.getNickName() + "已經在線,不能重復連接"))));
ctx.channel().disconnect();
return;
} IMServer.USERS.put(command.getNickName(), ctx.channel()); ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success("系統消息:" + command.getNickName() + "與服務端連接成功")))); ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success(JSON.toJSONString(IMServer.USERS.keySet())))));
}
}
加入群組功能的攔截器
/**
* @Description: 加入群聊攔截器
* @author: bhw
* @date: 2023年09月27日 15:07
*/
public class JoinGroupHandler {
public static void execute(ChannelHandlerContext ctx) {
try {
IMServer.GROUP.add(ctx.channel());
ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success("加入系統默認群組成功!"))));
} catch (Exception e) {
ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("消息內容異常"))));
} }
}
發送聊天到指定對象的功能攔截器
import com.alibaba.excel.util.StringUtils;
import com.alibaba.fastjson2.JSON;
import com.bxt.common.vo.Result;
import com.bxt.demo.im.cmd.ChatMessage;
import com.bxt.demo.im.cmd.MessageType;
import com.bxt.demo.im.server.IMServer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import java.util.Objects; /**
* @Description: 聊天攔截器
* @author: bhw
* @date: 2023年09月27日 15:07
*/
public class ChatHandler {
public static void execute(ChannelHandlerContext ctx, TextWebSocketFrame frame) {
try {
ChatMessage message = JSON.parseObject(frame.text(), ChatMessage.class);
MessageType msgType = MessageType.match(message.getType()); if (msgType.equals(MessageType.PRIVATE)) {
if (StringUtils.isBlank(message.getTarget())){
ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("系統消息:消息發送失敗,請選擇消息發送對象"))));
return;
}
Channel channel = IMServer.USERS.get(message.getTarget());
if (Objects.isNull(channel) || !channel.isActive()){
ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("系統消息:消息發送失敗,對方不在線"))));
IMServer.USERS.remove(message.getTarget());
return;
}
channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success("私聊消息(" + message.getTarget() + "):" + message.getContent())))); } else if (msgType.equals(MessageType.GROUP)) {
IMServer.GROUP.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success("群消息:發送者(" + message.getNickName() + "):" + message.getContent()))));
}else {
ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("系統消息:不支持的消息類型"))));
} } catch (Exception e) {
ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("消息內容異常"))));
} }
}
最后是websocket攔截器 接收到客戶端的指令后選擇對應的攔截器實現相應的功能:
import com.alibaba.fastjson2.JSON;
import com.bxt.common.vo.Result;
import com.bxt.demo.im.cmd.CommandType;
import com.bxt.demo.im.cmd.IMCommand;
import com.bxt.demo.im.server.IMServer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j; /**
* @Description: websocket攔截器
* @author: bhw
* @date: 2023年09月27日 13:59
*/
@Slf4j
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { @Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame frame) {
System.out.println(frame.text());
try {
IMCommand command = JSON.parseObject(frame.text(), IMCommand.class);
CommandType cmdType = CommandType.match(command.getCode());
if (cmdType.equals(CommandType.CONNECT)){
ConnectionHandler.execute(ctx, command);
} else if (cmdType.equals(CommandType.CHAT)) {
ChatHandler.execute(ctx,frame);
} else if (cmdType.equals(CommandType.JOIN_GROUP)) {
JoinGroupHandler.execute(ctx);
} else {
ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("不支持的code"))));
}
}catch (Exception e){
ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error(e.getMessage()))));
} } @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 當連接斷開時被調用
Channel channel = ctx.channel();
// 從 USERS Map 中移除對應的 Channel
removeUser(channel);
super.channelInactive(ctx);
} private void removeUser(Channel channel) {
// 遍歷 USERS Map,找到并移除對應的 Channel
IMServer.USERS.entrySet().removeIf(entry -> entry.getValue() == channel);
}
}
第三步 啟動服務
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
// 啟動IM服務
try {
IMServer.start();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} }
現在 客戶端通過socket協議訪問8087端口即可實現基本的聊天室功能了!
總結
以上是生活随笔為你收集整理的基于SpringBoot+Netty实现即时通讯(IM)功能的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: windows平板的开发和选型
- 下一篇: php http build query