2024-04-01
原文作者:立小言先森 原文地址: https://mingyang.blog.csdn.net/article/details/103741951
1.引入依赖
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>

配置application.properties文件:

    spring.rabbitmq.virtual-host=/
    #spring.rabbitmq.host=localhost
    spring.rabbitmq.addresses=172.30.67.122:5672,172.30.67.122:5673,172.30.67.122:5674
    #spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=admin
    
    #producer
    spring.rabbitmq.publisher-confirms=true
    spring.rabbitmq.publisher-returns=true
    spring.rabbitmq.template.mandatory=true
    
    #consumer
    ##手工确认消费者消费的消息
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    ##设置Qos,即RabbitMQ服务器每次推送给消费者未ack消息的个数
    spring.rabbitmq.listener.simple.prefetch=1
2.定义队列、交换器、路由之间的绑定关系
    package com.yaomy.control.rabbitmq.amqp.config;
    
    import com.google.common.collect.Maps;
    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.Map;
    
    /**
     * @Description: RabbitMQ生产者交换器、绑定、队列声明
     * @Version: 1.0
     */
    @SuppressWarnings("all")
    @Configuration
    public class RabbitConfig {
        /**
         * 声明队列
         */
        @Bean
        public Queue topicQueue(){
            Map<String, Object> args = Maps.newHashMap();
            /**
             * 设置消息发送到队列之后多久被丢弃,单位:毫秒
             */
            //args.put("x-message-ttl", 60000);
            /**
             * 定义优先级队列,消息最大优先级为15,优先级范围为0-15,数字越大优先级越高
             */
            args.put("x-max-priority", 15);
            /**
             * 设置持久化队列
             */
            return QueueBuilder.durable("test_queue2").withArguments(args).build();
        }
    
        /**
         * 声明Topic类型交换器
         */
        @Bean
        public TopicExchange topicExchange(){
            TopicExchange exchange = new TopicExchange("test_exchange2");
            return exchange;
        }
    
        /**
         * Topic交换器和队列通过bindingKey绑定
         * @return
         */
        @Bean
        public Binding bindingTopicExchangeQueue(){
            return BindingBuilder.bind(topicQueue()).to(topicExchange()).with("*.topic.*");
        }
    }
3.定义生产者
    package com.yaomy.control.rabbitmq.amqp;
    
    import org.springframework.amqp.AmqpException;
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.connection.PublisherCallbackChannel;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.util.UUID;
    
    /**
     * @Description: RabbitMQ生产者
     * @ProjectName: spring-parent
     * @Version: 1.0
     */
    @SuppressWarnings("all")
    @Component
    public class RabbitSender {
        @Autowired
        private RabbitTemplate rabbitTemplate;
        /**
         * 创建一个消息是否投递成功的回调方法
         */
        private final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
            /**
             *
             * @param correlationData 消息的附加信息
             * @param ack true for ack, false for nack
             * @param cause 是一个可选的原因,对于nack,如果可用,否则为空。
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if(!ack){
                    //可以进行日志记录、异常处理、补偿处理等
                    System.err.println("异常ack-"+ack+",id-"+correlationData.getId()+",cause:"+cause);
                }else {
                    //更新数据库,可靠性投递机制
                    System.out.println("正常ack-"+ack+",id-"+correlationData.getId());
                    try{
                    System.out.println(new String(correlationData.getReturnedMessage().getBody()));
    
                    } catch (Exception e){
    
                    }
                }
            }
        };
        /**
         * 创建一个消息是否被队列接收的监听对象,如果没有队列接收发送出的消息,则调用此方法进行后续处理
         */
        private final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
            /**
             *
             * @param message 被退回的消息
             * @param replyCode 错误编码
             * @param replyText 错误描述
             * @param exchange 交换器
             * @param routingKey 路由
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.err.println("spring_returned_message_correlation:"+message.getMessageProperties().getHeaders().get(PublisherCallbackChannel.RETURNED_MESSAGE_CORRELATION_KEY)
                                    +"return exchange: " + exchange
                                    + ", routingKey: "+ routingKey
                                    + ", replyCode: " + replyCode
                                    + ", replyText: " + replyText
                                    + ",message:" + message);
                try {
                    System.out.println(new String(message.getBody()));
                } catch (Exception e){
    
                }
            }
        };
    
        /**
         * 发送消息
         * @param exchange 交换器
         * @param route 路由键
         * @param message 消息
         * @param properties
         */
        public void sendMsg(String exchange, String routingKey, String message, MessageProperties properties){
            try {
                if(null == properties){
                    properties = new MessageProperties();
                }
                /**
                 * 设置消息唯一标识
                 */
                properties.setMessageId(UUID.randomUUID().toString());
                /**
                 * 创建消息包装对象
                 */
                Message msg = MessageBuilder.withBody(message.getBytes()).andProperties(properties).build();
                /**
                 * 设置生产者消息publish-confirm回调函数
                 */
                this.rabbitTemplate.setConfirmCallback(confirmCallback);
                /**
                 * 设置消息退回回调函数
                 */
                this.rabbitTemplate.setReturnCallback(returnCallback);
                /**
                 * 将消息主题和属性封装在Message类中
                 */
                Message returnedMessage = MessageBuilder.withBody(message.getBytes()).build();
                /**
                 * 相关数据
                 */
                CorrelationData correlationData = new CorrelationData();
                /**
                 * 消息ID,全局唯一
                 */
                correlationData.setId(msg.getMessageProperties().getMessageId());
    
                /**
                 * 设置此相关数据的返回消息
                 */
                correlationData.setReturnedMessage(returnedMessage);
                /**
                 * 如果msg是org.springframework.amqp.core.Message对象的实例,则直接返回,否则转化为Message对象
                 */
                this.rabbitTemplate.convertAndSend(exchange, routingKey, msg, new MessagePostProcessor() {
                    /**
                     * 消息后置处理器,消息在转换成Message对象之后调用,可以用来修改消息中的属性、header
                     */
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        MessageProperties msgProperties = message.getMessageProperties();
                        /**
                         * 设置消息发送到队列之后多久被丢弃,单位:毫秒
                         * 此种方案需要每条消息都设置此属性,比较灵活;
                         * 还有一种方案是在声明队列的时候指定发送到队列中的过期时间;
                         * * Queue queue = new Queue("test_queue2");
                         * * queue.getArguments().put("x-message-ttl", 10000);
                         * 这两种方案可以同时存在,以值小的为准
                         */
                        //msgProperties.setExpiration("10000");
                        /**
                         * 设置消息的优先级
                         */
                        msgProperties.setPriority(9);
                        /**
                         * 设置消息发送到队列中的模式,持久化|非持久化(只存在于内存中)
                         */
                        msgProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                        return message;
                    }
                }, correlationData);
            } catch (Exception e){
                e.printStackTrace();
            }
        }
    }
4.消费者

@RabbitListener注解标注指定的方法监听指定的队列、也可以标注在类上结合@RabbitHandler使用;监听方法可以使用多种参数接收消息,查看源码可以看到是允许六种参数:

    * Annotated methods are allowed to have flexible signatures similar to what
     * {@link MessageMapping} provides, that is
     * <ul>
     * <li>{@link com.rabbitmq.client.Channel} to get access to the Channel</li>
     * <li>{@link org.springframework.amqp.core.Message} or one if subclass to get access to
     * the raw AMQP message</li>
     * <li>{@link org.springframework.messaging.Message} to use the messaging abstraction
     * counterpart</li>
     * <li>{@link org.springframework.messaging.handler.annotation.Payload @Payload}-annotated
     * method arguments including the support of validation</li>
     * <li>{@link org.springframework.messaging.handler.annotation.Header @Header}-annotated
     * method arguments to extract a specific header value, including standard AMQP headers
     * defined by {@link org.springframework.amqp.support.AmqpHeaders AmqpHeaders}</li>
     * <li>{@link org.springframework.messaging.handler.annotation.Headers @Headers}-annotated
     * argument that must also be assignable to {@link java.util.Map} for getting access to
     * all headers.</li>
     * <li>{@link org.springframework.messaging.MessageHeaders MessageHeaders} arguments for
     * getting access to all headers.</li>

示例代码:

    package com.yaomy.control.rabbitmq.amqp;
    
    import com.rabbitmq.client.Channel;
    import org.apache.commons.lang3.math.NumberUtils;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.MessageHeaders;
    import org.springframework.messaging.handler.annotation.Header;
    import org.springframework.messaging.handler.annotation.Headers;
    import org.springframework.messaging.handler.annotation.Payload;
    import org.springframework.stereotype.Component;
    
    import java.util.Map;
    
    /**
     * @Description: RabbitMQ消息消费者
     * @Version: 1.0
     */
    @SuppressWarnings("all")
    @Component
    public class RabbitReceiver {
        /**
         *
         * @param channel 信道
         * @param message 消息
         * @throws Exception
         */
        @RabbitListener(queues = "test_queue2")
        public void onMessage(Channel channel, Message message) throws Exception {
            System.out.println("--------------------------------------");
            System.out.println("消费端Payload: " + message.getPayload()+"-ID:"+message.getHeaders().getId()+"-messageId:"+message.getHeaders());
            Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
            //手工ACK,获取deliveryTag
            channel.basicAck(deliveryTag, false);
        }
    
        /**
         *
         * @param channel 信道
         * @param message 消息
         * @throws Exception
         */
        @RabbitListener(queues = "test_queue2")
        public void onMessage(Channel channel, org.springframework.amqp.core.Message message) throws Exception {
            System.out.println("--------------------------------------");
            System.out.println("消费端Payload: " + new String(message.getBody())+"-messageId:"+message.getMessageProperties().getMessageId());
            message.getMessageProperties().getHeaders().forEach((key, value)->{
                System.out.println("header=>>"+key+"="+value);
            });
            Long deliveryTag = message.getMessageProperties().getDeliveryTag();
            //手工ACK,获取deliveryTag
            channel.basicAck(deliveryTag, false);
        }
    
        /**
         *
         * @param channel 信道
         * @param body 负载
         * @param amqp_messageId 消息唯一标识
         * @param headers 消息header
         * @throws Exception
         */
        //获取特定的消息
        @RabbitListener(queues = "test_queue2")
        //@RabbitHandler
        public void handleMessage(Channel channel, @Payload byte[] body, @Header String amqp_messageId,  @Headers Map<String, Object> headers) throws Exception{
            System.out.println("====消费消息===amqp_messageId:"+amqp_messageId);
            headers.keySet().forEach((key)->{
                System.out.println("header=>>"+key+"="+headers.get(key));
            });
            System.out.println(new String(body));
            Long deliveryTag = NumberUtils.toLong(headers.get("amqp_deliveryTag").toString());
            /**
             * 手动Ack
             */
            channel.basicAck(deliveryTag, false);
        }
    
        /**
         *
         * @param channel 信道
         * @param body 负载
         * @param headers 消息header
         * @throws Exception
         */
        @RabbitListener(queues = "test_queue2")
        //@RabbitHandler
        public void handleMessage(Channel channel, @Payload byte[] body, MessageHeaders headers) throws Exception{
            System.out.println("====消费消息===amqp_messageId:"+headers);
            headers.keySet().forEach((key)->{
                System.out.println("header=>>"+key+"="+headers.get(key));
            });
            System.out.println(new String(body));
            Long deliveryTag = NumberUtils.toLong(headers.get("amqp_deliveryTag").toString());
            /**
             * 手动Ack
             */
            channel.basicAck(deliveryTag, false);
        }
    }

使用MessageHeaders参数类型接收header示例如下:

    header=>>amqp_receivedDeliveryMode=PERSISTENT
    header=>>amqp_receivedExchange=test_exchange2
    header=>>amqp_deliveryTag=1
    header=>>amqp_consumerQueue=test_queue2
    header=>>amqp_redelivered=false
    header=>>priority=9
    header=>>amqp_receivedRoutingKey=test.topic.key
    header=>>number=12345
    header=>>spring_listener_return_correlation=34b20ba3-9fb0-49e5-b57e-5484e773e9b3
    header=>>send_time=2019-12-27 16:08:55
    header=>>spring_returned_message_correlation=cba66847-5c31-4ecb-bf97-07ba65238b9a
    header=>>amqp_messageId=cba66847-5c31-4ecb-bf97-07ba65238b9a
    header=>>id=b58e6adb-e5b2-9513-45c5-8376311cf82f
    header=>>amqp_consumerTag=amq.ctag-hPKHjdTdj0NY4awa4bFkAA
    header=>>contentType=application/octet-stream
    header=>>timestamp=1577434135312

其headers中有一个id属性,这个属性是消费者接收消息new MessageHeaders的时候生成的随机UUID,不可以作为整个系统中消息的唯一标识,只可以作为消费端的唯一标识。

GitHub地址:https://github.com/mingyang66/spring-parent

阅读全文