前言
在某些业务场景下是需要消息按照顺序进行消费,比如一个账户的加钱,减钱的动作必须按照时间先后去执行,否则就会发生金额不够导致操作失败。
顺序消息
故名知意就是消息按照发送的顺序进行消费,队列本身是一种先进先出的数据结构,而RocketMQ理论上说也遵循这种机制。但是默认生产者以Round Robin轮询方式把消息发送到不同的Queue分区队列;消费者从多个队列中消费消息,这种情况没法保证顺序。所以在RocketMQ中如何保证消息顺序呢?
全局有序消息
在RocketMQ中消息分为全局有序和局部有序消息,全局有序是一个topic下的所有消息都要保证顺序,如果要保证消息全局顺序消费,就需要保证使用一个队列存放消息,一个消费者从这一个队列消费消息
就能保证顺序,即:单线程执行,可以通过 producer.setDefaultTopicQueueNums(1);来指定队列数量。
下面我们使用一个订单来模拟顺序消息,订单状态有:创建 ,支付,发货。需要按照顺序发送和消费消息
订单实体
public class Order {
private Long id;
private String name;
private String status;
public Order() {
}
public Order(Long id, String name, String status) {
this.id = id;
this.name = name;
this.status = status;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
}
发送者
生产者通过 producer.setDefaultTopicQueueNums(1);
把队列数量设置成1,然后正常发送消息
public class Producer {
//演示消息同步发送
public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
//生产者
DefaultMQProducer producer = new DefaultMQProducer("order-producer");
//设置name server地址
producer.setNamesrvAddr("127.0.0.1:9876");
//队列数量,1个
producer.setDefaultTopicQueueNums(1);
//启动
producer.start();
for (long i = 0 ; i < 4 ; i++){
Order order = new Order(i,"订单"+i,"创建");
//添加内容
byte[] bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);
Message message = new Message("order-topic","order",bytes);
message.setKeys("key-"+i);
//执行发送第一个消息
SendResult result = producer.send(message);
System.out.println(result);
//====================================================================
order.setStatus("支付");
//添加内容
bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);
message = new Message("order-topic","order",bytes);
message.setKeys("key-"+i);
//执行发送
result = producer.send(message);
System.out.println(result);
//====================================================================
order.setStatus("发货");
//添加内容
bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);
message = new Message("order-topic","order",bytes);
message.setKeys("key-"+i);
//执行发送
result = producer.send(message);
//打印结果
System.out.println(result);
}
producer.shutdown();
}
}
发送结果如下
SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC450000, offsetMsgId=AC1028C700002A9F0000000000638E1D, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=48]
SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC4E0001, offsetMsgId=AC1028C700002A9F0000000000638EF8, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=49]
SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC4F0002, offsetMsgId=AC1028C700002A9F0000000000638FD3, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=50]
SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC510003, offsetMsgId=AC1028C700002A9F00000000006390AE, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=51]
SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC550004, offsetMsgId=AC1028C700002A9F0000000000639189, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=52]
SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC560005, offsetMsgId=AC1028C700002A9F0000000000639264, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=53]
SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC580006, offsetMsgId=AC1028C700002A9F000000000063933F, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=54]
SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC590007, offsetMsgId=AC1028C700002A9F000000000063941A, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=55]
SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC5B0008, offsetMsgId=AC1028C700002A9F00000000006394F5, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=56]
SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC5D0009, offsetMsgId=AC1028C700002A9F00000000006395D0, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=57]
SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC5F000A, offsetMsgId=AC1028C700002A9F00000000006396AB, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=58]
SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC62000B, offsetMsgId=AC1028C700002A9F0000000000639786, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=59]
消费者
消费者设置 MessageListenerOrderly 进行顺序消费
public class Consumer {
public static void main(String[] args) throws MQClientException {
//创建消费者
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("order-consumer");
//设置name server 地址
defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");
//从开始位置消费
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//广播模式
//最大线程1个
//defaultMQPushConsumer.setConsumeThreadMax(1);
//defaultMQPushConsumer.setConsumeThreadMin(1);
//同时只拉取一个消息
//defaultMQPushConsumer.setPullBatchSize(1);
//订阅
defaultMQPushConsumer.subscribe("order-topic","order");
defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
msgs.forEach(message->{
System.out.println(message+" ; "+new String(message.getBody(), CharsetUtil.UTF_8));
});
return ConsumeOrderlyStatus.SUCCESS;
}
});
defaultMQPushConsumer.start();
}
}
消费结果如下
MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=48, sysFlag=0, bornTimestamp=1632010570822, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570826, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F0000000000638E1D, commitLogOffset=6524445, bodyCRC=543694636, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=49, KEYS=key-0, CONSUME_START_TIME=1632010570828, UNIQ_KEY=7F000001244418B4AAC25E78BC450000, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 48, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 48, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -120, -101, -27, -69, -70, 34, 125], transactionId='null'}] ; {"id":0,"name":"订单0","status":"创建"}
MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=49, sysFlag=0, bornTimestamp=1632010570830, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570830, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F0000000000638EF8, commitLogOffset=6524664, bodyCRC=400232688, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=50, KEYS=key-0, CONSUME_START_TIME=1632010570832, UNIQ_KEY=7F000001244418B4AAC25E78BC4E0001, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 48, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 48, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -26, -108, -81, -28, -69, -104, 34, 125], transactionId='null'}] ; {"id":0,"name":"订单0","status":"支付"}
MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=50, sysFlag=0, bornTimestamp=1632010570831, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570832, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F0000000000638FD3, commitLogOffset=6524883, bodyCRC=1884939776, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=51, KEYS=key-0, CONSUME_START_TIME=1632010570835, UNIQ_KEY=7F000001244418B4AAC25E78BC4F0002, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 48, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 48, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -113, -111, -24, -76, -89, 34, 125], transactionId='null'}] ; {"id":0,"name":"订单0","status":"发货"}
MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=51, sysFlag=0, bornTimestamp=1632010570833, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570836, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F00000000006390AE, commitLogOffset=6525102, bodyCRC=1061325741, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=53, KEYS=key-1, CONSUME_START_TIME=1632010570839, UNIQ_KEY=7F000001244418B4AAC25E78BC510003, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 49, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 49, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -120, -101, -27, -69, -70, 34, 125], transactionId='null'}] ; {"id":1,"name":"订单1","status":"创建"}
MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=52, sysFlag=0, bornTimestamp=1632010570837, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570837, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F0000000000639189, commitLogOffset=6525321, bodyCRC=150045809, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=53, KEYS=key-1, CONSUME_START_TIME=1632010570841, UNIQ_KEY=7F000001244418B4AAC25E78BC550004, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 49, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 49, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -26, -108, -81, -28, -69, -104, 34, 125], transactionId='null'}] ; {"id":1,"name":"订单1","status":"支付"}
MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=53, sysFlag=0, bornTimestamp=1632010570838, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570839, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F0000000000639264, commitLogOffset=6525540, bodyCRC=1869836929, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=55, KEYS=key-1, CONSUME_START_TIME=1632010570844, UNIQ_KEY=7F000001244418B4AAC25E78BC560005, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 49, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 49, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -113, -111, -24, -76, -89, 34, 125], transactionId='null'}] ; {"id":1,"name":"订单1","status":"发货"}
MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=54, sysFlag=0, bornTimestamp=1632010570840, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570840, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F000000000063933F, commitLogOffset=6525759, bodyCRC=507328046, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=56, KEYS=key-2, CONSUME_START_TIME=1632010570845, UNIQ_KEY=7F000001244418B4AAC25E78BC580006, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 50, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 50, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -120, -101, -27, -69, -70, 34, 125], transactionId='null'}] ; {"id":2,"name":"订单2","status":"创建"}
MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=55, sysFlag=0, bornTimestamp=1632010570841, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570842, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F000000000063941A, commitLogOffset=6525978, bodyCRC=697186802, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=57, KEYS=key-2, CONSUME_START_TIME=1632010570847, UNIQ_KEY=7F000001244418B4AAC25E78BC590007, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 50, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 50, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -26, -108, -81, -28, -69, -104, 34, 125], transactionId='null'}] ; {"id":2,"name":"订单2","status":"支付"}
MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=56, sysFlag=0, bornTimestamp=1632010570843, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570844, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F00000000006394F5, commitLogOffset=6526197, bodyCRC=1309462274, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=58, KEYS=key-2, CONSUME_START_TIME=1632010570850, UNIQ_KEY=7F000001244418B4AAC25E78BC5B0008, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 50, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 50, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -113, -111, -24, -76, -89, 34, 125], transactionId='null'}] ; {"id":2,"name":"订单2","status":"发货"}
MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=57, sysFlag=0, bornTimestamp=1632010570845, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570846, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F00000000006395D0, commitLogOffset=6526416, bodyCRC=18326191, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=59, KEYS=key-3, CONSUME_START_TIME=1632010570851, UNIQ_KEY=7F000001244418B4AAC25E78BC5D0009, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 51, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 51, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -120, -101, -27, -69, -70, 34, 125], transactionId='null'}] ; {"id":3,"name":"订单3","status":"创建"}
MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=58, sysFlag=0, bornTimestamp=1632010570847, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570848, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F00000000006396AB, commitLogOffset=6526635, bodyCRC=916761971, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=60, KEYS=key-3, CONSUME_START_TIME=1632010570853, UNIQ_KEY=7F000001244418B4AAC25E78BC5F000A, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 51, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 51, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -26, -108, -81, -28, -69, -104, 34, 125], transactionId='null'}] ; {"id":3,"name":"订单3","status":"支付"}
MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=59, sysFlag=0, bornTimestamp=1632010570850, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570850, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F0000000000639786, commitLogOffset=6526854, bodyCRC=1361468291, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='order-topic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=60, KEYS=key-3, CONSUME_START_TIME=1632010570855, UNIQ_KEY=7F000001244418B4AAC25E78BC62000B, WAIT=true, TAGS=order}, body=[123, 34, 105, 100, 34, 58, 51, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 51, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -113, -111, -24, -76, -89, 34, 125], transactionId='null'}] ; {"id":3,"name":"订单3","status":"发货"}
局部有序消息
还有一种就是分区有序或者部分有序,部分顺序消息只要保证某一组消息被顺序消费,即:只需要保证一个队列中的消息有序消费即可。
比如:保证同一个订单ID的生成、付款、发货消息按照顺序消费即可实现原理:
-
把同一个订单ID的消息放入同一个MessageQueue
-
保证这个MessageQueue只有一个消费者不被并发处理 ,这个需要使用到 MessageQueueSelector 来保证同一个订单的消息在同一个队列
发送者
使用 MessageQueueSelector 消息队列选择器来把消息路由到不同的队列,下面案例就是把同一个订单的消息:创建,支付,发货 路由到同一个队列,达到局部消费的目的。
//演示消息同步发送
public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
//生产者
DefaultMQProducer producer = new DefaultMQProducer("order-producer2");
//设置name server地址
producer.setNamesrvAddr("127.0.0.1:9876");
//发送消息超时时间
producer.setSendMsgTimeout(1000);
//启动
producer.start();
for (long i = 0 ; i < 4 ; i++){
Order order = new Order(i,"订单"+i,"创建");
//添加内容
byte[] bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);
Message message = new Message("order-topic","order2",bytes);
message.setKeys("key-"+i);
//执行发送
SendResult result = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg;
//使用取模算法确定id存放到哪个队列
int index =(int) (id % mqs.size());
//index就是要存放的队列的索引
return mqs.get(index);
}
//把订单ID作为参数,作为选择器的基础数据
},order.getId());
//打印结果
System.out.println(result);
//====================================================================
order.setStatus("支付");
//添加内容
bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);
message = new Message("order-topic","order2",bytes);
message.setKeys("key-"+i);
//执行发送
result = producer.send(message,new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg;
//使用取模算法确定id存放到哪个队列
int index =(int) (id % mqs.size());
//index就是要存放的队列的索引
return mqs.get(index);
}
},order.getId());
//打印结果
System.out.println(result);
//====================================================================
order.setStatus("发货");
//添加内容
bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);
message = new Message("order-topic","order2",bytes);
message.setKeys("key-"+i);
//执行发送
result = producer.send(message,new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg;
//使用取模算法确定id存放到哪个队列
int index =(int) (id % mqs.size());
//index就是要存放的队列的索引
return mqs.get(index);
}
},order.getId());
//打印结果
System.out.println(result);
}
producer.shutdown();
}
消费者
消费者一样可通过 MessageListenerOrderly 进行顺序消费
public class Consumer {
public static void main(String[] args) throws MQClientException {
//创建消费者
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("order-consumer");
//设置name server 地址
defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");
//从开始位置消费
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//广播模式
//最大线程1个
//defaultMQPushConsumer.setConsumeThreadMax(1);
//defaultMQPushConsumer.setConsumeThreadMin(1);
//同时只拉取一个消息
//defaultMQPushConsumer.setPullBatchSize(1);
//订阅
defaultMQPushConsumer.subscribe("order-topic","order");
defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
msgs.forEach(message->{
System.out.println(message+" ; "+new String(message.getBody(), CharsetUtil.UTF_8));
});
return ConsumeOrderlyStatus.SUCCESS;
}
});
defaultMQPushConsumer.start();
}
}
文章到这就结束了,点赞还是要求一下的,万一屏幕面前的大帅哥,或者大漂亮一不小心就一键三连了啦,那我就是熬夜到头发掉光,也出下章。