时间:2022-11-05 12:22:14 | 栏目:JAVA代码 | 点击:次
上一篇博客 Springboot——整合RabbitMq测试TTL中,针对设置单个消息期限
或者整个队列消息期限
,进行了一些配置和说明。同时也都列举了一些区别关系。
但考虑过一个问题了没有?
不管是设置哪种方式,如果消息期限到了,队列都会将该消息进行丢弃处理。
这么做合适么?
假设是某个设备的重要信息
,或者某个重要的订单信息
,因为规定时间内未被及时消费就将其舍弃
,是否会造成很严重的后果?
有人会说,设置消息永不过期!等着消费者能够成功监听到该队列,将消息消费不就可以了嘛!
但这里需要考虑另外一个问题:
每个服务器的容量是有上限的!如果消息一直存在队列,如果一直不会被消费,岂不是很占用服务器资源?
如何解决这个问题,就是今天这篇文章需要说到的死信队列
。
说道死信
,可能大部分观众大姥爷会有懵逼的想法,什么是死信?
死信队列,俗称
DLX
,翻译过来的名称为Dead Letter Exchange
死信交换机
。
当消息限定时间内未被消费,成为
Dead Message
后,可以被重新发送
到另一个交换机
中,发挥其应有的价值!
需要测试死信队列
,则需要先梳理整体的思路,如可以采取如下方式进行配置:
从上面的逻辑图中,可以发现大致的思路:
1、消息队列分为正常交换机
、正常消息队列
;以及死信交换机
和死信队列
。
2、正常队列针对死信信息
,需要将数据
重新
发送至死信交换机
中。
结合上面的思路,编写具体的配置类。如下所示:
package cn.linkpower.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 死信队列配置 */ @Configuration public class DeadMsgMqConfig { // 定义正常交换机和正常队列信息(交换机名、队列名、路由key) public static final String queue_name = "xj_natural_queue"; public static final String exchange_name = "xj_natural_exchange"; public static final String routing_key = "xj_natural_routingKey"; // 定义死信交换机名、死信队列名、路由key public static final String queue_name_dead = "xj_dead_queue"; public static final String exchange_name_dead = "xj_dead_exchange"; public static final String routing_key_dead = "xj_dead_routingKey"; /** * 设置正常的消息队列; * 正常的消息队列具备以下几种功能: * 1、消息正常消费,需要绑定对应的消费者(这里为了测试死信,不创建消费者) * 2、当消息失效后,需要将指定的消息发送至 死信交换机 中 * @return */ @Bean(value = "getNaturalQueue") public Queue getNaturalQueue(){ return QueueBuilder.durable(queue_name) // 正常的队列,在消息失效后,需要将消息丢入 死信 交换机中 // 这里只需要针对名称进行绑定 .withArgument("x-dead-letter-exchange",exchange_name_dead) // 丢入 死信交换机,需要设定指定的 routingkey .withArgument("x-dead-letter-routing-key",routing_key_dead) // 设置正常队列中消息的存活时间为 10s,当然也可以针对单个消息进行设定不同的过期时间 .withArgument("x-message-ttl",10000) // 设定当前队列中,允许存放的最大消息数目 .withArgument("x-max-length",10) .build(); } * 设定正常的消息交换机 @Bean(value = "getNaturalExchange") public Exchange getNaturalExchange(){ // 这里为了测试,采取 direct exchange return ExchangeBuilder.directExchange(exchange_name) .durable(true) // 设定持久化 * 将正常的消息交换机和正常的消息队列进行绑定 * @param queue * @param directExchange @Bean public Binding bindNaturalExchangeAndQueue( @Qualifier(value = "getNaturalQueue") Queue queue, @Qualifier(value = "getNaturalExchange") Exchange directExchange ){ return BindingBuilder // 绑定消息队列 .bind(queue) // 至指定的消息交换机 .to(directExchange) // 匹配 routingkey .with(routing_key) // 无参数,不加会报错提示 .noargs(); * 定义死信队列 @Bean(value = "getDealQueue") public Queue getDealQueue(){ return QueueBuilder.durable(queue_name_dead).build(); * 定义死信交换机 @Bean(value = "getDeadExchange") public Exchange getDeadExchange(){ return ExchangeBuilder.directExchange(exchange_name_dead).durable(true).build(); * 将死信交换机和死信队列进行绑定 * @param deadQueue * @param directDeadExchange public Binding bindDeadExchangeAndQueue( @Qualifier(value = "getDealQueue") Queue deadQueue, @Qualifier(value = "getDeadExchange") Exchange directDeadExchange return BindingBuilder.bind(deadQueue).to(directDeadExchange).with(routing_key_dead).noargs(); }
默认采取rabbitTemplate.convertAndSend
方法,进行消息的发送处理。但为了保证消息生产者
能够成功将数据发送至正常交换机
,同时为了保证正常交换机
能够将数据信息,推送至正常消息队列
。需要对其增加监听。
package cn.linkpower.service; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Slf4j @Component public class RabbitmqService implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; /** * 直接发送消息 * @param exchange * @param routingKey * @param msg */ public void sendMessage(String exchange,String routingKey,Object msg) { // 设置交换机处理失败消息的模式 true 表示消息由交换机 到达不了队列时,会将消息重新返回给生产者 // 如果不设置这个指令,则交换机向队列推送消息失败后,不会触发 setReturnCallback rabbitTemplate.setMandatory(true); //消息消费者确认收到消息后,手动ack回执 rabbitTemplate.setConfirmCallback(this); // return 配置 rabbitTemplate.setReturnCallback(this); //发送消息 rabbitTemplate.convertAndSend(exchange,routingKey,msg); } * 交换机并未将数据丢入指定的队列中时,触发 * channel.basicPublish(exchange_name,next.getKey(), true, properties,next.getValue().getBytes()); * 参数三:true 表示如果消息无法正常投递,则return给生产者 ;false 表示直接丢弃 * @param message 消息对象 * @param replyCode 错误码 * @param replyText 错误信息 * @param exchange 交换机 * @param routingKey 路由键 @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("---- returnedMessage ----replyCode="+replyCode+" replyText="+replyText+" "); * 消息生产者发送消息至交换机时触发,用于判断交换机是否成功收到消息 * @param correlationData 相关配置信息 * @param ack exchange 交换机,判断交换机是否成功收到消息 true 表示交换机收到 * @param cause 失败原因 public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("---- confirm ----ack="+ack+" cause="+String.valueOf(cause)); log.info("correlationData -->"+correlationData.toString()); if(ack){ // 交换机接收到 log.info("---- confirm ----ack==true cause="+cause); }else{ // 没有接收到 log.info("---- confirm ----ack==false cause="+cause); } }
既然说到测试,那么需要编写一个测试类
,能够将产生的消息,推送至指定的正常消息交换机
中去。
package cn.linkpower.controller; import cn.linkpower.config.DeadMsgMqConfig; import cn.linkpower.service.RabbitmqService; import lombok.extern.slf4j.Slf4j; import org.apache.tomcat.jni.Time; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.concurrent.TimeUnit; @Slf4j @RestController public class DeadMsgController { @Autowired private RabbitmqService rabbitmqService; @RequestMapping("/deadMsgTest") public String deadMsgTest() throws InterruptedException { // 向正常的消息队列中丢数据,测试限定时间未消费后,死信队列的情况 // 配置文件中,针对于正常队列而言,设置有10条上限大小 for (int i = 0; i < 20; i++) { String msg = "dead msg test "+i; log.info("发送消息,消息信息为:{}",msg); // 向正常的消息交换机中传递数据 rabbitmqService.sendMessage(DeadMsgMqConfig.exchange_name,DeadMsgMqConfig.routing_key,msg); TimeUnit.SECONDS.sleep(2); } return "ok"; } }
启动项目,访问指定的链接,进行数据产生和将消息发送交换机操作:
http://localhost/deadMsgTest
控制台部分日志展示:
1、队列消息长度到达限制;
2、消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false
;
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
3、原队列存在消息过期设置,消息到达超时时间未被消费;
此处只是为了进行配置和测试需要,暂未定义任何正常消息队列消费者
和死信消息队列消费者
信息。
1、死信交换机和死信队列和普通的没有区别
2、当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列