系统学习消息队列——RabbitMQ的死信队列

艺帆风顺 发布于 2025-04-02 16 次阅读


1.死信队列的概念

2.死信产生的原因

3.死信队列的架构图

4.消息过期的死信

5.队列到达最大长度的死信

6.消息被拒绝的死信

1.死信队列的概念死信,就是无法被消费的消息。生产者把消息发送个broker,但是由于某些原因,queue中的消息无法被消费,这样的消息被称之为死信,而这种消息的后续处理,有专门的队列处理这类消息,称之为死信队列。

比如说订单下单后十五分钟没有支付自动取消,就可以使用死信队列来完成。

2.死信产生的原因

一般会有下面三种原因:

消息TTL时间过期。

队列到达最大长度。

消息被拒绝。

3.死信队列的架构图

4.消息过期的死信

消费者1:

publicclassConsumer1 {privatefinalstaticStringEXCHANGE_NAME="test_exchange";privatefinalstaticStringNORMAL_QUEUE="normal_queue";privatefinalstaticStringDEAD_EXCHANGE_NAME="dead_test";privatefinalstaticStringDEAD_QUEUE="dead_queue";publicstaticvoidmain(String[] args) throws Exception {Channelchannel= RabbitMqUtils.getChannel();//声明普通交换机        channel.exchangeDeclare(EXCHANGE_NAME,  BuiltinExchangeType.DIRECT);        Map params = newHashMap();//正常队列设置死信交换机 参数 key 是固定值        params.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);//正常队列设置死信 routing-key 参数 key 是固定值        params.put("x-dead-letter-routing-key", "dead");        channel.queueDeclare(NORMAL_QUEUE,true,false,false,params);        channel.queueBind(NORMAL_QUEUE,EXCHANGE_NAME,"test");        System.out.println("消费者1等待接收消息.........");//消息如何进行消费的业务逻辑DeliverCallbackdeliverCallback= (consumerTag, delivery) -> {Stringmessage=newString(delivery.getBody());            System.out.println("消费者1控制台接收并打印消息:"+message);            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);        };//取消消费的一个回调接口 如在消费的时候队列被删除掉了CancelCallbackcancelCallback= (consumerTag) -> {            System.out.println("消息消费被中断");        };        channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback);    }}

消费者2:

public class Consumer2 {private final static String DEAD_EXCHANGE_NAME = "dead_test";private final static String DEAD_QUEUE = "dead_queue";    public static void main(String[] args) throws Exception {        Channel channel = RabbitMqUtils.getChannel();//声明交换机        channel.exchangeDeclare(DEAD_EXCHANGE_NAME,  BuiltinExchangeType.DIRECT);        channel.queueDeclare(DEAD_QUEUE,true,false,false,null);        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE_NAME,"dead");        System.out.println("死信队列等待接收消息.........");//消息如何进行消费的业务逻辑        DeliverCallback deliverCallback = (consumerTag, delivery) -> {            String message = new String(delivery.getBody());            System.out.println("死信队列控制台接收并打印消息:"+message);            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);        };//取消消费的一个回调接口 如在消费的时候队列被删除掉了        CancelCallback cancelCallback = (consumerTag) -> {            System.out.println("死信消息消费被中断");        };        channel.basicConsume(DEAD_QUEUE, false, deliverCallback, cancelCallback);    }}

生产者:

public class Producer {private final static String EXCHANGE_NAME = "test_exchange";    public static void main(String[] args) throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {/**             * 声明一个 exchange             * 1.exchange 的名称             * 2.exchange 的类型             */            channel.exchangeDeclare(EXCHANGE_NAME,  BuiltinExchangeType.DIRECT);//设置消息的三十秒过期时间            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("30000").build();            Scanner sc = new Scanner(System.in);            System.out.println("请输入信息");while (sc.hasNext()) {                String message = sc.nextLine();                channel.basicPublish(EXCHANGE_NAME, "test", properties, message.getBytes("UTF-8"));                System.out.println("生产者发出消息" + message);            }        }    }}

生产者先发送三条消息,发现正常队列都收到了消息:

然后把消费者1关掉,模拟收不到消息,生产者发现消息过期了,就会把消息发到死信队列里面:

5.队列到达最大长度的死信

先把之前创建的队列和交换机都删掉:

消费者1:

publicclassConsumer1 {privatefinalstaticStringEXCHANGE_NAME="test_exchange";privatefinalstaticStringNORMAL_QUEUE="normal_queue";privatefinalstaticStringDEAD_EXCHANGE_NAME="dead_test";privatefinalstaticStringDEAD_QUEUE="dead_queue";publicstaticvoidmain(String[] args) throws Exception {Channelchannel= RabbitMqUtils.getChannel();//声明交换机        channel.exchangeDeclare(EXCHANGE_NAME,  BuiltinExchangeType.DIRECT);        Map params = newHashMap();//正常队列设置死信交换机 参数 key 是固定值        params.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);//正常队列设置死信 routing-key 参数 key 是固定值        params.put("x-dead-letter-routing-key", "dead");//设置队列最大长度        params.put("x-max-length", 6);        channel.queueDeclare(NORMAL_QUEUE,true,false,false,params);        channel.queueBind(NORMAL_QUEUE,EXCHANGE_NAME,"test");        System.out.println("消费者1等待接收消息.........");//消息如何进行消费的业务逻辑DeliverCallbackdeliverCallback= (consumerTag, delivery) -> {Stringmessage=newString(delivery.getBody());            System.out.println("消费者1控制台接收并打印消息:"+message);            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);        };//取消消费的一个回调接口 如在消费的时候队列被删除掉了CancelCallbackcancelCallback= (consumerTag) -> {            System.out.println("消息消费被中断");        };        channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback);    }}

消费者2:

public class Consumer2 {private final static String DEAD_EXCHANGE_NAME = "dead_test";private final static String DEAD_QUEUE = "dead_queue";    public static void main(String[] args) throws Exception {        Channel channel = RabbitMqUtils.getChannel();//声明交换机        channel.exchangeDeclare(DEAD_EXCHANGE_NAME,  BuiltinExchangeType.DIRECT);        channel.queueDeclare(DEAD_QUEUE,true,false,false,null);        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE_NAME,"dead");        System.out.println("死信队列等待接收消息.........");//消息如何进行消费的业务逻辑        DeliverCallback deliverCallback = (consumerTag, delivery) -> {            String message = new String(delivery.getBody());            System.out.println("死信队列控制台接收并打印消息:"+message);            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);        };//取消消费的一个回调接口 如在消费的时候队列被删除掉了        CancelCallback cancelCallback = (consumerTag) -> {            System.out.println("死信消息消费被中断");        };        channel.basicConsume(DEAD_QUEUE, false, deliverCallback, cancelCallback);    }}

生产者:

publicclassProducer {privatefinalstaticStringEXCHANGE_NAME="test_exchange";publicstaticvoidmain(String[] args) throws Exception {try (Channelchannel= RabbitMqUtils.getChannel()) {/**             * 声明一个 exchange             * 1.exchange 的名称             * 2.exchange 的类型             */            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//设置消息的三十秒过期时间            AMQP.BasicPropertiesproperties=newAMQP.BasicProperties().builder().expiration("30000").build();            System.out.println("请输入信息");//自动发信息,发送速度比较快,可造成队列到达最大长度for (inti=0; i 1000; i++) {Stringmessage= i + "";                channel.basicPublish(EXCHANGE_NAME, "test", properties, message.getBytes("UTF-8"));                System.out.println("生产者发出消息" + message);            }        }    }}

生产者发送消息:消费者1正常消费:死信消费者2消费队列放不下的数据:

6.消息被拒绝的死信

先把之前创建的队列和交换机都删掉:

消费者1:

public class Consumer1 {private final static String EXCHANGE_NAME = "test_exchange";private final static String NORMAL_QUEUE = "normal_queue";private final static String DEAD_EXCHANGE_NAME = "dead_test";private final static String DEAD_QUEUE = "dead_queue";    public static void main(String[] args) throws Exception {        Channel channel = RabbitMqUtils.getChannel();//声明交换机        channel.exchangeDeclare(EXCHANGE_NAME,  BuiltinExchangeType.DIRECT);        Map params = new HashMap();//正常队列设置死信交换机 参数 key 是固定值        params.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);//正常队列设置死信 routing-key 参数 key 是固定值        params.put("x-dead-letter-routing-key", "dead");        channel.queueDeclare(NORMAL_QUEUE,true,false,false,params);        channel.queueBind(NORMAL_QUEUE,EXCHANGE_NAME,"test");        System.out.println("消费者1等待接收消息.........");//消息如何进行消费的业务逻辑        DeliverCallback deliverCallback = (consumerTag, delivery) -> {            String message = new String(delivery.getBody());//假设现在三位数的消息,我们都设置为消费失败,退回去 requeue为true ,如果为false,直接进入死信队列if(message.length() >= 3){                channel.basicReject(delivery.getEnvelope().getDeliveryTag(),true);            }else{                System.out.println("消费者1控制台接收并打印消息:"+message);                channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);            }        };//取消消费的一个回调接口 如在消费的时候队列被删除掉了        CancelCallback cancelCallback = (consumerTag) -> {            System.out.println("消息消费被中断");        };        channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback);    }}

消费者2:

public class Consumer2 {private final static String DEAD_EXCHANGE_NAME = "dead_test";private final static String DEAD_QUEUE = "dead_queue";    public static void main(String[] args) throws Exception {        Channel channel = RabbitMqUtils.getChannel();//声明交换机        channel.exchangeDeclare(DEAD_EXCHANGE_NAME,  BuiltinExchangeType.DIRECT);        channel.queueDeclare(DEAD_QUEUE,true,false,false,null);        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE_NAME,"dead");        System.out.println("死信队列等待接收消息.........");//消息如何进行消费的业务逻辑        DeliverCallback deliverCallback = (consumerTag, delivery) -> {            String message = new String(delivery.getBody());            System.out.println("死信队列控制台接收并打印消息:"+message);            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);        };//取消消费的一个回调接口 如在消费的时候队列被删除掉了        CancelCallback cancelCallback = (consumerTag) -> {            System.out.println("死信消息消费被中断");        };        channel.basicConsume(DEAD_QUEUE, false, deliverCallback, cancelCallback);    }}

生产者:

publicclassProducer {privatefinalstaticStringEXCHANGE_NAME="test_exchange";publicstaticvoidmain(String[] args) throws Exception {try (Channelchannel= RabbitMqUtils.getChannel()) {/**             * 声明一个 exchange             * 1.exchange 的名称             * 2.exchange 的类型             */            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//设置消息的三十秒过期时间            AMQP.BasicPropertiesproperties=newAMQP.BasicProperties().builder().expiration("30000").build();            System.out.println("请输入信息");for (inti=0; i 1000; i++) {Stringmessage= i + "";                channel.basicPublish(EXCHANGE_NAME, "test", properties, message.getBytes("UTF-8"));                System.out.println("生产者发出消息" + message);            }        }    }}

生产者发送消息:

消费者1正常消费消息,并把长度不符合要求的消息退回到队列:

等到消息时间到了以后,死信队列收到消息,消费那些被拒绝消费的消息:

    版权声明:本文内容来自个人博客segmentfault:苏凌峰,遵循CC 4.0 BY-SA版权协议上原文接及本声明。本作品采用知识共享署名-非商业性使用-禁止演绎 2.5 中国大陆许可协议进行可。原文链接:https://segmentfault.com/a/1190000041997585如有涉及到侵权,请联系,将立即予以删除处理。在此特别鸣谢原作者的创作。此篇文章的所有版权归原作者所有,与本公众号无关,商业转载建议请联系原作者,非商业转载请注明出处。