JavaWebsocket-demo

news2025/5/24 6:02:58

Websocket客户端

pom依赖

		<dependency>
			<groupId>org.java-websocket</groupId>
			<artifactId>Java-WebSocket</artifactId>
			<version>1.4.0</version>
		</dependency>

客户端代码片段


@Component
@Slf4j
public class PositionAlarmListener {
    @Autowired
    private BigScreenService bigScreenService;
    @Autowired
    private ConfigurationSystemService configurationSystemService;

    @Bean
    public WebSocketClient webSocketClient() {
        WebSocketClient wsc = null;
        ThirdPartConfDto thirdPartConfDto = configurationSystemService.getConfig();

        Map<String, String> httpHeaders = new HashMap<>();
        try {
//            String reqUrl = String.format("ws://%s%s?apikey=%s", servicePort, SOCKET_URL, apikey);
            String reqUrl = thirdPartConfDto.getAlarmWebsocketUrl();
            log.info("websocketclient.position.reqUrl:{}",reqUrl);
            wsc = new WebSocketClient(new URI(reqUrl), httpHeaders) {
                @Override
                public void onOpen(ServerHandshake serverHandshake) {
                    log.info("UnmannedPlane==connect==build");
                }

                @Override
                public void onMessage(String message) {
                    log.info("websocketclient.position.receive.message:{}", message);
                    CompletableFuture.runAsync(() -> {
                        try {
                            if (StringUtils.isEmpty(message)) {
                                return;
                            }
//                            JSONObject parse = JSONObject.parseObject(message);
                            ThirdPositionAlarmDto thirdPositionAlarmDto = JSONObject.parseObject(message,ThirdPositionAlarmDto.class);
                            String type = thirdPositionAlarmDto.getType();
                            log.info("websocketclient.position.receive.message-type:{}", type);
                            if (StringUtils.isEmpty(type)) {
                                log.error("websocket.type.is null");
                                return;
                            }
                            if(!type.equals(ThirdPositionAlarmEnum.TYPE_TAG.getCode())){
                                log.error("websocket.type.is not tag");
                                return;
                            }
                           boolean bigScreenPush = bigScreenService.pusdata(thirdPositionAlarmDto,thirdPartConfDto);
                        } catch (Exception e) {
                            log.error("websocketclient.position.error:", e);
                        }
                    });
                }

                @Override
                public void onClose(int i, String s, boolean b) {
                    log.info("websocketclient.position.close code:{} reason:{} {}", i, s, b);
                }

                @Override
                public void onError(Exception e) {
                    log.info("websocketclient.position.connect==error:", e);
                }

            };
            wsc.connect();
            return wsc;
        } catch (Exception e) {
            log.error("websocketclient.position==webSocketClient:", e);
        }

        return wsc;
    }

}

客户端代码片段(新增心跳检测、重连)

客户端代码片段–配置新增

# websocketclient config
#websocket.client.config.wsUrl=ws://10.212.188.45:8880/position
websocket.client.config.wsUrl=ws://10.81.12.100:8090/websocket
websocket.client.config.enableHeartbeat=true
websocket.client.config.heartbeatInterval=20000
websocket.client.config.enableReconnection=true
@Configuration
@ConfigurationProperties(prefix="websocket.client.config")
public class WebsocketClientConfiguration {

   /**
    * websocket server ws://ip:port
    */
   private String wsUrl;
   /**
    * 是否启用心跳监测 默认开启
    */
   private Boolean enableHeartbeat;
   /**
    * 心跳监测间隔 默认20000毫秒
    */
   private Integer heartbeatInterval;
   /**
    * 是否启用重连接 默认启用
    */
   private Boolean enableReconnection;

   public String getWsUrl() {
      return wsUrl;
   }

   public void setWsUrl(String wsUrl) {
      this.wsUrl = wsUrl;
   }

   public Boolean getEnableHeartbeat() {
      return enableHeartbeat;
   }

   public void setEnableHeartbeat(Boolean enableHeartbeat) {
      this.enableHeartbeat = enableHeartbeat;
   }

   public Integer getHeartbeatInterval() {
      return heartbeatInterval;
   }

   public void setHeartbeatInterval(Integer heartbeatInterval) {
      this.heartbeatInterval = heartbeatInterval;
   }

   public Boolean getEnableReconnection() {
      return enableReconnection;
   }

   public void setEnableReconnection(Boolean enableReconnection) {
      this.enableReconnection = enableReconnection;
   }
}

客户端代码片段–客户端创建

@Slf4j
@Configuration
public class WebsocketClientBeanConfig {

    /**
     * 系统配置实现类
     */
    @Autowired
    private ConfigurationSystemService configurationSystemService;

    @Bean
    public WebsocketRunClient websocketRunClient(WebsocketClientConfiguration websocketClientConfiguration){
        String wsUrl = websocketClientConfiguration.getWsUrl();
        Boolean enableHeartbeat = websocketClientConfiguration.getEnableHeartbeat();
        Integer heartbeatInterval = websocketClientConfiguration.getHeartbeatInterval();
        Boolean enableReconnection = websocketClientConfiguration.getEnableReconnection();
        try {
            WebsocketRunClient websocketRunClient = new WebsocketRunClient(new URI(wsUrl));
            websocketRunClient.connect();
            websocketRunClient.setConnectionLostTimeout(0);

            new Thread(()->{
                while (true){
                    try {
                        Thread.sleep(heartbeatInterval);
                        if(enableHeartbeat){
                            websocketRunClient.send("[websocketclient] 心跳检测");
                            log.info("[websocketclient] 心跳检测");
                        }
                    } catch (Exception e) {
                        log.error("[websocketclient] 发生异常{}",e.getMessage());
                        try {
                            if(enableReconnection){
                                log.info("[websocketclient] 重新连接");
                                websocketRunClient.reconnect();
                                websocketRunClient.setConnectionLostTimeout(0);
                            }
                        }catch (Exception ex){
                            log.error("[websocketclient] 重连异常,{}",ex.getMessage());
                        }
                    }
                }
            }).start();

            return websocketRunClient;
        } catch (URISyntaxException ex) {
            log.error("[websocketclient] 连接异常,{}",ex.getMessage());
        }
        return null;
    }


}

客户端代码片段–客户端心跳检测

@Slf4j
@Component
public class WebsocketRunClient extends WebSocketClient {

    /**
     * 大屏推送地址
     */
    @Value("${thirdpart.bigscreen.positionhttpurl}")
    private String httpUrl;
    /**
     * 位置检测距离
     */
    @Value("${thirdpart.positionrange:100}")
    private Double positionrange;
    /**
     * 大屏接口推送实现
     */
    @Autowired
    private BigScreenService bigScreenService;


    public WebsocketRunClient(URI serverUri) {
        super(serverUri);
    }

    @Override
    public void onOpen(ServerHandshake serverHandshake) {
        log.info("[websocketclient] Websocket客户端连接成功");
    }

    @Override
    public void onMessage(String message) {
        log.info("[websocketclient.receive] 收到消息:{}",message);
//        ThirdPartConfDto thirdPartConfDto = configurationSystemService.getConfig();
        ThirdPartConfDto thirdPartConfDto = ThirdPartConfDto.builder()
                .bigScreenHttpUrl(httpUrl)
                .positionRange(positionrange)
                .build();
        CompletableFuture.runAsync(() -> {
            try {
                if (StringUtils.isEmpty(message.trim())) {
                    return;
                }
                if(message.contains("心跳检测")){
                    return;
                }
                List<ThirdPositionAlarmDto> thirdPositionAlarmDtoList = JSONObject.parseArray(message,ThirdPositionAlarmDto.class);
                for(ThirdPositionAlarmDto thirdPositionAlarmDto: thirdPositionAlarmDtoList){
                    String type = thirdPositionAlarmDto.getType();
                    log.info("websocketclient.position.receive.message-type:{}", type);
                    if (StringUtils.isEmpty(type)) {
                        log.error("websocket.type.is null");
                        return;
                    }
                    if(!type.equals(ThirdPositionAlarmEnum.TYPE_TAG.getCode())){
                        log.error("websocket.type.is not tag");
                        return;
                    }
                    boolean bigScreenPush = bigScreenService.pusdata(thirdPositionAlarmDto,thirdPartConfDto);
                }
            } catch (Exception e) {
                log.error("websocketclient.position.error:", e);
            }
        });
    }

    @Override
    public void onClose(int code, String reason, boolean remote) {
        log.info("[websocketclient] Websocket客户端关闭");
        System.out.println("Connection closed by " + (remote ? "remote peer" : "us") + " Code: " + code + " Reason: " + reason);
    }

    @Override
    public void onError(Exception e) {
        log.info("[websocketclient] Websocket客户端出现异常, 异常原因为:{}",e.getMessage());
    }

Websocket 服务端

服务端pom依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

服务端代码片段,websocket-配置

websocket-服务配置
@Configuration
public class WebSocketConfig {

    /**
     * ServerEndpointExporter注入
     * 该Bean会自动注册使用@ServerEndpoint注解申明的WebSocket endpoint
     *
     * @return
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

服务端代码片段,websocket-服务端广播消息

import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.util.concurrent.CopyOnWriteArraySet;

/**
 * @author 谪居马道
 * @describe:websocket,消息广播
 * @date 2025/5/21
 */
@Component
@ServerEndpoint("/websocket")
public class WebSocket {

    private final Logger log = LoggerFactory.getLogger(WebSocket.class);

    //与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;
    //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
    private static CopyOnWriteArraySet<WebSocket> webSocketSet = new CopyOnWriteArraySet<WebSocket>();
    /**         * 连接建立成功调用的方法         */
    @OnOpen
    public void onOpen(Session session){
        this.session=session;
        webSocketSet.add(this);
        //加入set中
        log.info("【WebSocket消息】有新的连接,总数:{}",webSocketSet.size());
    }
    /**         * 连接关闭调用的方法         */
    @OnClose
    public void onClose(){
        webSocketSet.remove(this);
        //从set中删除
        log.info("【WebSocket消息】连接断开,总数:{}",webSocketSet.size());        }
    /**         * 收到客户端消息后调用的方法         * @param message 客户端发送过来的消息         */
    @OnMessage
    public void onMessage(String message ){
        log.info("【WebSocket消息】收到客户端发来的消息:{}",message);
        sendMessage(message);
    }
    public void sendMessage(String message){
        for (WebSocket webSocket:webSocketSet) {
            log.info("【webSocket消息】广播消息,message={}",message);
            try {
                webSocket.session.getBasicRemote ().sendText(message);
            } catch (Exception e) {
                e.printStackTrace ();
            }            }
    }

}

服务端代码片段,websocket-服务端一对一消息

@Component
@ServerEndpoint("/websocket/{terminalId}")
public class WebSocketService {

    private final Logger logger = LoggerFactory.getLogger(WebSocketService.class);

    /**
     * 保存连接信息
     */
    private static final Map<String, Session> CLIENTS = new ConcurrentHashMap<>();
    private static final Map<String, AtomicInteger> TERMINAL_IDS = new HashMap<>();

    /**
     * 需要注入的Service声明为静态,让其属于类
     */
    private static TerminalService terminalService;

    /**
     * 注入的时候,给类的Service注入
     */
    @Autowired
    public void setMchDeviceInfoService(TerminalService terminalService) {
        WebSocketService.terminalService = terminalService;
    }

    @OnOpen
    public void onOpen(@PathParam("terminalId") String terminalId, Session session) throws Exception {
        logger.info(session.getRequestURI().getPath() + ",打开连接开始:" + session.getId());

        //当前连接已存在,关闭
        if (CLIENTS.containsKey(terminalId)) {
            onClose(CLIENTS.get(terminalId));
        }

        TERMINAL_IDS.put(terminalId, new AtomicInteger(0));
        CLIENTS.put(terminalId, session);

        logger.info(session.getRequestURI().getPath() + ",打开连接完成:" + session.getId());

        terminalService.terminal();
    }

    @OnClose
    public void onClose(@PathParam("terminalId") String terminalId, Session session) throws Exception {
        logger.info(session.getRequestURI().getPath() + ",关闭连接开始:" + session.getId());

        CLIENTS.remove(terminalId);
        TERMINAL_IDS.remove(terminalId);

        logger.info(session.getRequestURI().getPath() + ",关闭连接完成:" + session.getId());
    }

    @OnMessage
    public void onMessage(String message, Session session) {
        logger.info("前台发送消息:" + message);

        if ("心跳".equals(message)) {
            //重置当前终端心跳次数
            TERMINAL_IDS.get(message).set(0);
            return;
        }

        sendMessage("收到消息:" + message, session);
    }

    @OnError
    public void onError(Session session, Throwable error) {
        logger.error(error.toString());
    }

    public void onClose(Session session) {
        //判断当前连接是否在线
//        if (!session.isOpen()) {
//            return;
//        }

        try {
            session.close();
        } catch (IOException e) {
            logger.error("金斗云关闭连接异常:" + e);
        }
    }

    public void heartbeat() {
        //检查所有终端心跳次数
        for (String key : TERMINAL_IDS.keySet()) {
            //心跳3次及以上的主动断开
            if ((TERMINAL_IDS.get(key).intValue() >= 3)) {
                logger.info("心跳超时,关闭连接:" + key);
                onClose(CLIENTS.get(key));
            }
        }

        for (String key : CLIENTS.keySet()) {
            //记录当前终端心跳次数
            TERMINAL_IDS.get(key).incrementAndGet();
            sendMessage("心跳", CLIENTS.get(key));
        }
    }

    public void sendMessage(String message, Session session) {
        try {
            session.getAsyncRemote().sendText(message);

            logger.info("推送成功:" + message);
        } catch (Exception e) {
            logger.error("推送异常:" + e);
        }
    }

    public boolean sendMessage(String terminalId, String message) {
        try {
            Session session = CLIENTS.get(terminalId);
            session.getAsyncRemote().sendText(message);

            logger.info("推送成功:" + message);
            return true;
        } catch (Exception e) {
            logger.error("推送异常:" + e);
            return false;
        }
    }
}

Websocket测试工具

postman-测试

参考:
site1: https://maimai.cn/article/detail?fid=1747304025&efid=p7JdUMG2Gi0PrMX7xSXpXw
site2: https://blog.csdn.net/weixin_46768610/article/details/128711019

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2384394.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

特征学习:赋予机器学习 “慧眼” 的核心技术

一、特征学习&#xff1a;从人工设计到智能发现的范式革新 1.1 核心定义与价值 特征学习的本质是让机器模仿人类大脑的认知过程 —— 例如&#xff0c;人类视觉系统通过视网膜→视神经→大脑皮层的层级处理&#xff0c;从像素中识别物体&#xff1b;特征学习则通过神经网络的卷…

3D个人简历网站 7.联系我

3D个人简历网站 7.联系我 修改Contact.jsx // 从 react 库导入 useRef 和 useState hooks import { useRef, useState } from "react";/*** Contact 组件&#xff0c;用于展示联系表单&#xff0c;处理用户表单输入和提交。* returns {JSX.Element} 包含联系表单的 …

软考中级软件设计师——计算机系统篇

一、数据的表示和运算 1、进制转换 1. 常见进制类型 二进制&#xff08;B&#xff09;&#xff1a;基数为2&#xff08;0,1&#xff09;&#xff0c;计算机底层使用。 八进制&#xff08;O&#xff09;&#xff1a;基数为8&#xff08;0-7&#xff09;&#xff0c;3位二进制…

甘特图(项目计划图)

甘特图是甘特在第一次世界大战时为了提供工人效率所创。 由时间&#xff08;顶部横坐标&#xff09;和工作事项&#xff08;左边纵坐标组成&#xff09; 假设&#xff0c;我要做大数据迁移&#xff08;一般半年&#xff0c;几PB的数据和上万个任务&#xff09; 类似于这种

windows服务器部署jenkins工具(一)

jenkins作为一款常用的构建发布工具&#xff0c;极大的简化了项目部署发布流程。jenkins通常是部署在linux服务上&#xff0c;今天给大家分享的是windows服务器上如何搭建jenkins发布工具。 1.首先第一步还是看windows安装docker 这篇文章哈&#xff0c;当然也可以不采用docke…

基于51单片机和8X8点阵屏、独立按键的飞行躲闪类小游戏

目录 系列文章目录前言一、效果展示二、原理分析三、各模块代码1、8X8点阵屏2、独立按键3、定时器04、定时器1 四、主函数总结 系列文章目录 前言 用的是普中A2开发板。 【单片机】STC89C52RC 【频率】12T11.0592MHz 【外设】8X8点阵屏、独立按键 效果查看/操作演示&#xff…

告别“盘丝洞”车间:4-20mA无线传输如何重构工厂神经网?

4-20ma无线传输是利用无线模块将传统的温度、压力、液位等4-20mA电流信号转换为无线信号进行传输。这一技术突破了有线传输的限制&#xff0c;使得信号可以在更广泛的范围内进行灵活、快速的传递&#xff0c;无线传输距离可达到50KM。达泰4-20ma无线传输模块在实现工业现场应用…

VMware虚拟机突然无法ssh连接

遇到的情况&#xff1a; 功能全部正常的情况下&#xff0c;没有修改任何配置&#xff0c;重启电脑之后无法ssh连接 其实不太可能的可能原因&#xff1a; 1、虚拟机内部sshd服务未运行 systemctl status sshd systemctl start sshd 2、检查SSH端口监听 netstat -an | grep :…

班迪录屏--解决视频剪辑时声音和画面不同步的问题

原文网址&#xff1a;班迪录屏--解决视频剪辑时声音和画面不同步的问题_IT利刃出鞘的博客-CSDN博客 简介 本文介绍如何用班迪录屏解决视频剪辑时声音和画面不同步的问题。 问题描述 我用班迪录屏录了视频&#xff0c;用剪映进行剪辑&#xff0c;结果发现在剪辑时声音和画面…

Git上传项目到GitHub

Git上传项目到GitHub 下载Git客户端配置Git设置GitHub上传本地项目到Github 下载Git客户端 网址&#xff1a;Git Windows客户端。选择Standalone Installer(单独安装程序)&#xff0c;并点击64bit Git for Windows Setup(64位Git for Windows安装程序)进行下载。然后一路默认选…

【工具】Quicker/VBA|PPT 在指定位置添加有颜色的参考线

文章目录 效果展示使用方式技术原理更多原理ActivePresentation.Guides 概述主要属性和方法使用示例添加水平参考线添加垂直参考线删除所有参考线获取参考线数量 注意事项 致谢 效果展示 先展示效果&#xff1a; Quicker 动作&#xff1a;VBA 添加参考线 - Quicker 动作 使用…

第34节:迁移学习中的特征提取方法

迁移学习中的特征提取方法:原理、技术与应用 1. 迁移学习与特征提取概述 迁移学习(Transfer Learning)作为机器学习领域的重要范式 通过将源领域(source domain)学到的知识迁移到目标领域(target domain),有效解决了传统机器学习需要大量标注数据的瓶颈问题。 在迁…

(万字长文)Django数据库操作——ORM:数据交互显示前端网页

&#x1f31f; 如果这篇文章触动了你的心弦&#xff0c;请不要吝啬你的支持&#xff01; 亲爱的读者&#xff0c; 感谢你花时间阅读这篇分享。希望这里的每一个字都能为你带来启发或是让你会心一笑。如果你觉得这篇文章有价值&#xff0c;或者它解决了你一直以来的一个疑问&a…

实验-使用递归计算阶乘-RISC-V(计算机组成原理)

目录 一、实验内容 二、实验步骤 三、实验效果 四、实验环境 五、实验小结和思考 一、实验内容 一个典型的计算阶乘的递归过程如下图所示&#xff1a; 在这个任务中&#xff0c;一份汇编代码的框架“task4-阶乘”你需要使用RISC-V或MIPS汇编程序以递归的形式解决这个问题。…

ISO 26262-5 评估硬件架构度量值

两种硬件架构的度量&#xff0c; 用于评估相关项架构应对随机硬件失效的有效性。 应评估&#xff08;评估仅限于ASIL (B)、 C 和 D 的安全目标&#xff09; 1 应将按照附录 C 单点故障度量和潜伏故障度量的诊断覆盖率来评估 2 应结合残余故障和相关的潜伏故障来预估安全机制…

【Qt开发】显示类控件——QLCDNumber

目录 1&#xff0c;QLCDNumber的说明 2&#xff0c;QLCDNumber的运用 1&#xff0c;QLCDNumber的说明 QLCDNumer 是一个专门用来显示数字的控件。它类似于 "老式计算器" 的效果。它的核心属性如下&#xff1a; 2&#xff0c;QLCDNumber的运用 定时器 运用QLCDNumb…

音频AAC编码与RV1126的AENC模块的讲解

一.音频编码的原理 AAC编码的基本概念 AAC&#xff08;Advanced Audio Coding&#xff09;是一种高级音频编码格式&#xff0c;旨在提供比MP3更高的音质和更低的比特率。AAC是MPEG-2和MPEG-4标准的一部分&#xff0c;广泛应用于音乐、视频流媒体和广播等领域 音频为什么要进…

vue页面目录菜单有些属性是根据缓存读取的。如果缓存更新了。希望这个菜单也跟着更新。

父组件中有两个子组件。如果在B组件数据更新之后。A组件也跟着一起改变呢&#xff1f;如图如果我右边基本信息里面勾选了高血压&#xff0c;左侧菜单里面也要立刻出现一个高血压随访菜单&#xff0c;如果我取消勾选了左侧菜单就去掉。 左侧菜单的显示和隐藏的数据实际上是放在…

在TIA 博途中下载程序时找不到对应的网卡怎么办?

1. 检查物理连接 确认网线已正确连接PLC和PC&#xff0c;接口指示灯正常。 尝试更换网线或交换机端口&#xff0c;排除硬件故障。 2. 确认网卡驱动已安装 设备管理器检查&#xff1a; 右键点击“此电脑” → “管理” → “设备管理器”。 展开“网络适配器”&#xff0c;确…

《量子计算实战》PDF下载

内容简介 在加密、科学建模、制造物流、金融建模和人工智能等领域&#xff0c;量子计算可以极大提升解决问题的效率。量子系统正变得越来越强大&#xff0c;逐渐可用于生产环境。本书介绍了量子计算的思路与应用&#xff0c;在简要说明与量子相关的科学原理之后&#xff0c;指…