前言

  在项目中服务是双实例,导致请求发出的实例可能收不到回调的请求,导致前端请求超时失败,方案是使用RocketMq 广播模式来让所有的实例都可以收到请求。

Spring cloud Stream

根据配置:

spring:
  cloud:
    stream:
      bindings:
        build-callback:
          destination: build-callback
          group: ${spring.application.name}-build-callback
          consumer:
            broadcasting: true

 理论上述配置即可开启广播模式 但是实际使用过程中,双实例的消费者都在同一个group种,消息分组后,同一分组的消费服务,只能有一个消费者能消费到消息,很显然这不符合我们的要求。

RocketMq

生产者

@Component
public class BuildCallbackPublisher {
    private final RocketMQTemplate rocketMQTemplate;

    public BuildCallbackPublisher(RocketMQTemplate rocketMQTemplate) {
        this.rocketMQTemplate = rocketMQTemplate;
        rocketMQTemplate.setDefaultDestination("build-callback");
    }

    public void deployComplete(PublishCallbackEvent publishCallbackEvent) {
        Message<PublishCallbackEvent> message = MessageBuilder
                .withPayload(publishCallbackEvent)
                .build();
        rocketMQTemplate.send(message);
    }
}

 注入一个RocketMQTemplate 调用send方法发送消息,setDefaultDestination设置为build-callback

消费者

@Slf4j
@Component
@RocketMQMessageListener(
        topic = "build-callback",
        consumerGroup = "${spring.application.name}-build-callback",
        messageModel = MessageModel.BROADCASTING
)
public class BuildCallbackSubscriber implements RocketMQListener<PublishCallbackEvent> {
    private final DeployAsync deployAsync;

    public BuildCallbackSubscriber(DeployAsync deployAsync) {
        this.deployAsync = deployAsync;
    }

    @SneakyThrows
    @Override
    public void onMessage(PublishCallbackEvent publishCallbackEvent) {
        log.info("build-callback: get data from mq: " + new ObjectMapper().writeValueAsString(publishCallbackEvent));
        deployAsync.deployComplete(publishCallbackEvent);
    }
}

 使用@RocketMQMessageListener底层会帮我们用原生的Consumer消费消息,详细源码分析点此链接topicconsumerGroup配置一目了然,不需要多说,也是必须配置的内容,messageModel配置消费者消费的模式,分为:BROADCASTING广播模式和CLUSTERING集群模式

广播模式不需要更改生产者,只需要专注消费者。