WebSocket

Java 中使用 WebSocket 的方式一般下面几种:

  • 使用 Java 提供的 @ServerEndpoint 注解实现
  • 使用 Spring 提供的低层级 WebSocket API 实现
  • 使用 STOMP 协议实现(STOMP 是一个构建在 websockets 之上的框架,ws 是其实现的底层机制)

WebSocket 协议定义了两种类型的消息 (文本和二进制),但它们的内容未定义。
STOMP 协议定义了一种机制,供客户端和服务器协商子协议(即更高级别的消息传递协议),以便在 WebSocket 之上使用,它定义了以下内容:

  • 每次发送什么信息
  • 格式是什么
  • 每条消息的内容

企业级项目一般使用 stomp 比较多,它有以下优点:

  • 解耦通信模型:不需要手动维护每个 WebSocketSession,前端只要订阅某个主题即可,后端也只需“发送消息”
  • 支持订阅/广播/私聊机制
  • 配合消息代理中间件:能直接对接 RabbitMQ、ActiveMQ、Kafka,实现多节点消息广播
  • 便于维护和扩展:不用管理连接池、消息队列等复杂逻辑,逻辑更清晰、更规范
  • Spring 原生支持好:使用 @MessageMapping 像写 Controller 一样接收消息
  • 可与 Spring Security 无缝整合:登录用户可以直接关联 WebSocket 会话,实现权限管理和用户推送

其中最后两条是使用 Spring 生态时的优势,其它胶水框架的 ws 实现可能略有不同,但肯定是和自家生态无缝集成的。

使用场景

聊天室系统、实时系统通知、股票/数据行情推送、实时协作系统(文档、白板)、后台管理系统告警 常使用 stomp;
而实时游戏通信、IoT 设备通讯等一些需要高性能和定制化协议的场景则常使用 ws 原生协议,它比较轻量,可定制化程度高。

使用 Java 注解

这也是常用的一种方式:

@ServerEndpoint:暴露出去的 ws 服务访问的路径,有点类似我们经常用的 @RequestMapping;
@OnOpen:当 websocket 建立连接成功后会触发这个注解修饰的方法,它有一个 Session 参数;
@OnClose:当 websocket 建立的连接断开后会触发这个注解修饰的方法,它有一个 Session 参数;
@OnMessage:当客户端发送消息到服务端时,会触发该注解的方法体执行,它有一个 String 入参表明客户端传入的值;
@OnError:当 websocket 建立连接时出现异常会触发这个注解修饰的方法,它有一个 Session 参数;

以上几个注解都在 javax.websocket 包下,那么对于 Java 17+ 你懂得。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import javax.websocket.OnMessage;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;

@ServerEndpoint("/ws/test")
public class TestWebSocketEndpoint {

@OnMessage
public void handleMessage(Session session, String message) throws IOException {
session.getBasicRemote().sendText("requtest text: " + message);
}
...
}
1
2
3
4
5
6
7
8
9
@Configuration
@EnableWebSocket
public class WebSocketConfig{

@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}

使用 Spring 的 WS 实现

通过继承 TextWebSocketHandler 类并进行方法覆写,可以对 websocket 的事件进行处理:

afterConnectionEstablished() 方法在 socket 连接成功后被触发,同 Java 注解里的 @OnOpen 功能;
afterConnectionClosed()方法在 socket 连接关闭后被触发,同 Java 注解里的 @OnClose 功能;
handleTextMessage() 方法是在客户端发送信息时触发,同原生注解里的 @OnMessage 功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;

import java.text.MessageFormat;

/**
* 连接处理类,也可以实现 WebSocketHandler 接口,这里直接使用了 TextWebSocketHandler
*/
public class MyTextHandler extends TextWebSocketHandler {
private final Logger logger = LoggerFactory.getLogger(getClass());

@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
logger.debug("Opened new session in instance " + this);
}

@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
session.sendMessage(new TextMessage("request: " + message.toString()));
}

@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
session.close(CloseStatus.SERVER_ERROR);
logger.debug("Info: WebSocket connection closed.");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(myTextHandler(), "/ws");
registry.addHandler(myTextHandler(), "/customertest_sockjs").withSockJS();
// 类似 mvc 添加 interceptor
}

@Bean
public WebSocketHandler myTextHandler(){
return new MyTextHandler();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class WsInterceptor implements HandshakeInterceptor {
/**
* 连接之前 do somthing
* @param request 请求
* @param response 响应
* @param wsHandler handler
* @param attributes 属性
* @return true 放行全部连接,false 拒绝连接
* @throws Exception
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
return false;
}

/**
* 连接之后 do something
* @param request 请求
* @param response 响应
* @param wsHandler hdnaler
* @param exception 异常
*/
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {

}
}

使用 STOMP

STOMP(Simple Text Oriented Messaging Protocol),就是在 WebSocket 基础之上提供了一个基于帧的线路格式(frame-based wire format)层。它对发送简单文本消息定义了一套规范格式(STOMP 消息基于 Text,当然也支持传输二进制数据),目前很多服务端消息队列都已经支持 STOMP,比如:RabbitMQ、 ActiveMQ 等。

通过实现 WebSocketMessageBrokerConfigurer 接口并加上 @EnableWebSocketMessageBroker 注解来进行 stomp 的配置与注解扫描。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer{

/**
* 暴露的 stomp 的路径
*/
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// 配置客户端尝试连接地址
registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS();
}

/**
* 节点配置
*/
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// 设置广播节点
registry.enableSimpleBroker("/topic", "/user");
// 客户端向服务端发送消息需有 /api 前缀
registry.setApplicationDestinationPrefixes("/api");
// 指定用户发送(一对一)的前缀 /user/
registry.setUserDestinationPrefix("/user/");
}

@Autowired
private CustomerInterceptor customerInterceptor;

@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(customerInterceptor);
}
}

/**
* 使用自定义拦截器打印一下 ws 断开的通知信息
*/
@Component
public class CustomerInterceptor implements ChannelInterceptor {
private final Logger logger = LoggerFactory.getLogger(getClass());

@Override
public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
StompCommand command = accessor.getCommand();

//用户已经断开连接
if(StompCommand.DISCONNECT.equals(command)){
String user = "";
Principal principal = accessor.getUser();
if(principal != null && StringUtils.isNoneBlank(principal.getName())){
user = principal.getName();
}else{
user = accessor.getSessionId();
}
logger.debug(MessageFormat.format("用户{0}的WebSocket连接已经断开", user));
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Controller
public class WsController {

/**
* 如果服务端主动发送消息可以通过 SimpMessagingTemplate 类的 convertAndSend 方法
*/
@Autowired
private SimpMessagingTemplate simpMessagingTemplate;
/**
* @MessageMapping 注解用于监听指定路径的客户端消息,比如客户端需要访问 ws:ip:port/api/hello,api 是 ws config 中配置的前缀
* @SendTo 注解则用于服务端将消息发送监听了该路径的客户端
*/
@MessageMapping("/hello")
@SendTo("/topic/ws")
public String test(String message) {
System.out.println("接收消息:" + message);
return "Hello," + messgae + "!";
}
}

除了上面的几种之外,还有其它大神编写的 ws 实现库,例如 TIO 等,它的时候方法相差不大。


本站由 江湖浪子 使用 Stellar 1.32.2 主题创建。
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。