2024-04-01
原文作者:立小言先森 原文地址: https://mingyang.blog.csdn.net/article/details/103652007

在使用任何消息中间件的过程中,难免会出现消息异常丢失的情况。对于RabbitMQ而言,可能是生产者与Broker断开了连接并且没有任何重试机制;也可能是消费者在处理消息时发生了异常,不过却提前进行了ack;甚至是交换器没有与任何队列绑定,生产者感知不到或者没有采取相应的措施;另外RabbitMQ本身的集群策略也可能导致消息的丢失,这个时候就需要有一个良好的机制来跟踪记录消息的投递过程,以此来协助开发或者运维人员快速的定位问题。

Firehose

在RabbitMQ中可以使用Firehose功能来实现消息追踪,Firehose可以记录每一次发送或者消费消息的记录,方便RabbitMQ的使用者进行调试、排错等。

Firehose的原理是将生产者投递给RabbitMQ的消息,或者RabbitMQ投递给消费者的消息按照指定的格式发送到默认的交换器上。这个默认交换器的名称是amq.rabbitmq.trace,它是一个topic类型的交换器。发送到这个交换器上的消息路由键是:

  • publish.{exchangename},其中exchangename为交换器名称,对应生产者投递到交换器的消息
  • deliver.{queuename},其中queuename为队列名称,对应消费者从队列获取的消息

开启Firehose功能命令:

    rabbitmqctl trace_on [-p vhost]

[-p vhost]是可选参数,用来指定虚拟主机vhost

关闭Firehose功能命令:

    rabbitmqctl trace_off [-p vhost]

Firehose默认情况下处于关闭状态,并且Firehose的状态也是非持久化的,会在RabbitMQ服务重启的时候还原成默认状态。Firehose开启之后多少会影响RabbitMQ整体服务性能,因为它会引起额外的消息生成、路由和存储。

Firehose使用示例

首先要做一下准备工作:

  • 创建队列test_queue、queue1、queue2、queue3、queue4、queue5
  • 创建交换器test_exchange
  • 通过路由键绑定test_exchange交换器和test_queue队列
  • 将amq.rabbitmq.trace和queue1、queue2、queue3、queue4、queue5队列绑定

202404012101056031.png

RabbitMQ management管理页面示例图:

队列示例图:

202404012101059342.png

交换器示例图:

202404012101062653.png

使用客户端向交换器exchange中发送消息,然后可以观察到队列test_queue、queue1、queue2、queue3、queue4、queue5中都接收到了一条消息。

解释amq.rabbitmq.trace这个默认追踪交换器绑定队列的路由键:

  • "publish.#"匹配发送至所有交换器的消息
  • “deliver.#"匹配消费所有队列的消息
  • “#”包含“publish.#“和“deliver.#”
  • "publish.test_exchange"匹配发送到指定交换器的消息
  • “deliver.test_queue"匹配消费指定队列的消息

在Firehose开启状态,当有客户端发送或者消费 消息时,Firehose会自动封装相应的消息体,并添加详细的headers属性。看如下发送至交换器的消息体内容:

202404012101066484.png

在消费队列时,会将这条消息封装成如下图:

202404012101070635.png

headers中的exchange_name表示发送此条消息的交换器;routing_keys表示与exchange_name对应的路由键列表;properties表示消息本身的属性,比如delivery_mode设置为2表示消息需要持久化处理。

GitHub地址:https://github.com/mingyang66/spring-parent

阅读全文