系统学习消息队列——RabbitMQ的消息发布确认

艺帆风顺 发布于 2025-04-02 15 次阅读


1.MQ发送信息的时候产生的问题

2.MQ的发布确认原理

3.MQ的发布确认策略

1.MQ发送信息的时候产生的问题

我们在前一篇博客系统学习消息队列——RabbitMQ的消息应答和持久化中学过,当消费者挂掉的时候,有消息重发,当队列挂掉的时候,有消息持久化,但是我们却无法保证生产者发送到队列的消息能否确定发送成功,这个时候就有了消息的发布确认。

2.MQ的发布确认原理当我们的信道被设置成发布确认(confirm)模式,那么所有在该信道上面发布的消息都会被指派一个唯一的ID,一旦消息成功投递,broker就会发送一个确认给生产者,生产者此时就知道消息已经投递成功,生产者就会把这条消息进行删除。

confirm模式可以是同步的,也可以是异步的,同步的情况下是发送之后马上进行确认,异步的话生产者可以无需等待确认只管发送消息,如果某些消息得到确认,生产者将就可以通过回调方法来确认消息。

3.MQ的发布确认策略

3.1)开启确认发布发布确认模式默认是没有开启的,我们需要调用方法将它打开。

        Channel channel = connection.createChannel();        //开启发布确认        channel.confirmSelect();

3.2)单个确认发布

这是一种简单的同步确认方式,发送一条消息,确认一条消息,后续的消息才能继续发送。

优点:简单易懂。缺点:发布速度过慢,如果前面的消息没有得到确认,后面的消息就不得发送,容易阻塞。

public class ProducerSingle {private final static String QUEUE_NAME = "hello";    public static void main(String[] args) throws Exception {//channel 实现了自动 close 接口 自动关闭 不需要自己进行关闭try (Channel channel = RabbitMqUtils.getChannel()) {/**             * 声明一个队列             * 1.队列名称             * 2.队列里面的消息是否持久化 默认消息存储在内存中             * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费             * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除             * 5.其他参数             */            channel.queueDeclare(QUEUE_NAME, true, false, false, null);//开始时间            long begin = System.currentTimeMillis();for (int i = 0; i 1000; i++) {                String message = i + "";                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());//确认是否发送成功,服务端返回 false 或超时时间内未返回,生产者可以消息重发                boolean flag = channel.waitForConfirms();if (flag) {                    System.out.println("消息发送成功");                }            }//发送结束时间            long end = System.currentTimeMillis();            System.out.println("发布" + 1000 + "个单独确认消息,耗时" + (end - begin) + "ms");        }    }}

3.3)批量确认发布单个确认发布的速度非常慢,其实我们可以先发送一批,然后确认一批,再发布一批。优点:比单个确认发布速度快,吞吐量大。缺点:当其中一个消息出问题的时候,不知道是哪个消息出现了问题,我们必须将整个批处理消息保存在内存里,以记录重要的消息后重新发布消息。这种方法也是阻塞的,一样阻塞消息的发布。

public class ProducerMulti {private final static String QUEUE_NAME = "hello";    public static void main(String[] args) throws Exception {//channel 实现了自动 close 接口 自动关闭 不需要自己进行关闭try (Channel channel = RabbitMqUtils.getChannel()) {/**             * 声明一个队列             * 1.队列名称             * 2.队列里面的消息是否持久化 默认消息存储在内存中             * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费             * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除             * 5.其他参数             */            channel.queueDeclare(QUEUE_NAME, true, false, false, null);//批量确认消息大小int batchSize = 100;//未确认消息个数int unConfirmMessageNum = 0;//开始发送时间            long begin = System.currentTimeMillis();for (int i = 0; i 1000; i++) {                String message = i + "";                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());//发送一条消息,未确认书+1                unConfirmMessageNum++;//如果位确认数到达批量确认大小if (unConfirmMessageNum == batchSize) {//等待进行批量确认                    channel.waitForConfirms();                    unConfirmMessageNum = 0;                }            }//为了确保还有剩余没有确认消息 再次确认if (unConfirmMessageNum > 0) {                channel.waitForConfirms();            }//结束时间            long end = System.currentTimeMillis();            System.out.println("发布" + 1000 + "个单独确认消息,耗时" + (end - begin) + "ms");        }    }}

3.4)异步确认发布异步确认不需要阻塞,生产者只管发送信息就好,队列通过回调函数通知生产者发送成功。

优点:保证了效率和可靠性

缺点:编程逻辑复杂

public class ProducerAsyn {private final static String QUEUE_NAME = "hello";    public static void main(String[] args) throws Exception {//channel 实现了自动 close 接口 自动关闭 不需要自己进行关闭try (Channel channel = RabbitMqUtils.getChannel()) {/**             * 声明一个队列             * 1.队列名称             * 2.队列里面的消息是否持久化 默认消息存储在内存中             * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费             * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除             * 5.其他参数             */            channel.queueDeclare(QUEUE_NAME, true, false, false, null);/**             * 用于回调函数确认发布的哈希表,线程安全,适用于并发情况             * 1.可以将序列号和消息进行关联             * 2.可以批量删除已经确认的消息             * 3.支持并发访问             */            ConcurrentSkipListMap confirmsMap = new ConcurrentSkipListMap();/**             * 确认收到消息的一个回调             * 1.消息序列号             * 2.true 可以确认小于等于当前序列号的消息             *   false 确认当前序列号消息             */            ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {if (multiple) {//把小于当前序列号的全部消息取出//返回的是小于等于当前序列号的未确认消息 是一个 map                    ConcurrentNavigableMap confirmed =                            confirmsMap.headMap(sequenceNumber, true);//清除该部分未确认消息                    confirmed.clear();                }else{//只清除当前序列号的消息                    confirmsMap.remove(sequenceNumber);                }            };//未被确认的回调            ConfirmCallback nackCallback = (sequenceNumber, multiple) ->            {String message = confirmsMap.get(sequenceNumber);                System.out.println("发布的消息"+message+"未被确认,序列号"+sequenceNumber);            };/**             * 添加一个异步确认的监听器             * 1.确认收到消息的回调             * 2.未收到消息的回调             */            channel.addConfirmListener(ackCallback, nackCallback);//发送开始时间            long begin = System.currentTimeMillis();for (int i = 0; i 1000; i++) {                String message = i + "";//在map里面设置消息id和内容                confirmsMap.put(channel.getNextPublishSeqNo(), message);                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());            }//发送结束时间            long end = System.currentTimeMillis();            System.out.println("发布" + 1000 + "个单独确认消息,耗时" + (end - begin) + "ms");        }    }}

3.5)确认发布速度对比

    版权声明:本文内容来自个人博客segmentfault:苏凌峰,遵循CC 4.0 BY-SA版权协议上原文接及本声明。本作品采用知识共享署名-非商业性使用-禁止演绎 2.5 中国大陆许可协议进行可。原文链接:https://segmentfault.com/a/1190000041987998如有涉及到侵权,请联系,将立即予以删除处理。在此特别鸣谢原作者的创作。此篇文章的所有版权归原作者所有,与本公众号无关,商业转载建议请联系原作者,非商业转载请注明出处。