2024-01-19
原文作者: 墨家巨子@俏如来 原文地址: https://blog.csdn.net/u014494148/article/details/120376904

前言

在某些业务场景下是需要消息按照顺序进行消费,比如一个账户的加钱,减钱的动作必须按照时间先后去执行,否则就会发生金额不够导致操作失败。

顺序消息故名知意就是消息按照发送的顺序进行消费,队列本身是一种先进先出的数据结构,而RocketMQ理论上说也遵循这种机制。但是默认生产者以Round Robin轮询方式把消息发送到不同的Queue分区队列;消费者从多个队列中消费消息,这种情况没法保证顺序。所以在RocketMQ中如何保证消息顺序呢?

202401191955408771.png

全局有序消息

在RocketMQ中消息分为全局有序和局部有序消息,全局有序是一个topic下的所有消息都要保证顺序,如果要保证消息全局顺序消费,就需要保证使用一个队列存放消息,一个消费者从这一个队列消费消息就能保证顺序,即:单线程执行,可以通过 producer.setDefaultTopicQueueNums(1);来指定队列数量。

下面我们使用一个订单来模拟顺序消息,订单状态有:创建 ,支付,发货。需要按照顺序发送和消费消息

订单实体

    public class Order {
        private Long id;
        private String name;
        private String status;
    
        public Order() {
        }
    
        public Order(Long id, String name, String status) {
            this.id = id;
            this.name = name;
            this.status = status;
        }
    
        public Long getId() {
            return id;
        }
    
        public void setId(Long id) {
            this.id = id;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public String getStatus() {
            return status;
        }
    
        public void setStatus(String status) {
            this.status = status;
        }
    }

发送者

生产者通过 producer.setDefaultTopicQueueNums(1); 把队列数量设置成1,然后正常发送消息

    public class Producer {
    
        //演示消息同步发送
        public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
            //生产者
            DefaultMQProducer producer = new DefaultMQProducer("order-producer");
    
            //设置name server地址
            producer.setNamesrvAddr("127.0.0.1:9876");
            //队列数量,1个
            producer.setDefaultTopicQueueNums(1);
            //启动
            producer.start();
    
            for (long i = 0 ; i < 4 ; i++){
    
                Order order = new Order(i,"订单"+i,"创建");
    
                //添加内容
                byte[] bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);
    
                Message message = new Message("order-topic","order",bytes);
    
                message.setKeys("key-"+i);
    
                //执行发送第一个消息
                SendResult result = producer.send(message);
    
                System.out.println(result);
    
                //====================================================================
                order.setStatus("支付");
                //添加内容
                bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);
                message = new Message("order-topic","order",bytes);
                message.setKeys("key-"+i);
    
                //执行发送
                result = producer.send(message);
    
                System.out.println(result);
    
                //====================================================================
                order.setStatus("发货");
                //添加内容
                bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);
                message = new Message("order-topic","order",bytes);
                message.setKeys("key-"+i);
    
                //执行发送
                result = producer.send(message);
         		//打印结果
                System.out.println(result);
            }
    
            producer.shutdown();
        }
    }

发送结果如下

    SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC450000, offsetMsgId=AC1028C700002A9F0000000000638E1D, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=48]
    SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC4E0001, offsetMsgId=AC1028C700002A9F0000000000638EF8, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=49]
    SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC4F0002, offsetMsgId=AC1028C700002A9F0000000000638FD3, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=50]
    SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC510003, offsetMsgId=AC1028C700002A9F00000000006390AE, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=51]
    SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC550004, offsetMsgId=AC1028C700002A9F0000000000639189, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=52]
    SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC560005, offsetMsgId=AC1028C700002A9F0000000000639264, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=53]
    SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC580006, offsetMsgId=AC1028C700002A9F000000000063933F, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=54]
    SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC590007, offsetMsgId=AC1028C700002A9F000000000063941A, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=55]
    SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC5B0008, offsetMsgId=AC1028C700002A9F00000000006394F5, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=56]
    SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC5D0009, offsetMsgId=AC1028C700002A9F00000000006395D0, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=57]
    SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC5F000A, offsetMsgId=AC1028C700002A9F00000000006396AB, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=58]
    SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC62000B, offsetMsgId=AC1028C700002A9F0000000000639786, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=59]

消费者

消费者设置 MessageListenerOrderly 进行顺序消费

    public class Consumer {
        public static void main(String[] args) throws MQClientException {
            //创建消费者
            DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("order-consumer");
            //设置name server 地址
            defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");
            //从开始位置消费
            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            //广播模式
            //最大线程1个
            //defaultMQPushConsumer.setConsumeThreadMax(1);
            //defaultMQPushConsumer.setConsumeThreadMin(1);
            //同时只拉取一个消息
            //defaultMQPushConsumer.setPullBatchSize(1);
    
            //订阅
            defaultMQPushConsumer.subscribe("order-topic","order");
    
            defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly() {
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                    msgs.forEach(message->{
                        System.out.println(message+" ; "+new String(message.getBody(), CharsetUtil.UTF_8));
                    });
    
                    return ConsumeOrderlyStatus.SUCCESS;
                }
    
            });
    
            defaultMQPushConsumer.start();
        }
    }

消费结果如下

    MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=48, sysFlag=0, bornTimestamp=1632010570822, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570826, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F0000000000638E1D, commitLogOffset=6524445, bodyCRC=543694636, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=49, KEYS=key-0, CONSUME_START_TIME=1632010570828, UNIQ_KEY=7F000001244418B4AAC25E78BC450000, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 48, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 48, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -120, -101, -27, -69, -70, 34, 125], transactionId='null'}] ; {"id":0,"name":"订单0","status":"创建"}
    MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=49, sysFlag=0, bornTimestamp=1632010570830, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570830, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F0000000000638EF8, commitLogOffset=6524664, bodyCRC=400232688, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=50, KEYS=key-0, CONSUME_START_TIME=1632010570832, UNIQ_KEY=7F000001244418B4AAC25E78BC4E0001, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 48, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 48, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -26, -108, -81, -28, -69, -104, 34, 125], transactionId='null'}] ; {"id":0,"name":"订单0","status":"支付"}
    MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=50, sysFlag=0, bornTimestamp=1632010570831, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570832, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F0000000000638FD3, commitLogOffset=6524883, bodyCRC=1884939776, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=51, KEYS=key-0, CONSUME_START_TIME=1632010570835, UNIQ_KEY=7F000001244418B4AAC25E78BC4F0002, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 48, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 48, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -113, -111, -24, -76, -89, 34, 125], transactionId='null'}] ; {"id":0,"name":"订单0","status":"发货"}
    MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=51, sysFlag=0, bornTimestamp=1632010570833, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570836, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F00000000006390AE, commitLogOffset=6525102, bodyCRC=1061325741, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=53, KEYS=key-1, CONSUME_START_TIME=1632010570839, UNIQ_KEY=7F000001244418B4AAC25E78BC510003, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 49, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 49, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -120, -101, -27, -69, -70, 34, 125], transactionId='null'}] ; {"id":1,"name":"订单1","status":"创建"}
    MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=52, sysFlag=0, bornTimestamp=1632010570837, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570837, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F0000000000639189, commitLogOffset=6525321, bodyCRC=150045809, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=53, KEYS=key-1, CONSUME_START_TIME=1632010570841, UNIQ_KEY=7F000001244418B4AAC25E78BC550004, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 49, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 49, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -26, -108, -81, -28, -69, -104, 34, 125], transactionId='null'}] ; {"id":1,"name":"订单1","status":"支付"}
    MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=53, sysFlag=0, bornTimestamp=1632010570838, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570839, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F0000000000639264, commitLogOffset=6525540, bodyCRC=1869836929, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=55, KEYS=key-1, CONSUME_START_TIME=1632010570844, UNIQ_KEY=7F000001244418B4AAC25E78BC560005, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 49, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 49, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -113, -111, -24, -76, -89, 34, 125], transactionId='null'}] ; {"id":1,"name":"订单1","status":"发货"}
    MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=54, sysFlag=0, bornTimestamp=1632010570840, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570840, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F000000000063933F, commitLogOffset=6525759, bodyCRC=507328046, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=56, KEYS=key-2, CONSUME_START_TIME=1632010570845, UNIQ_KEY=7F000001244418B4AAC25E78BC580006, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 50, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 50, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -120, -101, -27, -69, -70, 34, 125], transactionId='null'}] ; {"id":2,"name":"订单2","status":"创建"}
    MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=55, sysFlag=0, bornTimestamp=1632010570841, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570842, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F000000000063941A, commitLogOffset=6525978, bodyCRC=697186802, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=57, KEYS=key-2, CONSUME_START_TIME=1632010570847, UNIQ_KEY=7F000001244418B4AAC25E78BC590007, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 50, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 50, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -26, -108, -81, -28, -69, -104, 34, 125], transactionId='null'}] ; {"id":2,"name":"订单2","status":"支付"}
    MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=56, sysFlag=0, bornTimestamp=1632010570843, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570844, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F00000000006394F5, commitLogOffset=6526197, bodyCRC=1309462274, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=58, KEYS=key-2, CONSUME_START_TIME=1632010570850, UNIQ_KEY=7F000001244418B4AAC25E78BC5B0008, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 50, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 50, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -113, -111, -24, -76, -89, 34, 125], transactionId='null'}] ; {"id":2,"name":"订单2","status":"发货"}
    MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=57, sysFlag=0, bornTimestamp=1632010570845, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570846, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F00000000006395D0, commitLogOffset=6526416, bodyCRC=18326191, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=59, KEYS=key-3, CONSUME_START_TIME=1632010570851, UNIQ_KEY=7F000001244418B4AAC25E78BC5D0009, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 51, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 51, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -120, -101, -27, -69, -70, 34, 125], transactionId='null'}] ; {"id":3,"name":"订单3","status":"创建"}
    MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=58, sysFlag=0, bornTimestamp=1632010570847, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570848, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F00000000006396AB, commitLogOffset=6526635, bodyCRC=916761971, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=60, KEYS=key-3, CONSUME_START_TIME=1632010570853, UNIQ_KEY=7F000001244418B4AAC25E78BC5F000A, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 51, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 51, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -26, -108, -81, -28, -69, -104, 34, 125], transactionId='null'}] ; {"id":3,"name":"订单3","status":"支付"}
    MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=59, sysFlag=0, bornTimestamp=1632010570850, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570850, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F0000000000639786, commitLogOffset=6526854, bodyCRC=1361468291, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=60, KEYS=key-3, CONSUME_START_TIME=1632010570855, UNIQ_KEY=7F000001244418B4AAC25E78BC62000B, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 51, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 51, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -113, -111, -24, -76, -89, 34, 125], transactionId='null'}] ; {"id":3,"name":"订单3","status":"发货"}

局部有序消息

还有一种就是分区有序或者部分有序,部分顺序消息只要保证某一组消息被顺序消费,即:只需要保证一个队列中的消息有序消费即可。

比如:保证同一个订单ID的生成、付款、发货消息按照顺序消费即可实现原理:

  • 把同一个订单ID的消息放入同一个MessageQueue

  • 保证这个MessageQueue只有一个消费者不被并发处理 ,这个需要使用到 MessageQueueSelector 来保证同一个订单的消息在同一个队列

    202401191955414912.png

发送者

使用 MessageQueueSelector 消息队列选择器来把消息路由到不同的队列,下面案例就是把同一个订单的消息:创建,支付,发货 路由到同一个队列,达到局部消费的目的。

    //演示消息同步发送
    public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        //生产者
        DefaultMQProducer producer = new DefaultMQProducer("order-producer2");
    
        //设置name server地址
        producer.setNamesrvAddr("127.0.0.1:9876");
       
        //发送消息超时时间
        producer.setSendMsgTimeout(1000);
        //启动
        producer.start();
    
        for (long i = 0 ; i < 4 ; i++){
    
    
            Order order = new Order(i,"订单"+i,"创建");
    
            //添加内容
            byte[] bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);
    
            Message message = new Message("order-topic","order2",bytes);
            message.setKeys("key-"+i);
    
            //执行发送
            SendResult result = producer.send(message, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Long id = (Long) arg;
                    //使用取模算法确定id存放到哪个队列
                    int index =(int) (id % mqs.size());
                    //index就是要存放的队列的索引
                    return mqs.get(index);
                }
    			//把订单ID作为参数,作为选择器的基础数据
            },order.getId());
            //打印结果
            System.out.println(result);
    
            //====================================================================
            order.setStatus("支付");
            //添加内容
            bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);
            message = new Message("order-topic","order2",bytes);
            message.setKeys("key-"+i);
    
            //执行发送
            result = producer.send(message,new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Long id = (Long) arg;
                    //使用取模算法确定id存放到哪个队列
                    int index =(int) (id % mqs.size());
                    //index就是要存放的队列的索引
                    return mqs.get(index);
                }
            },order.getId());
            //打印结果
            System.out.println(result);
    
            //====================================================================
            order.setStatus("发货");
            //添加内容
            bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);
            message = new Message("order-topic","order2",bytes);
            message.setKeys("key-"+i);
    
            //执行发送
            result = producer.send(message,new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Long id = (Long) arg;
                    //使用取模算法确定id存放到哪个队列
                    int index =(int) (id % mqs.size());
                    //index就是要存放的队列的索引
                    return mqs.get(index);
                }
            },order.getId());
            //打印结果
            System.out.println(result);
    
    
        }
    
        producer.shutdown();
    }

消费者

消费者一样可通过 MessageListenerOrderly 进行顺序消费

    public class Consumer {
        public static void main(String[] args) throws MQClientException {
            //创建消费者
            DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("order-consumer");
            //设置name server 地址
            defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");
            //从开始位置消费
            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            //广播模式
            //最大线程1个
            //defaultMQPushConsumer.setConsumeThreadMax(1);
            //defaultMQPushConsumer.setConsumeThreadMin(1);
            //同时只拉取一个消息
            //defaultMQPushConsumer.setPullBatchSize(1);
    
            //订阅
            defaultMQPushConsumer.subscribe("order-topic","order");
    
            defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly() {
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                    msgs.forEach(message->{
                        System.out.println(message+" ; "+new String(message.getBody(), CharsetUtil.UTF_8));
                    });
    
                    return ConsumeOrderlyStatus.SUCCESS;
                }
    
            });
    
            defaultMQPushConsumer.start();
        }
    }

文章到这就结束了,点赞还是要求一下的,万一屏幕面前的大帅哥,或者大漂亮一不小心就一键三连了啦,那我就是熬夜到头发掉光,也出下章。

阅读全文