1 背景
在互联网业务的实际应用场景中,消息的批量处理是非常必要的,因为我们时刻面临着大量数据的并发执行。
例如,我们在一个业务交互的时候会有大量的分支行为需要异步去处理,但是这些动作又是在不同的业务粒度上的,所以我们需要多次调用MQ写入消息,可能有多次的连接和消息发送。
这个写MySQL数据库是一样的,多次建连和写入,跟一次建连和批量数据库,性能是完全不能比的。
所以我们需要有MQ有批量消息的能力来对我们的业务数据进行快速处理。
2 批量消息实现过程
Rocket MQ的批量消息,可以提高消息的吞吐能力和处理效率,降低下游系统的API调用频率,同时对消息服务的稳定性也有帮助。
2.1 批量消息的特点
- 批量消息具有相同的topic。
- 批量消息具有相同的waitStoreMsgOK属性。
- 批量消息不支持延迟消息。
- 批量消息的大小不超过4M(4.4版本之后要求不超过1M)。
2.2 批量消息的使用场景
- 消息的吞吐能力和处理效率:通过将多条消息打包成一批进行发送,可以减少网络传输开销和消息处理的时间,从而提高整体的消息处理效率。
- 下游系统的API调用频率:通过将多条消息合并成一条批量消息进行发送,可以减少下游系统接收和处理消息的次数,从而降低API调用频率,减轻下游系统的负载压力。
2.3 批量消息的发送示例
Rocket MQ提供了批量发送消息的功能,可以通过调用DefaultMQProducer的send()方法,将多条消息以列表的形式发送给指定的topic。
以下是一个简单的示例代码:
DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName_1");
String topic = "BatchSendTest_1";
producer.start();
List<Message> msgs = new ArrayList<>();
msgs.add(new Message(topic, "Tag1", "OrderID-063105013", "Hello world".getBytes()));
msgs.add(new Message(topic, "Tag1", "OrderID-063105014", "Brand".getBytes()));
msgs.add(new Message(topic, "Tag1", "OrderID-063105015", "handsome boy ".getBytes()));
try {
producer.send(msgs);
} catch (Exception e) {
e.printStackTrace();
// 处理异常
}
finally {
// 如果不再发送消息,关闭生产者Producer
producer.shutdown();
}
在以上示例代码中,创建了一个DefaultMQProducer实例,并调用其start()方法启动生产者。
然后构造了一个包含三条消息的列表,通过调用producer的send()方法将列表中的消息发送给指定的topic。
如果消息的总长度可能大于1MB时,这时候最好把消息进行分割,参考下面的代码:
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1024 * 1024;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override public boolean hasNext() {
return currIndex < messages.size();
}
@Override public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; //for log overhead
if (tmpSize > SIZE_LIMIT) {
//it is unexpected that single message exceeds the SIZE_LIMIT
//here just let it go, otherwise it will block the splitting process
if (nextIndex - currIndex == 0) {
//if the next sublist has no element, add this one and then break, otherwise just break
nextIndex++;
}
break;
}
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}
//then you could split the large list into small ones:
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
// handle the error
}
}
可以看出来,Rocket MQ的批量消息可以提高消息的吞吐能力和处理效率,降低下游系统的API调用频率,是一种优化消息传输和处理的有效手段。
3 总结
- 对于同类型、同特征的消息,可以聚合进行批量发送,减少MQ的连接发送次数,能够显著提升性能。
- 批量发送消息须有相同的topic,相同的waitStoreMsgOK,且不能是延时消息。
Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。
它的内容包括:
- 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
- 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
- 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
- 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
- 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
- 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
- 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
- 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw
目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:
想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询
同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。