
若我们服务端一次性最大处理的字节数是1M,而客户端发来了2M的数据,此时服务端的数据就要被切割成两次传输解码。Http协议中有分块传输,而在Websocket也可以分块处理超大的消息体。在jsr356标准中使用javax.websocket.MessageHandler.Partial可以分块处理这种数据。
 interface Partial<T> extends MessageHandler {
        /**
         * Called when part of a message is available to be processed.
         *
         * @param messagePart   The message part
         * @param last          <code>true</code> if this is the last part of
         *                      this message, else <code>false</code>
         */
        void onMessage(T messagePart, boolean last);
    }
Partial接口中的参数last就是表示是否是最后一块分块数据。
我们即可以给WsContainer全局设置消息体的缓冲池大小,也可以给每个session单独设置消息体的缓冲池大小。发送消息体一旦超过了此大小,数据就会在服务端被分块传输解码。
 //全局设置消息体的缓冲池大小
    @Bean  
    public WebSocketContainerFactoryBean webSocketContainer(){
        WebSocketContainerFactoryBean factoryBean = new WebSocketContainerFactoryBean();
        factoryBean.setMaxTextMessageBufferSize(20);
//        factoryBean.setMaxSessionIdleTimeout(10*1000);
//        factoryBean.setMaxBinaryMessageBufferSize(1000);
        return factoryBean;
    }
	//session级别消息体的缓冲池大小
	 @OnOpen
    public void onOpen(Session session, @PathParam("token") String token, EndpointConfig config) throws IOException {
        session.setMaxTextMessageBufferSize(20);
        //....
  }      
使用JSR356注解实现消息体分块传输
@ServerEndpoint(value = "/ws/{token}")
@Component
public class WebsocketHandler2 {
    private final static Logger log = LoggerFactory.getLogger(WebsocketHandler2.class);
    private static final Map<String, StringBuilder> dataCache = new ConcurrentHashMap<>();
    @OnOpen
    public void onOpen(Session session, @PathParam("token") String token, EndpointConfig config) throws IOException {
        session.setMaxTextMessageBufferSize(20);
    }
    @OnMessage
    public void onMessage(String partialMsg, Session session, boolean isLast) throws IOException {
        StringBuilder stringJoiner = dataCache.get(session.getId());
        if (isLast) {
            log.info("receive client(id={}) partial last msg=>{}", session.getId(), partialMsg);
            if (stringJoiner == null) {
                String msg = String.format("reply your(id=%s) msg=>%s", session.getId(), partialMsg);
                session.getBasicRemote().sendText(msg);
            } else {
                dataCache.remove(session.getId());
                stringJoiner.append(partialMsg);
                String msg = String.format("reply your(id=%s) msg=>%s", session.getId(), stringJoiner);
                session.getBasicRemote().sendText(msg);
            }
        } else {
            log.info("receive client(id={}) partial non_last msg=>{}", session.getId(), partialMsg);
            if (stringJoiner == null) {
                stringJoiner = new StringBuilder(partialMsg);
                dataCache.put(session.getId(), stringJoiner);
            } else {
                stringJoiner.append(partialMsg);
            }
        }
    }
使用spring的WebSocketHandler实现消息体分块传输
@Component
public class WebsocketHandler1 extends TextWebSocketHandler {
    private final Logger log = LoggerFactory.getLogger(getClass());
    private static final Map<String, StringBuilder> dataCache = new ConcurrentHashMap<>();
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
   		 session.setTextMessageSizeLimit(20);
    }
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        StringBuilder stringJoiner = dataCache.get(session.getId());
        if (message.isLast()) {
            log.info("receive client(id={}) partial last msg=>{}", session.getId(), message.getPayload());
            if (stringJoiner == null) {
                TextMessage msg = new TextMessage(String.format("reply your(id=%s) msg=>%s", session.getId(), message.getPayload()));
                session.sendMessage(msg);
            } else {
                dataCache.remove(session.getId());
                stringJoiner.append(message.getPayload());
                TextMessage msg = new TextMessage(String.format("reply your(id=%s) msg=>%s", session.getId(), stringJoiner));
                session.sendMessage(msg);
            }
        } else {
            log.info("receive client(id={}) partial non_last msg=>{}", session.getId(), message.getPayload());
            if (stringJoiner == null) {
                stringJoiner = new StringBuilder(message.getPayload());
                dataCache.put(session.getId(), stringJoiner);
            } else {
                stringJoiner.append(message.getPayload());
            }
        }
    }
    @Override
    public boolean supportsPartialMessages() {
        return true;
    }
注意上边的WebsocketHandler1 实现的抽象方法supportsPartialMessages其返回值必须是true,否则处理大消息体时会报错。这是因为StandardWebSocketHandlerAdapter会根据supportsPartialMessages方法返回值将我们的WebSocketHandler适配成MessageHandler.Partial或MessageHandler.Whole,而supportsPartialMessages返回值是false就会适配成MessageHandler.Whole,MessageHandler.Whole是无法处理分块消息体的。
	//StandardWebSocketHandlerAdapter
	@Override
	public void onOpen(final javax.websocket.Session session, EndpointConfig config) {
		this.wsSession.initializeNativeSession(session);
		// The following inner classes need to remain since lambdas would not retain their
		// declared generic types (which need to be seen by the underlying WebSocket engine)
		if (this.handler.supportsPartialMessages()) {
			session.addMessageHandler(new MessageHandler.Partial<String>() {
				@Override
				public void onMessage(String message, boolean isLast) {
					handleTextMessage(session, message, isLast);
				}
			});
			session.addMessageHandler(new MessageHandler.Partial<ByteBuffer>() {
				@Override
				public void onMessage(ByteBuffer message, boolean isLast) {
					handleBinaryMessage(session, message, isLast);
				}
			});
		}
		else {
			session.addMessageHandler(new MessageHandler.Whole<String>() {
				@Override
				public void onMessage(String message) {
					handleTextMessage(session, message, true);
				}
			});
			session.addMessageHandler(new MessageHandler.Whole<ByteBuffer>() {
				@Override
				public void onMessage(ByteBuffer message) {
					handleBinaryMessage(session, message, true);
				}
			});
		}
		//......
	}
websocket底层是怎么处理分块数据的?我们在方法org.apache.tomcat.websocket.WsFrameBase#processDataText可以看到其具体的处理逻辑。
 首先将接收到的二进制数据尝试解码成文本数据,若发现接收缓冲区messageBufferText容量不足则查到分块处理器,若存在分块处理器泽调用sendMessageTex(false),先处理部分数据,若不存在则直接抛出异常。
//org.apache.tomcat.websocket.server.WsFrameBase
	private boolean processDataText() throws IOException {
        // Copy the available data to the buffer
        TransformationResult tr = transformation.getMoreData(opCode, fin, rsv, messageBufferBinary);
        while (!TransformationResult.END_OF_FRAME.equals(tr)) {
             //...
        }
        messageBufferBinary.flip();
        boolean last = false;
        // Frame is fully received
        // Convert bytes to UTF-8
        while (true) {
            CoderResult cr = utf8DecoderMessage.decode(messageBufferBinary, messageBufferText,
                    last);
            if (cr.isError()) {
                throw new WsIOException(new CloseReason(
                        CloseCodes.NOT_CONSISTENT,
                        sm.getString("wsFrame.invalidUtf8")));
            } else if (cr.isOverflow()) {
                // Ran out of space in text buffer - flush it
                //尝试解码时发现接收缓冲区messageBufferText容量不足
                //调用sendMessageTex(false),先处理部分数据。
                if (usePartial()) {   //查找分块处理器
                    messageBufferText.flip();
                    sendMessageText(false);
                    messageBufferText.clear();
                } else { //没有分块处理器,就会抛出异常
                    throw new WsIOException(new CloseReason(
                            CloseCodes.TOO_BIG,
                            sm.getString("wsFrame.textMessageTooBig")));
                }
            } else if (cr.isUnderflow() && !last) {
                // End of frame and possible message as well.
                if (continuationExpected) {
                    // If partial messages are supported, send what we have
                    // managed to decode
                    if (usePartial()) {
                        messageBufferText.flip();
                        sendMessageText(false);
                        messageBufferText.clear();
                    }
                    messageBufferBinary.compact();
                    newFrame();
                    // Process next frame
                    return true;
                } else {
                    // Make sure coder has flushed all output
                    last = true;
                }
            } else {
                // End of message
                messageBufferText.flip();
                //处理最后一块消息
                sendMessageText(true);
                newMessage();
                return true;
            }
        }
	}      
	//确定是否支持分块处理
	private boolean usePartial() {
        if (Util.isControl(opCode)) {
            return false;
        } else if (textMessage) {
            return textMsgHandler instanceof MessageHandler.Partial;
        } else {
            // Must be binary
            return binaryMsgHandler instanceof MessageHandler.Partial;
        }
    }


















