进一步了解 RabbitMQ 跨语言通讯
开发中遇到了 RabbitMQ 丢失的情况,另外一端一直说是我的 Java 端的问题,无奈做了验证,本文的主要内容也就是这个:如何验证消息发送成功。

进一步了解

开发中遇到了 RabbitMQ 丢失的情况,另外一端一直说是我的 Java 端的问题,无奈做了验证,本文的主要内容也就是这个:如何验证消息发送成功。

配置

pom.xml 文件中添加依赖。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.yml 文件中添加配置开启确认。

spring:
  rabbitmq:
    host: 192.168.0.102
    port: 5672
    username: admin
    password: admin
    virtual-host: sl
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true

重写方法

RabbitTemplate.ConfirmCallback 确认消息是否到达交换机, RabbitTemplate.ReturnCallback 确认消息是否到达队列中。

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
public class CustomRabbitSend implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    private static final Logger LOG = LoggerFactory.getLogger(CustomRabbitSend.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        //到达Exchange交换机,会调用此方法
        if (!b) {
            LOG.error("消息发送异常!");
        } else {
            LOG.info("发送者收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), b, s);
        }
    }

    @Override
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {
        //没有到达Queue队列中,会调用此方法
        LOG.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", i, s, s1, s2);
    }
}

交换机和队列绑定

Exchange 交换机需要通过 routingKey 才能将消息传递到 Queue 队列中(本文只用到 direct 类型,其实无需绑定,fanouttopic 类型才需要)。

import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;


@Configuration
public class DirectRabbitConfig {

    //队列
    @Bean
    public Queue customDirectQueue() {
        return new Queue("testQueue");
    }

    //Direct交换机
    @Bean
    public DirectExchange customDirectExchange() {
        return new DirectExchange("testDirectExchange", true, false);
    }

    //将队列和交换机绑定
    @Bean
    public Binding bindingDirect() {
        return BindingBuilder.bind(customDirectQueue()).to(customDirectExchange()).with("testRoutingKey");
    }

}

发送

传递 CorrelationData 类,包含消息唯一标识(AmqpTemplate 类无法传递,之前一直用的这个,死活找不到方法)。


@Component
public class TestScheduled {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Scheduled(fixedDelay = 3000)
    public void errorSpiderRerun(){
        String content = "test rabbit and now " + LocalDateTime.now().toString();
        CorrelationData cId = new CorrelationData("unique key");
        rabbitTemplate.convertAndSend("testDirectExchange", "testRoutingKey", content, cId);
    }
}

接收

Message 类接收。目前我用的是 String 传输,可以配置成 JSON 传输更好些。


import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class CustomRabbitReceiver {

    private static final Logger LOG = LoggerFactory.getLogger(CustomRabbitReceiver.class);

    @RabbitListener(queues = "testQueue")
    public void listen(Message message) {
        String body = new String(message.getBody());
        LOG.info("receiver from python:" + body);
    }
}

谁的锅

加了验证后,Java 端没有丢失,另一端消息也确实拿掉了,研究可能是多线程的问题,之前用的是 pika,官方说多线程不安全,后续可以改为 rabbitpy 更安全。

关联阅读


Last modified on 2020-07-28