欢迎来到【万博娱乐源码】【质押存币生息源码】【打洛口岸指标源码】rabbitmq源码分析持久化-皮皮网网站!!!

皮皮网

【万博娱乐源码】【质押存币生息源码】【打洛口岸指标源码】rabbitmq源码分析持久化-皮皮网 扫描左侧二维码访问本站手机端

【万博娱乐源码】【质押存币生息源码】【打洛口岸指标源码】rabbitmq源码分析持久化

2025-01-27 13:38:12 来源:{typename type="name"/} 分类:{typename type="name"/}

1.RabbitMQ的源码持久化
2.详细讲解!RabbitMQ如何防止数据丢失,分析看这篇就够了

rabbitmq源码分析持久化

RabbitMQ的持久持久化

       RabbitMQ的持久化特性涉及交换机、队列及消息三个层面,源码确保系统在异常或重启后能自动恢复。分析

       当使用php-amqplib实现RabbitMQ时,持久万博娱乐源码可直接调用相关函数,源码无需额外配置。分析

       1、持久交换机持久化:通过exchange_declare方法创建交换机,源码设置durable参数为true可使交换机在RabbitMQ重启后自动重建,分析反之则不会重建。持久

       2、源码队列持久化:队列创建时,分析queue_declare方法的持久durable参数决定队列是否在RabbitMQ重启后自动恢复。

       3、消息持久化:消息持久化依赖队列持久化,通过在创建消息时设置delivery_mode属性为2,可确保消息在磁盘中持久保存,即使队列消失,消息也不会丢失。质押存币生息源码

       消息持久化与交换机、队列持久化不同之处在于,它侧重于消息的存储方式,确保消息在系统重启后仍然可用。

       在配置客户端时,合理设置参数有助于提升开发效率,利用客户端对象的管理功能如索引、集群管理等,方便IDE搜索与管理。

       总体而言,打洛口岸指标源码RabbitMQ的持久化机制通过在关键组件上启用持久性,确保了系统的稳定性和数据完整性,是构建可靠消息队列系统的重要保障。

详细讲解!RabbitMQ如何防止数据丢失,看这篇就够了

       思维导图

       一、分析数据丢失的原因

       分析RabbitMQ消息丢失的情况,不妨先看看一条消息从生产者发送到消费者消费的过程:

       可以看出,一条消息整个过程要经历两次的网络传输:从生产者发送到RabbitMQ服务器,从RabbitMQ服务器发送到消费者。乱斗弹弹堂源码在消费者未消费前存储在队列(Queue)中。所以可以知道,有三个场景下是会发生消息丢失的:

       针对以上三种场景,RabbitMQ提供了三种解决的方式,分别是消息持久化,confirm机制,ACK事务机制。

       二、消息持久化

       RabbitMQ是支持消息持久化的,消息持久化需要设置:Exchange为持久化和Queue持久化,kw智能开关源码这样当消息发送到RabbitMQ服务器时,消息就会持久化。首先看Exchange交换机的类图:

       看这个类图其实是要说明上一篇文章介绍的四种交换机都是AbstractExchange抽象类的子类,所以根据java的特性,创建子类的实例会先调用父类的构造器,父类也就是AbstractExchange的构造器是怎么样的呢?

       从上面的注释可以看到durable参数表示是否持久化。默认是持久化(true)。创建持久化的Exchange可以这样写:

       @Bean public DirectExchange rabbitmqDemoDirectExchange() { //Direct交换机 return new DirectExchange(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, true, false); }

       接着是Queue队列,我们先看看Queue的构造器是怎么样的:

       也是通过durable参数设置是否持久化,默认是true。所以创建时可以不指定:

       @Bean public Queue fanoutExchangeQueueA() { //只需要指定名称,默认是持久化的 return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A); }

       这就完成了消息持久化的设置,接下来启动项目,发送几条消息,我们可以看到:

       怎么证明是已经持久化了呢,实际上可以找到对应的文件:

       找到对应磁盘中的目录:

       消息持久化可以防止消息在RabbitMQ Server中不会因为宕机重启而丢失。

       三、消息确认机制

       3.1 confirm机制

       在生产者发送到RabbitMQ Server时有可能因为网络问题导致投递失败,从而丢失数据。我们可以使用confirm模式防止数据丢失。工作流程是怎么样的呢,看以下图解:

       从上图中可以看到是通过两个回调函数confirm()、returnedMessage()进行通知。一条消息从生产者发送到RabbitMQ,首先会发送到Exchange,对应回调函数confirm()。第二步从Exchange路由分配到Queue中,对应回调函数则是returnedMessage()。

       代码怎么实现呢,请看演示:

       首先在application.yml配置文件中加上如下配置:

       spring: rabbitmq: publisher-confirms: true # publisher-returns: true template: mandatory: true # publisher-confirms:设置为true时。当消息投递到Exchange后,会回调confirm()方法进行通知生产者 # publisher-returns:设置为true时。当消息匹配到Queue并且失败时,会通过回调returnedMessage()方法返回消息 # spring.rabbitmq.template.mandatory: 设置为true时。指定消息在没有被队列接收时会通过回调returnedMessage()方法退回。

       接着我们需要定义回调方法:

       @Component public class RabbitmqConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { private Logger logger = LoggerFactory.getLogger(RabbitmqConfirmCallback.class); /** * 监听消息是否到达Exchange * * @param correlationData 包含消息的唯一标识的对象 * @param ack true 标识 ack,false 标识 nack * @param cause nack 投递失败的原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { logger.info("消息投递成功~消息Id:{ }", correlationData.getId()); } else { logger.error("消息投递失败,Id:{ },错误提示:{ }", correlationData.getId(), cause); } } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { logger.info("消息没有路由到队列,获得返回的消息"); Map map = byteToObject(message.getBody(), Map.class); logger.info("message body: { }", map == null ? "" : map.toString()); logger.info("replyCode: { }", replyCode); logger.info("replyText: { }", replyText); logger.info("exchange: { }", exchange); logger.info("routingKey: { }", exchange); logger.info("------------> end <------------"); } @SuppressWarnings("unchecked") private T byteToObject(byte[] bytes, Class clazz) { T t; try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes); ObjectInputStream ois = new ObjectInputStream(bis)) { t = (T) ois.readObject(); } catch (Exception e) { e.printStackTrace(); return null; } return t; } }

       ...