
Java 实现Websocket的几种方式
红猫三代部署项目中,后端代码调运维接口构建部署包,之后运维侧需要回调后端接口,给出请求的状态(成功或者失败)以及构建日志(方便开发人员进行判断构建异常的原因),在此接口中引入WebSocket技术,前端页面上不需要一直刷新去获取状态,一旦收到运维侧回调结果,后端向前端页面推送消息展示结果。
WebSocket 详解
WebSocket 可提供一个在单一TCP连接全双工双向 通信协议。
【全双工】意味着客户端和服务器可以独立发送消息给对方
【双向】意味着客户端可以向服务器发送消息,反之亦然。
WebSocket 协议
WebSocket 协议有两个部分:握手和传输。
客户端通过向服务端Url发送握手请求来建立连接。
握手与现有的基于HTTP的基础结构相兼容。
Web服务器将其解释为升级版的HTTP连接请求
WebSocket 连接
一个客户端建立连接的握手请求:
GET /path/to/websocket/endpoint HTTP/1.1
Host: localhost
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: xqBt3ImNzJbYqRINxEFlkg==
Origin: http://localhost
Sec-WebSocket-Version: 13
一个服务端响应
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: K7DJLdLooIwIG/MOpvWFB3y3FE8=
一个WebSocket连接的建立,需要客户端和服务器共同来维护一个key作为该连接的连接凭证
客户端向服务端发送WebSocketKey
服务器根据WebSocketKey 生成WebSocketAccpet 返回给客户端
客户端对WebSocketKey的值再进行相同的操作,如果服务器返回的Accept的值相匹配,就表示握手成功。之后就可以互相返送消息
WebSocket 消息类型
支持 文本消息和二进制消息。
WebSocket的控制消息由Close、Ping、Pong组成。
ping和pong也可能包含应用程序信息
WebSocket 连接形式:
ws://host:port/path?query
wss://host:port/path?query
ws表示未加密的连接
wss表示加密的连接
SpringBoot中实现WebSocket的几种方式
- 使用Java提供的@ServerEndpoint注解实现
- 使用Spring提供的低层级WebSocket API 实现
- 使用STOMP 消息实现
1、@ServerEndpoint
1.WebSocket 配置
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
2.使用@ServerEndpoint 注解监听一个WebSocket 请求路径
监听/buildCallback/{requestId} 接口 requestId 是三代项目中每一次操作的唯一id
OnOpen
和 OnClose
是在连接断开WebSocket连接的时候可以做的一些操作
在运维侧回调接口中注入并调用
sendMessage()
方法,即可向前端页面发送消息
@Slf4j
@Component
@ServerEndpoint("/buildCallback/{requestId}")
public class BuildCallBackSocket {
private static final ConcurrentHashMap<Long, Session> webSocketMap = new ConcurrentHashMap<>();
@OnOpen
public void onOpen(Session session, @PathParam("requestId") Long requestId) {
webSocketMap.put(requestId, session);
log.info("ws connect, reqId: {}", requestId);
}
public void sendMessage(Long requestId, String state) {
Session session = webSocketMap.get(requestId);
if (session != null) {
try {
session.getAsyncRemote().sendText(state);
session.close();
} catch (Exception ex) {
log.info("there has an exception when closing the ws,ex msg: {}, reqId: {}", ex, requestId);
}
} else {
log.info("ws connection lost,reqId: {}", requestId);
}
}
@OnClose
public void onClose(@PathParam("requestId") Long requestId) {
webSocketMap.remove(requestId);
log.info("ws disconnect, reqId: {}", requestId);
}
}
2.Spring WebSocket API
Spring为WebSocket通信提供了支持,包括:
- 发送和接收消息的低层级API
- 发送和接收消息的高级API
- 用来发送消息的模板
- 支持SockJS,用来解决浏览器端、服务器以及代理不支持WebSocket的问题
1.添加一个WebsocketHandler
定义一个继承了AbstractWebSocketHandler类的消息处理类,然后自定义对“建立连接”、“接收/发送消息”、“异常情况”等情况进行处理
AbstractWebSocketHandler 还有几个子类 :
BinaryWebSocketHandler - 二进制
TextWebSocketHandler - 文本消息
SockJsWebSocketHandler
@Component
public class PipelineSocketHandler extends TextWebSocketHandler {
public static final String PATH_PATTERN = "/modules/{moduleId}/publish/{requestId}/pipelines";
private final ConcurrentHashMap<Long, WebSocketSession> sessions = new ConcurrentHashMap<>();
private final ObjectMapper objectMapper;
private final AntPathMatcher pathMatcher;
private final Logger logger = LoggerFactory.getLogger(getClass());
public PipelineSocketHandler(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
this.pathMatcher = new AntPathMatcher();
}
@Override
public void afterConnectionEstablished(WebSocketSession session) {
Map<String, String> pathVariables = this.pathMatcher.extractUriTemplateVariables(PATH_PATTERN, Objects.requireNonNull(session.getUri()).getPath());
Long requestId = Long.valueOf(pathVariables.get("requestId"));
session.getAttributes().put("requestId", requestId);
this.sessions.put(requestId, session);
logger.info("ws connected, requestId: {}", requestId);
this.sendMessage(requestId, new PublishStateModel("uploading"));
}
public void sendMessage(Long requestId, PublishStateModel model) {
WebSocketSession session = sessions.get(requestId);
if (session != null) {
try {
session.sendMessage(new TextMessage(objectMapper.writeValueAsString(model)));
if ("done".equals(model.getPhase()) || "error".equals(model.getPhase())) {
session.close();
}
} catch (IOException ex) {
throw new RuntimeException(ex.getMessage(), ex);
}
}
}
@Override
public void afterConnectionClosed(@NotNull WebSocketSession session, @NotNull CloseStatus status) {
Long requestId = (Long) session.getAttributes().get("requestId");
sessions.remove(requestId);
logger.info("ws connection closed, requestId: {}", requestId);
}
}
- 相关配置:
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
private final PipelineSocketHandler handler;
public WebSocketConfig(PipelineSocketHandler handler) {
this.handler = handler;
}
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(handler, PipelineSocketHandler.PATH_PATTERN).setAllowedOrigins("*");
}
}
我们还可以在registerWebSocketHandlers
中再添加一条:
registry.addHandler(handler, PipelineSocketHandler.PATH_PATTERN).setAllowedOrigins("*").withSockJS();
来表示如果浏览器不支持WebSocket技术是的代替方案
3.使用STOMP消息实现
所谓STOMP(Simple Text Oriented Messaging Protocol),就是在WebSocket基础之上提供了一个基于帧的线路格式(frame-based wire format)层。它对发送简单文本消息定义了一套规范格式(STOMP消息基于Text,当然也支持传输二进制数据),目前很多服务端消息队列都已经支持STOMP,比如:RabbitMQ、 ActiveMQ等。
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
private MyChannelInterceptor myChannelInterceptor;
public WebSocketConfig(MyChannelInterceptor myChannelInterceptor) {
this.myChannelInterceptor = myChannelInterceptor;
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/stomp-websocket").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
//客户端需要把消息发送到/message/xxx地址
registry.setApplicationDestinationPrefixes("/message");
//服务端广播消息的路径前缀,客户端需要相应订阅/topic/yyy这个地址的消息
registry.enableSimpleBroker("/topic");
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(myChannelInterceptor);
}
}
- 首先注册了一个名为
/stomp-websocket
的端点,也就是STOMP客户端的连接地址。 - 此外,定义了服务端处理WebSocket消息的前缀是
message
,这个地址用于客户端向服务端发送消息(比如客户端向/message/hello
地址发送消息,那么服务端通过@MessageMapping("/hello")
这个注解来接收并处理消息) - 最后,定义了一个简单的消息代理,也就是服务端广播消息的路径前缀(比如客户端监听
/topic/gretting
这个地址,那么服务端就可以通过@SendTo("/topic/greeting")
这个注解向客户端发送STOMP消息)。
需要注意的是,上面代码中还添加了一个名为MyChannelInterceptor
的拦截器,目的是为了在客户端断开连接后打印一下日志。相关代码如下
@Component
public class MyChannelInterceptor implements ChannelInterceptor {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
StompCommand command = accessor.getCommand();
//用户已经断开连接
if (StompCommand.DISCONNECT.equals(command)) {
String user = "";
Principal principal = accessor.getUser();
if (principal != null && StringUtils.isNoneBlank(principal.getName())) {
user = principal.getName();
} else {
user = accessor.getSessionId();
}
logger.debug(MessageFormat.format("用户{0}的WebSocket连接已经断开", user));
}
}
}
@MessageMapping
注解用于监听指定路径的客户端消息,而@SendTo
注解用于将服务端的消息发送给监听了该路径的客户端
@Controller
public class GreetingController {
@MessageMapping("/hello")
@SendTo("/topic/greeting")
public HelloMessage greeting(Greeting greeting) {
return new HelloMessage("Hello," + greeting.getName() + "!");
}
}