目录  
 1.启用Websocket功能 2.封装操作websocket session的工具 3.保存websocket session的接口 4.保存websocket session的类 5.定义websocket 端点 6.创建定时任务 ping websocket 客户端   
 
package  com. xxx. robot. config ; 
import  org. springframework. context. annotation.  Bean ; 
import  org. springframework. context. annotation.  Configuration ; 
import  org. springframework. web. socket. config. annotation.  EnableWebSocket ; 
import  org. springframework. web. socket. server. standard.  ServerEndpointExporter ; 
@Configuration 
@EnableWebSocket 
public  class  WebSocketConfig  { 
    @Bean 
    public  ServerEndpointExporter  serverEndpoint ( )  { 
        return  new  ServerEndpointExporter ( ) ; 
    } 
} 
package  com. xxx. robot. websocket. util ; 
import  java. util.  Map ; 
import  javax. websocket.  Session ; 
import  org. apache. tomcat. websocket.  Constants ; 
import  org. springframework. security. authentication.  UsernamePasswordAuthenticationToken ; 
import  com. xxx. framework. security. config.  MyUserDetails ; 
import  com. xxx. framework. security. entity.  LoginUser ; 
import  com. xxx. user. entity.  User ; 
public  final  class  WebSocketSessionUtils  { 
    private  WebSocketSessionUtils ( )  { } 
	public  static  final  int  WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE =  8  *  1024  *  1024 ; 
    
    public  static  final  int  WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE =  8  *  1024  *  1024 ; 
    
    
    public  static  final  long  WEBSOCKET_BLOCKING_SEND_TIMEOUT =  10  *  1000 ; 
	
    public  static  User  findUser ( Session  session)  { 
        UsernamePasswordAuthenticationToken  uToken =  ( UsernamePasswordAuthenticationToken )  session. getUserPrincipal ( ) ; 
        MyUserDetails  userDetails =  ( MyUserDetails )  uToken. getPrincipal ( ) ; 
        LoginUser  loginUser =  ( LoginUser )  userDetails. getUserData ( ) ; 
        return  ( User )  loginUser. getAdditionalInfo ( ) ; 
    } 
    
    
    public  static  void  setProperties ( Session  session)  { 
    	
        session. setMaxTextMessageBufferSize ( WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE) ; 
        
        session. setMaxBinaryMessageBufferSize ( WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE) ; 
        Map < String ,  Object > =  session. getUserProperties ( ) ; 
        
        userProperties. put ( Constants . BLOCKING_SEND_TIMEOUT_PROPERTY,  WEBSOCKET_BLOCKING_SEND_TIMEOUT) ; 
    } 
} 
package  com. xxx. robot. websocket ; 
import  java. io.  IOException ; 
import  java. nio.  ByteBuffer ; 
import  java. util.  List ; 
import  javax. websocket.  Session ; 
import  org. slf4j.  Logger ; 
import  org. slf4j.  LoggerFactory ; 
public  interface  WebSocketSessionManager  { 
    Logger  log =  LoggerFactory . getLogger ( WebSocketSessionManager . class ) ; 
    
    String  PING =  "ping" ; 
    String  PONG =  "pong" ; 
    
    Session  get ( String  key) ; 
    
    List < String > keys ( ) ; 
    void  add ( String  key,  Session  session) ; 
    
    Session  remove ( String  key) ; 
    
    
    default  void  pingBatch ( )  { 
        List < String > =  keys ( ) ; 
        log. info ( "WebSocket: {} 数量为:{}" ,  this . getClass ( ) . getSimpleName ( ) ,  keyList. size ( ) ) ; 
        for  ( String  key :  keyList)  { 
            if  ( key !=  null )  { 
                Session  session =  get ( key) ; 
                if  ( session !=  null )  { 
                    try  { 
                        session. getBasicRemote ( ) . sendPing ( ByteBuffer . wrap ( PING. getBytes ( ) ) ) ; 
                        try  { 
                            Thread . sleep ( 10 ) ; 
                        }  catch  ( InterruptedException  e1)  { 
                        } 
                    }  catch  ( Exception  e)  { 
                        log. error ( "WebSocket-ping异常" ,  e) ; 
                    } 
                } 
            } 
        } 
    } 
    
    
    default  void  clearAllSession ( )  { 
        List < String > =  keys ( ) ; 
        int  i =  0 ; 
        for  ( String  key :  keyList)  { 
            if  ( key !=  null )  { 
                Session  session =  get ( key) ; 
                if  ( session !=  null )  { 
                    try  { 
                        remove ( key) ; 
                        i++ ; 
                        session. close ( ) ; 
                    }  catch  ( IOException  e1)  { 
                        log. error ( "WebSocket-移除并关闭session异常" ,  e1) ; 
                    } 
                    if  ( i %  10  ==  0 )  { 
                        try  { 
                            Thread . sleep ( 0 ) ; 
                        }  catch  ( InterruptedException  e1)  { 
                        } 
                    } 
                } 
            } 
        } 
        log. info ( "WebSocket-移除并关闭session数量为:{}" ,  i) ; 
    } 
} 
package  com. xxx. robot. websocket. robot. manager ; 
import  java. io.  IOException ; 
import  java. util.  ArrayList ; 
import  java. util.  List ; 
import  java. util.  NavigableSet ; 
import  java. util. concurrent.  ConcurrentNavigableMap ; 
import  java. util. concurrent.  ConcurrentSkipListMap ; 
import  javax. websocket.  Session ; 
import  org. apache. commons. lang3.  StringUtils ; 
import  org. springframework. stereotype.  Component ; 
import  com. xxx. robot. websocket.  WebSocketSessionManager ; 
@Component 
public  class  RobotSessionManager  implements  WebSocketSessionManager  { 
    
    
    private  static  final  ConcurrentSkipListMap < String ,  Session > =  new  ConcurrentSkipListMap < > ( ) ; 
    
    public  static  final  String  joinKey ( String  userId,  String  managerId)  { 
        return  userId +  '-'  +  managerId; 
    } 
    public  static  final  String  joinKey ( Long  userId,  String  managerId)  { 
        return  userId. toString ( )  +  '-'  +  managerId; 
    } 
    
    public  static  final  String [ ]  splitKey ( String  key)  { 
        return  StringUtils . split ( key,  '-' ) ; 
    } 
    @Override 
    public  Session  get ( String  key)  { 
        return  SESSION_POOL. get ( key) ; 
    } 
    
    
    public  List < String > keysByUserId ( String  userId,  String  excludeManagerId)  { 
    	
        ConcurrentNavigableMap < String ,  Session > =  SESSION_POOL. subMap ( userId +  '-' ,  userId +  '.' ) ; 
        NavigableSet < String > =  subMap. navigableKeySet ( ) ; 
        List < String > =  new  ArrayList < > ( ) ; 
        if  ( StringUtils . isBlank ( excludeManagerId) )  { 
            for  ( String  key :  keySet)  { 
                if  ( key !=  null )  { 
                    list. add ( key) ; 
                } 
            } 
        }  else  { 
            for  ( String  key :  keySet)  { 
                if  ( key !=  null  &&  ! key. equals ( excludeManagerId) )  { 
                    list. add ( key) ; 
                } 
            } 
        } 
        return  list; 
    } 
    @Override 
    public  List < String > keys ( )  { 
        NavigableSet < String > =  SESSION_POOL. navigableKeySet ( ) ; 
        List < String > =  new  ArrayList < > ( ) ; 
        for  ( String  key :  keySet)  { 
            if  ( key !=  null )  { 
                list. add ( key) ; 
            } 
        } 
        return  list; 
    } 
    @Override 
    public  synchronized  void  add ( String  key,  Session  session)  { 
        removeAndClose ( key) ; 
        SESSION_POOL. put ( key,  session) ; 
    } 
    @Override 
    public  synchronized  Session  remove ( String  key)  { 
        return  SESSION_POOL. remove ( key) ; 
    } 
    
    
    public  synchronized  void  remove ( String  key,  Session  session)  { 
        SESSION_POOL. remove ( key,  session) ; 
    } 
    
    private  void  removeAndClose ( String  key)  { 
        Session  session =  remove ( key) ; 
        if  ( session !=  null )  { 
            try  { 
                session. close ( ) ; 
            }  catch  ( IOException  e)  { 
            } 
        } 
    } 
} 
package  com. xxx. robot. websocket. robot. endpoint ; 
import  java. util.  Map ; 
import  javax. websocket.  OnClose ; 
import  javax. websocket.  OnError ; 
import  javax. websocket.  OnMessage ; 
import  javax. websocket.  OnOpen ; 
import  javax. websocket.  Session ; 
import  javax. websocket. server.  PathParam ; 
import  javax. websocket. server.  ServerEndpoint ; 
import  org. springframework. stereotype.  Component ; 
import  com. fasterxml. jackson. databind.  JsonNode ; 
import  com. xxx. framework. util.  SpringBeanUtils ; 
import  com. xxx. user. entity.  User ; 
import  com. xxx. robot. corefunc. service.  RobotCoreService ; 
import  com. xxx. robot. util. serial.  BaseJsonUtils ; 
import  com. xxx. robot. websocket.  WebSocketSessionManager ; 
import  com. xxx. robot. websocket. robot. manager.  RobotSessionManager ; 
import  com. xxx. robot. websocket. util.  WebSocketSessionUtils ; 
import  lombok. extern. slf4j.  Slf4j ; 
@Slf4j 
@Component 
@ServerEndpoint ( value =  "/robot/{id}" ) 
public  class  RobotWebSocketServer  { 
    
    private  volatile  User  user; 
    
    private  volatile  String  id; 
    
    private  volatile  Session  session; 
    
    private  volatile  Map < String ,  RobotCoreService > ; 
    
    @OnOpen 
    public  void  onOpen ( @PathParam ( "id" )  String  id,  Session  session)  { 
        WebSocketSessionUtils . setProperties ( session) ; 
        this . user =  WebSocketSessionUtils . findUser ( session) ; 
        this . id =  id; 
        this . session =  session; 
        log. info ( "连接成功:{}, {}" ,  id,  this . user. getUserCode ( ) ) ; 
        
        
        robotCoreServiceMap =  SpringBeanUtils . getApplicationContext ( ) . getBeansOfType ( RobotCoreService . class ) ; 
        RobotSessionManager  robotSessionManager =  SpringBeanUtils . getBean ( RobotSessionManager . class ) ; 
        
        robotSessionManager. add ( RobotSessionManager . joinKey ( this . user. getId ( ) ,  id) ,  session) ; 
    } 
    
    @OnClose 
    public  void  onClose ( )  { 
        log. info ( "连接关闭:{}, {}" ,  this . id,  this . user. getUserCode ( ) ) ; 
        RobotSessionManager  robotSessionManager =  SpringBeanUtils . getBean ( RobotSessionManager . class ) ; 
        
        robotSessionManager. remove ( RobotSessionManager . joinKey ( this . user. getId ( ) ,  this . id) ,  this . session) ; 
    } 
    
    @OnError 
    public  void  onError ( Throwable  error)  { 
        log. error ( "onError:id = {}, {}, {}" ,  this . id,  this . session. getId ( ) ,  this . user. getUserCode ( ) ,  error) ; 
        RobotSessionManager  robotSessionManager =  SpringBeanUtils . getBean ( RobotSessionManager . class ) ; 
        
        
        robotSessionManager. remove ( RobotSessionManager . joinKey ( this . user. getId ( ) ,  this . id) ,  this . session) ; 
    } 
    
    @OnMessage 
    public  void  onMessage ( String  message)  { 
        log. info ( "onMessage:id = {}, {}, {}" ,  this . id,  this . user. getUserCode ( ) ,  message) ; 
        if  ( WebSocketSessionManager . PING. equals ( message) )  { 
        	
            this . session. getAsyncRemote ( ) . sendText ( WebSocketSessionManager . PONG) ; 
            return ; 
        } 
        
        try  { 
        	
            JsonNode  root =  BaseJsonUtils . readTree ( message) ; 
            String  apiType =  root. at ( "/apiType" ) . asText ( ) ; 
            
            robotCoreServiceMap. get ( apiType +  "Service" ) . receiveFrontMessage ( this . user,  RobotSessionManager . joinKey ( this . user. getId ( ) ,  this . id) ,  root) ; 
        }  catch  ( Exception  e)  { 
            log. error ( "处理消息错误" ,  e) ; 
        } 
    } 
    
} 
 
package  com. xxx. robot. config ; 
import  org. springframework. context. annotation.  Bean ; 
import  org. springframework. context. annotation.  Configuration ; 
import  org. springframework. scheduling. annotation.  EnableScheduling ; 
import  org. springframework. scheduling. concurrent.  ThreadPoolTaskExecutor ; 
@Configuration 
@EnableScheduling 
public  class  TaskExecutorConfig  { 
    @Bean 
    public  ThreadPoolTaskExecutor  taskExecutor ( )  { 
        ThreadPoolTaskExecutor  executor =  new  ThreadPoolTaskExecutor ( ) ; 
        executor. setCorePoolSize ( 5 ) ; 
        executor. setMaxPoolSize ( 5 ) ; 
        executor. setQueueCapacity ( 10 ) ; 
        executor. setKeepAliveSeconds ( 60 ) ; 
        executor. setThreadNamePrefix ( "scheduler-executor-" ) ; 
        return  executor; 
    } 
} 
package  com. xxx. robot. websocket ; 
import  java. util.  List ; 
import  org. springframework. beans. factory. annotation.  Autowired ; 
import  org. springframework. scheduling. annotation.  Scheduled ; 
import  org. springframework. stereotype.  Component ; 
import  lombok. extern. slf4j.  Slf4j ; 
@Slf4j 
@Component 
public  class  WebSocketSchedulerTask  { 
    
    
    @Autowired 
    private  List < WebSocketSessionManager > ; 
	
    @Scheduled ( initialDelay =  60000 ,  fixedDelay =  30000 ) 
    public  void  clearInvalidSession ( )  { 
        try  { 
            log. info ( "pingBatch 开始。。。" ) ; 
            for  ( WebSocketSessionManager  webSocketSessionManager :  webSocketSessionManagers)  { 
                webSocketSessionManager. pingBatch ( ) ; 
            } 
            log. info ( "pingBatch 完成。。。" ) ; 
        }  catch  ( Exception  e)  { 
            log. error ( "pingBatch异常" ,  e) ; 
        } 
    } 
}