本文将基于前文实现的编解码与心跳机制,构建一个简单的 RPC 框架,包括请求封装、响应解析、动态代理调用。为打造微服务通信基础打下基础。
一、什么是 RPC?
RPC(Remote Procedure Call,远程过程调用)允许你像调用本地方法一样调用远程服务。
RPC 框架的核心包括:
-
通信协议(我们用 Netty 实现)
-
服务注册与发现(此处简化为直连)
-
编码/解码机制(TLV、自定义协议)
-
动态代理与调用(Java 反射)
二、定义协议数据结构
请求对象
public class RpcRequest {
private String className;
private String methodName;
private Class<?>[] paramTypes;
private Object[] args;
}
响应对象
public class RpcResponse {
private Object result;
private Throwable error;
}
三、编码器和解码器(简化版)
此处建议使用 Java 内置序列化或 JSON,避免自行实现复杂字节协议
public class RpcEncoder<T> extends MessageToByteEncoder<T> {
@Override
protected void encode(ChannelHandlerContext ctx, T msg, ByteBuf out) throws Exception {
byte[] data = SerializationUtil.serialize(msg); // 自定义序列化工具
out.writeInt(data.length);
out.writeBytes(data);
}
}
public class RpcDecoder<T> extends ByteToMessageDecoder {
private final Class<T> clazz;
public RpcDecoder(Class<T> clazz) { this.clazz = clazz; }
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
if (in.readableBytes() < 4) return;
in.markReaderIndex();
int length = in.readInt();
if (in.readableBytes() < length) {
in.resetReaderIndex();
return;
}
byte[] bytes = new byte[length];
in.readBytes(bytes);
out.add(SerializationUtil.deserialize(bytes, clazz));
}
}
四、客户端动态代理
public class RpcClientProxy {
private final String host;
private final int port;
public RpcClientProxy(String host, int port) {
this.host = host;
this.port = port;
}
@SuppressWarnings("unchecked")
public <T> T getProxy(Class<T> serviceClass) {
return (T) Proxy.newProxyInstance(
serviceClass.getClassLoader(),
new Class<?>[]{serviceClass},
(proxy, method, args) -> {
RpcRequest request = new RpcRequest();
request.setClassName(serviceClass.getName());
request.setMethodName(method.getName());
request.setParamTypes(method.getParameterTypes());
request.setArgs(args);
// Netty 同步发送请求,获取响应(略)
RpcResponse response = NettyClient.send(request, host, port);
return response.getResult();
});
}
}
五、服务端调用分发
public class RpcServerHandler extends SimpleChannelInboundHandler<RpcRequest> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) {
RpcResponse response = new RpcResponse();
try {
Class<?> clazz = Class.forName(request.getClassName());
Method method = clazz.getMethod(request.getMethodName(), request.getParamTypes());
Object result = method.invoke(clazz.getDeclaredConstructor().newInstance(), request.getArgs());
response.setResult(result);
} catch (Exception e) {
response.setError(e);
}
ctx.writeAndFlush(response);
}
}
六、服务端注册
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new RpcDecoder(RpcRequest.class));
pipeline.addLast(new RpcEncoder<>(RpcResponse.class));
pipeline.addLast(new RpcServerHandler());
}
});
bootstrap.bind(8080).sync();
七、调用示例
// 定义服务接口
public interface HelloService {
String hello(String name);
}
// 服务实现类
public class HelloServiceImpl implements HelloService {
public String hello(String name) {
return "Hello, " + name;
}
}
// 客户端调用
RpcClientProxy proxy = new RpcClientProxy("localhost", 8080);
HelloService service = proxy.getProxy(HelloService.class);
System.out.println(service.hello("Netty"));
八、总结
通过本篇,你已经实现了:
-
基于 Netty 的 RPC 协议通信
-
编解码框架构建
-
动态代理与服务远程调用
-
基础版 Netty RPC 框架原型
虽然还不完善(无注册中心、无连接池、无异步支持),但已具备通信能力,是 Netty 技术栈走向分布式架构的重要一步。