2024-01-19  阅读(13)
原文作者: 墨家巨子@俏如来 原文地址: https://blog.csdn.net/u014494148/article/details/120660997

使用场景

我们通常使用定时任务比如Quartz来解决超时业务,比如:订单支付超时关单,VIP会员超时提醒。但是使用定时任务来处理这些业务场景在数据量大的时候并不是一个很好的选择,会造成大量的空扫描浪费性能。我们可以考虑使用延迟消息来解决。

概述

延迟消息即:把消息写到Broker后需要延迟一定时间才能被消费 , 在RocketMQ中消息的延迟时间不能任意指定,而是由特定的等级(1 到 18)来指定,分别有:

    messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

可以通过修改配置来增加级别,比如在mq安装目录的 broker.conf 文件中增加如:

    messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 2d	

增加了2d 两天,这个时候总共就有19个level。

延迟消息工作原理

延迟队列工作流程图

202401191955436491.png
RocketMQ Broker端在存储生产者写入的消息时,首先都会将其写入到CommitLog中。之后根据消息中的Topic信息和队列信息,将其转发到目标Topic的指定队列(ConsumeQueue)中。不过,在分发之前,系统会先判断消息中是否带有延时等级。若没有,则直接正常分发;如果有就走延迟队列,执行下面的流程

  1. 修改消息Topic的名字为SCHEDULE_TOPIC_XXXX

  2. 根据延时等级,在consumequeue目录中SCHEDULE_TOPIC_XXXX主题下创建出相应的queueId
    目录与consumequeue文件

  3. 修改消息索引单元,计算出的投递时间当做消息Tag的哈希值存储到CosumeQueue中,投递时间 = 消息存储时间 + 延时等级时间 。下面是CosumeQueue单个存储单元组成结构如下

    202401191955440942.png

  • Commit Log Offset:记录在CommitLog中的位置。
  • Size:记录消息的大小
  • Message Tag HashCode:记录消息Tag的哈希值,用于消息过滤。特别的,对于延迟消息,这个字段记录的是消息的投递时间戳。
  1. 将消息索引写入到SCHEDULE_TOPIC_XXXX主题下相应的consumequeue中
  2. Broker内部有⼀个延迟消息服务类ScheuleMessageService,根据延迟级别数,创建对应数量的定时器Timer,定时消费SCHEDULE_TOPIC_XXXX中的消息,并投递到目标Topic中。
  3. 在将消息到期后,队列的Level等级改为0,作为一条普通消息,投递到目标Topic。

延迟消息实战

消息发送方

给消息设置延迟级别,API:message.setDelayTimeLevel(3);

    public class Producer {
    
        //演示消息同步发送
        public static void main(String[] args) throws Exception {
            //生产者
            DefaultMQProducer producer = new DefaultMQProducer("syn-producerGroup-delay");
    
            //设置name server地址
            producer.setNamesrvAddr("127.0.0.1:9876");
            //启动
            producer.start();
    
            for (long i = 0 ; i < 4 ; i++){
    
    
                Order order = new Order(i,"订单"+i,"创建");
    
                //添加内容
                byte[] bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);
    			//创建消息,指定:TOPIC 和 TAG
                Message message = new Message("topic-order-delay","product-order-delay",bytes);
                
    			//延迟级别 3,代表 10s延迟
                message.setDelayTimeLevel(3);
                
                message.setKeys("key-"+i);
                
                //执行发送
                SendResult result = producer.send(message);
                
                System.out.println("发送时间:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
    
                System.out.println(result);
            }
    
            producer.shutdown();
        }
    }

消息消费方

    public class Consumer {
        public static void main(String[] args) throws Exception {
            //创建消费者
            DefaultMQPushConsumer defaultMQPushConsumer = 
            							new DefaultMQPushConsumer("syn-consumerGroup-delay");
            //设置name server 地址
            defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");
            
            //从开始位置消费
            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    
            //订阅
            defaultMQPushConsumer.subscribe("topic-order-delay","product-order-delay");
    
            defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
    
                    list.forEach(message->{
                        System.out.println("消费时间:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
                        System.out.println(message+" ; "+new String(message.getBody(), CharsetUtil.UTF_8));
                    });
    
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            defaultMQPushConsumer.start();
        }
    }

从消费者消费结果时间来看,消息是延迟了10s后才收到。

文章结束,创作不易,大佬给个好评。


Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。

它的内容包括:

  • 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
  • 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
  • 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
  • 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
  • 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
  • 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
  • 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
  • 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw

目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:

想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询

同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。

阅读全文