时间:2022-10-01 13:35:31 | 栏目:JAVA代码 | 点击:次
一.导入Netty依赖
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.25.Final</version> </dependency>
二.搭建websocket服务器
@Component public class WebSocketServer { /** * 主线程池 */ private EventLoopGroup bossGroup; /** * 工作线程池 */ private EventLoopGroup workerGroup; /** * 服务器 */ private ServerBootstrap server; /** * 回调 */ private ChannelFuture future; public void start() { future = server.bind(9001); System.out.println("netty server - 启动成功"); } public WebSocketServer() { bossGroup = new NioEventLoopGroup(); workerGroup = new NioEventLoopGroup(); server = new ServerBootstrap(); server.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new WebsocketInitializer()); } }
三.初始化Websocket
public class WebsocketInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // ------------------ // 用于支持Http协议 // ------------------ // websocket基于http协议,需要有http的编解码器 pipeline.addLast(new HttpServerCodec()); // 对写大数据流的支持 pipeline.addLast(new ChunkedWriteHandler()); // 添加对HTTP请求和响应的聚合器:只要使用Netty进行Http编程都需要使用 //设置单次请求的文件的大小 pipeline.addLast(new HttpObjectAggregator(1024 * 64)); //webSocket 服务器处理的协议,用于指定给客户端连接访问的路由 :/ws pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); // 添加Netty空闲超时检查的支持 // 1. 读空闲超时(超过一定的时间会发送对应的事件消息) // 2. 写空闲超时 // 3. 读写空闲超时 pipeline.addLast(new IdleStateHandler(4, 8, 12)); //添加心跳处理 pipeline.addLast(new HearBeatHandler()); // 添加自定义的handler pipeline.addLast(new ChatHandler()); } }
四.创建Netty监听器
@Component public class NettyListener implements ApplicationListener<ContextRefreshedEvent> { @Resource private WebSocketServer websocketServer; @Override public void onApplicationEvent(ContextRefreshedEvent event) { if(event.getApplicationContext().getParent() == null) { try { websocketServer.start(); } catch (Exception e) { e.printStackTrace(); } } } }
五.建立消息通道
public class UserChannelMap { /** * 用户保存用户id与通道的Map对象 */ // private static Map<String, Channel> userChannelMap; /* static { userChannelMap = new HashMap<String, Channel>(); }*/ /** * 定义一个channel组,管理所有的channel * GlobalEventExecutor.INSTANCE 是全局的事件执行器,是一个单例 */ private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); /** * 存放用户与Chanel的对应信息,用于给指定用户发送消息 */ private static ConcurrentHashMap<String,Channel> userChannelMap = new ConcurrentHashMap<>(); private UserChannelMap(){} /** * 添加用户id与channel的关联 * @param userNum * @param channel */ public static void put(String userNum, Channel channel) { userChannelMap.put(userNum, channel); } /** * 根据用户id移除用户id与channel的关联 * @param userNum */ public static void remove(String userNum) { userChannelMap.remove(userNum); } /** * 根据通道id移除用户与channel的关联 * @param channelId 通道的id */ public static void removeByChannelId(String channelId) { if(!StringUtils.isNotBlank(channelId)) { return; } for (String s : userChannelMap.keySet()) { Channel channel = userChannelMap.get(s); if(channelId.equals(channel.id().asLongText())) { System.out.println("客户端连接断开,取消用户" + s + "与通道" + channelId + "的关联"); userChannelMap.remove(s); UserService userService = SpringUtil.getBean(UserService.class); userService.logout(s); break; } } } /** * 打印所有的用户与通道的关联数据 */ public static void print() { for (String s : userChannelMap.keySet()) { System.out.println("用户id:" + s + " 通道:" + userChannelMap.get(s).id()); } } /** * 根据好友id获取对应的通道 * @param receiverNum 接收人编号 * @return Netty通道 */ public static Channel get(String receiverNum) { return userChannelMap.get(receiverNum); } /** * 获取channel组 * @return */ public static ChannelGroup getChannelGroup() { return channelGroup; } /** * 获取用户channel map * @return */ public static ConcurrentHashMap<String,Channel> getUserChannelMap(){ return userChannelMap; } }
六.自定义消息类型
public class Message { /** * 消息类型 */ private Integer type; /** * 聊天消息 */ private String message; /** * 扩展消息字段 */ private Object ext; public Integer getType() { return type; } public void setType(Integer type) { this.type = type; } public MarketChatRecord getChatRecord() { return marketChatRecord; } public void setChatRecord(MarketChatRecord chatRecord) { this.marketChatRecord = chatRecord; } public Object getExt() { return ext; } public void setExt(Object ext) { this.ext = ext; } @Override public String toString() { return "Message{" + "type=" + type + ", marketChatRecord=" + marketChatRecord + ", ext=" + ext + '}'; } }
七.创建处理消息的handler
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class); /** * 用来保存所有的客户端连接 */ private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); /** *当Channel中有新的事件消息会自动调用 */ @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { // 当接收到数据后会自动调用 // 获取客户端发送过来的文本消息 Gson gson = new Gson(); log.info("服务器收到消息:{}",msg.text()); System.out.println("接收到消息数据为:" + msg.text()); Message message = gson.fromJson(msg.text(), Message.class); //根据业务要求进行消息处理 switch (message.getType()) { // 处理客户端连接的消息 case 0: // 建立用户与通道的关联 // 处理客户端发送好友消息 break; case 1: // 处理客户端的签收消息 break; case 2: // 将消息记录设置为已读 break; case 3: // 接收心跳消息 break; default: break; } } // 当有新的客户端连接服务器之后,会自动调用这个方法 @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { log.info("handlerAdded 被调用"+ctx.channel().id().asLongText()); // 添加到channelGroup 通道组 UserChannelMap.getChannelGroup().add(ctx.channel()); // clients.add(ctx.channel()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.info("{异常:}"+cause.getMessage()); // 删除通道 UserChannelMap.getChannelGroup().remove(ctx.channel()); UserChannelMap.removeByChannelId(ctx.channel().id().asLongText()); ctx.channel().close(); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { log.info("handlerRemoved 被调用"+ctx.channel().id().asLongText()); //删除通道 UserChannelMap.getChannelGroup().remove(ctx.channel()); UserChannelMap.removeByChannelId(ctx.channel().id().asLongText()); UserChannelMap.print(); } }
八.处理心跳
public class HearBeatHandler extends ChannelInboundHandlerAdapter { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if(evt instanceof IdleStateEvent) { IdleStateEvent idleStateEvent = (IdleStateEvent)evt; if(idleStateEvent.state() == IdleState.READER_IDLE) { System.out.println("读空闲事件触发..."); } else if(idleStateEvent.state() == IdleState.WRITER_IDLE) { System.out.println("写空闲事件触发..."); } else if(idleStateEvent.state() == IdleState.ALL_IDLE) { System.out.println("---------------"); System.out.println("读写空闲事件触发"); System.out.println("关闭通道资源"); ctx.channel().close(); } } } }
搭建完成后调用测试
1.页面访问http://localhost:9001/ws
2.端口号9001和访问路径ws都是我们在上边配置的,然后传入我们自定义的消息message类型。
3.大概流程:消息发送 :用户1先连接通道,然后发送消息给用户2,用户2若是在线直接可以发送给用户,若没在线可以将消息暂存在redis或者通道里,用户2链接通道的话,两者可以直接通讯。
消息推送 :用户1连接通道,根据通道id查询要推送的人是否在线,或者推送给所有人,这里我只推送给指定的人。