WebSocket

Java 中使用 WebSocket 的方式一般下面几种:

  • 使用 Java 提供的 @ServerEndpoint 注解实现
  • 使用 Spring 提供的低层级 WebSocket API 实现
  • 使用 STOMP 消息实现

使用 Java 注解

这也是常用的一种方式:

@ServerEndpoint:暴露出去的 ws 服务访问的路径,有点类似我们经常用的 @RequestMapping;
@OnOpen:当 websocket 建立连接成功后会触发这个注解修饰的方法,它有一个 Session 参数;
@OnClose:当 websocket 建立的连接断开后会触发这个注解修饰的方法,它有一个 Session 参数;
@OnMessage:当客户端发送消息到服务端时,会触发该注解的方法体执行,它有一个 String 入参表明客户端传入的值;
@OnError:当 websocket 建立连接时出现异常会触发这个注解修饰的方法,它有一个 Session 参数;

以上几个注解都在 javax.websocket 包下,那么对于 Java 17+ 你懂得。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import javax.websocket.OnMessage;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;

@ServerEndpoint("/ws/test")
public class TestWebSocketEndpoint {

@OnMessage
public void handleMessage(Session session, String message) throws IOException {
session.getBasicRemote().sendText("requtest text: " + message);
}
...
}
1
2
3
4
5
6
7
8
9
@Configuration
@EnableWebSocket
public class WebSocketConfig{

@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}

使用 Spring 的 WS 实现

通过继承 TextWebSocketHandler 类并进行方法覆写,可以对 websocket 的事件进行处理:

afterConnectionEstablished() 方法在 socket 连接成功后被触发,同 Java 注解里的 @OnOpen 功能;
afterConnectionClosed()方法在 socket 连接关闭后被触发,同 Java 注解里的 @OnClose 功能;
handleTextMessage() 方法是在客户端发送信息时触发,同原生注解里的 @OnMessage 功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;

import java.text.MessageFormat;

public class MyTextHandler extends TextWebSocketHandler{
private final Logger logger = LoggerFactory.getLogger(getClass());

@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
logger.debug("Opened new session in instance " + this);
}

@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
session.sendMessage(new TextMessage("request: " + message.toString()));
}

@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
session.close(CloseStatus.SERVER_ERROR);
logger.debug("Info: WebSocket connection closed.");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer{

@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(echoWebSocketHandler(), "/customertest");
registry.addHandler(echoWebSocketHandler(), "/customertest_sockjs").withSockJS();
}

@Bean
public WebSocketHandler myTextHandler(){
return new MyTextHandler();
}
}

使用 STOMP

STOMP(Simple Text Oriented Messaging Protocol),就是在 WebSocket 基础之上提供了一个基于帧的线路格式(frame-based wire format)层。它对发送简单文本消息定义了一套规范格式(STOMP 消息基于 Text,当然也支持传输二进制数据),目前很多服务端消息队列都已经支持 STOMP,比如:RabbitMQ、 ActiveMQ 等。

通过实现 WebSocketMessageBrokerConfigurer 接口并加上 @EnableWebSocketMessageBroker 来进行 stomp 的配置与注解扫描。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer{

/**
* 暴露的 stomp 的路径
*/
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// 配置客户端尝试连接地址
registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS();
}

/**
* 节点配置
*/
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// 设置广播节点
registry.enableSimpleBroker("/topic", "/user");
// 客户端向服务端发送消息需有 /api 前缀
registry.setApplicationDestinationPrefixes("/api");
// 指定用户发送(一对一)的前缀 /user/
registry.setUserDestinationPrefix("/user/");
}

@Autowired
private CustomerInterceptor customerInterceptor;

@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(customerInterceptor);
}
}

/**
* 使用自定义拦截器打印一下 ws 断开的通知信息
*/
@Component
public class CustomerInterceptor implements ChannelInterceptor {
private final Logger logger = LoggerFactory.getLogger(getClass());

@Override
public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
StompCommand command = accessor.getCommand();

//用户已经断开连接
if(StompCommand.DISCONNECT.equals(command)){
String user = "";
Principal principal = accessor.getUser();
if(principal != null && StringUtils.isNoneBlank(principal.getName())){
user = principal.getName();
}else{
user = accessor.getSessionId();
}
logger.debug(MessageFormat.format("用户{0}的WebSocket连接已经断开", user));
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Controller
public class WsController {

/**
* 如果服务端主动发送消息可以通过 SimpMessagingTemplate 类的 convertAndSend 方法
*/
@Autowired
private SimpMessagingTemplate simpMessagingTemplate;
/**
* @MessageMapping 注解用于监听指定路径的客户端消息,比如客户端需要访问 ws:ip:port/api/hello,api 是 ws config 中配置的前缀
* @SendTo 注解则用于服务端将消息发送监听了该路径的客户端
*/
@MessageMapping("/hello")
@SendTo("/topic/ws")
public String test(String message) {
System.out.println("接收消息:" + message);
return "Hello," + messgae + "!";
}
}

除了上面的几种之外,还有其它大神编写的 ws 实现库,例如 TIO 等,它的时候方法相差不大。


本站由 江湖浪子 使用 Stellar 1.29.1 主题创建。
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。