1.消息发布确认的方案
2.消息的回退
3.备份交换机
1.消息发布确认的方案在前面的文章中,系统学习消息队列——RabbitMQ的消息发布确认,我们一定程度上学习了消息的发布确认的基础,但是在生产环境中,由于RabbitMq的重启,RabbitMQ在重启过程中投递失败,导致消息丢失,需要手动处理和恢复。那么我们该如何保证当RabbitMQ不可用的时候,消息的稳定投递呢?
我们采取下面的方案:
我们将要发送消息做一个持久化,发送消息的时候,我们持久化一份到数据库或者缓存中,当发送消息失败的时候,我们进行一次重新发送。所以在发送消息的时候,我们要进行代码业务逻辑的处理:
yml:
server:port:11000spring:rabbitmq:host:127.0.0.1port:5672username:guestpassword:guestpublisher-confirm-type:correlated
publisher-confirm-type这个参数一共有三种配置方法:
NONE:禁用发布确认,是默认值。CORRELATED:发布消息后,交换机会触发回调方法。SIMPLE:有两种效果:1:和CORRELATED一样会触发回调方法2:发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker。
回调方法类:
@Component@Slf4jpublic class MyCallBack implements RabbitTemplate.ConfirmCallback {/** * 交换机是否收到消息的回调方法 * CorrelationData 消息相关数据 * ack 交换机是否收到消息 * cause 交换机未收到消息的原因 */@Override public void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info("交换机已经收到 id 为:{}的消息", correlationData.getId()); } else {log.info("交换机还未收到 id 为:{}消息,由于原因:{}", correlationData.getId(), cause); } }}
队列配置类:
@ConfigurationpublicclassConfirmQueueConfig {publicstatic final StringCONFIRM_EXCHANGE_NAME = "confirm.exchange";publicstatic final StringCONFIRM_QUEUE_NAME = "confirm.queue";@AutowiredprivateMyCallBack myCallBack;@AutowiredprivateRabbitTemplate rabbitTemplate;//依赖注入 rabbitTemplate 之后再设置它的回调对象@PostConstructpublicvoidinit() { rabbitTemplate.setConfirmCallback(myCallBack); }//声明业务 Exchange@Bean("confirmExchange")publicDirectExchangeconfirmExchange(){returnnewDirectExchange(CONFIRM_EXCHANGE_NAME); }// 声明确认队列@Bean("confirmQueue")publicQueueconfirmQueue(){returnQueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); }// 声明确认队列绑定关系@BeanpublicBindingqueueBinding(@Qualifier("confirmQueue") Queue queue,@Qualifier("confirmExchange") DirectExchange exchange){returnBindingBuilder.bind(queue).to(exchange).with("key1"); }}
生产者:
@RestController@RequestMapping("/confirm")@Slf4jpublic class ProducerController { public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange"; @Autowiredprivate RabbitTemplate rabbitTemplate; @GetMapping("sendMessage/{message}") public void sendMessage(@PathVariable String message) {//指定消息 id 为 1 CorrelationData correlationData1 = new CorrelationData("1");//这个key1是有交换机的key,会发送成功 String routingKey = "key1"; rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData1);//这个交换机不存在,会发送失败 CorrelationData correlationData2 = new CorrelationData("2"); rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME+"1", routingKey, message + routingKey, correlationData2); CorrelationData correlationData3 = new CorrelationData("3");//这个key2是没有交换机的key,会发送失败 routingKey = "key2"; rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData3); log.info("发送消息内容:{}", message); }}
消费者:
@Component@Slf4jpublicclassConfirmConsumer {publicstatic final StringCONFIRM_QUEUE_NAME = "confirm.queue";@RabbitListener(queues =CONFIRM_QUEUE_NAME)publicvoidreceiveMsg(Message message){String msg=newString(message.getBody()); log.info("接受到队列 confirm.queue 消息:{}",msg); }}
我们发送信息:http://localhost:11000/confir...可以啊
我们发送三条消息:一条是有交换机有队列的消息二条是没有交换机的消息三条是有交换机没有队列的消息
结果如下:
我们可以看出:第一条消息正常消费第二条消息找不到交换机,抛异常了第三条消息绑定键找不到队列,这条消息直接被抛弃了
2.消息的回退
我们发现第三条消息的反馈并不是很好,在仅仅开启了生产者确认机制的情况下,交换机收到消息后,会直接给生产者发送确认消息,如果该消息不可路由,那么消息会直接被抛弃,此时生产者是不知道这条消息被丢弃的。所以我们这里要引入消息的回退机制,如果消息不能路由到队列,就会有一个通知,通过设置mandatory参数可以将不可抵达队列的消息返回给生产者。
回调处理逻辑:
@Component@Slf4jpublicclassMyCallBackimplementsRabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {/** * 交换机是否收到消息的回调方法 * CorrelationData 消息相关数据 * ack 交换机是否收到消息 * cause 交换机未收到消息的原因 */@Overridepublicvoidconfirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) { log.info("交换机已经收到 id 为:{}的消息", correlationData.getId()); } else { log.info("交换机还未收到 id 为:{}消息,由于原因:{}", correlationData.getId(), cause); } }@OverridepublicvoidreturnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.error(" 消 息 {}, 被 交 换 机 {} 退 回 , 退 回 原 因 :{}, 路 由 key:{}",newString(message.getBody()), exchange, replyText, routingKey); }}
修改一下前面那个配置类的方法:
//依赖注入 rabbitTemplate 之后再设置它的回调对象 @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(myCallBack);/** * true: * 交换机无法将消息进行路由时,会将该消息返回给生产者 * false: * 如果发现消息无法进行路由,则直接丢弃 */ rabbitTemplate.setMandatory(true);//设置回退消息交给谁处理 rabbitTemplate.setReturnCallback(myCallBack); }
继续发送消息:http://localhost:11000/confir...可以啊
我们发现,交换机路由不到的队列,也会有反馈了:
3.备份交换机
有了前面那个mandatory参数和回退消息,我们对于无法投递到目的地的消息,可以进行处理了。但是我们在处理这些日志的时候,顶多就是打印了一下日志,然后触发报警,接着手动进行处理。通过日志收集这些无法到达路由的消息非常不优雅,而且手动复制日志非常容易出错。而且mandatory参数设置,还得增加配置类,增加了复杂性。
如果我们不想丢失消息,又不想增加配置类,该怎么做呢?在前面学习死信队列的时候系统学习消息队列——RabbitMQ的死信队列,我们可以为队列设置死信交换机来处理那些失败的消息。
RabbitMQ中有备份交换机这种存在,它就像死信交换机一样,可以用来处理那些路由不到的消息,当交换机接收到一份不可路由的消息的时候,我们就会把这条消息转发到备份交换机中,由备份交换机进行统一处理。

@ConfigurationpublicclassConfirmQueueConfig {publicstatic final StringCONFIRM_EXCHANGE_NAME = "confirm.exchange";publicstatic final StringCONFIRM_QUEUE_NAME = "confirm.queue";publicstatic final StringBACKUP_EXCHANGE_NAME = "backup.exchange";publicstatic final StringBACKUP_QUEUE_NAME = "backup.queue";publicstatic final StringWARNING_QUEUE_NAME = "warning.queue";// 声明确认队列@Bean("confirmQueue")publicQueueconfirmQueue(){returnQueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); }//声明确认队列绑定关系@BeanpublicBindingqueueBinding(@Qualifier("confirmQueue") Queue queue,@Qualifier("confirmExchange") DirectExchange exchange){returnBindingBuilder.bind(queue).to(exchange).with("key1"); }//声明备份 Exchange@Bean("backupExchange")publicFanoutExchangebackupExchange(){returnnewFanoutExchange(BACKUP_EXCHANGE_NAME); }//声明确认 Exchange 交换机的备份交换机@Bean("confirmExchange")publicDirectExchangeconfirmExchange(){//设置该交换机的备份交换机ExchangeBuilder exchangeBuilder =ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME) .durable(true) .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME); return (DirectExchange)exchangeBuilder.build(); }// 声明警告队列@Bean("warningQueue")publicQueuewarningQueue(){returnQueueBuilder.durable(WARNING_QUEUE_NAME).build(); }// 声明报警队列绑定关系@BeanpublicBindingwarningBinding(@Qualifier("warningQueue") Queue queue,@Qualifier("backupExchange") FanoutExchange backupExchange){returnBindingBuilder.bind(queue).to(backupExchange); }// 声明备份队列@Bean("backQueue")publicQueuebackQueue(){returnQueueBuilder.durable(BACKUP_QUEUE_NAME).build(); }// 声明备份队列绑定关系@BeanpublicBindingbackupBinding(@Qualifier("backQueue") Queue queue,@Qualifier("backupExchange") FanoutExchange backupExchange){returnBindingBuilder.bind(queue).to(backupExchange); }}

我们发现,不可路由的消息被发现后,就被送到了报警的备份队列里面。
而且这种配置的优先级,比mandatory参数更高。

版权声明:本文内容来自个人博客segmentfault:苏凌峰,遵循CC 4.0 BY-SA版权协议上原文接及本声明。
本作品采用知识共享署名-非商业性使用-禁止演绎 2.5 中国大陆许可协议进行可。
原文链接:https://segmentfault.com/a/1190000042010063
如有涉及到侵权,请联系,将立即予以删除处理。
在此特别鸣谢原作者的创作。
此篇文章的所有版权归原作者所有,与本公众号无关,商业转载建议请联系原作者,