1 背景
在互联网业务的实际应用场景中,消息的延时处理是非常必要的。例如,在金融交易系统中,某些交易的确认可能需要一段时间才能完成。又如,在物流跟踪系统中,货物的运输状态需要一段时间才能更新。而MQ作为中间件的角色专门来处理消息媒介,实际也具备了使用消息的延时处理来保证信息的及时性的能力。
这边举两个具体的例子:
- 火车票订购,提交了订单就把车票给占位了,这时候可以发送一个延时确认的消息,15m 未付款,就要把该车票释放,这样其他人就可以购买了。
- 购买电影票,可以发送一个核销检查消息,在电影开场前15分钟就无法退票了。
既然消息延迟处理的使用场景这么常见,那我们就要详细来分析下怎么使用MQ来实现,这边以RocketMQ为技术选型。
2 消息延时处理原理
RocketMQ的消息延时处理是通过预定义的消息延时级别和延时队列来实现的。在发送消息时,生产者可以设置一个延时级别,该消息将会被延迟一段时间后才能被消费者消费。RocketMQ默认提供了18个延时级别,每个级别对应不同的延迟时间。
所以延时时间并不是随意指定的,Rocket源码中指定的18种等级如下:
// org/apache/rocketmq/store/config/MessageStoreConfig.java 的第198行
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
- RocketMq不支持任意时间延时,需设置固定的延时等级,从1s到2h分别对应着等级1到18
- 可以使用setDelayTimeLevel(int level) 方法设置延时等级,level 从 0 开始
在RocketMQ中,每个Broker都设置了一个延时队列,用于存储延时消息。当消息的延时时间到达时,该消息将会被自动转移到普通的消息队列中,等待消费者的消费。这种方式可以有效地避免因为网络延迟或者消费者处理速度慢而导致消息的延迟。
3 消息延时处理实战
使用RocketMQ的消息延时处理非常简单。在发送消息时,生产者只需要设置一个延时级别,然后将消息发送到RocketMQ即可。例如:
public class DelayProducerApplication {
public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException , UnsupportedEncodingException {
// 1、创建生产者producer,并指定生产者组名为 example_group_name
DefaultMQProducer producer = new DefaultMQProducer("example_group_name");
// 2、指定NameServer的地址,以获取Broker路由地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 3、启动生产者producer
producer.start();
// 4、创建消息,并指定Topic,Tag和消息体
Message msg = new Message("example_topic","example_key", "试一试延迟30s发送的消息".getBytes("UTF-8"));
// 5、设置延时等级3,对应30s,所以这个消息在30秒之后发送
msg.setDelayTimeLevel(3);
// 6、发送消息到一个Broker
SendResult sendResult = producer.send(msg);
// 7、通过sendResult返回消息是否成功送达
System.out.printf("%s%n", sendResult);
// 8、如果不再发送消息,关闭生产者Producer
producer.shutdown();
}
}
在上述代码中,我们首先创建了一个生产者,然后指定了NameServer的地址,并启动了生产者。接着,我们创建了一个延时级别为3的消息,即该消息将会被延迟30秒后才能发送并被消费者消费。最后,我们发送了该消息,并关闭了生产者。
4 消息延时的优化
虽然RocketMQ的消息延时处理功能已经非常强大,但是在实际应用中,我们可能还需要根据自己的业务需求进行一些优化。以下是一些可能的优化方式:
- 调整延时队列的大小。在RocketMQ中,每个Broker都只有一个延时队列,队列太小可能导致一些延时消息被miss。可以根据实际需求调整延时队列的大小。
- 使用多个消费者来消费同一主题的消息。在RocketMQ中,可能有批量执行被设置了同样的延迟时间,这个就存在了一些风险,类似缓存的批量过期一样,稍有不慎,可能会击穿数据库。如果只有一个消费者来消费该主题的消息,可能会导致该消费者的处理速度不够快,从而影响到消息的及时性。我们可以根据实际需求增加消费者数量,以提高消息的处理速度。
- 调整RocketMQ的配置参数。RocketMQ提供了一些配置参数,可以用来调整其性能和可靠性。我们可以根据实际需求调整这些参数,以优化消息的延时处理效果。
总之,RocketMQ的消息延时处理功能非常强大,可以满足许多实际应用场景的需求。在实际应用中,我们可以根据自己的业务需求进行一些优化,以进一步提高消息的及时性和可靠性。
5 总结
本文我们介绍了RocketMQ如何使用消息延时来处理特殊的业务场景。除了上述的方法之外,我们还有一些其他方法,比如:
- 定时发送消息。在定时发送中,生产者可以指定一个未来的时间戳,在该时间戳到达时,该消息将会被发送到Broker。RocketMQ内部会维护一个定时任务,每隔一段时间检查一次待定时发送的消息,并判断是否到达了指定的时间戳。如果到达了指定的时间戳,该消息将会被发送到Broker。
- 自建环形队列来实现“延时消息” ,参考这篇:1分钟实现“延迟消息”功能,写的不错
Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。
它的内容包括:
- 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
- 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
- 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
- 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
- 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
- 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
- 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
- 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw
目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:
想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询
同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。