< ! --  springboot的依赖,如果系统不使用web相关的功能,也可以不使用 -- > 
< dependency> < groupId> . springframework. boot< / groupId> 
	< artifactId> - boot- starter- web< / artifactId> 
	< version> { springboot. version} < / version> 
< / dependency> 
< dependency> < groupId> . projectlombok< / groupId> 
    < artifactId> < / artifactId> 
    < version> { lombok. version} < / version> 
    < scope> < / scope> 
< / dependency> 
< dependency> < groupId> . alibaba. fastjson2< / groupId> 
    < artifactId> < / artifactId> 
    < version> { fastjson2. version} < / version> 
< / dependency> 
< dependency> < groupId> . netty< / groupId> 
    < artifactId> - all< / artifactId> 
    < version> { netty. version} < / version> 
< / dependency> 
< ! --  握手前校验使用,也可以使用其他的校验方式 -- > 
< dependency> < groupId> . auth0< / groupId> 
    < artifactId> - jwt< / artifactId> 
    < version> { jwt. version} < / version> 
< / dependency> 
 
package  com. zzc. netty. infrastructure. netty ; 
import  com. zzc. netty. infrastructure. netty. config.  SocketConfig ; 
public  interface  Socket < C  extends  SocketConfig > { 
    boolean  start ( ) ; 
    boolean  start ( C  serverConfig,  WebSocketChannelHandler  webSocketChannelHandler) ; 
    boolean  isStarted ( ) ; 
    void  close ( ) ; 
    C  getConfig ( ) ; 
    Socket  setConfig ( C  config) ; 
    boolean  isServer ( ) ; 
} 
package  com. zzc. netty. infrastructure. netty ; 
import  com. zzc. netty. infrastructure. netty. config.  SocketConfig ; 
import  io. netty. channel.  ChannelHandler ; 
import  io. netty. channel.  ChannelHandlerContext ; 
import  io. netty. channel.  ChannelInboundHandlerAdapter ; 
import  io. netty. channel.  ChannelPipeline ; 
import  io. netty. handler. timeout.  IdleState ; 
import  io. netty. handler. timeout.  IdleStateEvent ; 
import  io. netty. handler. timeout.  IdleStateHandler ; 
import  lombok. extern. slf4j.  Slf4j ; 
import  java. util. concurrent.  TimeUnit ; 
@Slf4j 
public  abstract  class  BaseSocket < C  extends  SocketConfig > implements  Socket < C > { 
    private  WebSocketChannelHandler  webSocketChannelHandler; 
    private  C  config; 
    private  boolean  server =  true ; 
    private  boolean  started =  false ; 
    public  BaseSocket ( boolean  server)  { 
        this . server =  server; 
    } 
    @Override 
    public  boolean  start ( )  { 
        C  serverConfig =  getConfig ( ) ; 
        if  ( serverConfig ==  null )  { 
            throw  new  RuntimeException ( "serverConfig is null." ) ; 
        } 
        if  ( getWebSocketChannelHandler ( )  ==  null )  { 
            throw  new  RuntimeException ( "OnChannelHandler is null." ) ; 
        } 
        boolean  result =  false ; 
        try  { 
            result =  doStart ( serverConfig) ; 
        }  catch  ( Exception  e)  { 
            throw  new  RuntimeException ( "start server error" ) ; 
        } 
        setStarted ( result) ; 
        return  result; 
    } 
    @Override 
    public  boolean  start ( C  serverConfig,  WebSocketChannelHandler  webSocketChannelHandler)  { 
        this . webSocketChannelHandler =  webSocketChannelHandler; 
        setConfig ( serverConfig) ; 
        return  start ( ) ; 
    } 
    @Override 
    public  boolean  isStarted ( )  { 
        return  started; 
    } 
    @Override 
    public  void  close ( )  { 
        if  ( isStarted ( ) )  { 
            log. info ( "stop server" ) ; 
            try  { 
                doClose ( ) ; 
            }  catch  ( Exception  e)  { 
                log. error ( "stop server error." ,  e) ; 
            } 
        } 
    } 
    @Override 
    public  C  getConfig ( )  { 
        return  config; 
    } 
    protected  WebSocketChannelHandler  getWebSocketChannelHandler ( )  { 
        return  this . webSocketChannelHandler; 
    } 
    @Override 
    public  Socket  setConfig ( C  config)  { 
        if  ( config ==  null )  { 
            throw  new  RuntimeException ( "conf is null" ) ; 
        } 
        this . config =  config; 
        return  this ; 
    } 
    @Override 
    public  boolean  isServer ( )  { 
        return  server; 
    } 
    protected  void  setStarted ( boolean  started)  { 
        this . started =  started; 
    } 
    protected  void  addNettyIdleHandler ( ChannelPipeline  pipeline)  { 
        long  readIdleTimeout =  getReadIdleTimeout ( ) ; 
        long  writeIdleTimeout =  getWriteIdleTimeout ( ) ; 
        long  allIdleTimeout =  getAllIdleTimeout ( ) ; 
        pipeline. addLast ( new  IdleStateHandler ( readIdleTimeout,  writeIdleTimeout,  allIdleTimeout,  TimeUnit . MILLISECONDS ) ) ; 
        pipeline. addLast ( new  SocketIdleStateTrigger ( ) ) ; 
    } 
    protected  long  getReadIdleTimeout ( )  { 
        C  conf =  getConfig ( ) ; 
        return  conf. getCloseTimeout ( ) ; 
    } 
    protected  long  getWriteIdleTimeout ( )  { 
        C  conf =  getConfig ( ) ; 
        long  closeTimeout =  conf. getCloseTimeout ( ) ; 
        return  Math . min ( Math . max ( closeTimeout /  4 ,  ( 15  *  1000 ) ) ,  ( closeTimeout /  2 ) ) ; 
    } 
    protected  long  getAllIdleTimeout ( )  { 
        C  conf =  getConfig ( ) ; 
        long  closeTimeout =  conf. getCloseTimeout ( ) ; 
        return  closeTimeout +  500 ; 
    } 
    protected  abstract  boolean  doStart ( C  serverConfig) ; 
    protected  abstract  void  doClose ( ) ; 
    protected  void  addNettyOtherHandler ( ChannelPipeline  pipeline)  { 
        
    } 
    @ChannelHandler.Sharable 
    class  SocketIdleStateTrigger  extends  ChannelInboundHandlerAdapter  { 
        @Override 
        public  void  userEventTriggered ( ChannelHandlerContext  ctx,  Object  evt)  throws  Exception  { 
            if  ( evt instanceof  IdleStateEvent )  { 
                IdleState  state =  ( ( IdleStateEvent )  evt) . state ( ) ; 
                
                getWebSocketChannelHandler ( ) . onIdleStateEvent ( ctx. channel ( ) ,  state) ; 
            }  else  { 
                super . userEventTriggered ( ctx,  evt) ; 
            } 
        } 
    } 
} 
package  com. zzc. netty. infrastructure. netty ; 
import  io. netty. channel.  Channel ; 
import  io. netty. handler. codec. http.  FullHttpMessage ; 
import  io. netty. handler. codec. http.  FullHttpRequest ; 
import  io. netty. handler. timeout.  IdleState ; 
import  java. util.  Map ; 
public  interface  WebSocketChannelHandler  { 
    boolean  beforeHandshake ( Channel  channel,  Map < String ,  Object > ) ; 
    boolean  afterHandshake ( Channel  channel,  Map < String ,  Object > ) ; 
    void  channelActive ( Channel  channel) ; 
    void  channelInactive ( Channel  channel) ; 
    void  channelRead ( Channel  channel,  Object  msg) ; 
    void  onException ( Channel  channel,  Throwable  throwable) ; 
    void  onIdleStateEvent ( Channel  channel,  IdleState  state) ; 
} 
package  com. zzc. netty. infrastructure. netty. config ; 
public  interface  SocketConfig  { 
    public  String  getIp ( ) ; 
    public  void  setIp ( String  ip) ; 
    public  int  getPort ( ) ; 
    public  void  setPort ( int  port) ; 
    public  long  getConnectTimeout ( ) ; 
    public  void  setConnectTimeout ( long  connectTimeout) ; 
    public  long  getWriteTimeout ( ) ; 
    public  void  setWriteTimeout ( long  writeTimeout) ; 
    public  long  getCloseTimeout ( ) ; 
    public  void  setCloseTimeout ( long  closeTimeout) ; 
} 
package  com. zzc. netty. infrastructure. netty. config ; 
public  class  BaseSocketConfig  implements  SocketConfig  { 
    long  TIMEOUT_CONNECT  =  30  *  1000 ; 
    long  TIMEOUT_WRITE  =  30  *  1000 ; 
    long  TIMEOUT_CLOSE  =  120  *  1000 ; 
    private  String  ip; 
    private  int  port =  9696 ; 
    private  long  connectTimeout =  TIMEOUT_CONNECT ; 
    private  long  writeTimeout =  TIMEOUT_WRITE ; 
    private  long  closeTimeout =  TIMEOUT_CLOSE ; 
    @Override 
    public  String  getIp ( )  { 
        return  ip; 
    } 
    @Override 
    public  void  setIp ( String  ip)  { 
        this . ip =  ip; 
    } 
    @Override 
    public  int  getPort ( )  { 
        return  port; 
    } 
    @Override 
    public  void  setPort ( int  port)  { 
        this . port =  port; 
    } 
    @Override 
    public  long  getConnectTimeout ( )  { 
        return  connectTimeout; 
    } 
    @Override 
    public  void  setConnectTimeout ( long  connectTimeout)  { 
        this . connectTimeout =  connectTimeout; 
    } 
    @Override 
    public  long  getWriteTimeout ( )  { 
        return  writeTimeout; 
    } 
    @Override 
    public  void  setWriteTimeout ( long  writeTimeout)  { 
        this . writeTimeout =  writeTimeout; 
    } 
    @Override 
    public  long  getCloseTimeout ( )  { 
        return  closeTimeout; 
    } 
    @Override 
    public  void  setCloseTimeout ( long  closeTimeout)  { 
        this . closeTimeout =  closeTimeout; 
    } 
} 
package  com. zzc. netty. infrastructure. netty. conn ; 
public  interface  Conn  { 
    
    boolean  isOpen ( ) ; 
    void  setAllowWrite ( boolean  allowWrite) ; 
    boolean  isAllowWrite ( ) ; 
    String  getConnId ( ) ; 
    void  delayRelease ( ) ; 
    void  delayRelease ( int  delayMilliSecond) ; 
    void  release ( ) ; 
    void  releaseGracefully ( Object  statusCode) ; 
    void  releaseGracefully ( ) ; 
    boolean  writeObj ( Object  msg) ; 
    void  writeObjAsyn ( Object  msg) ; 
    void  writeObjAsyn ( Object  msg,  ConnFutureListener  listener) ; 
    void  writePing ( ) ; 
    void  writePong ( ) ; 
} 
public  interface  ConnFutureListener  { 
    void  onSuccess ( Conn  conn) ; 
    void  onCancel ( ) ; 
    void  onFailed ( ) ; 
} 
package  com. zzc. netty. infrastructure. netty. conn ; 
import  com. zzc. netty. domain. protocol.  Response ; 
public  interface  WebSocketConn  extends  Conn  { 
    boolean  writeResp ( Response  response) ; 
    void  writeRespAsyn ( Response  response) ; 
} 
package  com. zzc. netty. infrastructure. netty. conn ; 
import  com. zzc. netty. domain. protocol.  Response ; 
import  io. netty. channel.  Channel ; 
import  io. netty. handler. codec. http. websocketx.  TextWebSocketFrame ; 
import  lombok. extern. slf4j.  Slf4j ; 
@Slf4j 
public  class  WebSocketConnImpl  extends  BaseConn  implements  WebSocketConn  { 
    public  WebSocketConnImpl ( Channel  channel)  { 
        super ( channel) ; 
    } 
    @Override 
    public  boolean  writeResp ( Response  response)  { 
        TextWebSocketFrame  frame =  new  TextWebSocketFrame ( response. toString ( ) ) ; 
        return  writeObj ( frame) ; 
    } 
    @Override 
    public  void  writeRespAsyn ( Response  response)  { 
        TextWebSocketFrame  frame =  new  TextWebSocketFrame ( response. toString ( ) ) ; 
        writeObjAsyn ( frame) ; 
    } 
} 
package  com. zzc. netty. infrastructure. netty. handler ; 
import  com. zzc. netty. infracore. common. utils.  HttpxUtils ; 
import  com. zzc. netty. infrastructure. netty.  WebSocketChannelHandler ; 
import  io. netty. channel.  ChannelHandler ; 
import  io. netty. channel.  ChannelHandlerContext ; 
import  io. netty. channel.  ChannelInboundHandlerAdapter ; 
import  io. netty. handler. codec. http.  FullHttpRequest ; 
import  io. netty. util.  AttributeKey ; 
import  lombok. extern. slf4j.  Slf4j ; 
import  java. util.  Map ; 
@Slf4j 
@ChannelHandler.Sharable 
public  class  SecurityServerHandler  extends  ChannelInboundHandlerAdapter  { 
    public  static  final  AttributeKey < Map < String ,  Object > > SECURITY_CHECK_ATTRIBUTE_KEY  = 
            AttributeKey . valueOf ( "SECURITY_CHECK_ATTRIBUTE_KEY" ) ; 
    private  WebSocketChannelHandler  webSocketChannelHandler; 
    public  SecurityServerHandler ( WebSocketChannelHandler  webSocketChannelHandler)  { 
        this . webSocketChannelHandler =  webSocketChannelHandler; 
    } 
    
    @Override 
    public  void  channelRead ( ChannelHandlerContext  ctx,  Object  msg)  throws  Exception  { 
        if  ( msg instanceof  FullHttpRequest )  { 
            
            FullHttpRequest  request =  ( FullHttpRequest )  msg; 
            Map < String ,  Object > =  HttpxUtils . urlQueryParams ( request. uri ( ) ) ; 
            boolean  valid =  webSocketChannelHandler. beforeHandshake ( ctx. channel ( ) ,  params) ; 
            log. info ( "channelRead valid:{}" ,  valid) ; 
            if  ( valid)  { 
                request. setUri ( "/ws" ) ; 
                ctx. channel ( ) . attr ( SECURITY_CHECK_ATTRIBUTE_KEY ) . set ( params) ; 
                ctx. fireUserEventTriggered ( params) ; 
                ctx. pipeline ( ) . remove ( this ) ; 
            }  else  { 
                ctx. close ( ) ; 
                return ; 
            } 
            super . channelRead ( ctx,  msg) ; 
        } 
    } 
} 
package  com. zzc. netty. infrastructure. netty. handler ; 
import  com. zzc. netty. infrastructure. netty.  WebSocketChannelHandler ; 
import  io. netty. channel.  ChannelHandler ; 
import  io. netty. channel.  ChannelHandlerContext ; 
import  io. netty. channel.  SimpleChannelInboundHandler ; 
import  io. netty. handler. codec. http. websocketx.  WebSocketServerProtocolHandler ; 
import  io. netty. util.  AttributeKey ; 
import  lombok. extern. slf4j.  Slf4j ; 
import  java. util.  Map ; 
@Slf4j 
@ChannelHandler.Sharable 
public  class  WebSocketFrameHandler  extends  SimpleChannelInboundHandler  { 
    private  WebSocketChannelHandler  webSocketChannelHandler; 
    public  WebSocketFrameHandler ( WebSocketChannelHandler  webSocketChannelHandler)  { 
        this . webSocketChannelHandler =  webSocketChannelHandler; 
    } 
    @Override 
    public  void  channelRegistered ( ChannelHandlerContext  ctx)  throws  Exception  { 
        super . channelRegistered ( ctx) ; 
    } 
    @Override 
    public  void  channelUnregistered ( ChannelHandlerContext  ctx)  throws  Exception  { 
        super . channelUnregistered ( ctx) ; 
    } 
    @Override 
    public  void  channelActive ( ChannelHandlerContext  ctx)  throws  Exception  { 
        webSocketChannelHandler. channelActive ( ctx. channel ( ) ) ; 
        super . channelActive ( ctx) ; 
    } 
    @Override 
    public  void  channelInactive ( ChannelHandlerContext  ctx)  throws  Exception  { 
        webSocketChannelHandler. channelInactive ( ctx. channel ( ) ) ; 
        super . channelInactive ( ctx) ; 
    } 
    @Override 
    public  void  channelReadComplete ( ChannelHandlerContext  ctx)  throws  Exception  { 
        super . channelReadComplete ( ctx) ; 
    } 
    @Override 
    public  void  channelWritabilityChanged ( ChannelHandlerContext  ctx)  throws  Exception  { 
        super . channelWritabilityChanged ( ctx) ; 
    } 
    @Override 
    public  void  exceptionCaught ( ChannelHandlerContext  ctx,  Throwable  cause)  throws  Exception  { 
        webSocketChannelHandler. onException ( ctx. channel ( ) ,  cause) ; 
        super . exceptionCaught ( ctx,  cause) ; 
    } 
    @Override 
    public  void  userEventTriggered ( ChannelHandlerContext  ctx,  Object  evt)  throws  Exception  { 
         if  ( evt instanceof  WebSocketServerProtocolHandler. HandshakeComplete )  { 
            Map < String ,  Object > =  ctx. channel ( ) . attr ( SecurityServerHandler . SECURITY_CHECK_ATTRIBUTE_KEY ) . get ( ) ; 
            log. info ( "userEventTriggered params:{}" ,  params) ; 
            webSocketChannelHandler. afterHandshake ( ctx. channel ( ) ,  params) ; 
        } 
        super . userEventTriggered ( ctx,  evt) ; 
    } 
    @Override 
    protected  void  channelRead0 ( ChannelHandlerContext  ctx,  Object  msg)  throws  Exception  { 
        webSocketChannelHandler. channelRead ( ctx. channel ( ) ,  msg) ; 
    } 
} 
package  com. zzc. netty. infrastructure. netty. server ; 
import  com. zzc. netty. infrastructure. netty. config.  BaseSocketConfig ; 
public  class  ServerSocketConfig  extends  BaseSocketConfig  { 
    private  int  bossThreads =  2 ; 
    private  int  workThreads =  Runtime . getRuntime ( ) . availableProcessors ( )  *  2 ; 
    public  ServerSocketConfig ( )  { 
        super ( ) ; 
    } 
    public  ServerSocketConfig ( int  bossThreads,  int  workThreads)  { 
        this . bossThreads =  bossThreads; 
        this . workThreads =  workThreads; 
    } 
    public  int  getBossThreads ( )  { 
        return  bossThreads; 
    } 
    public  int  getWorkThreads ( )  { 
        return  workThreads; 
    } 
} 
package  com. zzc. netty. infrastructure. netty. server ; 
import  com. zzc. netty. infrastructure. netty.  Socket ; 
public  interface  ServerSocket  extends  Socket < ServerSocketConfig > { 
} 
package  com. zzc. netty. infrastructure. netty. server ; 
import  com. zzc. netty. infracore. common. utils.  HttpxUtils ; 
import  com. zzc. netty. infracore. common. utils.  ThreadPoolUtils ; 
import  com. zzc. netty. infrastructure. netty.  BaseSocket ; 
import  com. zzc. netty. infrastructure. netty. handler.  SecurityServerHandler ; 
import  com. zzc. netty. infrastructure. netty. handler.  WebSocketFrameHandler ; 
import  io. netty. bootstrap.  ServerBootstrap ; 
import  io. netty. buffer.  PooledByteBufAllocator ; 
import  io. netty. channel.  AdaptiveRecvByteBufAllocator ; 
import  io. netty. channel.  Channel ; 
import  io. netty. channel.  ChannelHandler ; 
import  io. netty. channel.  ChannelHandlerContext ; 
import  io. netty. channel.  ChannelInboundHandlerAdapter ; 
import  io. netty. channel.  ChannelInitializer ; 
import  io. netty. channel.  ChannelOption ; 
import  io. netty. channel.  ChannelPipeline ; 
import  io. netty. channel.  EventLoopGroup ; 
import  io. netty. channel.  SimpleChannelInboundHandler ; 
import  io. netty. channel. nio.  NioEventLoopGroup ; 
import  io. netty. channel. socket. nio.  NioServerSocketChannel ; 
import  io. netty. channel. socket. nio.  NioSocketChannel ; 
import  io. netty. handler. codec. http.  FullHttpRequest ; 
import  io. netty. handler. codec. http.  HttpHeaderNames ; 
import  io. netty. handler. codec. http.  HttpObjectAggregator ; 
import  io. netty. handler. codec. http.  HttpRequest ; 
import  io. netty. handler. codec. http.  HttpServerCodec ; 
import  io. netty. handler. codec. http. websocketx.  WebSocketServerProtocolHandler ; 
import  io. netty. handler. codec. http. websocketx. extensions. compression.  WebSocketServerCompressionHandler ; 
import  io. netty. handler. ssl.  SslHandler ; 
import  io. netty. handler. stream.  ChunkedWriteHandler ; 
import  io. netty. util.  AttributeKey ; 
import  lombok. extern. slf4j.  Slf4j ; 
import  java. util.  Map ; 
import  java. util. concurrent.  TimeUnit ; 
@Slf4j 
public  class  ServerSocketImpl  extends  BaseSocket < ServerSocketConfig > implements  ServerSocket  { 
    private  final  static  String  THREAD_PREFIX_BOSS  =  "websocketBossServer" ; 
    private  final  static  String  THREAD_PREFIX_WORK  =  "websocketWorkServer" ; 
    ServerBootstrap  bootstrap =  null ; 
    EventLoopGroup  bossGroup =  null ; 
    EventLoopGroup  workGroup =  null ; 
    public  ServerSocketImpl ( )  { 
        super ( true ) ; 
    } 
    @Override 
    protected  boolean  doStart ( ServerSocketConfig  serverConfig)  { 
        long  currentTimeMillis =  System . currentTimeMillis ( ) ; 
        boolean  listenResuset =  false ; 
        try  { 
            bootstrap =  new  ServerBootstrap ( ) ; 
            bossGroup =  new  NioEventLoopGroup ( serverConfig. getBossThreads ( ) ,  ThreadPoolUtils . newThreadFactory ( THREAD_PREFIX_BOSS ) ) ; 
            workGroup =  new  NioEventLoopGroup ( serverConfig. getWorkThreads ( ) ,  ThreadPoolUtils . newThreadFactory ( THREAD_PREFIX_WORK ) ) ; 
            bootstrap. group ( bossGroup,  workGroup) 
                    . channel ( NioServerSocketChannel . class ) 
                    . childOption ( ChannelOption . TCP_NODELAY ,  true ) 
                    . childOption ( ChannelOption . SO_REUSEADDR ,  true ) 
                    . childOption ( ChannelOption . ALLOCATOR ,  PooledByteBufAllocator . DEFAULT ) 
                    . childOption ( ChannelOption . RCVBUF_ALLOCATOR ,  new  AdaptiveRecvByteBufAllocator ( 64 ,  1024 ,  65535 ) ) 
                    . childHandler ( new  ChannelInitializer < NioSocketChannel > ( )  { 
                        @Override 
                        protected  void  initChannel ( NioSocketChannel  channel)  throws  Exception  { 
                            ChannelPipeline  pipeline =  channel. pipeline ( ) ; 
                            addNettyIdleHandler ( pipeline) ; 
                            addNettyOtherHandler ( pipeline) ; 
                        } 
                    } ) ; 
            listenResuset =  bootstrap. bind ( serverConfig. getPort ( ) ) . await ( serverConfig. getConnectTimeout ( ) ,  TimeUnit . MICROSECONDS ) ; 
        }  catch  ( Exception  e)  { 
            log. error ( "listen server timeout." ,  e) ; 
            return  false ; 
        }  finally  { 
            if  ( listenResuset)  { 
                log. info ( "listen server, result:{}, spendTime:{}" ,  listenResuset,  ( System . currentTimeMillis ( )  -  currentTimeMillis) ) ; 
            }  else  { 
                log. error ( "listen server error, result:{}, spendTime:{}" ,  listenResuset,  ( System . currentTimeMillis ( )  -  currentTimeMillis) ) ; 
            } 
        } 
        return  listenResuset; 
    } 
    @Override 
    protected  void  doClose ( )  { 
        if  ( workGroup !=  null )  { 
            workGroup. shutdownGracefully ( ) ; 
        } 
        if  ( bossGroup !=  null )  { 
            bossGroup. shutdownGracefully ( ) ; 
        } 
    } 
    @Override 
    protected  void  addNettyOtherHandler ( ChannelPipeline  pipeline)  { 
        pipeline. addLast ( "http-codec" ,  new  HttpServerCodec ( ) ) ; 
        pipeline. addLast ( "aggregator" ,  new  HttpObjectAggregator ( 65536 ) ) ; 
        pipeline. addLast ( "http-chunked" ,  new  ChunkedWriteHandler ( ) ) ; 
        pipeline. addLast ( "security-handler" ,  new  SecurityServerHandler ( getWebSocketChannelHandler ( ) ) ) ; 
        pipeline. addLast ( "websocket-compression" ,  new  WebSocketServerCompressionHandler ( ) ) ; 
        pipeline. addLast ( "handler" ,  new  WebSocketServerProtocolHandler ( "/ws" ,  null ,  true ,  1024  *  1024 ,  true ) ) ; 
        pipeline. addLast ( "websocket-handler" ,  new  WebSocketFrameHandler ( getWebSocketChannelHandler ( ) ) ) ; 
    } 
} 
package  com. zzc. netty. application. service ; 
import  com. zzc. netty. domain.  Constant ; 
import  com. zzc. netty. infracore. common. utils.  JWTUtils ; 
import  com. zzc. netty. infracore. common. utils.  StrUtils ; 
import  com. zzc. netty. infrastructure. netty.  WebSocketChannelHandler ; 
import  com. zzc. netty. infrastructure. netty. conn.  WebSocketConn ; 
import  com. zzc. netty. infrastructure. netty. conn.  WebSocketConnImpl ; 
import  com. zzc. netty. domain. enums.  DisconReason ; 
import  io. netty. channel.  Channel ; 
import  io. netty. handler. timeout.  IdleState ; 
import  lombok. extern. slf4j.  Slf4j ; 
import  java. util.  Map ; 
import  java. util. concurrent.  ConcurrentHashMap ; 
@Slf4j 
public  abstract  class  AbstractWebSocketChannelHandler  implements  WebSocketChannelHandler  { 
    private  static  Map < String ,  WebSocketConn > =  new  ConcurrentHashMap < > ( ) ; 
    @Override 
    public  boolean  beforeHandshake ( Channel  channel,  Map < String ,  Object > )  { 
        if  ( params ==  null )  { 
            return  false ; 
        } 
        String  appId =  ( String )  params. get ( Constant . KEY_APP_ID ) ; 
        String  userId =  ( String )  params. get ( Constant . KEY_USERID ) ; 
        String  username =  ( String )  params. get ( Constant . KEY_USERNAME ) ; 
        String  platform =  ( String )  params. get ( Constant . KEY_PLATFORM ) ; 
        String  token =  ( String )  params. get ( Constant . KEY_TOKEN ) ; 
        String  ts =  ( String )  params. get ( Constant . KEY_TS ) ; 
        if  ( StrUtils . isBlank ( appId,  userId,  username,  platform,  token,  ts) )  { 
            log. info ( "beforeHandshake param exist null. appId:{}, userId:{}, username:{}, platform:{}, ts:{}" ,  appId,  userId,  username,  platform,  ts) ; 
            return  false ; 
        } 
        boolean  verify =  JWTUtils . verify ( token) ; 
        return  verify; 
    } 
    @Override 
    public  boolean  afterHandshake ( Channel  channel,  Map < String ,  Object > )  { 
        log. info ( "afterHandshake params:{}" ,  params) ; 
        String  connId =  getConnId ( channel) ; 
        WebSocketConn  conn =  new  WebSocketConnImpl ( channel) ; 
        boolean  connected =  connected ( conn,  params) ; 
        
        if  ( connected)  { 
            addConn ( connId,  conn) ; 
        } 
        return  connected; 
    } 
    @Override 
    public  void  channelActive ( Channel  channel)  { 
        log. info ( "channelActive connId:{}" ,  getConnId ( channel) ) ; 
    } 
    @Override 
    public  void  channelInactive ( Channel  channel)  { 
        String  connId =  getConnId ( channel) ; 
        WebSocketConn  conn =  getConn ( connId) ; 
        if  ( conn !=  null )  { 
            log. info ( "remove conn. connId:{}" ,  connId) ; 
            try  { 
                disconnected ( conn,  DisconReason . NORMAL ) ; 
            }  finally  { 
                ensureRelease ( conn,  channel) ; 
            } 
        } 
    } 
    @Override 
    public  void  channelRead ( Channel  channel,  Object  msg)  { 
        String  connId =  getConnId ( channel) ; 
        WebSocketConn  conn =  getConn ( connId) ; 
        if  ( conn ==  null )  { 
            log. error ( "channelRead error, local cache is null. connId:{}, msg:{}" ,  connId,  msg) ; 
            return ; 
        } 
        receiver ( conn,  msg) ; 
    } 
    @Override 
    public  void  onException ( Channel  channel,  Throwable  throwable)  { 
        String  connId =  getConnId ( channel) ; 
        WebSocketConn  conn =  getConn ( connId) ; 
        log. info ( "onException connId:{}" ,  connId) ; 
        try  { 
            disconnected ( conn,  DisconReason . EXCEPTION ) ; 
        }  finally  { 
            ensureRelease ( conn,  channel) ; 
        } 
    } 
    @Override 
    public  void  onIdleStateEvent ( Channel  channel,  IdleState  state)  { 
        if  ( state ==  IdleState . WRITER_IDLE )  { 
        }  else  if  ( state ==  IdleState . READER_IDLE )  { 
        }  else  if  ( state ==  IdleState . ALL_IDLE )  { 
            
            String  connId =  getConnId ( channel) ; 
            WebSocketConn  conn =  getConn ( connId) ; 
            log. info ( "onIdleStateEvent connId:{}" ,  connId) ; 
            try  { 
                disconnected ( conn,  DisconReason . IDLE ) ; 
            }  finally  { 
                ensureRelease ( conn,  channel) ; 
            } 
        } 
    } 
    private  String  getConnId ( Channel  channel)  { 
        if  ( channel !=  null )  { 
            return  channel. id ( ) . asLongText ( ) ; 
        } 
        return  null ; 
    } 
    private  WebSocketConn  getConn ( String  connId)  { 
        return  children. get ( connId) ; 
    } 
    private  void  addConn ( String  connId,  WebSocketConn  conn)  { 
        children. put ( connId,  conn) ; 
    } 
    private  void  removeConn ( String  connId)  { 
        children. remove ( connId) ; 
    } 
    private  void  ensureRelease ( WebSocketConn  conn,  Channel  channel)  { 
        if  ( conn ==  null )  { 
            channel. close ( ) ; 
            log. warn ( "ensureRelease conn is null." ) ; 
            return ; 
        } 
        try  { 
            conn. release ( ) ; 
        }  finally  { 
            removeConn ( conn. getConnId ( ) ) ; 
        } 
    } 
    
    protected  abstract  boolean  connected ( WebSocketConn  conn,  Map < String ,  Object > ) ; 
    
    protected  abstract  void  disconnected ( WebSocketConn  conn,  DisconReason  reason) ; 
    
    protected  abstract  void  receiver ( WebSocketConn  conn,  Object  msg) ; 
} 
package  com. zzc. netty. application. service ; 
import  com. alibaba. fastjson2.  JSON ; 
import  com. zzc. netty. application.  CommandFactory ; 
import  com. zzc. netty. domain. command.  CommandContext ; 
import  com. zzc. netty. application.  CommandHandler ; 
import  com. zzc. netty. infracore. api.  CommonCode ; 
import  com. zzc. netty. domain. protocol.  Response ; 
import  com. zzc. netty. infracore. common. utils.  ThreadPoolUtils ; 
import  com. zzc. netty. infrastructure. netty. conn.  WebSocketConn ; 
import  lombok. extern. slf4j.  Slf4j ; 
import  org. springframework. stereotype.  Service ; 
import  java. util. concurrent.  ThreadPoolExecutor ; 
import  java. util. concurrent.  TimeUnit ; 
@Slf4j 
@Service 
public  class  CommandInvoker  { 
    private  static  ThreadPoolExecutor  executor =  ThreadPoolUtils . newThreadPoolExecutorDirectAndAsy ( 
            "command-handler" , 
            2  *  Runtime . getRuntime ( ) . availableProcessors ( ) , 
            4  *  Runtime . getRuntime ( ) . availableProcessors ( ) , 
            120 , 
            TimeUnit . SECONDS , 
            30 ) ; 
    public  void  action ( WebSocketConn  conn,  String  jsonStr)  { 
        executor. submit ( new  Runnable ( )  { 
            @Override 
            public  void  run ( )  { 
                try  { 
                    CommandContext  ctx =  JSON . parseObject ( jsonStr,  CommandContext . class ) ; 
                    ctx. setResultCode ( CommonCode . SUCCESS ) ; 
                    log. info ( "action ctx:{}, resultCode:{}" ,  JSON . toJSONString ( ctx) ,  JSON . toJSONString ( ctx. getResultCode ( ) ) ) ; 
                    ctx. setWebSocketConn ( conn) ; 
                    String  command =  ctx. getCommand ( ) ; 
                    CommandHandler  handler =  CommandFactory . getHandler ( command) ; 
                    handler. execute ( ctx) ; 
                }  catch  ( Exception  e)  { 
                    conn. writeRespAsyn ( Response . error ( CommonCode . SYSTEM_ERROR ) ) ; 
                    log. error ( "receiver cmd error." ,  e) ; 
                } 
            } 
        } ) ; 
    } 
} 
package  com. zzc. netty. application ; 
import  com. zzc. netty. domain. command.  CommandContext ; 
import  com. zzc. netty. infracore. api.  CommonCode ; 
import  com. zzc. netty. infracore. exception.  DomainException ; 
import  lombok. extern. slf4j.  Slf4j ; 
@Slf4j 
public  abstract  class  CommandHandler < T > { 
    enum  State  { 
        ACK , 
        BEFORE , 
        PROCESS , 
        AFTER , 
        FINISH 
    } 
    protected  abstract  boolean  ack ( CommandContext < T > ) ; 
    protected  abstract  boolean  beforeHandler ( CommandContext < T > ) ; 
    protected  abstract  boolean  handler ( CommandContext < T > ) ; 
    protected  abstract  boolean  afterHandler ( CommandContext < T > ) ; 
    protected  abstract  boolean  answer ( CommandContext < T > ) ; 
    public  void  execute ( CommandContext < T > )  { 
        State  state =  State . ACK ; 
        switch  ( state)  { 
            case  ACK : 
                if  ( ! ack ( ctx) )  { 
                    break ; 
                } 
                state =  State . BEFORE ; 
            case  BEFORE : 
                if  ( ! beforeHandler ( ctx) )  { 
                    answer ( ctx) ; 
                    break ; 
                } 
                state =  State . PROCESS ; 
            case  PROCESS : 
                if  ( ! handler ( ctx) )  { 
                    answer ( ctx) ; 
                    break ; 
                } 
                state =  State . AFTER ; 
            case  AFTER : 
                if  ( ! afterHandler ( ctx) )  { 
                    answer ( ctx) ; 
                    break ; 
                } 
                state =  State . FINISH ; 
            case  FINISH : 
                answer ( ctx) ; 
                return ; 
            default : 
                throw  new  DomainException ( CommonCode . SYSTEM_ERROR ) ; 
        } 
    } 
} 
package  com. zzc. netty. application. service ; 
import  com. zzc. netty. application. dto.  CloseWsCommand ; 
import  com. zzc. netty. domain.  Constant ; 
import  com. zzc. netty. domain. protocol.  ProtocolFactory ; 
import  com. zzc. netty. domain. command.  CommandEnums ; 
import  com. zzc. netty. domain. session.  UserSession ; 
import  com. zzc. netty. domain. session.  UserSessionService ; 
import  com. zzc. netty. infracore. api.  CommonCode ; 
import  com. zzc. netty. domain. protocol.  Response ; 
import  com. zzc. netty. infrastructure. netty. conn.  WebSocketConn ; 
import  com. zzc. netty. domain. enums.  DisconReason ; 
import  com. zzc. netty. infrastructure. netty. server.  ServerSocketConfig ; 
import  com. zzc. netty. infrastructure. netty. server.  ServerSocketImpl ; 
import  io. netty. handler. codec. http. websocketx.  TextWebSocketFrame ; 
import  lombok. extern. slf4j.  Slf4j ; 
import  org. springframework. beans. factory. annotation.  Autowired ; 
import  org. springframework. stereotype.  Service ; 
import  javax. annotation.  PostConstruct ; 
import  java. util.  Map ; 
@Slf4j 
@Service 
public  class  SignalServiceImpl  extends  AbstractWebSocketChannelHandler  { 
    @Autowired 
    private  UserSessionService  userSessionService; 
    @Autowired 
    private  CommandInvoker  commandInvoker; 
    @PostConstruct 
    public  void  init ( )  { 
        ServerSocketImpl  serverSocket =  new  ServerSocketImpl ( ) ; 
        serverSocket. start ( new  ServerSocketConfig ( ) ,  this ) ; 
    } 
    @Override 
    protected  boolean  connected ( WebSocketConn  conn,  Map < String ,  Object > )  { 
        String  sessionId =  conn. getConnId ( ) ; 
        if  ( ! conn. isOpen ( ) )  { 
            throw  new  RuntimeException ( "connected conn is close. connId:"  +  sessionId) ; 
        } 
        if  ( params ==  null  ||  params. isEmpty ( ) )  { 
            conn. writeRespAsyn ( Response . error ( CommonCode . PARAM_NULL ) ) ; 
            return  false ; 
        } 
        String  appId =  ( String )  params. get ( Constant . KEY_APP_ID ) ; 
        String  userId =  ( String )  params. get ( Constant . KEY_USERID ) ; 
        String  platform =  ( String )  params. get ( Constant . KEY_PLATFORM ) ; 
        String  ts =  ( String )  params. get ( Constant . KEY_TS ) ; 
        String  username =  ( String )  params. get ( Constant . KEY_USERNAME ) ; 
        
        UserSession  userSession =  UserSession. Builder ( ) 
                . appId ( appId) 
                . sessionId ( sessionId) 
                . userId ( userId) 
                . platform ( platform) 
                . username ( username) 
                . ts ( ts) 
                . build ( ) ; 
        userSessionService. addUserSession ( userSession) ; 
        return  conn. writeResp ( ProtocolFactory . createResp ( CommandEnums . OPENWS . getCommand ( ) ,  sessionId,  ts) ) ; 
    } 
    @Override 
    protected  void  disconnected ( WebSocketConn  conn,  DisconReason  reason)  { 
        if  ( conn ==  null )  { 
            log. error ( "disconnected error, conn is null." ) ; 
            return ; 
        } 
        log. info ( "disconnected connId:{}" ,  conn. getConnId ( ) ) ; 
        UserSession  userSession =  userSessionService. removeUserSession ( conn. getConnId ( ) ) ; 
        if  ( userSession !=  null )  { 
            CloseWsCommand  closeWs =  new  CloseWsCommand ( ) ; 
            closeWs. setReason ( reason. getReason ( ) ) ; 
            conn. writeRespAsyn ( ProtocolFactory . createResp ( closeWs,  CommandEnums . CLOSEWS . getCommand ( ) ,  userSession. getSessionId ( ) ,  userSession. getTs ( ) ) ) ; 
        }  else  { 
            log. warn ( "usersession is null. connId:{}" ,  conn. getConnId ( ) ) ; 
        } 
    } 
    @Override 
    protected  void  receiver ( WebSocketConn  conn,  Object  msg)  { 
        if  ( msg instanceof  TextWebSocketFrame )  { 
            String  jsonData =  ( ( TextWebSocketFrame )  msg) . text ( ) ; 
            log. info ( "receiver rev msg:{}" ,  jsonData) ; 
            commandInvoker. action ( conn,  jsonData) ; 
        }  else  { 
            log. warn ( "receiver other data. class:{}" ,  msg. getClass ( ) ) ; 
        } 
    } 
} 
package  com. zzc. netty. application ; 
import  com. zzc. netty. adapter. handler.  AddRoomHandler ; 
import  com. zzc. netty. domain. command.  CommandEnums ; 
import  com. zzc. netty. infracore. common. utils.  SpringBeansUtil ; 
import  java. util.  HashMap ; 
import  java. util.  Map ; 
public  class  CommandFactory  { 
    private  static  final  Map < String ,  CommandHandler > =  new  HashMap < > ( ) ; 
    static  { 
    	
    	
        commandHandlers. put ( CommandEnums . ADDROOM . getCommand ( ) ,  new  AddRoomHandler ( ) ) ; 
    } 
    public  static  CommandHandler  getHandler ( String  command)  { 
        return  commandHandlers. get ( command) ; 
    } 
}