MQ的Hellow World程序
2.MQ的消息应答
3.MQ的持久化
1.MQ的Hellow World程序根据上一篇文章系统学习消息队列——RabbitMQ的基础概念,我们学习了mq的基础理论,理论固然重要,实践也尤其重要,我们先来编写一个RabbitMq的Hello World程序,来感受一下队列。
在这里我们创建一个消息生产者,两个消息消费者,轮询进行分发消息。
pom:
dependency> groupId>com.rabbitmqgroupId> artifactId>amqp-clientartifactId> version>5.8.0version> dependency>
连接工具类:
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;/** * @author sulingfeng * @title: RabbitMqUtils * @date 2022/6/14 9:33 */publicclassRabbitMqUtils {//得到一个连接的工具类publicstatic Channel getChannel() throws Exception {//创建一个连接工厂ConnectionFactoryfactory=newConnectionFactory(); factory.setHost("127.0.0.1"); factory.setUsername("guest"); factory.setPassword("guest");Connectionconnection= factory.newConnection();Channelchannel= connection.createChannel();return channel; }}
生产者:
import com.rabbitmq.client.Channel;import java.util.Scanner;/** * @author sulingfeng * @title: Producer * @date 2022/6/13 19:55 */publicclassProducer {privatefinalstaticStringQUEUE_NAME="hello";publicstaticvoidmain(String[] args) throws Exception {//channel 实现了自动 close 接口 自动关闭 不需要自己进行关闭try (Channelchannel= RabbitMqUtils.getChannel()) {/** * 声明一个队列 * 1.队列名称 * 2.队列里面的消息是否持久化 默认消息存储在内存中 * 3.该队列是否只供一个消费者进行消费 是否进行共享 true就是排它队列,只能由消费者声明,false是不排他队列 * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除 * 5.其他参数 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null);//从控制台当中接受信息Scannerscanner=newScanner(System.in);while (scanner.hasNext()){Stringmessage= scanner.next();/** * 发送一个消息 * 1.发送到那个交换机 * 2.路由的 key 是哪个 * 3.其他的参数信息 * 4.发送消息的消息体 */ channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("发送消息完成:"+message); } } }}
消费者:
import com.rabbitmq.client.*;/** * @author sulingfeng * @title: Consumer * @date 2022/6/13 20:06 */publicclassConsumer {privatefinalstaticStringQUEUE_NAME="hello";publicstaticvoidmain(String[] args) throws Exception {Channelchannel= RabbitMqUtils.getChannel(); System.out.println("等待接收消息.........");//消息如何进行消费的业务逻辑DeliverCallbackdeliverCallback= (consumerTag, delivery)-> {Stringmessage=newString(delivery.getBody()); System.out.println(message); };//取消消费的一个回调接口 如在消费的时候队列被删除掉了CancelCallbackcancelCallback= (consumerTag) -> { System.out.println("消息消费被中断"); };/** * 消费者消费消息 * 1.消费哪个队列 * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答 * 3.消费成功的回调 * 4.消费者未成功消费的回调 */ channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); }}
效果:
生产者:消费者1:
消费者2:
2.MQ的消息应答
我们设想这样一个问题。Rabbitmq是如何保证消息被稳定消费的?假设消费一个消息要很长一段时间,但是消费者消费到一半就挂掉了,这个时候Rabbitmq就丢失了这条消息。为了解决这样的问题,rabbitmq引入了消息应答机制,消息应答就是:消费者在接受到消息并且消费完毕之后,告诉mq它已经处理了,rabbitmq可以把消息删除了。
2.1)自动应答自动应答就是消息发送之后就可以被认为发送成功,这种模式需要在高吞吐量和数据传输安全性方面作出权衡,它没有对消息是否成功消费,发送消息数量进行限制,可能会导致消息丢失,消息大量积压,所以这种模式仅适用在消费者可以高效并以某种速率处理这些消息的情况下使用。
2.2)手动应答每一条消息都需要消费者手动应答,如果消费者因为某种原因失去连接,导致消息未发送ack确认,Rabbitmq知道了消费未被处理,会将其重新插入队列,如果此时有其它的消费者可以处理,就将消息发送给另外一个消费者。保证了消息被确认消费的稳定性,也可以防止消费者消息的积压。
手动应答代码演示:
生产者:
public class Consumer {private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); System.out.println("等待接收消息.........");//消息如何进行消费的业务逻辑 DeliverCallback deliverCallback = (consumerTag, delivery)-> { String message = new String(delivery.getBody());try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(message);/** * 1.消息tag * 2.是否批量应答 */ channel.basicAck(delivery.getEnvelope().getDeliveryTag(),true); };//取消消费的一个回调接口 如在消费的时候队列被删除掉了 CancelCallback cancelCallback = (consumerTag) -> { System.out.println("消息消费被中断"); };/** * 消费者消费消息 * 1.消费哪个队列 * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答 * 3.消费成功的回调 * 4.消费者未成功消费的回调 */ Boolean autoAck = false; channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback); }}
消费者:
publicclassProducer {privatefinalstaticStringQUEUE_NAME="hello";publicstaticvoidmain(String[] args) throws Exception {//channel 实现了自动 close 接口 自动关闭 不需要自己进行关闭try (Channelchannel= RabbitMqUtils.getChannel()) {/** * 声明一个队列 * 1.队列名称 * 2.队列里面的消息是否持久化 默认消息存储在内存中 * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费 * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除 * 5.其他参数 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null);//从控制台当中接受信息Scannerscanner=newScanner(System.in);while (scanner.hasNext()){Stringmessage= scanner.next();/** * 发送一个消息 * 1.发送到那个交换机 * 2.路由的 key 是哪个 * 3.其他的参数信息 * 4.发送消息的消息体 */ channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("发送消息完成:"+message); } } }}
我们现在开启两个消费者,然后生产者给消费者发送消息,等发送了足够多的消息之后,关掉一个消费者,让其中一个正在消费的消息丢失,看看队列会不会再进行重发。
生产者一共发了14条消息:
消费者2消费到一半,停止服务:消费者1帮消费者2消费了消费者2未消费完的数据:
消息确认的api:
Channel.basicAck(用于肯定确认) RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了Channel.basicNack(用于否定确认) Channel.basicReject(用于否定确认) 与 Channel.basicNack 相比少一个参数,代表不处理该消息了直接拒绝,可以将其丢弃了
我们在上面编写代码的时候,还发现了一个是否批量应答的标志位:
Multiple的true和false代表的含义不同:
true:代表批量应答
false:代表单个应答
2.3)消息的不公平分发我们发现发送消息的时候,消费者处理的速度有快有慢,在这种场景下,轮询发送的效率好像不是最优的,所以我们可以进行不公平的分发,哪个消费者处理地快,就先发送给哪个消费者。
int prefetchCount =1;channel.basicQos(prefetchCount);
有了这个值,rabbitmq就会把该任务分配给没有那么忙的消费者,谁处理地快,谁就消费。
注意,这个配置要在生产者和消费者同时配置。
我们发送二十条消息:
消费者1:消费者2:
此时就是按照消费者的能力消费消息了。
同时我们也要注意一下,这个值还有一个概念,那就是预取值:
消息在发送的时候,本质上是异步消费的,因此消费者这里肯定会有一个消息的缓冲区,开发人员希望限制该缓冲区的大小,以避免缓冲区里面无限制存放未确认消息的问题。这个时候,就可以使用basic.qos方法设置"预取值",该值就定义了允许未确认消息的最大数量。例如取值为4,则rabbitMQ会在未确认信息为4之后,就不发送信息。找到一个合适的值是一个反复试探的过程,大概在100~300之间,取值为1最保守,但是这样吞吐量会变得很低。

3.MQ的持久化
刚刚我们处理了消费者如何不丢失消费的情况,但是我们无法避免Rabbitmq本身挂掉的情况,如何保证Rabbitmq服务停掉后,本身存储在Rabbotmq里面的消息不丢失,我们需要把队列和消息都标记为持久化。
3.1)队列如何持久化
我们之前创建的队列都是非持久化的,所以如果要创建同名的持久化队列,我们需要把以前的队列删除了才可以。

在声明队列的时候,同时要把durable参数设置为true.
/** * 声明一个队列 * 1.队列名称 * 2.队列里面的消息是否持久化 默认消息存储在内存中 * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费 * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除 * 5.其他参数 */ Boolean durable = true; channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
我们会发现队列已经持久化了:
3.2)消息如何持久化要想让消息持久化,需要在发送消息的时候,增加MessageProperties.PERSISTENT_BASIC参数。
channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_BASIC,message.getBytes()); System.out.println("发送消息完成:"+message);
其实将消息标记为持久化并不能保证完全不丢消息,可能Rabbitmq在将消息保存到磁盘的时候,消息还没存储完,但是队列就挂了,如果需要更有效的持久化机制,可以参考后面的发布确认章节。

版权声明:本文内容来自个人博客segmentfault:苏凌峰,遵循CC 4.0 BY-SA版权协议上原文接及本声明。
本作品采用知识共享署名-非商业性使用-禁止演绎 2.5 中国大陆许可协议进行可。
原文链接:https://segmentfault.com/a/1190000042293539
如有涉及到侵权,请联系,将立即予以删除处理。
在此特别鸣谢原作者的创作。
此篇文章的所有版权归原作者所有,与本公众号无关,商业转载建议请联系原作者,非商业转载请注明出处。