2023-03-11
原文作者:柒's Blog 原文地址:https://blog.52itstyle.vip/archives/1304/

202303111616093911.png

前情提要

上一篇文章,我们为了解决实际场景中遇到的问题,使得其更像一个安全高效的邮件服务平台,我们引入了 LinkedBlockingQueue 队列对邮件发送进行流量削锋、间隔发送以及重复内容检测。

当然,文章末尾也就此方案提出了几点疑问,就比如邮件服务挂了,队列还没消费完怎么办?

怎么办?怎么办?还能怎么办,等着被老板扣工资吧!!!

202303111616099902.png

有没有一种想屎的感觉的?

解决方案

由于LinkedBlockingQueue是进程内的队列,如果容器无情的挂掉以后,随之队列中的内容也便荡然无存。

其实你也可以不用去屎,山人自有妙计。

这里给大家介绍一款进程外的队列实现,redis,对没错就是有些人熟悉 有些人陌生的 NoSql数据库。

代码案例

pom.xml 引入以下依赖:

    <!-- spring-boot-starter-redis -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-redis</artifactId>
        <version>1.4.7.RELEASE</version>
    </dependency>

定义接口(部分代码)

         public void sendRedisQueue(Email mail) throws Exception;

定义实现(部分代码)

    @Override
        public void sendRedisQueue(Email mail) throws Exception {
            redisTemplate.convertAndSend("mail",mail);
        }

重写CachingConfigurerSupport

    import java.lang.reflect.Method;
    
    import org.springframework.cache.CacheManager;
    import org.springframework.cache.annotation.CachingConfigurerSupport;
    import org.springframework.cache.annotation.EnableCaching;
    import org.springframework.cache.interceptor.KeyGenerator;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.cache.RedisCacheManager;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
    
    import com.fasterxml.jackson.databind.ObjectMapper;
    
    @Configuration
    @EnableCaching
    public class RedisConfig extends CachingConfigurerSupport {
        /**
         * 自定义key(消息队列 暂时用不到 自行忽略)
         * 此方法将会根据类名+方法名+所有参数的值生成唯一的一个key,即使@Cacheable中的value属性一样,key也会不一样。
         * @Author  科帮网
         * @return 
         * @Date    2017年8月13日
         * 更新日志
         * 2017年8月13日  科帮网 首次创建
         *
         */
        @Bean
        public KeyGenerator keyGenerator() {
            return new KeyGenerator() {
                @Override
                public Object generate(Object target, Method method,
                        Object... params) {
                    StringBuilder sb = new StringBuilder();
                    sb.append(target.getClass().getName());
                    sb.append(method.getName());
                    for (Object obj : params) {
                        sb.append(obj.toString());
                    }
                    return sb.toString();
                }
            };
        }
        /**
         * 缓存管理器
         * @Author  科帮网
         * @param redisTemplate
         * @return  CacheManager
         * @Date    2017年8月13日
         * 更新日志
         * 2017年8月13日  科帮网 首次创建
         */
        @SuppressWarnings("rawtypes")
        @Bean
        public CacheManager cacheManager(RedisTemplate redisTemplate) {
            return new RedisCacheManager(redisTemplate);
        }
        /**
         * 序列化Java对象
         * @Author  科帮网
         * @param factory
         * @return  RedisTemplate<String,String>
         * @Date    2017年8月13日
         * 更新日志
         * 2017年8月13日  科帮网 首次创建
         *
         */
        @Bean
        public RedisTemplate<String, String> redisTemplate(
                RedisConnectionFactory factory) {
            StringRedisTemplate template = new StringRedisTemplate(factory);
            setSerializer(template); //使用Jackson序列化
            template.afterPropertiesSet();
            return template;
        }
        @SuppressWarnings({ "rawtypes", "unchecked" })
        private void setSerializer(StringRedisTemplate template) {
            Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(
                    Object.class);
            ObjectMapper om = new ObjectMapper();
            //om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            //om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
            jackson2JsonRedisSerializer.setObjectMapper(om);
            template.setValueSerializer(jackson2JsonRedisSerializer);
        }
    }

配置RedisListener监听

    import java.util.concurrent.CountDownLatch;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.data.redis.listener.PatternTopic;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
    import org.springframework.stereotype.Component;
    /**
     * 注意 扫描监听 否则无法接收消息
     * 创建者 科帮网
     * 创建时间    2017年8月13日
     *
     */
    @Component
    public class RedisListener {
        private static final Logger LOGGER = LoggerFactory.getLogger(RedisListener.class);
        @Bean
        RedisMessageListenerContainer container(
                RedisConnectionFactory connectionFactory,
                MessageListenerAdapter listenerAdapter) {
            LOGGER.info("启动监听"); 
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.addMessageListener(listenerAdapter, new PatternTopic("mail"));
            return container;
        }
    
        @Bean
        MessageListenerAdapter listenerAdapter(Receiver receiver) {
            return new MessageListenerAdapter(receiver, "receiveMessage");
        }
    
        @Bean
        Receiver receiver(CountDownLatch latch) {
            return new Receiver(latch);
        }
        
        @Bean
        CountDownLatch latch() {
            return new CountDownLatch(1);
        }
    
        @Bean
        StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
            return new StringRedisTemplate(connectionFactory);
        }
    }

定义Receiver接收着

    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    
    import com.fasterxml.jackson.core.JsonParseException;
    import com.fasterxml.jackson.databind.JsonMappingException;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.itstyle.mail.model.Email;
    import com.itstyle.mail.service.IMailService;
    
    public class Receiver {
        private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
        @Autowired
        private IMailService mailService;
        private CountDownLatch latch;
    
        @Autowired
        public Receiver(CountDownLatch latch) {
            this.latch = latch;
        }
    
        public void receiveMessage(String message) {
            LOGGER.info("接收email消息 <{}>",message);
            if(message == null){
                LOGGER.info("接收email消息 <" + null + ">");
            }else {
                ObjectMapper mapper = new ObjectMapper();  
                try {
                    Email email = mapper.readValue(message, Email.class);
                    mailService.send(email);
                    LOGGER.info("接收email消息内容 <{}>",email.getContent());
                } catch (JsonParseException e) {
                    e.printStackTrace();
                } catch (JsonMappingException e) {
                    e.printStackTrace();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            latch.countDown();
        }
    }

SpringbootMailApplication测试

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.ComponentScan;
    import org.springframework.context.annotation.ImportResource;
    
    import com.itstyle.mail.model.Email;
    import com.itstyle.mail.service.IMailService;
    
    @SpringBootApplication
    @ComponentScan(basePackages={"com.itstyle.mail"})
    @ImportResource({"classpath:spring-context-dubbo.xml","classpath:spring-context-task.xml"})
    public class SpringbootMailApplication implements CommandLineRunner {
        @Autowired
        private IMailService mailService;
        public static void main(String[] args) {
            SpringApplication.run(SpringbootMailApplication.class, args);
        }
    
        @Override
        public void run(String... args) {
            try {
                Email mail = new Email();
                mail.setEmail(new String[]{"345849402@qq.com"});
                mail.setSubject("你个小逗比");
                mail.setContent("科帮网欢迎您");
                mail.setTemplate("welcome");
                for(int i=0;i<1000;i++){
                    mailService.sendRedisQueue(mail);
                }
                
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

详细代码案例:码云

阅读全文