使用WebSocket、SockJS、STOMP实现消息实时通讯功能

news2025/8/3 23:38:04

客户端

<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.0 Transitional//EN">
<html>
<head>
    <title>websocket client</title>
	<script src="http://cdn.bootcss.com/sockjs-client/1.1.1/sockjs.min.js"></script>  
    <script src="http://cdn.bootcss.com/stomp.js/2.3.3/stomp.js"></script>  
    <script src="http://cdn.bootcss.com/jquery/3.1.1/jquery.min.js"></script>  
    <script type="text/javascript">
	var stompClient = null;   
    //加载完浏览器后  调用connect(),打开双通道  
    $(function(){     
    //打开双通道  
    connect()  
    })    
    //强制关闭浏览器  调用websocket.close(),进行正常关闭  
    window.onunload = function() {  
        disconnect()  
    }  
    function connect(){  
	    // 当前登录的用户id
        var userId="4a85f897fc48360be1f8e6abeec40c16";  
		//连接SockJS的endpoint名称为"webSocket"  
        var socket = new SockJS('http://xxx.xxx.xxx.xxx:8080/webSocket');
		//使用STMOP子协议的WebSocket客户端  
        stompClient = Stomp.over(socket);
		// socket 链接传递 Authorization:token 信息
		var headers={
			"Authorization": "token"
		}
		//   
		var testMsg = JSON.stringify({'message':'Hello WebSocket!','userId':userId});
		//连接WebSocket服务端     
        stompClient.connect(headers,function(frame){      
            console.log('Connected:' + frame);  
            //通过stompClient.subscribe订阅/topic/getResponse 目标(destination)发送的消息  
            stompClient.subscribe('/user/'+userId+'/single/ip',function(response){  
				console.log("点对点消息");
				console.log(response);
                var message=JSON.parse(response.body);                                   
                console.log(message);   
				showResponse(message);				
            });  
			 stompClient.subscribe('/topic',function(response){  
				console.log("订阅消息");
				console.log(response);
                var message=JSON.parse(response.body);                                   
                console.log(message);                    
            }); 
            // 客户端给服务端发送消息
			stompClient.send("/app/testSendMsg",{},testMsg);			
        });  
    }  
    //关闭双通道  
    function disconnect(){  
        if(stompClient != null) {  
            stompClient.disconnect();  
        }  
        console.log("Disconnected");  
    }  
    function showResponse(message){  
        var response = $("#response");  
		var msg = JSON.parse(message.responseMessage);  
        response.append("<p>只有userID为"+msg.userId+"的人才能收到</p>");  
    }  
    </script>
</head>
<body>
    <pre id="response"></pre>
</body>
</html>

服务端

pom

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency> 

初始化config

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

/**
 * webSocket 初始化类
 */
@Configuration
// @EnableWebSocketMessageBroker 注解用于开启使用 STOMP 协议来传输基于代理(MessageBroker)的消息,这时候控制器(controller)
// 开始支持@MessageMapping,就像是使用 @requestMapping 一样。
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Lazy
    @Autowired
    private WebsocketUserInterceptor websocketUserInterceptor;


    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        //注册一个名为 /oauth/ws 的 Stomp 节点(endpoint),并指定使用 SockJS 协议。
        registry.addEndpoint("/webSocket")
            .setAllowedOrigins("*")
            .withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        //点对点应配置一个/user消息代理,广播式应配置一个/topic消息代理
        registry.enableSimpleBroker("/topic","/user");
        //客户端向服务端发起请求时,需要以/app为前缀。
        registry.setApplicationDestinationPrefixes("/app");
        //点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/
        registry.setUserDestinationPrefix("/user");
    }

    /**
     * 采用自定义拦截器,获取connect时候传递的参数
     * @param registration
     */
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(websocketUserInterceptor);
    }
}

拦截器

@Slf4j
@Component
public class WebsocketUserInterceptor implements ChannelInterceptor {

    @Autowired
    private WebSocketServer webSocketServ;

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
        if (StompCommand.CONNECT.equals(accessor.getCommand())) {
            log.info("【webSocket】 --- 连接success");
            Object raw = message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS);
            if (raw instanceof Map) {
                Object tokenObj = ((Map) raw).get(JwtTokenUtil.AUTH_HEADER_KEY);
                if (tokenObj instanceof LinkedList) {
                    String token = ((LinkedList) tokenObj).get(0).toString();
                    //设置当前访问器的认证用户
                    String userId = this.getUserIdFromToken(token);
                    accessor.setUser(new WebsocketUserVO(userId));
                    webSocketServ.pushOnlineUser(userId);
                    log.info("【webSocket】 --- userId:{} 上线了,在线数量:{}",userId,webSocketServ.getOnlineUserSize());
                }
            }
        }
        return message;
    }

    /**
     * 发送消息调用后立即调用 一般用于监听上下线
     */
    @Override
    public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
        try {
            StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);//消息头访问器

            if (accessor.getCommand() == null){ return;}// 避免非stomp消息类型,例如心跳检测
            switch(accessor.getCommand()){
                case DISCONNECT:
                    //点击断开连接,这里会执行两次,第二次执行的时候,message.getHeaders.size()=5,第一次是6。直接关闭浏览器,只会执行一次,size是5。
                    log.info("【webSocket】 --- 断开连接");
                    MessageHeaders headers = message.getHeaders();
                    if(ObjectUtil.isNotNull(headers)) {
                        Object header = headers.get(SimpMessageHeaderAccessor.USER_HEADER);
                        log.info("【webSocket】 --- header:{}", header.toString());
                        if (ObjectUtil.isNotNull(header)) {
                            WebsocketUserVO vo = (WebsocketUserVO) header;
                            webSocketServ.removeOnlineUser(vo.getName());
                            log.info("【webSocket】 断开连接 --- userId:{} 下线了,在线数量:{}", vo.getName(), webSocketServ.getOnlineUserSize());
                        }
                        break;
                    }
                case MESSAGE:
                    break;
                case SEND:
                    break;
            }
        } catch (Exception e) {
            log.info("【webSocket】 断开连接 - 异常:{}", e.getMessage());
        }
    }


    // 解析token获取userId
    private String getUserIdFromToken(String token){
        token = token.replace(JwtTokenUtil.TOKEN_PREFIX,"");
        LoginUser user = JSON.parseObject(JWT.decode(token).getSubject(),LoginUser.class);
        String userId = user.getUserId();
        log.info("【webSocket】解析token获取userId:{}",userId);
        return userId;
    }
}

WebSocketServer

@Component
public class WebSocketServer {

    public static String WEB_SOCKET_KEY="webSocket:register:";

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    // 用户上线
    public void pushOnlineUser(String id) {
        stringRedisTemplate.opsForValue().set(WEB_SOCKET_KEY+id,id);
    }

    // 用户下线
    public void removeOnlineUser(String id) {
        stringRedisTemplate.delete(WEB_SOCKET_KEY+id);
    }

    // 获取在线用户数量
    public Integer getOnlineUserSize() {
        return this.getOnlineUserList().size();
    }

    // 获取在线用户集合
    public List<String> getOnlineUserList() {
        Set<String> keys = stringRedisTemplate.keys(WEB_SOCKET_KEY + "*");
        return stringRedisTemplate.opsForValue().multiGet(keys);
    }
}

实体bean

public class WebsocketUserVO implements Principal {

    private  String name;
    public WebsocketUserVO(String name) {
        this.name = name;
    }

    @Override
    public String getName() {
        return name;
    }
}
/**
 * 服务器向浏览器发送消息用这个类
 */
public class Server2ClientMessage {

    private String responseMessage;

    public Server2ClientMessage(String responseMessage) {
        this.responseMessage = responseMessage;
    }

    public String getResponseMessage() {
        return responseMessage;
    }

    public void setResponseMessage(String responseMessage) {
        this.responseMessage = responseMessage;
    }
}
/**
 * 浏览器向服务器发送消息用这个类
 */
public class Client2ServerMessage {

    private String userId;

    private String message;

    public String getUserId() {
        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }
}

测试controller

@Controller
public class SockertController {

  @Autowired
  private SimpMessagingTemplate simpMessagingTemplate;

  @MessageMapping("/testSendMsg")
  public BaseResponse testSendMsg(Client2ServerMessage socketBean) {
    System.out.println("收到客户端消息:"+ JSON.toJSON(socketBean));
	
	System.out.println("开始发送服务器端消息");
    Server2ClientMessage message = new Server2ClientMessage("{\"userId\":\""+socketBean.getUserId()+"\"}");
    // 点对点消息
    simpMessagingTemplate.convertAndSendToUser(socketBean.getUserId(),"/single/ip",message);
    // 广播消息
    simpMessagingTemplate.convertAndSend("/topic",message);
    System.out.println("结束发送服务器端消息");
    return BaseResponse.success("操作成功");
  }
}

测试效果

在这里插入图片描述

nginx配置协议升级

proxy_http_version 1.1
proxy_set_header Upgrade $http_upgrade
proxy_set_header Connection 'upgrade'

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/394289.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Softing smartLink网关——推进过程工业数字化转型

虽然在过程工业中各工厂所投入的运营时间千差万别&#xff0c;但仍需按照新标准来进行有效控制和管理&#xff0c;而这就需要使用一种能够聚合其异构数据的数字通信架构。对此&#xff0c;Softing提供了两种网关解决方案&#xff0c;可用于将过程工业通信架构集成到现有以太网系…

初次使用ESP32-CAM记录

模块的配置和图片 摄像头&#xff1a;8225N V2.0 171026 模块esp-32s 参考资料&#xff1a;https://docs.ai-thinker.com/esp32 配置环境 参考&#xff1a;https://blog.csdn.net/weixin_43794311/article/details/128622558 简单使用需要注意的地方 基本的环境配置和串口…

学习笔记:Java并发编程(补)ThreadLocal

【尚硅谷】学习视频&#xff1a;https://www.bilibili.com/video/BV1ar4y1x727【黑马程序员】学习视频&#xff1a;https://www.bilibili.com/video/BV15b4y117RJ 参考书籍 《实战 JAVA 高并发程序设计》 葛一鸣 著《深入理解 JAVA 虚拟机 | JVM 高级特性与最佳实践》 周志明 著…

大数据项目实战之数据仓库:用户行为采集平台——第3章 用户行为日志

第3章 用户行为日志 3.1 用户行为日志概述 用户行为日志的内容&#xff0c;主要包括用户的各项行为信息以及行为所处的环境信息。收集这些信息的主要目的是优化产品和为各项分析统计指标提供数据支撑。收集这些信息的手段通常为埋点。 目前主流的埋点方式&#xff0c;有代码…

流量与日志分析

文章目录1.流量与日志分析1.1系统日志分析1.1.1window系统日志与分析方法1.1.2linux 系统日志与分析方法1.2 web日志分析iis 日志分析方法apache日志分析**access_log****error_log**nginx日志分析tomcat 日志分析主流日志分析工具使用1.流量与日志分析 日志&#xff0c;是作为…

Dns域名解析服务器

前言 域名解析服务器的介绍 域名服务器的类型划分 DNS域名解析的过程 为什么需要DNS解析域名为IP地址&#xff1f; 通俗理解Dns DNS劫持 DNS污染 Dns面试经验 前言 DNS是一个应用层协议&#xff0c;用来获取域名对应的IP地址 域名解析服务器的介绍 DNS&#xff08;Dom…

大数据技术之HBase(二)HBase原理简介

一、HBase定义1.1 HBase定义HBase 是一种分布式、可扩展、支持海量数据存储的 NoSQL 数据库非结构化数据存储的数据库&#xff0c;基于列的模式存储。利用Hadoop HDFS作为其文件存储系统&#xff0c;写入性能很强&#xff0c;读取性能较差。利用Hadoop MapReduce来处理HBase中的…

HTTPS简介

HTTPS是HTTP开启TLS传输协议&#xff0c;客户端要拿到服务端的公钥&#xff0c;用公钥加密数据后再进行传输&#xff0c;防止数据泄露后背篡改。它要解决两个问题&#xff1a;怎么保证公钥可信怎么加密数据公钥可信问题客户端从服务端获取公钥的时候&#xff0c;存在请求被拦截…

Spring(一)Spring的7种事务传播行为

目录1.7种事务传播行为2.事务使用示例3.REQUIRES_NEW 事务传播行为使用示例3.1 事务传播图3.2 TUserAServiceImpl.java3.3 TUserBServiceImpl.java1.7种事务传播行为 Spring 中定义了七种事务传播行为&#xff0c;分别是&#xff1a;&#xff08;propagation&#xff1a;n.传播…

RabbitMQ的初始入门与理解

文章目录 目录 文章目录 前言 一、简单介绍 二、使用步骤 2.1 环境配置 2.2 测试使用--java 2.2.1 简单模式 2.2.2 Work queues 工作队列模式 2.2.3 Pub/Sub 订阅模式 2.2.4 Routing 路由模式 总结 前言 MQ全称 Message Queue&#xff08;消息队列&#xff09;&#xff0c…

OperWrt 启动过程03

文章目录 OperWrt 启动过程03OpenWrt启动脚本分析OperWrt 启动过程03 OpenWrt启动脚本分析 内核代码start_kernel函数执行的最后会调用kernel_init函数来启动用户空间的一号进程,标准linux默认是的/etc/init进程,但在OpenWRT里面会执行/etc/preinit,代码如下图992行: 下面…

在SNAP中用sentinel-1数据做InSAR测量,以门源地震为例

在SNAP中用sentinel-1数据做InSAR0 写在前面1 数据下载2 处理步骤2.1 split2.2 apply orbit 导入精密轨道2.3 查看数据的时空基线base line2.4 back-geocoding 配准2.5 Enhanced Spectral Diversity2.6 Deburst2.7 Interogram Formation 生成干涉图2.8 Multilook 多视2.9 Golds…

【Unity】接入Max广告聚合SDK

下载和导入MAX Unity插件&#xff1a; 官方SDK链接 在这里插入图片描述 2.初始化MAX SDK&#xff1a; MaxSdkCallbacks.OnSdkInitializedEvent (MaxSdkBase.SdkConfiguration sdkConfiguration) > {// AppLovin SDK is initialized, start loading ads };MaxSdk.SetSdkK…

JavaScript 高级4 :正则表达式

JavaScript 高级4 &#xff1a;正则表达式 Date: January 19, 2023 Text: 正则表达式、正则表达式特殊字符、正则表达式中的替换 目标&#xff1a; 能够说出正则表达式的作用 能够写出简单的正则表达式 能够使用正则表达式对表单进行验证 能够使用正则表达式替换内容 正则…

渗透测试自动化生成报告——ExportReport

Git仓库&#xff1a; https://github.com/ljy1058318852/ExportReport0x01 概述&#xff1a; 本项目用于自动化生成报告。可根据项目需求&#xff0c;通过简单的提取变量来自定义报告模板。内附常见扫描器API/原报告(awvs、xray、goby)数据提取模块&#xff0c;可直接生成全新…

电影《断网》观后感

上周看了电影《断网》这部电影&#xff0c;题材是网络攻击与防范的故事&#xff0c;这样的题材距离我们很远&#xff0c;又离我们很近&#xff0c;我们每天都在用网络&#xff0c;生活中也离不开网络&#xff0c;所以它离我们很近&#xff0c;但是真正涉及到网络攻击时&#xf…

【00后卷王秘籍】python自动化测试—Python自动化框架及工具

1 、概述 手续的关于测试的方法论&#xff0c;都是建立在之前的文章里面提到的观点&#xff1a; 功能测试不建议做自动化 接口测试性价比最高 接口测试可以做自动化 后面所谈到的 测试自动化 也将围绕着 接口自动化 来介绍。 本系列选择的测试语言是 python 脚本语言。由于其…

为什么99%的程序员都做不好SQL优化?

连接层 最上层是一些客户端和链接服务&#xff0c;包含本地sock 通信和大多数基于客户端/服务端工具实现的类似于 TCP/IP的通信。主要完成一些类似于连接处理、授权认证、及相关的安全方案。在该层上引入了线程 池的概念&#xff0c;为通过认证安全接入的客户端提供线程。同样…

forEach() 的用法

forEach() 方法用于遍历动态数组中每一个元素并执行特定操作。 forEach&#xff08;回调函数&#xff08;item&#xff09;{}&#xff09; &#xff1a;数组遍历方法 item&#xff1a;指的是数组里的每一项 含义是&#xff1a;遍历数组里的每一项&#xff0c;对每一项执行一次回…

【Git】Git工作流程及使用

Git工作流程及使用Git工作流程与常用命令Git工作流程Git常用命令项目中使用Git的场景需求开发前的分支拉取流程&#xff0c;需求开发后的分支合并流程分支合并出现冲突如何解决线上出现事故代码如何回退Git工作流程与常用命令 Git工作流程 workspace&#xff1a;工作区 stagin…