进一步了解 RabbitMQ 跨语言通讯
开发中遇到了 RabbitMQ 丢失的情况,另外一端一直说是我的 Java 端的问题,无奈做了验证,本文的主要内容也就是这个:如何验证消息发送成功。
进一步了解
开发中遇到了 RabbitMQ 丢失的情况,另外一端一直说是我的 Java 端的问题,无奈做了验证,本文的主要内容也就是这个:如何验证消息发送成功。
配置
pom.xml
文件中添加依赖。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.yml
文件中添加配置开启确认。
spring:
rabbitmq:
host: 192.168.0.102
port: 5672
username: admin
password: admin
virtual-host: sl
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
重写方法
RabbitTemplate.ConfirmCallback
确认消息是否到达交换机, RabbitTemplate.ReturnCallback
确认消息是否到达队列中。
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class CustomRabbitSend implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
private static final Logger LOG = LoggerFactory.getLogger(CustomRabbitSend.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
//到达Exchange交换机,会调用此方法
if (!b) {
LOG.error("消息发送异常!");
} else {
LOG.info("发送者收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), b, s);
}
}
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
//没有到达Queue队列中,会调用此方法
LOG.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", i, s, s1, s2);
}
}
交换机和队列绑定
Exchange
交换机需要通过 routingKey
才能将消息传递到 Queue
队列中(本文只用到 direct
类型,其实无需绑定,fanout
、topic
类型才需要)。
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
@Configuration
public class DirectRabbitConfig {
//队列
@Bean
public Queue customDirectQueue() {
return new Queue("testQueue");
}
//Direct交换机
@Bean
public DirectExchange customDirectExchange() {
return new DirectExchange("testDirectExchange", true, false);
}
//将队列和交换机绑定
@Bean
public Binding bindingDirect() {
return BindingBuilder.bind(customDirectQueue()).to(customDirectExchange()).with("testRoutingKey");
}
}
发送
传递 CorrelationData
类,包含消息唯一标识(AmqpTemplate
类无法传递,之前一直用的这个,死活找不到方法)。
@Component
public class TestScheduled {
@Autowired
private RabbitTemplate rabbitTemplate;
@Scheduled(fixedDelay = 3000)
public void errorSpiderRerun(){
String content = "test rabbit and now " + LocalDateTime.now().toString();
CorrelationData cId = new CorrelationData("unique key");
rabbitTemplate.convertAndSend("testDirectExchange", "testRoutingKey", content, cId);
}
}
接收
用 Message
类接收。目前我用的是 String 传输,可以配置成 JSON 传输更好些。
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class CustomRabbitReceiver {
private static final Logger LOG = LoggerFactory.getLogger(CustomRabbitReceiver.class);
@RabbitListener(queues = "testQueue")
public void listen(Message message) {
String body = new String(message.getBody());
LOG.info("receiver from python:" + body);
}
}
谁的锅
加了验证后,Java 端没有丢失,另一端消息也确实拿掉了,研究可能是多线程的问题,之前用的是 pika
,官方说多线程不安全,后续可以改为 rabbitpy
更安全。
关联阅读
Last modified on 2020-07-28