[Java实战]Spring Boot + Netty 实现 TCP 长连接客户端及 RESTful 请求转发(二十六)

news2025/5/16 8:06:23

[Java实战]Spring Boot + Netty 实现 TCP 长连接客户端及 RESTful 请求转发(二十六)

在现代微服务架构中,经常需要在不同服务之间进行高效、可靠的通信。本文将介绍如何使用 Spring Boot 结合 Netty 实现一个 TCP 长连接客户端,并通过 RESTful 接口转发请求到后台 TCP 服务。这种架构在物联网、实时通信等领域非常常见。

一.架构设计

以下是系统的架构设计图:

RESTful
Response
Async
HTTP Client
Spring Boot Controller
Netty Client Service
TCP Connection Pool
Backend TCP Server
  • HTTP Client:外部客户端通过 RESTful 接口发送请求。
  • Spring Boot Controller:接收 HTTP 请求并转发到 Netty 客户端服务。
  • Netty Client Service:管理 TCP 长连接,发送请求并接收响应。
  • TCP Connection Pool:管理多个 TCP 连接,提高性能和可靠性。
  • Backend TCP Server:后台 TCP 服务,处理实际的业务逻辑。

二.项目搭建与依赖配置

1. 创建项目并添加依赖

创建一个 Spring Boot 项目,并添加以下依赖:

<dependencies>
    <!-- Spring Boot Web -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
    <!-- Netty -->
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.94.Final</version>
    </dependency>
    
    <!-- Lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
</dependencies>

2. 配置参数

application.yml 文件中配置 TCP 服务器的地址、端口以及其他参数:

tcp:
  server:
    host: 127.0.0.1
    port: 9000
  client:
    worker-threads: 4
    connect-timeout: 3000
    heartbeat-interval: 30000
    max-frame-length: 65535

三.核心组件实现

3.1 Netty 客户端启动器

NettyClientBootstrap 负责初始化 Netty 客户端并建立长连接:

@Component
@Slf4j
public class NettyClientBootstrap {

    @Value("${tcp.server.host}")
    private String host;
    
    @Value("${tcp.server.port}")
    private int port;
    
    private volatile Channel channel;
    private Bootstrap bootstrap;

    @PostConstruct
    public void init() throws InterruptedException {
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(workerGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ChannelPipeline pipeline = ch.pipeline();
                        // 协议处理
                        pipeline.addLast(new LengthFieldPrepender(4));
                        pipeline.addLast(new LengthFieldBasedFrameDecoder(
                                65535, 0, 4, 0, 4));
                        pipeline.addLast(new StringEncoder(StandardCharsets.UTF_8));
                        pipeline.addLast(new StringDecoder(StandardCharsets.UTF_8));
                        // 业务处理器
                        pipeline.addLast(new ClientHandler());
                    }
                });
        connect();
    }

    private void connect() throws InterruptedException {
        ChannelFuture future = bootstrap.connect(host, port).sync();
        channel = future.channel();
        channel.closeFuture().addListener(f -> {
            log.warn("Connection lost, reconnecting...");
            reconnect();
        });
    }

    private void reconnect() {
        bootstrap.connect(host, port).addListener((ChannelFutureListener) future -> {
            if (future.isSuccess()) {
                channel = future.channel();
                log.info("Reconnected successfully");
            } else {
                log.error("Reconnect failed, retrying in 5s");
                channel.eventLoop().schedule(this::reconnect, 5, TimeUnit.SECONDS);
            }
        });
    }

    public Channel getChannel() {
        if (!channel.isActive()) {
            throw new IllegalStateException("Channel is inactive");
        }
        return channel;
    }
}

3.2 业务处理器

ClientHandler 负责处理 TCP 通信中的请求和响应:

@Slf4j
public class ClientHandler extends SimpleChannelInboundHandler<String> {

    private static final ConcurrentMap<String, CompletableFuture<String>> pendingRequests 
        = new ConcurrentHashMap<>();

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) {
        // 响应格式示例:REQ_ID|RESPONSE_DATA
        String[] parts = msg.split("\\|", 2);
        if (parts.length == 2) {
            CompletableFuture<String> future = pendingRequests.remove(parts[0]);
            if (future != null) {
                future.complete(parts[1]);
            }
        }
    }

    public static CompletableFuture<String> sendRequest(Channel channel, String message) {
        String reqId = UUID.randomUUID().toString();
        CompletableFuture<String> future = new CompletableFuture<>();
        pendingRequests.put(reqId, future);
        
        String protocolMsg = reqId + "|" + message;
        channel.writeAndFlush(protocolMsg).addListener(f -> {
            if (!f.isSuccess()) {
                future.completeExceptionally(f.cause());
                pendingRequests.remove(reqId);
            }
        });
        
        // 设置超时
        channel.eventLoop().schedule(() -> {
            if (pendingRequests.remove(reqId) != null) {
                future.completeExceptionally(new TimeoutException("Request timeout"));
            }
        }, 5, TimeUnit.SECONDS);
        
        return future;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.error("Channel error", cause);
        ctx.close();
    }
}

3.3 服务层封装

TcpClientService 提供对外的服务接口,封装了 TCP 通信逻辑:

@Service
@RequiredArgsConstructor
public class TcpClientService {

    private final NettyClientBootstrap clientBootstrap;

    public String sendMessage(String message) throws Exception {
        Channel channel = clientBootstrap.getChannel();
        return ClientHandler.sendRequest(channel, message)
                .get(5, TimeUnit.SECONDS);
    }
}

3.4 RESTful 接口层

TcpController 提供 RESTful 接口,接收外部请求并转发到 TCP 服务:

@RestController
@RequestMapping("/api")
@RequiredArgsConstructor
public class TcpController {

    private final TcpClientService tcpClientService;

    @PostMapping("/send")
    public ResponseEntity<?> sendCommand(@RequestBody String payload) {
        try {
            String response = tcpClientService.sendMessage(payload);
            return ResponseEntity.ok(response);
        } catch (TimeoutException e) {
            return ResponseEntity.status(HttpStatus.GATEWAY_TIMEOUT)
                    .body("Backend service timeout");
        } catch (Exception e) {
            return ResponseEntity.status(HttpStatus.BAD_GATEWAY)
                    .body("Communication error: " + e.getMessage());
        }
    }
}

3.5 心跳保活机制

HeartbeatScheduler 定期发送心跳消息,保持 TCP 连接活跃:

@Component
@RequiredArgsConstructor
public class HeartbeatScheduler {

    private final TcpClientService tcpClientService;
    
    @Scheduled(fixedRateString = "${tcp.client.heartbeat-interval}")
    public void heartbeat() {
        try {
            tcpClientService.sendMessage("HEARTBEAT");
        } catch (Exception e) {
            log.error("Heartbeat failed", e);
        }
    }
}

四.关键功能说明

1. 长连接管理

  • 自动重连机制:断线后每 5 秒自动重试。
  • Channel 状态监控:实时监控连接状态,确保连接可用。
  • 异常自动恢复:捕获异常并尝试恢复连接。

2. 协议设计

协议格式如下:

+----------------+-------------------+
| 32字节UUID     | 实际消息内容(UTF8)|
+----------------+-------------------+

3. 异步处理流程

异步处理流程如下:

Controller Service Handler TCP Server 发送请求 生成请求ID 发送协议消息 返回响应 完成Future 返回响应 Controller Service Handler TCP Server

五.测试方法

1. 启动模拟 TCP 服务端

使用 Python 快速搭建一个测试 TCP 服务端:

# 使用Python快速搭建测试服务
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('0.0.0.0', 9000))
sock.listen(1)

while True:
    conn, addr = sock.accept()
    while True:
        data = conn.recv(1024)
        if not data: break
        req_id, payload = data.decode().split('|', 1)
        conn.send(f"{req_id}|ECHO:{payload}".encode())
    conn.close()

2. 发送测试请求

通过 curl 发送测试请求:

curl -X POST -H "Content-Type: text/plain" \
-d "Hello Netty" \
http://localhost:8080/api/send

六.生产级优化建议

1. 连接池扩展

实现多 Channel 管理,提高性能和可靠性:

public class ConnectionPool {
    private final BlockingQueue<Channel> pool = new LinkedBlockingQueue<>(10);
    
    public Channel getChannel() {
        Channel channel = pool.poll();
        if (channel == null || !channel.isActive()) {
            channel = createNewChannel();
        }
        return channel;
    }
    
    private Channel createNewChannel() {
        // 创建新连接逻辑
    }
}

2. 监控指标

添加监控指标,便于实时监控系统状态:

@Bean
public MeterRegistryCustomizer<MeterRegistry> metrics() {
    return registry -> {
        Gauge.builder("tcp.active.connections", 
                () -> clientBootstrap.getActiveCount())
            .register(registry);
    };
}

3. SSL 加密支持

为 TCP 连接添加 SSL 加密支持:

bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) {
        SSLEngine engine = sslContext.newEngine(ch.alloc());
        pipeline.addLast(new SslHandler(engine));
        // 其他处理器...
    }
});

4. 流量控制

添加流量控制机制,防止服务过载:

// 在Handler中添加限流器
private final RateLimiter rateLimiter = RateLimiter.create(1000); // QPS=1000

public static CompletableFuture<String> sendRequest(...) {
    if (!rateLimiter.tryAcquire()) {
        throw new RateLimitExceededException();
    }
    // 原有逻辑...
}

总结

本文介绍了一个基于 Spring Boot 和 Netty 的 TCP 长连接客户端实现方案,支持通过 RESTful 接口转发请求到后台 TCP 服务。该方案具备以下核心功能:

  • 基于 Netty 的 TCP 长连接管理
  • 异步请求/响应匹配机制
  • 自动重连和心跳保活
  • RESTful 接口集成
  • 完善的超时和异常处理

你可以根据实际需求调整协议格式、连接池参数和安全策略。建议配合 APM 工具(如 SkyWalking)进行链路监控,以进一步优化系统性能和稳定性。

希望本文对你有所帮助!如果有任何问题或建议,欢迎在评论区留言。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2376690.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Linux】动静态库的使用

&#x1f4dd;前言&#xff1a; 这篇文章我们来讲讲Linux——动静态库的使用 &#x1f3ac;个人简介&#xff1a;努力学习ing &#x1f4cb;个人专栏&#xff1a;Linux &#x1f380;CSDN主页 愚润求学 &#x1f304;其他专栏&#xff1a;C学习笔记&#xff0c;C语言入门基础&…

Java基础(网络编程)

一、概述 目的&#xff1a;网络通信&#xff1a; 1、设备和设备 2、进程和进程 1&#xff09;不同设备之间 2&#xff09;本地设备之间 需要解决的问题&#xff1a; 如何准确地发送到对方的主机 - IP地址 - 唯一的定位网络中的一台主机 如何准确的发送到对方主机的进程 -…

计量——异方差的检验及其修正

目录 1.异方差的检验 1 BP检验 2white检验 2.异方差的修正 1.异方差的检验 1 BP检验 选择检验方法&#xff1a;BP BP检验的实际步骤&#xff08;非机器&#xff09;&#xff1a; 1.y对所有x进行回归&#xff0c;得到残差u。计算残差的平方u^2 2.u^2对所有x进行回归&#…

学习C++的好书:C++编程之禅

历时四个月&#xff0c;把这本书看了一遍&#xff0c;受益匪浅&#xff0c;推荐给大家&#xff0c;系统的学习一遍C。

OpenCV进阶操作:人脸检测、微笑检测

文章目录 前言一、OpenCV如何实现人脸检测1、haar特征2、级联分类器3、级联分类器的使用 二、人脸检测、微笑检测 案例实现1、预处理2、加载分类器3、标注人脸4、运行结果&#xff1a;4、微笑检测 总结 前言 要实现人脸识别首先要判断当前图像中是否出现了人脸&#xff0c;这就…

车载诊断进阶篇 --- 车载诊断概念

我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 钝感力的“钝”,不是木讷、迟钝,而是直面困境的韧劲和耐力,是面对外界噪音的通透淡然。 生活中有两种人,一种人格外在意别人的眼光;另一种人无论…

制作一款打飞机游戏49:敌人抖动

蛇形敌人 如果你玩过一些射击游戏&#xff08;shmups&#xff09;&#xff0c;尤其是老式的射击游戏&#xff0c;你可能会遇到一种敌人&#xff0c;它们像蛇一样移动。我想在我们的游戏中实现这种效果。这种动态的感觉非常棒&#xff0c;我们完全有能力通过动画来实现它。 方…

Pycharm IDEA加载大文件时报错:The file size exceeds configured limit

解决方案&#xff1a;配置一下idea.properties文件 文件里面写入代码&#xff1a; idea.max.intellisense.filesize50000重启IDEA即可&#xff1b;

PDA手持终端应用有哪些?

随着技术进步不断拓展&#xff0c;PDA手持终端其便携性与多功能特性使其成为多行业数字化转型的核心工具。主要包括物流与仓储管理、零售行业、医疗行业以及制造业等。 1.物流与仓储管理 在物流与仓储管理中&#xff0c;PDA手持终端主要用于物品的实时跟踪、库存管理和拣货作业…

Python+Selenium爬虫:豆瓣登录反反爬策略解析

1. 引言 在当今互联网时代&#xff0c;数据抓取&#xff08;爬虫&#xff09;技术广泛应用于数据分析、市场调研、自动化测试等领域。然而&#xff0c;许多网站采用动态加载技术&#xff08;如Ajax、React、Vue.js等框架&#xff09;来渲染页面&#xff0c;传统的**<font s…

电总协议调试助手更新-PowerBus-v1.0.5

电总协议调试助手&#xff0c;该工具主要是用于打包电总协议&#xff0c;用于电总协议的设备调试&#xff08;精密空调、UPS、基站电源等等&#xff09;。电总协议校验计算、编码转换比较麻烦&#xff0c;手动组包困难&#xff0c;使用该工具可以大大提高调试效率。 Ver1.0.5版…

技术文档:变频器干扰问题与解决方案

1. 引言 在现代工业自动化系统中&#xff0c;变频器&#xff08;Variable Frequency Drive, VFD&#xff09;因其高效节能和精确调速的特点被广泛应用于电机控制。然而&#xff0c;变频器在运行过程中会产生高频电磁干扰&#xff08;EMI&#xff09;&#xff0c;对周边设备如P…

2025认证杯数学建模C题思路+代码+模型:化工厂生产流程的预测和控制

2025认证杯数学建模C题思路代码模型&#xff0c;详细内容见文末名片 在化工厂的生产流程中&#xff0c;往往涉及到多个反应釜、管道和储罐等设备。在 流水线上也有每个位置的温度、压力、流量等诸多参数。只有参数处于正常范 围时&#xff0c;最终的产物才是合格的。这些参数…

亚马逊,temu测评采购低成本养号策略:如何用一台设备安全批量管理买家账号

只要能够巧妙规避平台的检测和风控措施&#xff0c;测评便可安全进行。 自养号测评&#xff0c;它更便于卖家掌控&#xff0c;且能降低风险。现在很多卖家都是自己养号&#xff0c;自己养号都是精养&#xff0c;不是自动的机刷&#xff0c;买家账号掌握在自己手里&#xff0c;更…

SiFli-SDK 编译

1.编译报错 scons: *** No SConstruct file found. 出现这个错误是没有正确进入到工程目录执行编译命令&#xff0c;例如应该进入project目录中。 2.scons: *** [build_em-lb525_hcpu\src\resource\strings\en_us.c] AttributeError : dict object has no attribute iteritem…

C++多态实现的必要条件剖析

在C中&#xff0c;多态的一个必要条件确实是通过基类的指针或引用调用虚函数。这一要求背后的原因与C如何实现动态绑定&#xff08;运行时多态&#xff09;密切相关。下面详细解释了为什么需要使用基类的指针或引用来实现多态。 动态绑定与静态绑定 静态绑定&#xff08;编译期…

C语言_自动义类型:联合和枚举

1. 联合体 1.1 联合体类型的声明 与结构体相似&#xff0c;联合体也是有一个或多个成员&#xff08;可以是不同类型&#xff09;构成&#xff1b;但是编译器只为最大的成员分配足够的内存空间 联合体的特点是所有成员共用同一块内存空间&#xff0c;所以联合体也叫&#xff…

汽车紧固件涂层18问:看敦普无铬锌铝涂料如何为螺丝防锈防腐

导读 在汽车紧固件防锈涂装领域&#xff0c;敦普牌紧固件无铬锌铝涂料&#xff0c;是专为汽车紧固件打造的水性涂料&#xff0c;集防锈、环保、高性价比于一体。它有何独特之处&#xff1f;让我们一探究竟。​ 1、敦普紧固件无铬锌铝涂料是什么产品&#xff1f; 敦普紧固件无铬…

掘金中亚货代蓝海,易境通货代系统解锁数字化制胜密码!

2025年&#xff0c;中亚地区正成为全球物流行业的新蓝海。中亚五国因其独特的地缘位置和“一带一路”倡议的深化推进&#xff0c;正逐渐成为全球物流行业的战略要地。 在政策红利、基建升级与市场需求的叠加效应下&#xff0c;中亚物流市场预计在2025年迎来爆发式增长。但传统…

W1R3S: 1.0.1靶场

W1R3S: 1.0.1 来自 <W1R3S: 1.0.1 ~ VulnHub> 1&#xff0c;将两台虚拟机网络连接都改为NAT模式 2&#xff0c;攻击机上做namp局域网扫描发现靶机 nmap -sn 192.168.23.0/24 那么攻击机IP为192.168.23.182&#xff0c;靶场IP192.168.23.249 3&#xff0c;对靶机进行端口…