[Java实战]Spring Boot + Netty 实现 TCP 长连接客户端及 RESTful 请求转发(二十六)
在现代微服务架构中,经常需要在不同服务之间进行高效、可靠的通信。本文将介绍如何使用 Spring Boot 结合 Netty 实现一个 TCP 长连接客户端,并通过 RESTful 接口转发请求到后台 TCP 服务。这种架构在物联网、实时通信等领域非常常见。
一.架构设计
以下是系统的架构设计图:
- HTTP Client:外部客户端通过 RESTful 接口发送请求。
- Spring Boot Controller:接收 HTTP 请求并转发到 Netty 客户端服务。
- Netty Client Service:管理 TCP 长连接,发送请求并接收响应。
- TCP Connection Pool:管理多个 TCP 连接,提高性能和可靠性。
- Backend TCP Server:后台 TCP 服务,处理实际的业务逻辑。
二.项目搭建与依赖配置
1. 创建项目并添加依赖
创建一个 Spring Boot 项目,并添加以下依赖:
<dependencies>
<!-- Spring Boot Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Netty -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.94.Final</version>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
2. 配置参数
在 application.yml
文件中配置 TCP 服务器的地址、端口以及其他参数:
tcp:
server:
host: 127.0.0.1
port: 9000
client:
worker-threads: 4
connect-timeout: 3000
heartbeat-interval: 30000
max-frame-length: 65535
三.核心组件实现
3.1 Netty 客户端启动器
NettyClientBootstrap
负责初始化 Netty 客户端并建立长连接:
@Component
@Slf4j
public class NettyClientBootstrap {
@Value("${tcp.server.host}")
private String host;
@Value("${tcp.server.port}")
private int port;
private volatile Channel channel;
private Bootstrap bootstrap;
@PostConstruct
public void init() throws InterruptedException {
EventLoopGroup workerGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 协议处理
pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast(new LengthFieldBasedFrameDecoder(
65535, 0, 4, 0, 4));
pipeline.addLast(new StringEncoder(StandardCharsets.UTF_8));
pipeline.addLast(new StringDecoder(StandardCharsets.UTF_8));
// 业务处理器
pipeline.addLast(new ClientHandler());
}
});
connect();
}
private void connect() throws InterruptedException {
ChannelFuture future = bootstrap.connect(host, port).sync();
channel = future.channel();
channel.closeFuture().addListener(f -> {
log.warn("Connection lost, reconnecting...");
reconnect();
});
}
private void reconnect() {
bootstrap.connect(host, port).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
channel = future.channel();
log.info("Reconnected successfully");
} else {
log.error("Reconnect failed, retrying in 5s");
channel.eventLoop().schedule(this::reconnect, 5, TimeUnit.SECONDS);
}
});
}
public Channel getChannel() {
if (!channel.isActive()) {
throw new IllegalStateException("Channel is inactive");
}
return channel;
}
}
3.2 业务处理器
ClientHandler
负责处理 TCP 通信中的请求和响应:
@Slf4j
public class ClientHandler extends SimpleChannelInboundHandler<String> {
private static final ConcurrentMap<String, CompletableFuture<String>> pendingRequests
= new ConcurrentHashMap<>();
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
// 响应格式示例:REQ_ID|RESPONSE_DATA
String[] parts = msg.split("\\|", 2);
if (parts.length == 2) {
CompletableFuture<String> future = pendingRequests.remove(parts[0]);
if (future != null) {
future.complete(parts[1]);
}
}
}
public static CompletableFuture<String> sendRequest(Channel channel, String message) {
String reqId = UUID.randomUUID().toString();
CompletableFuture<String> future = new CompletableFuture<>();
pendingRequests.put(reqId, future);
String protocolMsg = reqId + "|" + message;
channel.writeAndFlush(protocolMsg).addListener(f -> {
if (!f.isSuccess()) {
future.completeExceptionally(f.cause());
pendingRequests.remove(reqId);
}
});
// 设置超时
channel.eventLoop().schedule(() -> {
if (pendingRequests.remove(reqId) != null) {
future.completeExceptionally(new TimeoutException("Request timeout"));
}
}, 5, TimeUnit.SECONDS);
return future;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("Channel error", cause);
ctx.close();
}
}
3.3 服务层封装
TcpClientService
提供对外的服务接口,封装了 TCP 通信逻辑:
@Service
@RequiredArgsConstructor
public class TcpClientService {
private final NettyClientBootstrap clientBootstrap;
public String sendMessage(String message) throws Exception {
Channel channel = clientBootstrap.getChannel();
return ClientHandler.sendRequest(channel, message)
.get(5, TimeUnit.SECONDS);
}
}
3.4 RESTful 接口层
TcpController
提供 RESTful 接口,接收外部请求并转发到 TCP 服务:
@RestController
@RequestMapping("/api")
@RequiredArgsConstructor
public class TcpController {
private final TcpClientService tcpClientService;
@PostMapping("/send")
public ResponseEntity<?> sendCommand(@RequestBody String payload) {
try {
String response = tcpClientService.sendMessage(payload);
return ResponseEntity.ok(response);
} catch (TimeoutException e) {
return ResponseEntity.status(HttpStatus.GATEWAY_TIMEOUT)
.body("Backend service timeout");
} catch (Exception e) {
return ResponseEntity.status(HttpStatus.BAD_GATEWAY)
.body("Communication error: " + e.getMessage());
}
}
}
3.5 心跳保活机制
HeartbeatScheduler
定期发送心跳消息,保持 TCP 连接活跃:
@Component
@RequiredArgsConstructor
public class HeartbeatScheduler {
private final TcpClientService tcpClientService;
@Scheduled(fixedRateString = "${tcp.client.heartbeat-interval}")
public void heartbeat() {
try {
tcpClientService.sendMessage("HEARTBEAT");
} catch (Exception e) {
log.error("Heartbeat failed", e);
}
}
}
四.关键功能说明
1. 长连接管理
- 自动重连机制:断线后每 5 秒自动重试。
- Channel 状态监控:实时监控连接状态,确保连接可用。
- 异常自动恢复:捕获异常并尝试恢复连接。
2. 协议设计
协议格式如下:
+----------------+-------------------+
| 32字节UUID | 实际消息内容(UTF8)|
+----------------+-------------------+
3. 异步处理流程
异步处理流程如下:
五.测试方法
1. 启动模拟 TCP 服务端
使用 Python 快速搭建一个测试 TCP 服务端:
# 使用Python快速搭建测试服务
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('0.0.0.0', 9000))
sock.listen(1)
while True:
conn, addr = sock.accept()
while True:
data = conn.recv(1024)
if not data: break
req_id, payload = data.decode().split('|', 1)
conn.send(f"{req_id}|ECHO:{payload}".encode())
conn.close()
2. 发送测试请求
通过 curl
发送测试请求:
curl -X POST -H "Content-Type: text/plain" \
-d "Hello Netty" \
http://localhost:8080/api/send
六.生产级优化建议
1. 连接池扩展
实现多 Channel 管理,提高性能和可靠性:
public class ConnectionPool {
private final BlockingQueue<Channel> pool = new LinkedBlockingQueue<>(10);
public Channel getChannel() {
Channel channel = pool.poll();
if (channel == null || !channel.isActive()) {
channel = createNewChannel();
}
return channel;
}
private Channel createNewChannel() {
// 创建新连接逻辑
}
}
2. 监控指标
添加监控指标,便于实时监控系统状态:
@Bean
public MeterRegistryCustomizer<MeterRegistry> metrics() {
return registry -> {
Gauge.builder("tcp.active.connections",
() -> clientBootstrap.getActiveCount())
.register(registry);
};
}
3. SSL 加密支持
为 TCP 连接添加 SSL 加密支持:
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
SSLEngine engine = sslContext.newEngine(ch.alloc());
pipeline.addLast(new SslHandler(engine));
// 其他处理器...
}
});
4. 流量控制
添加流量控制机制,防止服务过载:
// 在Handler中添加限流器
private final RateLimiter rateLimiter = RateLimiter.create(1000); // QPS=1000
public static CompletableFuture<String> sendRequest(...) {
if (!rateLimiter.tryAcquire()) {
throw new RateLimitExceededException();
}
// 原有逻辑...
}
总结
本文介绍了一个基于 Spring Boot 和 Netty 的 TCP 长连接客户端实现方案,支持通过 RESTful 接口转发请求到后台 TCP 服务。该方案具备以下核心功能:
- 基于 Netty 的 TCP 长连接管理
- 异步请求/响应匹配机制
- 自动重连和心跳保活
- RESTful 接口集成
- 完善的超时和异常处理
你可以根据实际需求调整协议格式、连接池参数和安全策略。建议配合 APM 工具(如 SkyWalking)进行链路监控,以进一步优化系统性能和稳定性。
希望本文对你有所帮助!如果有任何问题或建议,欢迎在评论区留言。