时间:2022-10-04 10:51:47 | 栏目:JAVA代码 | 点击:次
1. 存在的问题
再生产环境中由于一些不明原因导致rabbitmq
重启,在RabbitMQ
重启期间生产者消息投递失败,会导致消息丢失。
当消息不能正常被接收的时候,我们需要将消息存放在缓存中。
spring.rabbitmq.host=192.168.123.129 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=123 spring.rabbitmq.publisher-confirm-type=correlated
NONE
:禁用发布确认模式,是默认值。CORRELATED
:发布消息成功到交换机会触发回调方方法。CORRELATED
:就是发布一个就确认一个。import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class ConfirmConfig { public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange"; public static final String CONFIRM_QUEUE_NAME = "confirm_queue"; public static final String CONFIRM_ROUTING_KEY = "key1"; @Bean("confirmExchange") public DirectExchange confirmExchange(){ return new DirectExchange(CONFIRM_EXCHANGE_NAME); } @Bean("confirmQueue") public Queue confirmQueue(){ return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } @Bean public Binding queueBindingExchange(@Qualifier("confirmExchange") DirectExchange confirmExchange, @Qualifier("confirmQueue") Queue confirmQueue){ return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY); } }
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; /** * 回调接口 */ @Component @Slf4j public class MyCallBack implements RabbitTemplate.ConfirmCallback { @Autowired RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ rabbitTemplate.setConfirmCallback(this); } /** * 交换机接受失败后进行回调 * 1. 保存消息的ID及相关消息 * 2. 是否接收成功 * 3. 接受失败的原因 * @param correlationData * @param b * @param s */ @Override public void confirm(CorrelationData correlationData, boolean b, String s) { String id = correlationData != null ? correlationData.getId() : ""; if(b == true){ log.info("交换机已经收到id为:{}的消息",id); }else{ log.info("交换机还未收到id为:{}消息,由于原因:{}",id,s); } } }
import com.xiao.springbootrabbitmq.utils.MyCallBack; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.PostConstruct; @RestController @RequestMapping("/confirm") @Slf4j public class Producer { public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange"; @Autowired RabbitTemplate rabbitTemplate; @GetMapping("/sendMessage/{message}") public void sendMessage(@PathVariable String message){ CorrelationData correlationData1 = new CorrelationData("1"); String routingKey1 = "key1"; rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey1,message + routingKey1,correlationData1); CorrelationData correlationData2 = new CorrelationData("2"); String routingKey2 = "key2"; rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey2,message + routingKey2,correlationData2); log.info("发送得内容是:{}",message); } }
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @Slf4j public class ConfirmConsumer { public static final String CONFIRM_QUEUE_NAME = "confirm_queue"; @RabbitListener(queues = CONFIRM_QUEUE_NAME) public void receiveMessage(Message message){ String msg = new String(message.getBody()); log.info("接收到队列" + CONFIRM_QUEUE_NAME + "消息:{}",msg); } }
1. 第一种情况
ID
为1
的消息正常送达,ID
为2
的消息由于RoutingKey
的错误,导致不能正常被消费,但是交换机还是正常收到了消息,所以此时由于交换机正常接收之后的原因丢失的消息不能正常被接收。
2. 第二种情况
我们再上一种情况下修改了ID
为1
的消息的交换机的名称,所以此时回调函数会进行回答由于什么原因导致交换机无法接收成功消息。
spring.rabbitmq.publisher-returns=true
需要在配置文件种开启返回回调
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.PostConstruct; @RestController @RequestMapping("/confirm") @Slf4j public class Producer { public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange"; @Autowired RabbitTemplate rabbitTemplate; @GetMapping("/sendMessage/{message}") public void sendMessage(@PathVariable String message){ CorrelationData correlationData1 = new CorrelationData("1"); String routingKey1 = "key1"; rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey1,message + routingKey1,correlationData1); log.info("发送得内容是:{}",message + routingKey1); CorrelationData correlationData2 = new CorrelationData("2"); String routingKey2 = "key2"; rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey2,message + routingKey2,correlationData2); log.info("发送得内容是:{}",message + routingKey2); } }
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; /** * 回调接口 */ @Component @Slf4j public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback { @Autowired RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnsCallback(this); } /** * 交换机接受失败后进行回调 * 1. 保存消息的ID及相关消息 * 2. 是否接收成功 * 3. 接受失败的原因 * @param correlationData * @param b * @param s */ @Override public void confirm(CorrelationData correlationData, boolean b, String s) { String id = correlationData != null ? correlationData.getId() : ""; if(b == true){ log.info("交换机已经收到id为:{}的消息",id); }else{ log.info("交换机还未收到id为:{}消息,由于原因:{}",id,s); } } @Override public void returnedMessage(ReturnedMessage returnedMessage) { Message message = returnedMessage.getMessage(); String exchange = returnedMessage.getExchange(); String routingKey = returnedMessage.getRoutingKey(); String replyText = returnedMessage.getReplyText(); log.error("消息{},被交换机{}退回,回退原因:{},路由Key:{}",new String(message.getBody()),exchange,replyText,routingKey); } }
其他类的代码与上一小节案例相同
ID
为2
的消息由于RoutingKey
不可路由,但是还是被回调函数处理了。
这里我们新增了备份交换机、备份队列、报警队列。它们绑定关系如图所示。如果确认交换机成功接收的消息无法路由到相应的队列,就会被确认交换机发送给备份交换机。
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class ConfirmConfig { public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange"; public static final String CONFIRM_QUEUE_NAME = "confirm_queue"; public static final String BACKUP_EXCHANGE_NAME = "backup_exchange"; public static final String BACKUP_QUEUE_NAME = "backup_queue"; public static final String WARNING_QUEUE_NAME = "warning_queue"; public static final String CONFIRM_ROUTING_KEY = "key1"; @Bean("confirmExchange") public DirectExchange confirmExchange(){ return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true) .withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME).build(); } @Bean("confirmQueue") public Queue confirmQueue(){ return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } @Bean("backupExchange") public FanoutExchange backupExchange(){ return new FanoutExchange(BACKUP_EXCHANGE_NAME); } @Bean("backupQueue") public Queue backupQueue(){ return QueueBuilder.durable(BACKUP_QUEUE_NAME).build(); } @Bean("warningQueue") public Queue warningQueue(){ return QueueBuilder.durable(WARNING_QUEUE_NAME).build(); } @Bean public Binding queueBindingExchange(@Qualifier("confirmExchange") DirectExchange confirmExchange, @Qualifier("confirmQueue") Queue confirmQueue){ return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY); } @Bean public Binding queueBindingExchange1(@Qualifier("backupExchange") FanoutExchange backupExchange, @Qualifier("backupQueue") Queue backupQueue){ return BindingBuilder.bind(backupQueue).to(backupExchange); } @Bean public Binding queueBindingExchange2(@Qualifier("backupExchange") FanoutExchange backupExchange, @Qualifier("warningQueue") Queue warningQueue){ return BindingBuilder.bind(warningQueue).to(backupExchange); } }
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @Slf4j public class WarningConsumer { public static final String WARNING_QUEUE_NAME = "warning_queue"; @RabbitListener(queues = WARNING_QUEUE_NAME) public void receiveMessage(Message message){ String msg = new String(message.getBody()); log.info("报警发现不可路由的消息内容为:{}",msg); } }
mandatory参数与备份交换机可以一起使用的时候,如果两者同时开启,备份交换机优先级高。