一、Netty简介
Netty 是一个异步事件驱动的网络通信应用框架,用于快速开发可维护的高性能服务器和客户端。简单地说Netty封装了JDK的NIO,不用再写一大堆复杂的代码,从NIO各种繁复的细节中脱离出来,让开发者重点关心业务逻辑。
二、Netty重要API
说明:大部分是模板代码,重点应该关注的是处理业务逻辑的Handler, 其按照角色不同分为ServerHandler和ClientHandler,按照数据处理流向不同,分为InboundHandler(对应接口是ChannelInboundHandlerAdapter)-处理输入数据,比如从客户端的角度,是处理从服务端发送过来的数据, 和OutboundHandler(对应接口是ChannelOutboundHandlerAdapter)-处理输出数据,比如从客户端的角度,是处理其发送给服务端的数据。
1、服务端
//用于接收客户端的连接请求的线程池
val bossGroup = new NioEventLoopGroup()
//用与处理客户端SocketChannel的网络读写的线程池
val workerGroup = new NioEventLoopGroup()
//是Netty用户启动NIO服务端的辅助启动类,降低服务端的开发复杂度
val bootstrap = new ServerBootstrap()
//将两个NIO线程组作为参数传入到ServerBootstrap
bootstrap.group(bossGroup, workerGroup)
  //创建NioServerSocketChannel
  .channel(classOf[NioServerSocketChannel])
  //绑定事件处理类
  .childHandler(new ChannelInitializer[SocketChannel] {
    override def initChannel(ch: SocketChannel): Unit = {
      // 日常主要要写的是ServerHandler逻辑 
      // 并且通常会添加多个ServerHandler
      ch.pipeline().addLast(new ServerHandler1)
    }
  })
//绑定端口地址端口
bootstrap.bind(host, port)2、客户端
//创建客户端NIO线程组
val eventGroup = new NioEventLoopGroup
//创建客户端辅助启动类
val bootstrap = new Bootstrap
//将NIO线程组传入到Bootstrap
bootstrap.group(eventGroup)
  //创建NioSocketChannel
  .channel(classOf[NioSocketChannel])
  //绑定事件处理类
  .handler(new ChannelInitializer[SocketChannel] {
    override def initChannel(ch: SocketChannel): Unit = {
      // 日常主要要写的是ClientHandler逻辑 
      // 并且通常会添加多个ClientHandler
      ch.pipeline().addLast(new ClientHandler1)
    }
  })
//发送连接操作
bootstrap.connect(host, port)三、Handler执行顺序
如第二部分所述,日常开发主要是写Handler,用于处理不同的业务逻辑,并且通常需要添加多个Handler,那么多个Handler的执行顺序如何呢?示例如下

比如一个客户端添加了5个Handler, 其中1和2是inBound,3和4是OutBound,5是InboundOutBound,
该客户端向外发送的消息会依次经过如下Handle的处理:5->4->3,而其接收到的外部信息会依次经过如下Handle的处理:1->2->5。
四、应用举例
1、案例1
描述:客户端与服务端建立连接,会各自执行Handler中的channelActive方法
客户端:ClientHandler1d的channelActive方法被调用[已跟服务器建立连接]
服务端: ServerHandler1的channelActive方法被调用[一个客户端连接上]
(1) 服务端代码
① NettyServer
package top.doe.netty.demo1
import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.ChannelInitializer
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannel
class NettyServer1 {
  def bind(host: String, port: Int): Unit = {
    //用于接收客户端的连接请求的线程池
    val bossGroup = new NioEventLoopGroup()
    //用与处理客户端SocketChannel的网络读写的线程池
    val workerGroup = new NioEventLoopGroup()
    //是Netty用户启动NIO服务端的辅助启动类,降低服务端的开发复杂度
    val bootstrap = new ServerBootstrap()
    //将两个NIO线程组作为参数传入到ServerBootstrap
    bootstrap.group(bossGroup, workerGroup)
      //创建NioServerSocketChannel
      .channel(classOf[NioServerSocketChannel])
      //绑定事件处理类
      .childHandler(new ChannelInitializer[SocketChannel] {
        override def initChannel(ch: SocketChannel): Unit = {
          ch.pipeline().addLast(new ServerHandler1)
        }
      })
    //绑定端口地址端口
    bootstrap.bind(host, port)
  }
}
object NettyServer1 {
  def main(args: Array[String]) {
    val host = "localhost"
    val port = 8888
    val server = new NettyServer1
    server.bind(host, port)
  }
}
②ServerHandler
package top.doe.netty.demo1
import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter}
class ServerHandler1 extends ChannelInboundHandlerAdapter {
  /**
   * 有客户端与服务端建立连接后调用
   */
  override def channelActive(ctx: ChannelHandlerContext): Unit = {
    println("ServerHandler的channelActive方法被调用【一个客户端连接上】")
  }
  /**
   * 有客户端与服务端断开连接后调用
   */
  override def channelInactive(ctx: ChannelHandlerContext): Unit = {
    println("ServerHandler的channelInactive方法被调用【一个客户端与服务端断开连接了】")
  }
  /**
   * 接受客户端发送来的消息
   */
  override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = {
    println("ServerHandler的channelRead方法被调用【收到客户端发送的消息了】")
  }
}
(2) 客户端代码
① NettyClient
package top.doe.netty.demo1
import io.netty.bootstrap.Bootstrap
import io.netty.channel.ChannelInitializer
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannel
class NettyClient1 {
  def connect(host: String, port: Int): Unit = {
    //创建客户端NIO线程组
    val eventGroup = new NioEventLoopGroup
    //创建客户端辅助启动类
    val bootstrap = new Bootstrap
    //将NIO线程组传入到Bootstrap
    bootstrap.group(eventGroup)
      //创建NioSocketChannel
      .channel(classOf[NioSocketChannel])
      //绑定事件处理类
      .handler(new ChannelInitializer[SocketChannel] {
        override def initChannel(ch: SocketChannel): Unit = {
          ch.pipeline().addLast(new ClientHandler1)
        }
      })
    //发送连接操作
    bootstrap.connect(host, port)
  }
}
object NettyClient1 {
  def main(args: Array[String]) {
    val host = "localhost"
    val port = 8888
    val client = new NettyClient1
    client.connect(host, port)
  }
}
②ClientHandler
package top.doe.netty.demo1
import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter}
class ClientHandler1 extends ChannelInboundHandlerAdapter {
  /**
   * 一旦跟服务端建立上连接,channelActive方法将被调用
   */
  override def channelActive(ctx: ChannelHandlerContext): Unit = {
    println("ClientHandler的channelActive方法被调用!【已经跟服务端连接上了】")
  }
  /**
   * 服务端返回消息后,channelRead方法被调用,该方法用于接送服务端返回的消息
   */
  override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = {
    println("ClientHandler的channelRead方法被调用!")
  }
}
(3) 执行结果
>>客户端

>>服务端

2、案例2
在案例1的基础上,服务端和客户端给彼此发送一条消息。
(1) 基本流程



(2) 服务端代码
① NettyServer
package com.wakedata.demo2
import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.ChannelInitializer
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannel
class NettyServer2 {
  def bind(host:String,port:Int) = {
    val bossGroup = new NioEventLoopGroup()
    val workerGroup = new NioEventLoopGroup()
    val bootstrap = new ServerBootstrap()
    bootstrap.group(bossGroup,workerGroup)
      .channel(classOf[NioServerSocketChannel])
      .childHandler(new ChannelInitializer[SocketChannel] {
        override def initChannel(ch: SocketChannel): Unit = {
          ch.pipeline().addLast(new ServerHandler2)
        }
      })
    bootstrap.bind(host,port)
  }
}
object NettyServer2{
  def main (args: Array[String] ): Unit = {
    val host = "localhost"
    val port = 8888
    val server2 = new NettyServer2
    server2.bind(host,port)
}
}
②ServerHandler
package com.wakedata.demo2
import io.netty.buffer.{ByteBuf, Unpooled}
import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter}
class ServerHandler2 extends ChannelInboundHandlerAdapter {
  override def channelActive(ctx: ChannelHandlerContext): Unit = {
    println("ServerHandler1的channelActive方法被调用[一个客户端连接上]")
  }
  override def channelInactive(ctx: ChannelHandlerContext): Unit = {
    println("ServerHandler1的channelInactive方法被调用[一个客户端断开连接]")
  }
  override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = {
    //接收客户端发送过来的消息
    val byteBuf = msg.asInstanceOf[ByteBuf]
    val bytes = new Array[Byte](byteBuf.readableBytes())
    byteBuf.readBytes(bytes)
    val message = new String(bytes, "UTF-8")
    println("ServerHandler2的channelRead方法被调用[收到客户端发送的消息了]" + message)
    //将数据发送到客户端
    val back = "你好"
    val resp = Unpooled.copiedBuffer(back.getBytes("UTF-8"))
    ctx.writeAndFlush(resp)
  }
}
(3) 客户端代码
① NettyClient
package com.wakedata.demo2
import io.netty.bootstrap.Bootstrap
import io.netty.channel.ChannelInitializer
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannel
class NettyClient2 {
  def connect(host:String,port:Int):Unit = {
    //创建客户端的NIO线程组
    val eventGroup = new NioEventLoopGroup()
    //创建客户端辅助启动类
    val bootstrap = new Bootstrap()
    //将NIO线程组传入到Bootstrap
    bootstrap.group(eventGroup)
      .channel(classOf[NioSocketChannel])
    //绑定事件处理类
      .handler(new ChannelInitializer[SocketChannel] {
        override def initChannel(ch: SocketChannel): Unit = {
          ch.pipeline().addLast(new ClientHandler2)
        }
      })
    //发送连接操作
    bootstrap.connect(host,port)
  }
}
object NettyClient2{
  def main(args: Array[String]): Unit = {
    val host = "localhost"
    val port = 8888
    val client = new NettyClient2()
    client.connect(host,port)
  }
}
②ClientHandler
package com.wakedata.demo2
import io.netty.buffer.{ByteBuf, Unpooled}
import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter}
class ClientHandler2 extends ChannelInboundHandlerAdapter{
  override def channelActive(ctx: ChannelHandlerContext): Unit = {
    println("ClientHandler2的channelActive方法被调用[已跟服务器建立连接]")
    //向服务端发送消息
    val msg = "hello"
    ctx.writeAndFlush(Unpooled.copiedBuffer(msg.getBytes("UTF-8")))
  }
  override def channelRead(ctx: ChannelHandlerContext, msg: Any): Unit = {
    //读取服务端返回的消息
    val byteBuf = msg.asInstanceOf[ByteBuf]
    val bytes = new Array[Byte](byteBuf.readableBytes())
    byteBuf.readBytes(bytes)
    val message = new String(bytes, "UTF-8")
    print("ClientHandler2的channelRead方法被调用,接收到服务器端发送过来的消息:" + message)
  }
}
(4) 执行结果
>> 客户端

 >> 服务端
3、案例3
在案例2的基础上,添加多个Handler处理器(共计3个)。因为发送的case class对象,所以消息发出之前,会经过一个OurBoundHandler进行序列化,然后接受消息时,首先经过一个InBoundHandler进行解码,然后再经过另外一个 InBoundHandler对解码后的数据进行读取。
(1) 服务端代码
① NettyServer
package com.wakedata.demo3
import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.ChannelInitializer
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.codec.serialization.{ClassResolvers, ObjectDecoder, ObjectEncoder}
class NettyServer3 {
  def bind(host: String, port: Int): Unit = {
    //配置服务端线程池组
    //用于服务器接收客户端的连接
    val bossGroup = new NioEventLoopGroup()
    //用户进行SocketChannel的网络读写
    val workerGroup = new NioEventLoopGroup()
    //是Netty用户启动NIO服务端的辅助启动类,降低服务端的开发复杂度
    val bootstrap = new ServerBootstrap()
    //将两个NIO线程组作为参数传入到ServerBootstrap
    bootstrap.group(bossGroup, workerGroup)
      //创建NioServerSocketChannel
      .channel(classOf[NioServerSocketChannel])
      //绑定I/O事件处理类
      .childHandler(new ChannelInitializer[SocketChannel] {
        override def initChannel(ch: SocketChannel): Unit = {
          //处理输入的数据执行顺序 decoder -> handler
          //处理返回的数据执行顺序 encoder
          ch.pipeline().addLast("encoder", new ObjectEncoder) //实现了ChannelOutboundHandler
          ch.pipeline().addLast("decoder", new ObjectDecoder(ClassResolvers.cacheDisabled(getClass.getClassLoader))) //实现了ChannelInboundHandler
          ch.pipeline().addLast("handler", new ServerHandler3) //实现了ChannelInboundHandler
        }
      })
    val channelFuture = bootstrap.bind(host, port)
    channelFuture.syncUninterruptibly
  }
}
object NettyServer3 {
  def main(args: Array[String]) {
    val host = "localhost"
    val port = 8888
    val server = new NettyServer3
    server.bind(host, port)
  }
}
② ServerHandler
package com.wakedata.demo3
import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter}
class ServerHandler3 extends ChannelInboundHandlerAdapter {
  /**
   * 有客户端建立连接后调用
   */
  override def channelActive(ctx: ChannelHandlerContext): Unit = {
    println("一个客户端连接上了...")
  }
  /**
   * 接受客户端发送来的消息
   */
  override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = {
    //进行模式匹配
    msg match {
      case RequestMsg(msg) => {
        println("收到客户端发送的消息:" + msg)
        //将数据发送到客户端
        ctx.writeAndFlush(ResponseMsg("haha"))
      }
    }
  }
}
③ ResponseMsg
package com.wakedata.demo3
case class ResponseMsg(msg: String)
(2) 服务端代码
① NettyClient
package com.wakedata.demo3
import io.netty.bootstrap.Bootstrap
import io.netty.channel.ChannelInitializer
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.serialization.{ClassResolvers, ObjectDecoder, ObjectEncoder}
class NettyClient3 {
  def connect(host: String, port: Int): Unit = {
    //创建客户端NIO线程组
    val eventGroup = new NioEventLoopGroup
    //创建客户端辅助启动类
    val bootstrap = new Bootstrap
    //将NIO线程组传入到Bootstrap
    bootstrap.group(eventGroup)
      //创建NioSocketChannel
      .channel(classOf[NioSocketChannel])
      //绑定I/O事件处理类
      .handler(new ChannelInitializer[SocketChannel] {
        override def initChannel(ch: SocketChannel): Unit = {
          ch.pipeline().addLast("encoder", new ObjectEncoder)
          ch.pipeline().addLast("decoder", new ObjectDecoder(ClassResolvers.cacheDisabled(getClass.getClassLoader)))
          ch.pipeline().addLast("handler", new ClientHandler3)
        }
      })
    //发起异步连接操作
    val channelFuture = bootstrap.connect(host, port)
    channelFuture.syncUninterruptibly
  }
}
object NettyClient3 {
  def main(args: Array[String]) {
    val host = "localhost"
    val port = 8888
    val client = new NettyClient3
    client.connect(host, port)
  }
}
② ClientHandler
package com.wakedata.demo3
import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter}
class ClientHandler3 extends ChannelInboundHandlerAdapter {
  //一旦跟服务端建立上连接channelActive将被调用
  override def channelActive(ctx: ChannelHandlerContext): Unit = {
    println("已经跟服务端连接上了")
    //向服务端发送case class实例
    ctx.writeAndFlush(RequestMsg("hello"))
  }
  override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = {
    msg match {
      case ResponseMsg(msg) => {
        println("收到服务端返回的消息:" + msg)
      }
    }
  }
}
③ RequestMsg
package com.wakedata.demo3
case class RequestMsg(content: String)
(3) 执行结果
>>客户端
 >>服务端
>>服务端




















