前言
现在开发项目都是基于SpringBoot,新项目很少使用Spring,所以我们学习一门技术除了要会原生API,还不得不考虑和SpringBoot集成,本篇文章为SpirngBoot整合RocketMQ案例
SpringBoot集成RocketMQ
导入依赖
这里使用整合RocketMQ的基础依赖:rocketmq-spring-boot-starter
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.5.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
配置文件
rocketmq:
name-server: 127.0.0.1:9876
#生产者配置
producer:
#生产者组名字
group: "service-producer"
# 消息最大长度 默认 1024 * 4 (4M)
max-message-size: 4096
# 发送消息超时时间,默认 3000
send-message-timeout: 3000
# 发送消息失败重试次数,默认2
retry-times-when-send-failed: 2
retry-times-when-send-async-failed: 2
简单消息发送
生产者
使用 RocketMQTemplate 发送消息,使用@Autowared注入RocketMQTemplate即可使用,其中包含的方法有
- public void sendOneWay(String destination, Object payload) :单向消息
- public SendResult syncSend(String destination, Message<?> message) :同步消息
- public void asyncSend(String destination, Message<?> message, SendCallback sendCallback):异步消息
- public TransactionSendResult sendMessageInTransaction :事务消息
destination指的是消息的目的地,格式为: topicName:tags ,发送单向消息案例如:
rocketMQTemplate.sendOneWay( "message:sms", "我是短信消息"));
同步消息是有发送结果的,同步消息发送如:
SendResult result = rocketMQTemplate.syncSend("message:sms", "我是短信消息");
//打印结果
System.out.println(result);
异步消息需要指定发送回调,SendCallback,异步消息发送如:
rocketMQTemplate.asyncSend("message:sms", "我是短信消息", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
}
}
);
消费者端
通过 RocketMQListener 监听器来监听消息,@RocketMQMessageListener注解来指定消费者组以及topic和tags。
@Slf4j
@Component
@RocketMQMessageListener(topic = "message",
selectorExpression="sms" //tags
,consumerGroup = "service-consumer"
,messageModel = MessageModel.CLUSTERING )
public class MessageConsumer implements RocketMQListener<MessageExt> {
@Override
@Transactional
public void onMessage(MessageExt message) {
String msg = new String(message.getBody(), CharsetUtil.UTF_8);
log.info("消费者 {} ",msg);
}
}
onMessage方法是自动ack消息,如果方法中出现异常,ack失败,消息将会重试消费。
事务消息
对事务的理解见上一篇《事务消息》
事务监听器
通过实现 RocketMQLocalTransactionListener 编写本地事务监听器
@Component
//订单事务组
@RocketMQTransactionListener(txProducerGroup = "TX_GROUP")
@Slf4j
public class MyTransactionListener implements RocketMQLocalTransactionListener {
//执行本地事务,返回commit事务消息才会被消费者消费,我们可以在该方法中对数据库做写操作保存本地事务
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
log.info("执行本地事务,",msg,arg);
//本地事务执行成功,返回commit
return RocketMQLocalTransactionState.COMMIT;
//本地事务执行失败,返回rollback ,事务消息不会被消费
//return RocketMQLocalTransactionState.ROLLBACK;
}
//检查本地事务状态,MQ通过该方法回查本地事务状态
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
byte[] payload = (byte[]) msg.getPayload();
//回查本地事务状态,如果成功,返回commit
return RocketMQLocalTransactionState.COMMIT;
//回查本地事务状态,如果失败,返回rollback,事务消息不会被消费
//return RocketMQLocalTransactionState.ROLLBACK;
}
}
事务消息发送方
//构建消息
Message message = MessageBuilder.withPayload("事务消息").build();
//发送下单的事务消息
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
"TX-GROUP", //事务组名字,需要和事务监听器的事务组名字一样。
"txtopic:txtags",
message, null);
if(result.getSendStatus() == SendStatus.SEND_OK){
//消息发送成功
}
这里的message会传递给事务监听器的executeLocalTransaction方法中
事务消息消费方
消费者就是普通的消费者即可
@Slf4j
@Component
@RocketMQMessageListener(topic = "txtopic",
selectorExpression="txtags" ,consumerGroup = "service-consumer"
,messageModel = MessageModel.CLUSTERING
)
public class MessageConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
String msg = new String(message.getBody(), CharsetUtil.UTF_8);
//消费消息
}
}
文章结束希望对于你有所帮助,喜欢的话给个三连
Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。
它的内容包括:
- 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
- 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
- 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
- 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
- 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
- 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
- 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
- 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw
目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:
想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询
同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。