系统学习消息队列——RabbitMQ的延时队列

艺帆风顺 发布于 2025-04-02 16 次阅读


1.延时队列的概念

2.延时队列适用场景

3.RabbitMQ中的两种TTL

4.队列的TTL

5.消息的TTL

6.RabbitMQ插件实现延时队列

1.延时队列的概念延时队列,顾名思义,就是消息等一段时间再发送出去,最重要的属性是延时属性,它侧重于延时队列中的消息在指定的时间到了之后再取出处理。

2.延时队列适用场景

2.1)用户给自己设置了一个提醒,到点推送消息。2.2)订单在结束之后自动把钱打到买家账户。2.3)订单在几分钟内未支付自动取消。2.4)用户发起退款,一天之内没处理就自动推送给相关人员。

以上这些场景都有一个特点,那就是在某个业务逻辑完成后,在指定的特定时间完成某一项业务逻辑的处理。如:用户给自己设置了一个提醒,到点推送消息。其实如果简单地来说,是不是使用定时任务处理,每分钟甚至每秒钟轮询所有数据,符合条件的推送就行了?如果在数据量少的情况,且对于时间处理不是特别严格,可以拥有一定的延时的情况,是可以这么处理的,但是一旦数据量变得非常大,并且时效性非常强的业务场景,定时任务就会处理不过来,可能无法在几秒钟或者一分钟大量处理数据,同时也会给数据库很大压力,既不满足业务要求,而且性能也很低下。

3.RabbitMQ中的两种TTLTTL是RabbitMQ中,一个消息或者一个队列的属性,表明一条消息或者队列中所有的消息的最大存活时间,单位是毫秒。

如果一条消息设置了TTL属性或者消息进入了设置TTL属性的队列,在这个时间到了的情况下还没被消费,那么就会成为死信。如果同时配置了两个TTL属性,那么较小的值将会被使用。

4.队列的TTL

pom:

        dependency>            groupId>org.projectlombokgroupId>            artifactId>lombokartifactId>        dependency>        dependency>            groupId>com.rabbitmqgroupId>            artifactId>amqp-clientartifactId>            version>5.8.0version>        dependency>        dependency>            groupId>org.springframework.bootgroupId>            artifactId>spring-boot-starter-amqpartifactId>        dependency>

yml:

server:port:11000spring:rabbitmq:host:127.0.0.1port:5672username:guestpassword:guest

在写队列之前,我们先画一张队列与交换机的关系图,就可以更顺利地编写代码,我们这里要写两个有过期时间的队列,过期时间不同。

这次我们不使用原生API,我们使用springBoot整合好的API:

声明队列和交换机:

@ConfigurationpublicclassQueueConfig {privateString xExchange = "x";privateString queueA = "QA";privateString queueB = "QB";privateString yDeadLetterExchange = "Y";privateString deadLetterQueue = "QD";@Bean("xExchange")publicDirectExchangexExchange(){returnnewDirectExchange(xExchange);    }@Bean("yExchange")publicDirectExchangeyExchange(){returnnewDirectExchange(yDeadLetterExchange);    }@Bean("queueA")publicQueuequeueA(){MapString, Object> args = newHashMap(3);//声明当前队列绑定的死信交换机        args.put("x-dead-letter-exchange", yDeadLetterExchange);//声明当前队列的死信路由 key        args.put("x-dead-letter-routing-key", "YD");//声明队列的 TTL        args.put("x-message-ttl", 10000);returnQueueBuilder.durable(queueA).withArguments(args).build();    }@BeanpublicBindingqueueABindX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){returnBindingBuilder.bind(queueA).to(xExchange).with("XA");    }@Bean("queueB")publicQueuequeueB(){MapString, Object> args = newHashMap(3);//声明当前队列绑定的死信交换机        args.put("x-dead-letter-exchange", yDeadLetterExchange);//声明当前队列的死信路由 key        args.put("x-dead-letter-routing-key", "YD");//声明队列的 TTL        args.put("x-message-ttl", 40000);returnQueueBuilder.durable(queueB).withArguments(args).build();    }@BeanpublicBindingqueueBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange){returnBindingBuilder.bind(queueB).to(xExchange).with("XB");    }//声明死信队列 QD@Bean("queueD")publicQueuequeueD(){returnnewQueue(deadLetterQueue);    }//声明死信队列 QD 绑定关系@BeanpublicBindingdeadLetterBindingQAD(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){returnBindingBuilder.bind(queueD).to(yExchange).with("YD");    }}

消息生产者代码:

@Slf4j@RequestMapping("/ttl")@RestControllerpublic class SendMsgController {@Autowired    private RabbitTemplate rabbitTemplate;@GetMapping("/sendMsg/{message}")    public void sendMsg(@PathVariable String message) {log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date(), message);rabbitTemplate.convertAndSend("x", "XA", "消息来自 ttl 为 10S 的队列: " + message);rabbitTemplate.convertAndSend("x", "XB", "消息来自 ttl 为 40S 的队列: " + message);    }}

消息消费者代码:

@Slf4j@Componentpublic class DeadLetterQueueConsumer {    @RabbitListener(queues = "QD")    public void receiveD(Message message, Channel channel) throws IOException {        String msg = new String(message.getBody());        log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);    }}

我们发送两条消息:http://localhost:11000/ttl/se...

我们发现,第一条消息在10s之后变成了死信消息,第二条消息在40s之后变成了死信消息,然后被消费,一个延时队列就完成了。

但是这种架构,扩展性不强。如果每次增加一个新的需求,有不同的延时时间要求,那么就要增加一个队列。

5.消息的TTL为了解决上面的问题,我们就需要给消息设置一个过期时间,这样就可以防止队列的无序扩张,消息到期后自动发出去就可以了。

声明队列:

privateString queueC = "QC";//声明队列 C 死信交换机@Bean("queueC")publicQueuequeueC(){MapString, Object> args = newHashMap(3);//声明当前队列绑定的死信交换机        args.put("x-dead-letter-exchange", yDeadLetterExchange);//声明当前队列的死信路由 key        args.put("x-dead-letter-routing-key", "YD");//没有声明 TTL 属性returnQueueBuilder.durable(queueC).withArguments(args).build();    }@BeanpublicBindingqueueCBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange){returnBindingBuilder.bind(queueC).to(xExchange).with("XC");    }

生产者:

@GetMapping("sendExpirationMsg/{message}/{ttlTime}")    public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) {rabbitTemplate.convertAndSend("x", "XC", message,                correlationData -> {correlationData.getMessageProperties().setExpiration(ttlTime);returncorrelationData;                });log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(), ttlTime, message);    }

发送请求:

http://localhost:11000/ttl/se...可以啊10000/10000

http://localhost:11000/ttl/se...可以啊20000/20000

可以看出,等待消息过期后,就会自动把消息发出,不过这么做有一个bug:

RabbitMQ只会检查第一个消息是否过期,如果过期则转发至死信交换机,如果第一个消息的过期时间很长很长,而第二个消息的过期时间很短很短,那么在第一个消息发送成功之前,第二个消息不会先得到执行。

6.RabbitMQ插件实现延时队列

上面提到的问题,如果我们的队列受困于第一条消息的过期时间,那么这个消息并不是一个完整的消息队列,那我们该如何解决呢?

我们需要安装一个延时队列的插件:

https://github.com/rabbitmq/r...

到这个网站去下载,要注意一下rabbitMQ的版本,我的是3.8.3,所以要下载对应版本,放在rabbitMQ的plugIn文件夹下

在该文件夹下执行rabbitmq-plugins enable rabbitmq_delayed_message_exchange,或者重启服务,就可以拥有插件延时队列了。

架构图:

声明队列:

publicstatic final StringDELAYED_QUEUE_NAME = "delayed.queue";publicstatic final StringDELAYED_EXCHANGE_NAME = "delayed.exchange";publicstatic final StringDELAYED_ROUTING_KEY = "delayed.routingkey";@BeanpublicQueuedelayedQueue() {returnnewQueue(DELAYED_QUEUE_NAME);    }@BeanpublicCustomExchangedelayedExchange() {MapString, Object> args = newHashMap();//自定义交换机的类型        args.put("x-delayed-type", "direct");returnnewCustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);    }@BeanpublicBindingbindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,@Qualifier("delayedExchange") CustomExchange                                               delayedExchange) {returnBindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();    }

生产者:

publicstatic final StringDELAYED_EXCHANGE_NAME = "delayed.exchange";publicstatic final StringDELAYED_ROUTING_KEY = "delayed.routingkey";@GetMapping("sendDelayMsg/{message}/{delayTime}")publicvoidsendMsg(@PathVariableString message, @PathVariable Integer delayTime) {        rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message,                correlationData -> {                    correlationData.getMessageProperties().setDelay(delayTime);return correlationData;                });        log.info(" 当 前 时 间 : {}, 发 送 一 条 延 迟 {} 毫秒的信息给队列 delayed.queue:{}", newDate(), delayTime, message);    }

消费者:

publicstatic final StringDELAYED_QUEUE_NAME = "delayed.queue";@RabbitListener(queues = DELAYED_QUEUE_NAME)publicvoidreceiveDelayedQueue(Message message) {String msg = newString(message.getBody());        log.info("当前时间:{},收到延时队列的消息:{}", newDate().toString(), msg);    }

我们先发送一个过期时间比较长的消息,再发送一条过期时间比较短的消息:

http://localhost:11000/ttl/se...可以啊20000/20000

http://localhost:11000/ttl/se...可以啊10000/10000

我们发现消息的消费时符合我们的预期的,使用插件完美解决了消息发送的问题。

    版权声明:本文内容来自个人博客segmentfault:苏凌峰,遵循CC 4.0 BY-SA版权协议上原文接及本声明。本作品采用知识共享署名-非商业性使用-禁止演绎 2.5 中国大陆许可协议进行可。原文链接:https://segmentfault.com/a/1190000042001616如有涉及到侵权,请联系,将立即予以删除处理。在此特别鸣谢原作者的创作。此篇文章的所有版权归原作者所有,与本公众号无关,商业转载建议请联系原作者,非商业转载请注明出处。