2024-01-19
原文作者: 墨家巨子@俏如来 原文地址: https://blog.csdn.net/u014494148/article/details/120999131

前言

现在开发项目都是基于SpringBoot,新项目很少使用Spring,所以我们学习一门技术除了要会原生API,还不得不考虑和SpringBoot集成,本篇文章为SpirngBoot整合RocketMQ案例

SpringBoot集成RocketMQ

导入依赖

这里使用整合RocketMQ的基础依赖:rocketmq-spring-boot-starter

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.5.RELEASE</version>
    </parent>
    
    <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.4</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>

配置文件

    rocketmq:
      name-server: 127.0.0.1:9876 
      
      #生产者配置
      producer:
        #生产者组名字
        group: "service-producer"
        # 消息最大长度 默认 1024 * 4 (4M)
        max-message-size: 4096
        # 发送消息超时时间,默认 3000
        send-message-timeout: 3000
        # 发送消息失败重试次数,默认2
        retry-times-when-send-failed: 2
        retry-times-when-send-async-failed: 2

简单消息发送

生产者

使用 RocketMQTemplate 发送消息,使用@Autowared注入RocketMQTemplate即可使用,其中包含的方法有

  • public void sendOneWay(String destination, Object payload) :单向消息
  • public SendResult syncSend(String destination, Message<?> message) :同步消息
  • public void asyncSend(String destination, Message<?> message, SendCallback sendCallback):异步消息
  • public TransactionSendResult sendMessageInTransaction :事务消息

destination指的是消息的目的地,格式为: topicName:tags ,发送单向消息案例如:

     rocketMQTemplate.sendOneWay( "message:sms", "我是短信消息"));

同步消息是有发送结果的,同步消息发送如:

    SendResult result = rocketMQTemplate.syncSend("message:sms", "我是短信消息");
    //打印结果
    System.out.println(result);

异步消息需要指定发送回调,SendCallback,异步消息发送如:

    rocketMQTemplate.asyncSend("message:sms", "我是短信消息", new SendCallback() {
                        @Override
                        public void onSuccess(SendResult sendResult) {
                            System.out.println(sendResult);
                        }
    
                        @Override
                        public void onException(Throwable e) {
                            e.printStackTrace();
                        }
                    }
            );
消费者端

通过 RocketMQListener 监听器来监听消息,@RocketMQMessageListener注解来指定消费者组以及topic和tags。

    @Slf4j
    @Component
    @RocketMQMessageListener(topic = "message",
            selectorExpression="sms"	//tags
                    ,consumerGroup = "service-consumer"
            ,messageModel = MessageModel.CLUSTERING )
    public class MessageConsumer implements RocketMQListener<MessageExt> {
    
     
    
        @Override
        @Transactional
        public void onMessage(MessageExt message) {
            String msg = new String(message.getBody(), CharsetUtil.UTF_8);
            log.info("消费者 {} ",msg);
        }
    }

onMessage方法是自动ack消息,如果方法中出现异常,ack失败,消息将会重试消费。

事务消息

对事务的理解见上一篇《事务消息

事务监听器

通过实现 RocketMQLocalTransactionListener 编写本地事务监听器

    @Component
    //订单事务组
    @RocketMQTransactionListener(txProducerGroup = "TX_GROUP")
    @Slf4j
    public class MyTransactionListener implements RocketMQLocalTransactionListener {
    
        //执行本地事务,返回commit事务消息才会被消费者消费,我们可以在该方法中对数据库做写操作保存本地事务
        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    
           log.info("执行本地事务,",msg,arg);
    
    		//本地事务执行成功,返回commit
           return RocketMQLocalTransactionState.COMMIT;
    
    		//本地事务执行失败,返回rollback ,事务消息不会被消费
            //return RocketMQLocalTransactionState.ROLLBACK;
        }
    
        //检查本地事务状态,MQ通过该方法回查本地事务状态
        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            byte[] payload = (byte[]) msg.getPayload();
            //回查本地事务状态,如果成功,返回commit
            return RocketMQLocalTransactionState.COMMIT;
            //回查本地事务状态,如果失败,返回rollback,事务消息不会被消费
            //return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
事务消息发送方
    //构建消息
    Message message =  MessageBuilder.withPayload("事务消息").build();
    //发送下单的事务消息
    TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
          "TX-GROUP",	//事务组名字,需要和事务监听器的事务组名字一样。
          "txtopic:txtags",
          message, null);
    if(result.getSendStatus() == SendStatus.SEND_OK){
    	//消息发送成功
    }

这里的message会传递给事务监听器的executeLocalTransaction方法中

事务消息消费方

消费者就是普通的消费者即可

    @Slf4j
    @Component
    @RocketMQMessageListener(topic = "txtopic",
            selectorExpression="txtags"        ,consumerGroup = "service-consumer"
            ,messageModel = MessageModel.CLUSTERING
        )
    public class MessageConsumer implements RocketMQListener<MessageExt> {
    
       
    
        @Override
        public void onMessage(MessageExt message) {
            String msg = new String(message.getBody(), CharsetUtil.UTF_8);
            //消费消息
        }
    }

文章结束希望对于你有所帮助,喜欢的话给个三连

阅读全文