Spring Boot+Netty+Websocket实现后台向前端推送信息

news2025/7/9 0:45:49

Netty 是一个利用 Java 的高级网络的能力,隐藏其背后的复杂性而提供一个易于使用的API的客户端/服务器框架

可能在此之前你没有接触过,不过不要担心,下面我们通过一个消息推送的例子来看一下netty是怎么使用的。

1.添加Maven依赖

 <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
 <dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-all</artifactId>
  <version>4.1.36.Final</version>
 </dependency>

2.设置主启动类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class MessageSendApplication {
    public static void main(String[] args) {
        SpringApplication.run(MessageSendApplication.class, args);
        try {
            new NettyServer(12345).start();
            System.out.println("https://blog.csdn.net/moshowgame");
            System.out.println("http://127.0.0.1:6688/netty-websocket/index");
        } catch (Exception e) {
            System.out.println("NettyServerError:" + e.getMessage());
        }

    }
}

3.NettyServer

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

/**
 * NettyServer Netty服务器配置
 */
public class NettyServer {
    private final int port;

    public NettyServer(int port) {
        this.port = port;
    }

    public void start() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();

        EventLoopGroup group = new NioEventLoopGroup();
        try {
            ServerBootstrap sb = new ServerBootstrap();
            sb.option(ChannelOption.SO_BACKLOG, 1024);
            sb.group(group, bossGroup) // 绑定线程池
                    .channel(NioServerSocketChannel.class) // 指定使用的channel
                    .localAddress(this.port)// 绑定监听端口
                    .childHandler(new ChannelInitializer<SocketChannel>() { // 绑定客户端连接时候触发操作

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            System.out.println("收到新连接");
                            //websocket协议本身是基于http协议的,所以这边也要使用http解编码器
                            ch.pipeline().addLast(new HttpServerCodec());
                            //以块的方式来写的处理器
                            ch.pipeline().addLast(new ChunkedWriteHandler());
                            ch.pipeline().addLast(new HttpObjectAggregator(8192));
                            ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", null, true, 65536 * 10));
                            ch.pipeline().addLast(new WebSocketHandler());
                        }
                    });
            ChannelFuture cf = sb.bind().sync(); // 服务器异步创建绑定
            System.out.println(NettyServer.class + " 启动正在监听: " + cf.channel().localAddress());
            cf.channel().closeFuture().sync(); // 关闭服务器通道
        } finally {
            group.shutdownGracefully().sync(); // 释放线程池资源
            bossGroup.shutdownGracefully().sync();
        }
    }
}

4.MyChannelHandlerPool

通道组池,管理所有websocket连接

import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

/**
 * MyChannelHandlerPool
 * 通道组池,管理所有websocket连接
 */
public class MyChannelHandlerPool {

    public MyChannelHandlerPool(){}

    public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

}

5.WebSocketHandler

处理ws以下几种情况:

  • channelActive与客户端建立连接
  • channelInactive与客户端断开连接
  • channelRead0客户端发送消息处理
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.example.messagesend.netty.NettyConfig;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Component
@ChannelHandler.Sharable
@Slf4j
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    /**
     * 一旦连接,第一个被执行
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        log.info("有新的客户端链接:[{}]", ctx.channel().id().asLongText());
        // 添加到channelGroup 通道组
        NettyConfig.getChannelGroup().add(ctx.channel());
    }

    /**
     * 读取数据
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        log.info("服务器收到消息:{}", msg.text());
        // 获取用户ID,关联channel
        String text = msg.text();
        String uid = text.split(":")[0];
        String message = text.split(":")[1];
//        JSONObject jsonObject = JSONUtil.parseObj(msg.text());
        NettyConfig.getChannelMap().put(uid, ctx.channel());

        // 将用户ID作为自定义属性加入到channel中,方便随时channel中获取用户ID
        AttributeKey<String> key = AttributeKey.valueOf("userId");
        ctx.channel().attr(key).setIfAbsent(uid);

        // 回复消息
        ctx.channel().writeAndFlush(new TextWebSocketFrame(uid+":"+message));
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        log.info("用户下线了:{}", ctx.channel().id().asLongText());
        // 删除通道
        NettyConfig.getChannelGroup().remove(ctx.channel());
        removeUserId(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.info("异常:{}", cause.getMessage());
        // 删除通道
        NettyConfig.getChannelGroup().remove(ctx.channel());
        removeUserId(ctx);
        ctx.close();
    }

    /**
     * 删除用户与channel的对应关系
     */
    private void removeUserId(ChannelHandlerContext ctx) {
        AttributeKey<String> key = AttributeKey.valueOf("userId");
        String userId = ctx.channel().attr(key).get();
        NettyConfig.getChannelMap().remove(userId);
    }
}

6.Netty配置

管理全局Channel以及用户对应的channel(推送消息)

import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.util.concurrent.ConcurrentHashMap;

public class NettyConfig {
    /**
     * 定义全局单利channel组 管理所有channel
     */
    private static volatile ChannelGroup channelGroup = null;

    /**
     * 存放请求ID与channel的对应关系
     */
    private static volatile ConcurrentHashMap<String, Channel> channelMap = null;

    /**
     * 定义两把锁
     */
    private static final Object lock1 = new Object();
    private static final Object lock2 = new Object();


    public static ChannelGroup getChannelGroup() {
        if (null == channelGroup) {
            synchronized (lock1) {
                if (null == channelGroup) {
                    channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
                }
            }
        }
        return channelGroup;
    }

    public static ConcurrentHashMap<String, Channel> getChannelMap() {
        if (null == channelMap) {
            synchronized (lock2) {
                if (null == channelMap) {
                    channelMap = new ConcurrentHashMap<>();
                }
            }
        }
        return channelMap;
    }

    public static Channel getChannel(String userId) {
        if (null == channelMap) {
            return getChannelMap().get(userId);
        }
        return channelMap.get(userId);
    }
}

7.socket.html

<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">

<head>
    <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
    <title>Netty-Websocket</title>
    <script type="text/javascript">
        // by zhengkai.blog.csdn.net
        var socket;
        if (!window.WebSocket) {
            window.WebSocket = window.MozWebSocket;
        }
        if (window.WebSocket) {
            socket = new WebSocket("ws://127.0.0.1:12345/ws");
            socket.onmessage = function(event) {
                var ta = document.getElementById('responseText');
                ta.value += event.data + "\r\n";
            };
            socket.onopen = function(event) {
                var ta = document.getElementById('responseText');
                ta.value = "Netty-WebSocket服务器。。。。。。连接  \r\n";
            };
            socket.onclose = function(event) {
                var ta = document.getElementById('responseText');
                ta.value = "Netty-WebSocket服务器。。。。。。关闭 \r\n";
            };
        } else {
            alert("您的浏览器不支持WebSocket协议!");
        }

        function send(message) {
            if (!window.WebSocket) {
                return;
            }
            if (socket.readyState == WebSocket.OPEN) {
                socket.send(message);
            } else {
                alert("WebSocket 连接没有建立成功!");
            }

        }
    </script>
</head>

<body>
    <form onSubmit="return false;">
        <label>ID</label><input type="text" name="uid" value="${uid!!}" /> <br />
        <label>TEXT</label><input type="text" name="message" value="这里输入消息" /> <br />
        <br /> <input type="button" value="发送ws消息" onClick="send(this.form.uid.value+':'+this.form.message.value)" />
        <hr color="black" />
        <h3>服务端返回的应答消息</h3>
        <textarea id="responseText" style="width: 1024px;height: 300px;"></textarea>
    </form>
</body>

</html>

8.Controller

写好了html当然还需要一个controller来引导页面。

import cn.hutool.core.util.RandomUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.ModelAndView;

@RestController
public class TestController {
    @GetMapping("/index")
    public ModelAndView index() {
        ModelAndView mav = new ModelAndView("socket");
        mav.addObject("uid", RandomUtil.randomNumbers(6));
        return mav;
    }
}

下面我们打开页面测试一下
在这里插入图片描述

9.改造netty支持url参数

我们重新写一个WebSocketHandler:MyWebSocketHandler,然后调整加载handler的顺序,优先MyWebSocketHandler在WebSocketServerProtocolHandler之上。

改造MyWebSocketHandlerchannelRead方法,首次连接会是一个FullHttpRequest类型,可以通过FullHttpRequest.uri()获取完整ws的URL地址,之后接受信息的话,会是一个TextWebSocketFrame类型。

MyWebSocketHandler:

import com.alibaba.fastjson.JSON;
import com.example.messagesend.netty.MyChannelHandlerPool;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

import java.util.HashMap;
import java.util.Map;

public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("与客户端建立连接,通道开启!");

        //添加到channelGroup通道组
        MyChannelHandlerPool.channelGroup.add(ctx.channel());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("与客户端断开连接,通道关闭!");
        //添加到channelGroup 通道组
        MyChannelHandlerPool.channelGroup.remove(ctx.channel());
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //首次连接是FullHttpRequest,处理参数 by zhengkai.blog.csdn.net
        if (null != msg && msg instanceof FullHttpRequest) {
            FullHttpRequest request = (FullHttpRequest) msg;
            String uri = request.uri();

            Map paramMap=getUrlParams(uri);
            System.out.println("接收到的参数是:"+ JSON.toJSONString(paramMap));
            //如果url包含参数,需要处理
            if(uri.contains("?")){
                String newUri=uri.substring(0,uri.indexOf("?"));
                System.out.println(newUri);
                request.setUri(newUri);
            }

        }else if(msg instanceof TextWebSocketFrame){
            //正常的TEXT消息类型
            TextWebSocketFrame frame=(TextWebSocketFrame)msg;
            System.out.println("客户端收到服务器数据:" +frame.text());
            sendAllMessage(frame.text());
        }
        super.channelRead(ctx, msg);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {

    }

    private void sendAllMessage(String message){
        //收到信息后,群发给所有channel
        MyChannelHandlerPool.channelGroup.writeAndFlush( new TextWebSocketFrame(message));
    }

    private static Map getUrlParams(String url){
        Map<String,String> map = new HashMap<>();
        url = url.replace("?",";");
        if (!url.contains(";")){
            return map;
        }
        if (url.split(";").length > 0){
            String[] arr = url.split(";")[1].split("&");
            for (String s : arr){
                String key = s.split("=")[0];
                String value = s.split("=")[1];
                map.put(key,value);
            }
            return  map;

        }else{
            return map;
        }
    }
}

然后在 NettyServer中调整顺序:
在这里插入图片描述

我们html中的socket连接地址也需要改一下:

socket = new WebSocket("ws://127.0.0.1:12345/ws?uid=666&gid=777");

接着重新运行一下,看看我们控制台的效果:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/9079.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

动态代理静态代理

一、使用背景 将目标类包裹起来&#xff0c;对目标类增加一个前置操作和一个后置操作&#xff0c;比如添加日志&#xff0c;在调用目标类前、调用目标后添加日志。 感觉静态代理与动态代理的核心思想&#xff0c;都是根据目标类&#xff0c;拿到目标实现的接口&#xff0c;和…

【软考】-- 操作系统(上)

目录&#xff1a;操作系统&#xff08;上&#xff09;第一节 操作系统概述&#x1f384;一、操作系统基本概念1️⃣操作系统的五大部分&#xff1a;&#x1f38b;二、操作系统的分类1️⃣批处理操作系统&#xff1a;2️⃣分时操作系统&#xff1a;3️⃣实时操作系统&#xff1a…

STC51单片机28——跑马灯

//使用P1口流水点亮8位LED #include<reg51.h> //包含单片机寄存器的头文件 /**************************************** 函数功能&#xff1a;延时一段时间 *****************************************/ void delay(void) { unsigned char i,j; for(i…

Jetpack Compose 重写TopAppBar 实现标题多行折叠

没有效果图一律当水贴处理 效果动图 前言 想用composes实现类似CSDN的文章详细页面的标题栏 上滑隐藏标题后标题栏显示标题 compose.material3下的TopAppBar不能嵌套滚动 MediumTopAppBar 便使用了MediumTopAppBar一开始用着没什么问题&#xff0c;但是标题字数多了&…

一天完成react面试准备

什么是 React的refs&#xff1f;为什么它们很重要 refs允许你直接访问DOM元素或组件实例。为了使用它们&#xff0c;可以向组件添加个ref属性。 如果该属性的值是一个回调函数&#xff0c;它将接受底层的DOM元素或组件的已挂载实例作为其第一个参数。可以在组件中存储它。 ex…

字体图标以及svg图片的使用vite和webpack

先说下字体图标的使用 首先去阿里巴巴矢量图标库&#xff0c;选择你需要的图标&#xff08;可将svg图片自己上传&#xff09;添加到项目里&#xff0c;可以生成在线链接&#xff0c;或者下载资源包到本地。 资源包形式&#xff1a;在项目里创建一个fonts文件夹&#xff0c;将下…

linux 安装rar工具

1.到官网下载对应的编译包 点击跳转 也可以直接到我上传的资源去下载 https://download.csdn.net/download/liudongyang123/87032929https://download.csdn.net/download/liudongyang123/870329292.解压 tar -xf rarlinux-x64-620b2.tar.gz 3.进入到解压后的文件夹&#xf…

Spring Cloud Alibaba 版本对照表,集成nacos,sentinel,seata

一、Spring Cloud Alibaba 版本对照网址 https://github.com/alibaba/spring-cloud-alibaba/wiki/%E7%89%88%E6%9C%AC%E8%AF%B4%E6%98%8E 二、集成nacos nacos源码编译打包_qq_41369135的博客-CSDN博客 连接mysql nacos\conf下的application.properties spring.datasource.…

JDBC:PreparedStatement 插入BLOB类型的数据,PreparedStatement 批量处理,Connection 事务处理

JDBC&#xff1a;PreparedStatement 插入BLOB类型的数据&#xff0c;PreparedStatement 批量处理&#xff0c;Connection 事务处理 每博一文案 村上春树说: 你要做一个不动声色的大人了&#xff0c;不准情绪化&#xff0c;不准偷偷想念&#xff0c;不准回头看自己&#xff0c;…

VGG网络详解(实现猫猫和狗狗识别)

VGG VGG在2014年由牛津大学著名研究组vGG (Visual Geometry Group)提出&#xff0c;斩获该年lmageNet竞赛中Localization Task (定位任务)第一名和 Classification Task (分类任务)第二名。 感受野 首先介绍一下感受野的概念。在卷积神经网络中&#xff0c;决定某一层输出结…

Cloud Flare 添加谷歌镜像站(反向代理)

1.首先创建一个属于自己的镜像站 参考链接&#xff1a;利用cloudflare搭建属于自己的免费Github加速站 首先&#xff0c;点击 Cloud Flare 链接 &#xff0c;创建一个属于自己的账户 登录后&#xff0c;点击 Workers 这个子域&#xff0c;可以自定义 输入好后点set up 然后…

[附源码]java毕业设计基于实时定位的超市配送业务管理

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

本地外卖市场趋势怎么样?成为行业黑马的机会有多大呢?

随着互联网经济的发展&#xff0c;很多人倾向于足不出户就能吃到各种美味食物&#xff0c;因此外卖行业应运而生。这个新行业不仅解决懒人的饮食问题&#xff0c;也为社会提供了更多的就业机会——外卖配送员。据CNNIC的《2022年第49次中国互联网络发展状况统计报告》显示&…

学会这几款表白特效让你明年双十一不再是一个人

随着各种节日的到来&#xff0c;也伴随着许许多多的表白时机&#xff0c;为何不制作几款表白特效让你的行动更加充实呢&#xff0c;此文主要基于HTMLCSSJS制作网页特效&#xff0c;代码简洁&#xff0c;上手简单。 网页特效爱心画心3D爱心爱在心中3D旋转相册开发流程工具安装创…

C语言,从联合看字节序

C语言中的联合&#xff08;union&#xff09;类型为我们提供了操纵和解读“数据”的独特方式&#xff0c;它允许对同一块内存以不同的方式进行解读和操纵。 union UINT {unsigned int intValue; //占4个字节unsigned char bytes[4]; //占4个字节 }; //注意末尾分号不能少本…

aj-report页面嵌入其他项目

我们前面已经制作了自己的报表,我们可以通过共享报表将结果呈现给其他人,但是对一些小白来说,报表与其他项目合成是一个新的问题。怎么合成呢? 我们继续未完的探索。 1、首先,我们可以创建一个已做好的报表的链接: 如上图,我们可以在报表管理里面分享建成的报表,选…

UnRaid安装CloudDrive以实现阿里云盘、天翼云盘、115网盘挂载

文章目录1、前言2、准备工作2.1、修改Docker源2.2、开启Docker服务的MountFlags功能3、添加Docker应用CloudDrive4、添加云盘1、前言 最近一直在学习UnRaid这个Nas系统&#xff0c;折腾起来易用性十足&#xff0c;但由于其自带的应用市场不能完全满足所有人的需求&#xff0c;…

高纯度高活性艾美捷人重组MEGACD40L蛋白(可溶性)

艾美捷人重组MEGACD40L蛋白&#xff08;可溶性&#xff09;&#xff1a;高活性、高纯度CD40L蛋白&#xff0c;用于免疫应答的共刺激激活。 艾美捷人重组MEGACD40L蛋白&#xff08;可溶性&#xff09;特点&#xff1a; 1、高活性MEGACD40L低聚物模拟体内膜辅助CD40L聚集和刺激&…

【C++修炼之路】9. string类的模拟实现

每一个不曾起舞的日子都是对生命的辜负 string类的模拟实现前言代码&#xff1a;1. string.h2. test.cpp扩展&#xff1a;内置类型的拷贝构造总结前言 本篇文章是衔接上一篇string&#xff0c;进行string的模拟实现&#xff0c;其中包含了众多重载函数&#xff0c;以及一些实现…

pytest中allure特性

一、allure.step allure报告最重要的一点是&#xff0c;它允许对每个测试用例进行非常详细的步骤说明 通过 allure.step() 装饰器&#xff0c;可以让测试用例在allure报告中显示更详细的测试过程 step() 只有一个参数&#xff0c;就是title&#xff0c;你传什么&#xff0c;在…