红猫三代部署项目中,后端代码调运维接口构建部署包,之后运维侧需要回调后端接口,给出请求的状态(成功或者失败)以及构建日志(方便开发人员进行判断构建异常的原因),在此接口中引入WebSocket技术,前端页面上不需要一直刷新去获取状态,一旦收到运维侧回调结果,后端向前端页面推送消息展示结果。

WebSocket 详解

WebSocket 可提供一个在单一TCP连接全双工双向 通信协议。
【全双工】意味着客户端和服务器可以独立发送消息给对方
【双向】意味着客户端可以向服务器发送消息,反之亦然。

WebSocket 协议

WebSocket 协议有两个部分:握手和传输。
客户端通过向服务端Url发送握手请求来建立连接。
握手与现有的基于HTTP的基础结构相兼容。
Web服务器将其解释为升级版的HTTP连接请求

WebSocket 连接

一个客户端建立连接的握手请求:
GET /path/to/websocket/endpoint HTTP/1.1
Host: localhost
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: xqBt3ImNzJbYqRINxEFlkg==
Origin: http://localhost
Sec-WebSocket-Version: 13
一个服务端响应
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: K7DJLdLooIwIG/MOpvWFB3y3FE8=

一个WebSocket连接的建立,需要客户端和服务器共同来维护一个key作为该连接的连接凭证
客户端向服务端发送WebSocketKey
服务器根据WebSocketKey 生成WebSocketAccpet 返回给客户端
客户端对WebSocketKey的值再进行相同的操作,如果服务器返回的Accept的值相匹配,就表示握手成功。之后就可以互相返送消息
image

WebSocket 消息类型

支持 文本消息和二进制消息。

WebSocket的控制消息由Close、Ping、Pong组成。
ping和pong也可能包含应用程序信息

WebSocket 连接形式:

ws://host:port/path?query
wss://host:port/path?query

ws表示未加密的连接
wss表示加密的连接

SpringBoot中实现WebSocket的几种方式

  1. 使用Java提供的@ServerEndpoint注解实现
  2. 使用Spring提供的低层级WebSocket API 实现
  3. 使用STOMP 消息实现

1、@ServerEndpoint

1.WebSocket 配置

@Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

2.使用@ServerEndpoint 注解监听一个WebSocket 请求路径
监听/buildCallback/{requestId} 接口 requestId 是三代项目中每一次操作的唯一id
OnOpenOnClose 是在连接断开WebSocket连接的时候可以做的一些操作

在运维侧回调接口中注入并调用sendMessage()方法,即可向前端页面发送消息

@Slf4j
@Component
@ServerEndpoint("/buildCallback/{requestId}")
public class BuildCallBackSocket {
    private static final ConcurrentHashMap<Long, Session> webSocketMap = new ConcurrentHashMap<>();

    @OnOpen
    public void onOpen(Session session, @PathParam("requestId") Long requestId) {
        webSocketMap.put(requestId, session);
        log.info("ws connect, reqId: {}", requestId);
    }

    public void sendMessage(Long requestId, String state) {
        Session session = webSocketMap.get(requestId);
        if (session != null) {
            try {
                session.getAsyncRemote().sendText(state);
                session.close();
            } catch (Exception ex) {
                log.info("there has an exception when closing the ws,ex msg: {}, reqId: {}", ex, requestId);
            }
        } else {
            log.info("ws connection lost,reqId: {}", requestId);
        }
    }

    @OnClose
    public void onClose(@PathParam("requestId") Long requestId) {
        webSocketMap.remove(requestId);
        log.info("ws disconnect, reqId: {}", requestId);
    }
}

2.Spring WebSocket API

   Spring为WebSocket通信提供了支持,包括:

  • 发送和接收消息的低层级API
  • 发送和接收消息的高级API
  • 用来发送消息的模板
  • 支持SockJS,用来解决浏览器端、服务器以及代理不支持WebSocket的问题

1.添加一个WebsocketHandler
定义一个继承了AbstractWebSocketHandler类的消息处理类,然后自定义对“建立连接”、“接收/发送消息”、“异常情况”等情况进行处理

AbstractWebSocketHandler 还有几个子类 :

BinaryWebSocketHandler - 二进制
TextWebSocketHandler   - 文本消息
SockJsWebSocketHandler

@Component
public class PipelineSocketHandler extends TextWebSocketHandler {

    public static final String PATH_PATTERN = "/modules/{moduleId}/publish/{requestId}/pipelines";

    private final ConcurrentHashMap<Long, WebSocketSession> sessions = new ConcurrentHashMap<>();
    private final ObjectMapper objectMapper;
    private final AntPathMatcher pathMatcher;
    private final Logger logger = LoggerFactory.getLogger(getClass());

    public PipelineSocketHandler(ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
        this.pathMatcher = new AntPathMatcher();
    }

    @Override
    public void afterConnectionEstablished(WebSocketSession session) {
        Map<String, String> pathVariables = this.pathMatcher.extractUriTemplateVariables(PATH_PATTERN, Objects.requireNonNull(session.getUri()).getPath());
        Long requestId = Long.valueOf(pathVariables.get("requestId"));
        session.getAttributes().put("requestId", requestId);
        this.sessions.put(requestId, session);

        logger.info("ws connected, requestId: {}", requestId);
        this.sendMessage(requestId, new PublishStateModel("uploading"));
    }

    public void sendMessage(Long requestId, PublishStateModel model) {
        WebSocketSession session = sessions.get(requestId);

        if (session != null) {

            try {
                session.sendMessage(new TextMessage(objectMapper.writeValueAsString(model)));

                if ("done".equals(model.getPhase()) || "error".equals(model.getPhase())) {
                    session.close();
                }
            } catch (IOException ex) {
                throw new RuntimeException(ex.getMessage(), ex);
            }
        }
    }

    @Override
    public void afterConnectionClosed(@NotNull WebSocketSession session, @NotNull CloseStatus status) {
        Long requestId = (Long) session.getAttributes().get("requestId");
        sessions.remove(requestId);

        logger.info("ws connection closed, requestId: {}", requestId);
    }
}
  1. 相关配置:
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    private final PipelineSocketHandler handler;

    public WebSocketConfig(PipelineSocketHandler handler) {
        this.handler = handler;
    }

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(handler, PipelineSocketHandler.PATH_PATTERN).setAllowedOrigins("*");
    }
}

我们还可以在registerWebSocketHandlers中再添加一条:
registry.addHandler(handler, PipelineSocketHandler.PATH_PATTERN).setAllowedOrigins("*").withSockJS();来表示如果浏览器不支持WebSocket技术是的代替方案

3.使用STOMP消息实现

   所谓STOMP(Simple Text Oriented Messaging Protocol),就是在WebSocket基础之上提供了一个基于帧的线路格式(frame-based wire format)层。它对发送简单文本消息定义了一套规范格式(STOMP消息基于Text,当然也支持传输二进制数据),目前很多服务端消息队列都已经支持STOMP,比如:RabbitMQ、 ActiveMQ等。

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    private MyChannelInterceptor myChannelInterceptor;

    public WebSocketConfig(MyChannelInterceptor myChannelInterceptor) {
        this.myChannelInterceptor = myChannelInterceptor;
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/stomp-websocket").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {

        //客户端需要把消息发送到/message/xxx地址
        registry.setApplicationDestinationPrefixes("/message");

        //服务端广播消息的路径前缀,客户端需要相应订阅/topic/yyy这个地址的消息
        registry.enableSimpleBroker("/topic");

    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(myChannelInterceptor);
    }
}
  • 首先注册了一个名为/stomp-websocket的端点,也就是STOMP客户端的连接地址。
  • 此外,定义了服务端处理WebSocket消息的前缀是message,这个地址用于客户端向服务端发送消息(比如客户端向/message/hello地址发送消息,那么服务端通过@MessageMapping("/hello")这个注解来接收并处理消息)
  • 最后,定义了一个简单的消息代理,也就是服务端广播消息的路径前缀(比如客户端监听/topic/gretting这个地址,那么服务端就可以通过@SendTo("/topic/greeting")这个注解向客户端发送STOMP消息)。

需要注意的是,上面代码中还添加了一个名为MyChannelInterceptor的拦截器,目的是为了在客户端断开连接后打印一下日志。相关代码如下

@Component
public class MyChannelInterceptor implements ChannelInterceptor {
    private final Logger logger = LoggerFactory.getLogger(getClass());
    
    @Override
    public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        StompCommand command = accessor.getCommand();
        
        //用户已经断开连接
        if (StompCommand.DISCONNECT.equals(command)) {
            String user = "";
            Principal principal = accessor.getUser();
            if (principal != null && StringUtils.isNoneBlank(principal.getName())) {
                user = principal.getName();
            } else {
                user = accessor.getSessionId();
            }
            logger.debug(MessageFormat.format("用户{0}的WebSocket连接已经断开", user));
        }
    }
}

@MessageMapping注解用于监听指定路径的客户端消息,而@SendTo注解用于将服务端的消息发送给监听了该路径的客户端

@Controller
public class GreetingController {
    @MessageMapping("/hello")
    @SendTo("/topic/greeting")
    public HelloMessage greeting(Greeting greeting) {
        return new HelloMessage("Hello," + greeting.getName() + "!");
    }
}