1.死信队列的概念
2.死信产生的原因
3.死信队列的架构图
4.消息过期的死信
5.队列到达最大长度的死信
6.消息被拒绝的死信
1.死信队列的概念死信,就是无法被消费的消息。生产者把消息发送个broker,但是由于某些原因,queue中的消息无法被消费,这样的消息被称之为死信,而这种消息的后续处理,有专门的队列处理这类消息,称之为死信队列。
比如说订单下单后十五分钟没有支付自动取消,就可以使用死信队列来完成。
2.死信产生的原因
一般会有下面三种原因:
消息TTL时间过期。
队列到达最大长度。
消息被拒绝。
3.死信队列的架构图

4.消息过期的死信
消费者1:
publicclassConsumer1 {privatefinalstaticStringEXCHANGE_NAME="test_exchange";privatefinalstaticStringNORMAL_QUEUE="normal_queue";privatefinalstaticStringDEAD_EXCHANGE_NAME="dead_test";privatefinalstaticStringDEAD_QUEUE="dead_queue";publicstaticvoidmain(String[] args) throws Exception {Channelchannel= RabbitMqUtils.getChannel();//声明普通交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); Map params = newHashMap();//正常队列设置死信交换机 参数 key 是固定值 params.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);//正常队列设置死信 routing-key 参数 key 是固定值 params.put("x-dead-letter-routing-key", "dead"); channel.queueDeclare(NORMAL_QUEUE,true,false,false,params); channel.queueBind(NORMAL_QUEUE,EXCHANGE_NAME,"test"); System.out.println("消费者1等待接收消息.........");//消息如何进行消费的业务逻辑DeliverCallbackdeliverCallback= (consumerTag, delivery) -> {Stringmessage=newString(delivery.getBody()); System.out.println("消费者1控制台接收并打印消息:"+message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); };//取消消费的一个回调接口 如在消费的时候队列被删除掉了CancelCallbackcancelCallback= (consumerTag) -> { System.out.println("消息消费被中断"); }; channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback); }}
消费者2:
public class Consumer2 {private final static String DEAD_EXCHANGE_NAME = "dead_test";private final static String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel();//声明交换机 channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare(DEAD_QUEUE,true,false,false,null); channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE_NAME,"dead"); System.out.println("死信队列等待接收消息.........");//消息如何进行消费的业务逻辑 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); System.out.println("死信队列控制台接收并打印消息:"+message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); };//取消消费的一个回调接口 如在消费的时候队列被删除掉了 CancelCallback cancelCallback = (consumerTag) -> { System.out.println("死信消息消费被中断"); }; channel.basicConsume(DEAD_QUEUE, false, deliverCallback, cancelCallback); }}
生产者:
public class Producer {private final static String EXCHANGE_NAME = "test_exchange"; public static void main(String[] args) throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {/** * 声明一个 exchange * 1.exchange 的名称 * 2.exchange 的类型 */ channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//设置消息的三十秒过期时间 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("30000").build(); Scanner sc = new Scanner(System.in); System.out.println("请输入信息");while (sc.hasNext()) { String message = sc.nextLine(); channel.basicPublish(EXCHANGE_NAME, "test", properties, message.getBytes("UTF-8")); System.out.println("生产者发出消息" + message); } } }}
生产者先发送三条消息,发现正常队列都收到了消息:
然后把消费者1关掉,模拟收不到消息,生产者发现消息过期了,就会把消息发到死信队列里面:
5.队列到达最大长度的死信
先把之前创建的队列和交换机都删掉:
消费者1:
publicclassConsumer1 {privatefinalstaticStringEXCHANGE_NAME="test_exchange";privatefinalstaticStringNORMAL_QUEUE="normal_queue";privatefinalstaticStringDEAD_EXCHANGE_NAME="dead_test";privatefinalstaticStringDEAD_QUEUE="dead_queue";publicstaticvoidmain(String[] args) throws Exception {Channelchannel= RabbitMqUtils.getChannel();//声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); Map params = newHashMap();//正常队列设置死信交换机 参数 key 是固定值 params.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);//正常队列设置死信 routing-key 参数 key 是固定值 params.put("x-dead-letter-routing-key", "dead");//设置队列最大长度 params.put("x-max-length", 6); channel.queueDeclare(NORMAL_QUEUE,true,false,false,params); channel.queueBind(NORMAL_QUEUE,EXCHANGE_NAME,"test"); System.out.println("消费者1等待接收消息.........");//消息如何进行消费的业务逻辑DeliverCallbackdeliverCallback= (consumerTag, delivery) -> {Stringmessage=newString(delivery.getBody()); System.out.println("消费者1控制台接收并打印消息:"+message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); };//取消消费的一个回调接口 如在消费的时候队列被删除掉了CancelCallbackcancelCallback= (consumerTag) -> { System.out.println("消息消费被中断"); }; channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback); }}
消费者2:
public class Consumer2 {private final static String DEAD_EXCHANGE_NAME = "dead_test";private final static String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel();//声明交换机 channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare(DEAD_QUEUE,true,false,false,null); channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE_NAME,"dead"); System.out.println("死信队列等待接收消息.........");//消息如何进行消费的业务逻辑 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); System.out.println("死信队列控制台接收并打印消息:"+message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); };//取消消费的一个回调接口 如在消费的时候队列被删除掉了 CancelCallback cancelCallback = (consumerTag) -> { System.out.println("死信消息消费被中断"); }; channel.basicConsume(DEAD_QUEUE, false, deliverCallback, cancelCallback); }}
生产者:
publicclassProducer {privatefinalstaticStringEXCHANGE_NAME="test_exchange";publicstaticvoidmain(String[] args) throws Exception {try (Channelchannel= RabbitMqUtils.getChannel()) {/** * 声明一个 exchange * 1.exchange 的名称 * 2.exchange 的类型 */ channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//设置消息的三十秒过期时间 AMQP.BasicPropertiesproperties=newAMQP.BasicProperties().builder().expiration("30000").build(); System.out.println("请输入信息");//自动发信息,发送速度比较快,可造成队列到达最大长度for (inti=0; i 1000; i++) {Stringmessage= i + ""; channel.basicPublish(EXCHANGE_NAME, "test", properties, message.getBytes("UTF-8")); System.out.println("生产者发出消息" + message); } } }}
生产者发送消息:
6.消息被拒绝的死信
先把之前创建的队列和交换机都删掉:
消费者1:
public class Consumer1 {private final static String EXCHANGE_NAME = "test_exchange";private final static String NORMAL_QUEUE = "normal_queue";private final static String DEAD_EXCHANGE_NAME = "dead_test";private final static String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel();//声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); Map params = new HashMap();//正常队列设置死信交换机 参数 key 是固定值 params.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);//正常队列设置死信 routing-key 参数 key 是固定值 params.put("x-dead-letter-routing-key", "dead"); channel.queueDeclare(NORMAL_QUEUE,true,false,false,params); channel.queueBind(NORMAL_QUEUE,EXCHANGE_NAME,"test"); System.out.println("消费者1等待接收消息.........");//消息如何进行消费的业务逻辑 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody());//假设现在三位数的消息,我们都设置为消费失败,退回去 requeue为true ,如果为false,直接进入死信队列if(message.length() >= 3){ channel.basicReject(delivery.getEnvelope().getDeliveryTag(),true); }else{ System.out.println("消费者1控制台接收并打印消息:"+message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); } };//取消消费的一个回调接口 如在消费的时候队列被删除掉了 CancelCallback cancelCallback = (consumerTag) -> { System.out.println("消息消费被中断"); }; channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback); }}
消费者2:
public class Consumer2 {private final static String DEAD_EXCHANGE_NAME = "dead_test";private final static String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel();//声明交换机 channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare(DEAD_QUEUE,true,false,false,null); channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE_NAME,"dead"); System.out.println("死信队列等待接收消息.........");//消息如何进行消费的业务逻辑 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); System.out.println("死信队列控制台接收并打印消息:"+message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); };//取消消费的一个回调接口 如在消费的时候队列被删除掉了 CancelCallback cancelCallback = (consumerTag) -> { System.out.println("死信消息消费被中断"); }; channel.basicConsume(DEAD_QUEUE, false, deliverCallback, cancelCallback); }}
生产者:
publicclassProducer {privatefinalstaticStringEXCHANGE_NAME="test_exchange";publicstaticvoidmain(String[] args) throws Exception {try (Channelchannel= RabbitMqUtils.getChannel()) {/** * 声明一个 exchange * 1.exchange 的名称 * 2.exchange 的类型 */ channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//设置消息的三十秒过期时间 AMQP.BasicPropertiesproperties=newAMQP.BasicProperties().builder().expiration("30000").build(); System.out.println("请输入信息");for (inti=0; i 1000; i++) {Stringmessage= i + ""; channel.basicPublish(EXCHANGE_NAME, "test", properties, message.getBytes("UTF-8")); System.out.println("生产者发出消息" + message); } } }}
生产者发送消息:
消费者1正常消费消息,并把长度不符合要求的消息退回到队列:
等到消息时间到了以后,死信队列收到消息,消费那些被拒绝消费的消息:




版权声明:本文内容来自个人博客segmentfault:苏凌峰,遵循CC 4.0 BY-SA版权协议上原文接及本声明。
本作品采用知识共享署名-非商业性使用-禁止演绎 2.5 中国大陆许可协议进行可。
原文链接:https://segmentfault.com/a/1190000041997585
如有涉及到侵权,请联系,将立即予以删除处理。
在此特别鸣谢原作者的创作。
此篇文章的所有版权归原作者所有,与本公众号无关,商业转载建议请联系原作者,非商业转载请注明出处。