Netty-实验

news2025/7/18 8:17:31

Netty应用实例-群聊系统

实例要求:

(1)编写一个Netty群聊系统,实现服务端和客户端之间的数据简单通讯(非阻塞)
(2)实现多人群聊
(3)服务器端:可以监视用户上线,离线,并实现消息转发功能
(4)客户端:通过channel可以物阻塞发送消息给其他用户,同时可以接受其他用户发送的消息(由服务器转发的得到)
(5)目的:进一步理解Netty非阻塞网络编程机制

拆解过程:

首先我们建立GroupChatServer端,然后重写对应的具体业务处理的handler,在handler中具体需要捕捉的是客户端channel的上线、下线、具体对应的就是handlerAdded、handlerRemoved、channelActive、channelInactive,分别是处理发送给其他客户端的消息和服务器捕捉发送消息,最重要的就是读消息并转发,对于发送消息的channel的处理和其他客户端不一样。

然后我们建立GroupChatClient端,不同的就是我们的消息需要响应输入,然后发送,对于handler只需要读取消息打印即可。

GroupchatServer实现:
package com.sgg.Netty.GroupChat;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class GroupchatServer {
    private int port;

    public GroupchatServer(int port){
        this.port = port;
    }

    public void run() throws InterruptedException {
        //创建线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        ServerBootstrap b = new ServerBootstrap();
        try{
            b.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,128)       //设置线程队列的连接个数
                    .childOption(ChannelOption.SO_KEEPALIVE,true)    //设置保持活动连接状态
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast("decoder",new StringDecoder());
                            pipeline.addLast("encoder", new StringEncoder());
                            pipeline.addLast(new GroupchatServerhandler());
                        }
                    });
            System.out.println("服务器启动");
            ChannelFuture cf = b.bind(port).sync();
            cf.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    if(cf.isSuccess()){
                        System.out.println("监听端口成功");
                    }else{
                        System.out.println("监听端口失败");
                    }
                }
            });

            cf.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }


    }
    public static void main(String[] args) throws InterruptedException {
        new GroupchatServer(8847).run();
    }
}

GroupchatServerhandler实现:
package com.sgg.Netty.GroupChat;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.text.SimpleDateFormat;

public class GroupchatServerhandler extends SimpleChannelInboundHandler<String> {


    //定义一个channel组,管理所有的channel
    private static ChannelGroup channelGroup= new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy--mm-dd HH:mm:ss");

    /**
     *处理所有在线客户端某新客户加入消息
     */
    //handlerAdded 表示连接建立,一旦连接,第一个被执行
    //将当前channelchannelGroup
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        //将该客户端加入聊天的消息推送给其他在线的客户端
        //该方法会将channelGroup中所有的channel遍历,并发送消息
        channelGroup.writeAndFlush("[客户端]"+channel.remoteAddress()+"加入聊天\n");
        channelGroup.add(channel);
    }

    /**
     *处理所有在线客户端某新客户离开的消息
     */
    //断开连接,将XX酷互动离开的信息推送给当前所有在线的客户
    //出发该方法时ChannelGroup中会自动删除该channenl,不需手动删除
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        //将该客户端离开聊天的消息推送给其他在线的客户端
        //该方法会将channelGroup中所有的channel遍历,并发送消息
        channelGroup.writeAndFlush("[客户端]"+channel.remoteAddress()+"离开聊天\n");
    }

    /**
     *处理服务端显示新客户上线消息
     */
    //表示channel处于活动状态,提示XX上线
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress()+"上线了");
    }

    /**
     *处理服务端显示新客户上下线消息
     */
    //表示channel处于不活动状态,提示XX下线
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress()+"下线了~");
    }

    //读取数据并转发给当前在线的所有人

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
        Channel channel = ctx.channel();

        channelGroup.forEach(ch->{
            if(channel!=ch){
                ch.writeAndFlush("[客户】"+channel.remoteAddress()+"发送了消息"+s+"\n");
            }else{
                ch.writeAndFlush("[自己]发送了消息"+s+"\n");
            }
        });
    }



}

GroupchatClient实现:
package com.sgg.Netty.GroupChat;

import com.sgg.Netty.simple.NettyClienthandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.util.Scanner;

public class GroupchatClient {
    private final String host;
    private final int port;
    public GroupchatClient(String host, int port){
        this.host = host;
        this.port = port;
    }
    public void run() throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();

        try {
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast("decoder",new StringDecoder());
                            pipeline.addLast("encoder",new StringEncoder());
                            pipeline.addLast(new GroupchatCinenthandler());//加入自己的handler
                        }
                    });
//            System.out.println("客户端OK");

            ChannelFuture channelFuture = bootstrap.connect(host,port).sync();
            Channel channel = channelFuture.channel();
            System.out.println("---------"+channel.localAddress()+"--------");
            //客户端需要输入信息
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()){
                String msg = scanner.nextLine();
                //通过channel发送出去
                channel.writeAndFlush(msg+"\r\n");
            }

        }finally {
            //关闭
            group.shutdownGracefully();
        }

    }

    public static void main(String[] args) throws InterruptedException {
        new GroupchatClient("127.0.0.1",8847).run();
    }
}

GroupchatClienthandler实现:
package com.sgg.Netty.GroupChat;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class GroupchatCinenthandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        System.out.println(s.trim());
    }
}

到此,我们实现了群聊系统如下:
GroupchatServer
在这里插入图片描述
GrouchatClient1
在这里插入图片描述
GrouchatClient2
在这里插入图片描述
GrouchatClient3
在这里插入图片描述

Netty心跳机制实验

我们先来看看心跳机制的产生
我们知道在TCP长连接或者WebSocket长连接中一般我们都会使用心跳机制–即发送特殊的数据包来通告对方自己的业务还没有办完,不要关闭链接。

那么心跳机制可以用来做什么呢?

我们知道网络的传输是不可靠的,当我们发起一个链接请求的过程之中会发生什么事情谁都无法预料,或者断电,服务器重启,断网线之类。

如果有这种情况的发生对方也无法判断你是否还在线。所以这时候我们引入心跳机制,在长链接中双方没有数据交互的时候互相发送数据(可能是空包,也可能是特殊数据),对方收到该数据之后也回复相应的数据用以确保双方都在线,这样就可以确保当前链接是有效的。

1. 如何实现心跳机制

一般实现心跳机制由两种方式:

TCP协议自带的心跳机制来实现;
在应用层来实现。
但是TCP协议自带的心跳机制系统默认是设置的是2小时的心跳频率。它检查不到机器断电、网线拔出、防火墙这些断线。而且逻辑层处理断线可能也不是那么好处理。另外该心跳机制是与TCP协议绑定的,那如果我们要是使用UDP协议岂不是用不了?所以一般我们都不用。

而一般我们自己实现呢大致的策略是这样的:

Client启动一个定时器,不断发送心跳;
Server收到心跳后,做出回应;
Server启动一个定时器,判断Client是否存在,这里做判断有两种方法:时间差和简单标识。
时间差:

收到一个心跳包之后记录当前时间;
判断定时器到达时间,计算多久没收到心跳时间=当前时间-上次收到心跳时间。如果改时间大于设定值则认为超时。
简单标识:

收到心跳后设置连接标识为true;
判断定时器到达时间,如果未收到心跳则设置连接标识为false;

Netty中的心跳机制的实现:

我们来看一下Netty的心跳机制的实现,在Netty中提供了IdleStateHandler类来进行心跳的处理,它可以对一个 Channel 的 读/写设置定时器, 当 Channel 在一定事件间隔内没有数据交互时(即处于 idle 状态), 就会触发指定的事件。
该类可以对三种类型的超时做心跳机制检测:

  • readerIdleTimeSeconds:设置读超时时间;
  • writerIdleTimeSeconds:设置写超时时间;
  • allIdleTimeSeconds:同时为读或写设置超时时间;

下面我们进行心跳机制的实验,我们只需要写服务端即可,客户端我们可以用上面的群聊实验中的客户端即可。
MyServer:

package com.sgg.Netty.HeatBeat;

import com.sgg.Netty.simple.NettyServerhandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

public class MyServer {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        ServerBootstrap bootstrap = new ServerBootstrap();

        try{
            //使用链式变成来进行设置
            bootstrap.group(bossGroup,workerGroup)       //设置两个线程组
                    .channel(NioServerSocketChannel.class)     //使用NioServerSocketChannel作为服务器的通道实现
                    .option(ChannelOption.SO_BACKLOG,128)       //设置线程队列的连接个数
                    .childOption(ChannelOption.SO_KEEPALIVE,true)       //设置保持活动连接状态
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {         //创建一个通道测试对象
                        //给pipeline设置处理器
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            //加入一个Netty提供的IdleStateHandler
                            /**
                             * 1、IdleStateHandler是netty提供的处理空闲状态的处理器
                             * 2、long readerIdleTime: 表示多长时间没有读,就会发送一个心跳监测包检测是否连接
                             * 3、long writerIdleTime: 表示多长时间没有写,就会发送一个心跳监测包检测是否连接
                             * 4、long allIdleTime: 表示多长时间没有读和写,就会发送一个心跳监测包检测是否连接
                             * 5、当IdleStateHandler触发后,就会传递给管道的下一个handler去处理,
                             * 通过调用(触发)下一个handler的userEventTiggered,在该方法中去处理
                             */

                            pipeline.addLast(new IdleStateHandler(3,5,7, TimeUnit.SECONDS));
                            pipeline.addLast(new MyServerhandler());
                        }
                    });

            ChannelFuture cf = bootstrap.bind(8847).sync();
            //给cf注册监听器,监控我们关心的事件
            cf.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    if(cf.isSuccess()){
                        System.out.println("监听端口成功");
                    }else{
                        System.out.println("监听端口失败");
                    }
                }
            });

            //对关闭通道进行监听
            cf.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}

MyServerhandler:

package com.sgg.Netty.HeatBeat;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;

public class MyServerhandler extends ChannelInboundHandlerAdapter {

    /**
     * 心跳机制触发后的处理函数
     * @param ctx 上下文
     * @param evt 心跳机制发生的事件
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if(evt instanceof IdleStateEvent){
            //将evt向下转型 IdleStateEvent
            IdleStateEvent event  = (IdleStateEvent)evt;
            String eventType = null;
            switch (event.state()){
                case READER_IDLE:
                    eventType = "读空闲";
                    break;
                case WRITER_IDLE:
                    eventType = "写空闲";
                    break;
                case ALL_IDLE:
                    eventType = "读写空闲";
                    break;
            }
            System.out.println(ctx.channel().remoteAddress()+"------"+eventType);
        }
    }
}

实验结果:
在这里插入图片描述
我们在handler中实现的userEventTriggered方法中我们可以捕捉到空闲事件,后续根据需要即可进行响应的处理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/8416.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

论文阅读笔记 | 三维目标检测——PointRCNN

如有错误&#xff0c;恳请指出。 文章目录1. 背景2. 网络结构2.1 Proposal Generation2.2 Proposal Refinement3. 实验部分3.1 kitti上的测评3.2 消融实验paper&#xff1a;《PointRCNN: 3D Object Proposal Generation and Detection from Point Cloud》文章比较复杂&#xff…

一文详解Redis企业版软件!

一、Redis企业版软件概述 Redis企业版软件&#xff08;Redis Enterprise&#xff09;是企业级的数据库软件&#xff0c;也是一款实时数据平台&#xff0c;为全球超过8500家知名企业提供实时数据服务。具有线性可扩展性、高可用性、持久性、备份和恢复、地理分布、分层内存访问…

WhatsApp群发系统-SendWS拓客系统功能后台介绍(五):WhatsApp筛号群发,群发超链

WhatsApp群发系统 基于WhatsApp进行群发功能&#xff0c;将品牌和产品推送给全世界各地的人们或者选择筛选好的用户&#xff0c;进行针对性的群发&#xff0c;提升了品牌和产品的影响力&#xff0c;让更多人了解认识品牌&#xff0c;帮助客户低成本实现WhatsApp营销精准拓客。…

windows和linux可以共用的端口连通性是否丢包测试工具paping

通常我们在系统无论是在windows还是linux&#xff0c;都会使用telnet命令来测试端口的连通性&#xff0c;但此命令只能测试是否通&#xff0c;无法测试是否有丢包或者是否有中断。paping这个工具就应用而生&#xff0c;它可以在多系统环境下进行像ping一样测试。 一、下载&…

【vscode】远程容器内开发python

一、环境 本人的远程开发环境&#xff1a; docker容器miniconda 常用的IDE&#xff1a; pyCharm专业版vsCodeRemote Development插件Python插件 由于pyCharm专业版要么花钱要么破解&#xff0c;我选择了vscode插件的方式&#xff0c;插件都是microsoft出品。 二、使用 服务…

记一道前端高难度面试题

目录 提问&#xff1a;如何让下面的这行代码成立 1.错误原因 2.思路 3.解题 4.小结 提问&#xff1a;如何让下面的这行代码成立 var [a,b] {a:1,b:2} 直接运行会报错&#xff0c;报错信息如下&#xff1a; Uncaught TypeError: {(intermediate value)(intermediate valu…

手柄零件的工艺设计

手柄零件的工艺设计 目录 一、零件的工艺分析及生产类型的确定 1.零件的作用-------------------------------------------------------------------------- 3 2.热处理-------------------------------------------------------------------------------- 3 3.零件的生产类型-…

架构师书籍推荐

讲实话&#xff0c;要看书只能看看架构师思维相关的数据&#xff0c;开拓一下思路就行&#xff0c;看看别人的看法和观念。 架构师需要积累的技术不要从书上来&#xff0c;去官网看他的说明书&#xff0c;一切纯讲技术类的书籍都有滞后性。 正在用的技术要时常关注一下他官网…

数据结构-图的基本概念

目录 完全图无向图有向图路径长度回路或环⭐⭐无向图-->连通图和连通分量⭐⭐有向图-强连通图和强连通分量完全图 无向图 无向图中每两个顶点之间都存在着一条边。 称为完全图 无向完全图包含n(n-1)/2条边。 有向图 有向图每两个顶点之间都存在着方向相反的两条边。 称…

Nature 、cell 双开花-抗氧化剂与氧化应激

癌细胞经常通过癌症转移调控自身的新陈代谢&#xff0c;进而来有效地支持细胞增殖和存活。因此&#xff0c;因恶性肿瘤转移造成的死亡占癌症整体发病的 95%。2019 年 6 月 27 日&#xff0c;国际 TOP 杂志 Nature 在线发表了中科院上海生化与细胞研究所杨巍维课题组与中科院大连…

el-table中显示echarts的趋势折线图(燃尽图)

显示效果&#xff1a;右边的趋势图其实是查询当前行的30天数据量 背景&#xff1a;为了模仿禅道上的燃尽图&#xff0c;但是查看其源码&#xff0c;发现是用php写的&#xff0c;我们想用vue实现 实现步骤&#xff1a;1.先使用el-table画出表格来 注意&#xff1a;此时数据是…

ctfshow 萌新赛 给她

初识&#xff1a; 一开始看到这个题目以为是sql注入&#xff0c;尝试了各种sql注入转义次都注入不了 .git泄露&#xff1a;最后还是看了一下大佬的解题&#xff0c;发现方向就错了&#xff0c;“给她”——“git”&#xff0c;这题的入口是.git泄露。 我是纯小白&#xff0c;…

如何在React项目中使用TypeScript

本文主要记录我如何在React项目中优雅的使用TypeScript&#xff0c;来提高开发效率及项目的健壮性。 项目目录及ts文件划分 由于我在实际项目中大部分是使用umi来进行开发项目&#xff0c;所以使用umi生成的目录来做案例。 . ├── README.md ├── global.d.ts ├── mo…

SpringBoot SpringBoot 开发实用篇 1 热部署 1.2 自动启动热部署

SpringBoot 【黑马程序员SpringBoot2全套视频教程&#xff0c;springboot零基础到项目实战&#xff08;spring boot2完整版&#xff09;】 SpringBoot 开发实用篇 文章目录SpringBootSpringBoot 开发实用篇1 热部署1.2 自动启动热部署1.2.1 问题引入1.2.2 自动启动热部署1.2.…

Nacos2.1.1集群和持久化配置以及Nginx负载均衡分发(重点)

Nacos集群和持久化配置&#xff08;重点&#xff09; 1、官网说明 官网文档地址&#xff1a;https://nacos.io/zh-cn/docs/cluster-mode-quick-start.html 对如上图片进行翻译如下 根据下图&#xff0c;需要配置MySQL作为持久化保存Nacos的信息 默认Nacos使用嵌入式数据库实现…

如何在 Visual Paradigm 中创建流程图丨使用教程

让我们看看如何在 Visual Paradigm 中绘制流程图。我们将在这里使用一个非常简单的流程图示例。完成本教程后&#xff0c;您可以扩展示例。 1.从主菜单中选择图表 > 新建。 2.在New Diagram窗口中&#xff0c;选择Flowchart并单击Next。 3.您可以从空图表开始&#xff0c;…

股价暴跌了74.5%后,乐信第三季度财报可能会低于市场预期

来源&#xff1a;猛兽财经 作者&#xff1a;猛兽财经 乐信2022年第三季度财报前瞻 此前&#xff0c;乐信(LX)曾在2022年11月10日宣布&#xff0c;公司将于2022年11月16日发布2022年第三季度财报。 猛兽财经认为&#xff0c;乐信2022年第三季度的实际财务业绩可能会令市场失望。…

最佳使用案例NO.1–干涉测量

为了营造今年的节日气氛&#xff0c;我们决定发布4份特别的新闻通讯展示我们的“2019最佳使用案例” 在VirtualLab Fusion中所实现的快速物理光学技术为著名的干涉仪的快速仿真提供了强有力的工具&#xff0c;从而使我们能够研究干涉图样中的相干和色散效应。 基于物理光学的Vi…

webpack5 Css 兼容性处理postcss-loader

postcss-loader | webpack 中文文档webpack 是一个模块打包器。它的主要目标是将 JavaScript 文件打包在一起&#xff0c;打包后的文件用于在浏览器中使用&#xff0c;但它也能够胜任转换&#xff08;transform&#xff09;、打包&#xff08;bundle&#xff09;或包裹&#xf…

震撼来袭,最具中国特色的微服务组件:新一代SpringCloud Alibaba

都说程序员工资高、待遇好&#xff0c; 2022 金九银十到了&#xff0c;你的小目标是 30K、40K&#xff0c;还是 16薪的 20K&#xff1f;作为一名 Java 开发工程师&#xff0c;当能力可以满足公司业务需求时&#xff0c;拿到超预期的 Offer 并不算难。然而&#xff0c;提升 Java…