WebSocket
Java 中使用 WebSocket 的方式一般下面几种:
- 使用 Java 提供的 @ServerEndpoint 注解实现
- 使用 Spring 提供的低层级 WebSocket API 实现
- 使用 STOMP 消息实现
使用 Java 注解
这也是常用的一种方式:
@ServerEndpoint:暴露出去的 ws 服务访问的路径,有点类似我们经常用的 @RequestMapping;
@OnOpen:当 websocket 建立连接成功后会触发这个注解修饰的方法,它有一个 Session 参数;
@OnClose:当 websocket 建立的连接断开后会触发这个注解修饰的方法,它有一个 Session 参数;
@OnMessage:当客户端发送消息到服务端时,会触发该注解的方法体执行,它有一个 String 入参表明客户端传入的值;
@OnError:当 websocket 建立连接时出现异常会触发这个注解修饰的方法,它有一个 Session 参数;
以上几个注解都在 javax.websocket
包下,那么对于 Java 17+ 你懂得。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| import javax.websocket.OnMessage; import javax.websocket.Session; import javax.websocket.server.ServerEndpoint; import java.io.IOException; @ServerEndpoint("/ws/test") public class TestWebSocketEndpoint {
@OnMessage public void handleMessage(Session session, String message) throws IOException { session.getBasicRemote().sendText("requtest text: " + message); } ... }
|
1 2 3 4 5 6 7 8 9
| @Configuration @EnableWebSocket public class WebSocketConfig{
@Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
|
使用 Spring 的 WS 实现
通过继承 TextWebSocketHandler
类并进行方法覆写,可以对 websocket 的事件进行处理:
afterConnectionEstablished()
方法在 socket 连接成功后被触发,同 Java 注解里的 @OnOpen 功能;
afterConnectionClosed()
方法在 socket 连接关闭后被触发,同 Java 注解里的 @OnClose 功能;
handleTextMessage()
方法是在客户端发送信息时触发,同原生注解里的 @OnMessage 功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.text.MessageFormat;
public class MyTextHandler extends TextWebSocketHandler{ private final Logger logger = LoggerFactory.getLogger(getClass());
@Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { logger.debug("Opened new session in instance " + this); }
@Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { session.sendMessage(new TextMessage("request: " + message.toString())); }
@Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { session.close(CloseStatus.SERVER_ERROR); logger.debug("Info: WebSocket connection closed."); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Configuration @EnableWebSocket public class WebSocketConfig implements WebSocketConfigurer{ @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(echoWebSocketHandler(), "/customertest"); registry.addHandler(echoWebSocketHandler(), "/customertest_sockjs").withSockJS(); } @Bean public WebSocketHandler myTextHandler(){ return new MyTextHandler(); } }
|
使用 STOMP
STOMP(Simple Text Oriented Messaging Protocol),就是在 WebSocket 基础之上提供了一个基于帧的线路格式(frame-based wire format)层。它对发送简单文本消息定义了一套规范格式(STOMP 消息基于 Text,当然也支持传输二进制数据),目前很多服务端消息队列都已经支持 STOMP,比如:RabbitMQ、 ActiveMQ 等。
通过实现 WebSocketMessageBrokerConfigurer
接口并加上 @EnableWebSocketMessageBroker 来进行 stomp 的配置与注解扫描。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.simp.config.ChannelRegistration; import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; import org.springframework.web.socket.config.annotation.StompEndpointRegistry; import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
@Configuration @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer{
@Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS(); }
@Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.enableSimpleBroker("/topic", "/user"); registry.setApplicationDestinationPrefixes("/api"); registry.setUserDestinationPrefix("/user/"); }
@Autowired private CustomerInterceptor customerInterceptor; @Override public void configureClientInboundChannel(ChannelRegistration registration) { registration.interceptors(customerInterceptor); } }
@Component public class CustomerInterceptor implements ChannelInterceptor { private final Logger logger = LoggerFactory.getLogger(getClass()); @Override public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) { StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message); StompCommand command = accessor.getCommand(); if(StompCommand.DISCONNECT.equals(command)){ String user = ""; Principal principal = accessor.getUser(); if(principal != null && StringUtils.isNoneBlank(principal.getName())){ user = principal.getName(); }else{ user = accessor.getSessionId(); } logger.debug(MessageFormat.format("用户{0}的WebSocket连接已经断开", user)); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Controller public class WsController {
@Autowired private SimpMessagingTemplate simpMessagingTemplate;
@MessageMapping("/hello") @SendTo("/topic/ws") public String test(String message) { System.out.println("接收消息:" + message); return "Hello," + messgae + "!"; } }
|
除了上面的几种之外,还有其它大神编写的 ws 实现库,例如 TIO 等,它的时候方法相差不大。