WebSocket
Java 中使用 WebSocket 的方式一般下面几种:
- 使用 Java 提供的 @ServerEndpoint 注解实现
- 使用 Spring 提供的低层级 WebSocket API 实现
- 使用 STOMP 协议实现(STOMP 是一个构建在 websockets 之上的框架,ws 是其实现的底层机制)
WebSocket 协议定义了两种类型的消息 (文本和二进制),但它们的内容未定义。
STOMP 协议定义了一种机制,供客户端和服务器协商子协议(即更高级别的消息传递协议),以便在 WebSocket 之上使用,它定义了以下内容:
企业级项目一般使用 stomp 比较多,它有以下优点:
- 解耦通信模型:不需要手动维护每个 WebSocketSession,前端只要订阅某个主题即可,后端也只需“发送消息”
- 支持订阅/广播/私聊机制
- 配合消息代理中间件:能直接对接 RabbitMQ、ActiveMQ、Kafka,实现多节点消息广播
- 便于维护和扩展:不用管理连接池、消息队列等复杂逻辑,逻辑更清晰、更规范
- Spring 原生支持好:使用 @MessageMapping 像写 Controller 一样接收消息
- 可与 Spring Security 无缝整合:登录用户可以直接关联 WebSocket 会话,实现权限管理和用户推送
其中最后两条是使用 Spring 生态时的优势,其它胶水框架的 ws 实现可能略有不同,但肯定是和自家生态无缝集成的。
使用场景
聊天室系统、实时系统通知、股票/数据行情推送、实时协作系统(文档、白板)、后台管理系统告警 常使用 stomp;
而实时游戏通信、IoT 设备通讯等一些需要高性能和定制化协议的场景则常使用 ws 原生协议,它比较轻量,可定制化程度高。
使用 Java 注解
这也是常用的一种方式:
@ServerEndpoint:暴露出去的 ws 服务访问的路径,有点类似我们经常用的 @RequestMapping;
@OnOpen:当 websocket 建立连接成功后会触发这个注解修饰的方法,它有一个 Session 参数;
@OnClose:当 websocket 建立的连接断开后会触发这个注解修饰的方法,它有一个 Session 参数;
@OnMessage:当客户端发送消息到服务端时,会触发该注解的方法体执行,它有一个 String 入参表明客户端传入的值;
@OnError:当 websocket 建立连接时出现异常会触发这个注解修饰的方法,它有一个 Session 参数;
以上几个注解都在 javax.websocket
包下,那么对于 Java 17+ 你懂得。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| import javax.websocket.OnMessage; import javax.websocket.Session; import javax.websocket.server.ServerEndpoint; import java.io.IOException; @ServerEndpoint("/ws/test") public class TestWebSocketEndpoint {
@OnMessage public void handleMessage(Session session, String message) throws IOException { session.getBasicRemote().sendText("requtest text: " + message); } ... }
|
1 2 3 4 5 6 7 8 9
| @Configuration @EnableWebSocket public class WebSocketConfig{
@Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
|
使用 Spring 的 WS 实现
通过继承 TextWebSocketHandler
类并进行方法覆写,可以对 websocket 的事件进行处理:
afterConnectionEstablished()
方法在 socket 连接成功后被触发,同 Java 注解里的 @OnOpen 功能;
afterConnectionClosed()
方法在 socket 连接关闭后被触发,同 Java 注解里的 @OnClose 功能;
handleTextMessage()
方法是在客户端发送信息时触发,同原生注解里的 @OnMessage 功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.text.MessageFormat;
public class MyTextHandler extends TextWebSocketHandler { private final Logger logger = LoggerFactory.getLogger(getClass());
@Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { logger.debug("Opened new session in instance " + this); }
@Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { session.sendMessage(new TextMessage("request: " + message.toString())); }
@Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { session.close(CloseStatus.SERVER_ERROR); logger.debug("Info: WebSocket connection closed."); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @Configuration @EnableWebSocket public class WebSocketConfig implements WebSocketConfigurer { @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(myTextHandler(), "/ws"); registry.addHandler(myTextHandler(), "/customertest_sockjs").withSockJS(); } @Bean public WebSocketHandler myTextHandler(){ return new MyTextHandler(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class WsInterceptor implements HandshakeInterceptor {
@Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception { return false; }
@Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
} }
|
使用 STOMP
STOMP(Simple Text Oriented Messaging Protocol),就是在 WebSocket 基础之上提供了一个基于帧的线路格式(frame-based wire format)层。它对发送简单文本消息定义了一套规范格式(STOMP 消息基于 Text,当然也支持传输二进制数据),目前很多服务端消息队列都已经支持 STOMP,比如:RabbitMQ、 ActiveMQ 等。
通过实现 WebSocketMessageBrokerConfigurer
接口并加上 @EnableWebSocketMessageBroker
注解来进行 stomp 的配置与注解扫描。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.simp.config.ChannelRegistration; import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; import org.springframework.web.socket.config.annotation.StompEndpointRegistry; import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
@Configuration @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer{
@Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS(); }
@Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.enableSimpleBroker("/topic", "/user"); registry.setApplicationDestinationPrefixes("/api"); registry.setUserDestinationPrefix("/user/"); }
@Autowired private CustomerInterceptor customerInterceptor; @Override public void configureClientInboundChannel(ChannelRegistration registration) { registration.interceptors(customerInterceptor); } }
@Component public class CustomerInterceptor implements ChannelInterceptor { private final Logger logger = LoggerFactory.getLogger(getClass()); @Override public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) { StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message); StompCommand command = accessor.getCommand(); if(StompCommand.DISCONNECT.equals(command)){ String user = ""; Principal principal = accessor.getUser(); if(principal != null && StringUtils.isNoneBlank(principal.getName())){ user = principal.getName(); }else{ user = accessor.getSessionId(); } logger.debug(MessageFormat.format("用户{0}的WebSocket连接已经断开", user)); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Controller public class WsController {
@Autowired private SimpMessagingTemplate simpMessagingTemplate;
@MessageMapping("/hello") @SendTo("/topic/ws") public String test(String message) { System.out.println("接收消息:" + message); return "Hello," + messgae + "!"; } }
|
除了上面的几种之外,还有其它大神编写的 ws 实现库,例如 TIO 等,它的时候方法相差不大。