Netty JBoss Marshallin 编解码
- 介绍
- Marshallin 开发环境
- maven 依赖
 
- 业务场景模拟
- 流程图
 
- 代码展示
- 订购采购消息 POJO 类
- 订购应答消息 POJO 类
- SubscribeReqServer 服务端启动类
- MarshallingCodeCFactory
- 服务端业务处理类 SubscribeServerHandler
- 客户端启动类 SubscribeClient
- 客户端 业务处理类 SubscribeClientHandler
- 重点部分说明
 
- 效果展示
- 服务端打印
- 客户端打印
 
- 踩坑说明
- 解决办法
 
- 总结
介绍
JBoss Marshalling 是一个Java 对象序列化包,对JDK 默认的序列化框架进行了优化,但又保持跟java.io.Serializable 接口进行兼容(即POJO 类只要实现了Serializable接口,就支持编解码)。同时增加了一些可调的参数和附加的特性,可以通过工厂类进行配置。
Marshallin 开发环境
maven 依赖
由于我们只是用到了它的序列化类库,因此,只需要下载 jboss-marshlling-1.4.11.Final 和 jboss-marshalling-serial-1.4.11.Final 类库。采用 1.3.0 版本也行。
 		<dependency>
            <groupId>org.jboss.marshalling</groupId>
            <artifactId>jboss-marshalling</artifactId>
            <version>1.4.11.Final </version>
        </dependency>
        <dependency>
            <groupId>org.jboss.marshalling</groupId>
            <artifactId>jboss-marshalling-serial</artifactId>
            <version>1.4.11.Final </version>
        </dependency>
业务场景模拟
流程图

 (1)Netty 服务端接收客户端的用户订购请求信息,消息定义如下
 
(2)服务端接收到请求消息,对用户名进行合法性校验,如果合法,则构造订购成功的应答消息给客户端,订购应答消息定义如下
 
代码展示
订购采购消息 POJO 类
public class SubscribeReq implements Serializable {
    private static final long serialVersionUID=1L;
    //订购编号
    private int subReqID;
    //用户名
    private String userName;
    //订购的产品名称
    private String productName;
    //订购者的电话号码
    private String phoneNumber;
    //订购者的家庭地址
    private String address;
    public int getSubReqID() {
        return subReqID;
    }
    public void setSubReqID(int subReqID) {
        this.subReqID = subReqID;
    }
    public String getUserName() {
        return userName;
    }
    public void setUserName(String userName) {
        this.userName = userName;
    }
    public String getProductName() {
        return productName;
    }
    public void setProductName(String productName) {
        this.productName = productName;
    }
    public String getPhoneNumber() {
        return phoneNumber;
    }
    public void setPhoneNumber(String phoneNumber) {
        this.phoneNumber = phoneNumber;
    }
    public String getAddress() {
        return address;
    }
    public void setAddress(String address) {
        this.address = address;
    }
    public String toString(){
        return "SubscribeReq [ subReqID = "+subReqID+" , userName = " +
                ""+userName+" , productName = "+productName+" ," +
                " phoneNumber = "+phoneNumber+" , address = "+address+" ]";
    }
}
订购应答消息 POJO 类
public class SubscribeResp implements Serializable {
    private static final long serialVersionUID=1L;
    //订购编号
    private int subReqID;
    //订购结果 0 表示成功
    private int respCode;
    //可选的详细描述信息
    private String desc;
    public int getSubReqID() {
        return subReqID;
    }
    public void setSubReqID(int subReqID) {
        this.subReqID = subReqID;
    }
    public int getRespCode() {
        return respCode;
    }
    public void setRespCode(int respCode) {
        this.respCode = respCode;
    }
    public String getDesc() {
        return desc;
    }
    public void setDesc(String desc) {
        this.desc = desc;
    }
    public String toString(){
        return "SubscribeResp [ subReqID = "+subReqID+" " +
                ", respCode = "+respCode+" , desc = "+desc+" ]";
    }
}
SubscribeReqServer 服务端启动类
public class SubscribeReqServer {
    public void bind(int port){
        //配置服务端的NIO线程组
        EventLoopGroup parentGroup=new NioEventLoopGroup();
        EventLoopGroup childGroup=new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap=new ServerBootstrap();
            bootstrap.group(parentGroup,childGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,100)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //socketChannel.pipeline().addLast(
                                    //new LengthFieldBasedFrameDecoder(20*1024,0,2));
                            
                            //添加 MarshallingCodeCFactory 构造的 Marshalling对应的解码器和编码器
                           socketChannel.pipeline().
                                   addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                           socketChannel.pipeline()
                                           .addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                            socketChannel.pipeline()
                                    .addLast(new SubscribeServerHandler());
                        }
                    });
            ChannelFuture future=bootstrap.bind(port).sync();
            System.out.println("netty server is started");
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            //优雅退出,释放线程池资源
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }
    }
    public static void main(String[] args) {
        new SubscribeReqServer().bind(8080);
    }
}
MarshallingCodeCFactory
public class MarshallingCodeCFactory {
    public static MarshallingDecoder buildMarshallingDecoder(){
        //通过工具类 Marshalling 的 getProvidedMarshallerFactory 静态方法获取 MarshallerFactory 实例
        //参数 serial 表示创建的是 Java 序列化工厂对象
        final MarshallerFactory marshallerFactory= Marshalling.getProvidedMarshallerFactory("serial");
        //创建 MarshallingConfiguration 对象,将它的版本号设置为5,
        final MarshallingConfiguration configuration=new MarshallingConfiguration();
        configuration.setVersion(5);
        //然后根据  MarshallerFactory 和 MarshallingConfiguration 创建 UnmarshallerProvider 对象
        UnmarshallerProvider provider=new DefaultUnmarshallerProvider(marshallerFactory,configuration);
        //创建 Netty 的 MarshallingDecoder 对象,1024 表示 单个消息序列化后的最大长度
        MarshallingDecoder decoder=new MarshallingDecoder(provider,1024);
        return decoder;
    }
    public static MarshallingEncoder buildMarshallingEncoder(){
        //构造 MarshallerFactory 对象
        final MarshallerFactory marshallerFactory=Marshalling.getProvidedMarshallerFactory("serial");
        //构造 MarshallingConfiguration 对象
        final MarshallingConfiguration configuration=new MarshallingConfiguration();
        configuration.setVersion(5);
        //利用  MarshallerFactory MarshallingConfiguration 来 构造 MarshallerProvider对象
        MarshallerProvider provider=new DefaultMarshallerProvider(marshallerFactory,configuration);
        //创建 Netty 的解码器 MarshallingEncoder
        MarshallingEncoder encoder=new MarshallingEncoder(provider);
        return encoder;
    }
}
服务端业务处理类 SubscribeServerHandler
public class SubscribeServerHandler extends ChannelHandlerAdapter {
    public void channelRead(ChannelHandlerContext context,Object obj){
        SubscribeReq subscribeReq=(SubscribeReq)obj;
        //对合法性进行交易,当前只校验userName属性。如果符合,将信息发送给客户端
        if ("echo".equals(subscribeReq.getUserName())){
            System.out.println("Service accept client subscribe req : "+subscribeReq);
            context.writeAndFlush(resp(subscribeReq.getSubReqID()));
        }
    }
    private SubscribeResp resp(int subId){
        SubscribeResp subscribeResp=new SubscribeResp();
        subscribeResp.setSubReqID(subId);
        subscribeResp.setRespCode(0);
        subscribeResp.setDesc("Netty learning is doing,please go on .");
        return subscribeResp;
    }
    public void exceptionCaught(ChannelHandlerContext context,Throwable throwable){
        throwable.printStackTrace();
        context.close();
    }
}
客户端启动类 SubscribeClient
public class SubscribeClient {
    private void connect(String host, int port) {
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(workGroup)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                           // socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(20*1024,0,2));
                            //添加 MarshallingCodeCFactory 构造的 Marshalling对应的解码器和编码器
                            socketChannel.pipeline().
                                    addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                            socketChannel.pipeline()
                                    .addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                            socketChannel.pipeline().addLast(new SubscribeClientHandler());
                        }
                    });
            ChannelFuture future=bootstrap.connect(host,port).sync();
            System.out.println("the netty client is started,and connected server");
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            //优雅退出
            workGroup.shutdownGracefully();
        }
    }
    public static void main(String[] args) {
        new SubscribeClient().connect("127.0.00.1",8080);
    }
}
客户端 业务处理类 SubscribeClientHandler
public class SubscribeClientHandler extends ChannelHandlerAdapter {
    //tcp 链接成功后,就给服务端 循环发送10条信息
    public void channelActive(ChannelHandlerContext context){
        for (int i=0;i<10;i++){
            context.write(subReq(i));
        }
        context.flush();
    }
    private SubscribeReq subReq(int i){
        SubscribeReq subscribeReq=new SubscribeReq();
        subscribeReq.setSubReqID(i);
        subscribeReq.setAddress("深圳市宝安区西乡街道xxx");
        subscribeReq.setPhoneNumber("174868xxxx");
        subscribeReq.setProductName("Netty learning ");
        subscribeReq.setUserName("echo");
        return  subscribeReq;
    }
    /**
     * 由于对象解码器已经对 SubscribeResp 请求消息 进行了自动解码,
     * 因此
     * @param context
     * @param obj
     */
    public void channelRead(ChannelHandlerContext context,Object obj){
        System.out.println("Receive server response : "+obj);
    }
}
重点部分说明
和前面几章相比,这章相对代码比较简单。重点在于服务端启动类和客户端启动类加的 编码器和解码器。
效果展示
服务端打印

客户端打印

踩坑说明
此时用网络注释 netAssist 来测试服务端,给服务端发送消息的时候,会报异常。如截图:
 netAssis 下载地址:https://download.csdn.net/download/echohuangshihuxue/87311946?spm=1001.2014.3001.5503
 
解决办法
在服务端启动类里面,加上一个 LengthFieldBasedFrameDecoder 这个解码器即可。
 如图:
 
总结
本章介绍了如何使用Netty的Marshalling 编码器和解码器对POJO对象进行序列化。通过使用Netty的
 Marshalling 编解码器,我们可以轻松的开发出支持 JBoss Marshalling 序列化的客户端和服务端程序,
 方便对接JBoss 的内部模块,同时也利于对已有使用JBoss Marshalling 框架做通信协议的模块的桥接和重用。












![[ 常用工具篇 ] burpsuite_pro 安装配置详解(附安装包)](https://img-blog.csdnimg.cn/2e129eee05104753b9fa8e5bcf6a74bf.png)





