WebSocket
Java 中使用 WebSocket 的方式一般下面几种:
- 使用 Java 提供的 @ServerEndpoint 注解实现
- 使用 Spring 提供的低层级 WebSocket API 实现
- 使用 STOMP 协议实现(STOMP 是一个构建在 websockets 之上的框架,ws 是其实现的底层机制)
WebSocket 协议定义了两种类型的消息 (文本和二进制),但它们的内容未定义。 STOMP 协议定义了一种机制,供客户端和服务器协商子协议(即更高级别的消息传递协议),以便在 WebSocket 之上使用,它定义了以下内容:
- 每次发送什么信息
- 格式是什么
- 每条消息的内容
企业级项目一般使用 stomp 比较多,它有以下优点:
- 解耦通信模型:不需要手动维护每个 WebSocketSession,前端只要订阅某个主题即可,后端也只需“发送消息”
- 支持订阅/广播/私聊机制
- 配合消息代理中间件:能直接对接 RabbitMQ、ActiveMQ、Kafka,实现多节点消息广播
- 便于维护和扩展:不用管理连接池、消息队列等复杂逻辑,逻辑更清晰、更规范
- Spring 原生支持好:使用 @MessageMapping 像写 Controller 一样接收消息
- 可与 Spring Security 无缝整合:登录用户可以直接关联 WebSocket 会话,实现权限管理和用户推送
其中最后两条是使用 Spring 生态时的优势,其它胶水框架的 ws 实现可能略有不同,但肯定是和自家生态无缝集成的。
使用场景
聊天室系统、实时系统通知、股票/数据行情推送、实时协作系统(文档、白板)、后台管理系统告警 常使用 stomp; 而实时游戏通信、IoT 设备通讯等一些需要高性能和定制化协议的场景则常使用 ws 原生协议,它比较轻量,可定制化程度高。
使用 Java 注解
这也是常用的一种方式:
@ServerEndpoint:暴露出去的 ws 服务访问的路径,有点类似我们经常用的 @RequestMapping; @OnOpen:当 websocket 建立连接成功后会触发这个注解修饰的方法,它有一个 Session 参数; @OnClose:当 websocket 建立的连接断开后会触发这个注解修饰的方法,它有一个 Session 参数; @OnMessage:当客户端发送消息到服务端时,会触发该注解的方法体执行,它有一个 String 入参表明客户端传入的值; @OnError:当 websocket 建立连接时出现异常会触发这个注解修饰的方法,它有一个 Session 参数;
以上几个注解都在
javax.websocket包下,那么对于 Java 17+ 你懂得。
import javax.websocket.OnMessage;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
@ServerEndpoint("/ws/test")
public class TestWebSocketEndpoint {
@OnMessage
public void handleMessage(Session session, String message) throws IOException {
session.getBasicRemote().sendText("requtest text: " + message);
}
...
}
@Configuration
@EnableWebSocket
public class WebSocketConfig{
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
使用 Spring 的 WS 实现
通过继承 TextWebSocketHandler 类并进行方法覆写,可以对 websocket 的事件进行处理:
afterConnectionEstablished() 方法在 socket 连接成功后被触发,同 Java 注解里的 @OnOpen 功能;
afterConnectionClosed()方法在 socket 连接关闭后被触发,同 Java 注解里的 @OnClose 功能;
handleTextMessage() 方法是在客户端发送信息时触发,同原生注解里的 @OnMessage 功能。
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.text.MessageFormat;
/**
* 连接处理类,也可以实现 WebSocketHandler 接口,这里直接使用了 TextWebSocketHandler
*/
public class MyTextHandler extends TextWebSocketHandler {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
logger.debug("Opened new session in instance " + this);
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
session.sendMessage(new TextMessage("request: " + message.toString()));
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
session.close(CloseStatus.SERVER_ERROR);
logger.debug("Info: WebSocket connection closed.");
}
}
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(myTextHandler(), "/ws");
registry.addHandler(myTextHandler(), "/customertest_sockjs").withSockJS();
// 类似 mvc 添加 interceptor
}
@Bean
public WebSocketHandler myTextHandler(){
return new MyTextHandler();
}
}
public class WsInterceptor implements HandshakeInterceptor {
/**
* 连接之前 do somthing
* @param request 请求
* @param response 响应
* @param wsHandler handler
* @param attributes 属性
* @return true 放行全部连接,false 拒绝连接
* @throws Exception
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
return false;
}
/**
* 连接之后 do something
* @param request 请求
* @param response 响应
* @param wsHandler hdnaler
* @param exception 异常
*/
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
}
}
使用 STOMP
STOMP(Simple Text Oriented Messaging Protocol),就是在 WebSocket 基础之上提供了一个基于帧的线路格式(frame-based wire format)层。它对发送简单文本消息定义了一套规范格式(STOMP 消息基于 Text,当然也支持传输二进制数据),目前很多服务端消息队列都已经支持 STOMP,比如:RabbitMQ、 ActiveMQ 等。
通过实现 WebSocketMessageBrokerConfigurer 接口并加上 @EnableWebSocketMessageBroker 注解来进行 stomp 的配置与注解扫描。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer{
/**
* 暴露的 stomp 的路径
*/
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// 配置客户端尝试连接地址
registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS();
}
/**
* 节点配置
*/
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// 设置广播节点
registry.enableSimpleBroker("/topic", "/user");
// 客户端向服务端发送消息需有 /api 前缀
registry.setApplicationDestinationPrefixes("/api");
// 指定用户发送(一对一)的前缀 /user/
registry.setUserDestinationPrefix("/user/");
}
@Autowired
private CustomerInterceptor customerInterceptor;
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(customerInterceptor);
}
}
/**
* 使用自定义拦截器打印一下 ws 断开的通知信息
*/
@Component
public class CustomerInterceptor implements ChannelInterceptor {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
StompCommand command = accessor.getCommand();
//用户已经断开连接
if(StompCommand.DISCONNECT.equals(command)){
String user = "";
Principal principal = accessor.getUser();
if(principal != null && StringUtils.isNoneBlank(principal.getName())){
user = principal.getName();
}else{
user = accessor.getSessionId();
}
logger.debug(MessageFormat.format("用户{0}的WebSocket连接已经断开", user));
}
}
}
@Controller
public class WsController {
/**
* 如果服务端主动发送消息可以通过 SimpMessagingTemplate 类的 convertAndSend 方法
*/
@Autowired
private SimpMessagingTemplate simpMessagingTemplate;
/**
* @MessageMapping 注解用于监听指定路径的客户端消息,比如客户端需要访问 ws:ip:port/api/hello,api 是 ws config 中配置的前缀
* @SendTo 注解则用于服务端将消息发送监听了该路径的客户端
*/
@MessageMapping("/hello")
@SendTo("/topic/ws")
public String test(String message) {
System.out.println("接收消息:" + message);
return "Hello," + messgae + "!";
}
}
除了上面的几种之外,还有其它大神编写的 ws 实现库,例如 TIO 等,它的使用方法相差不大。

说些什么吧!