Spring事务同步机制扩展解决加锁和MQ发送数据同步问题

 2023-02-10
原文作者:wf2311 原文地址:https://juejin.cn/post/7004780487264698375

问题场景

在Spring事务中发送MQ或进行加锁操作,如果在事务还未结束时,MQ就已经发送出去或锁已经被释放掉,导致数据不一致性问题

场景一

        @Transactional(propagation = Propagation.REQUIRES_NEW)
        public Order save(Order order) {
            orderRepository.save(order);
            
            //发送mq
            orderSender.send(order);
            
            //do something
            
            return order;
        }

上述代码中,在保存订单后立即发送MQ,但此时事务还没提交,如果do something的后续逻辑耗时较久,就会有几率,MQ消费者在接收到消息后,生产者的事务还未提交,假设消费者此时要到数据库反查order,就会查不到;

场景二

        @Transactional(propagation = Propagation.REQUIRES_NEW)
        @Override
        public void seckill() {
            lock.lock();
            try {
                Integer exists = orderRepository.countAllBy();
                log.info("当前库存:" + (limit - exists));
                if (exists >= limit) {
                    log.warn("下单失败了");
                    FAIL.increment();
                    return;
                }
    
                Order order = new Order();
                order.setPrice(123L);
                order.setCreatedOn(LocalDateTime.now());
                orderRepository.save(order);
                log.info(" 下单成功了");
                SUCCESS.increment();
            } finally {
                lock.unlock();
            }
        }

上述代码中,虽然解锁操作lock.unlock()是放在在finally里面执行的,但实际上在使用事务时,经过AOP代理后,lock.unlock()会早于事务提交执行,因此还是存在并发问题。

解决方案

要解决上述的问题,本质上就是要让事务或解锁操作,在事务提交后再执行,可以使用手动提交事务方式,直接后置这些操作,但此种实现起来对业务代码侵入性较大,不太优雅。

这里我将介绍另一种方式——通过扩展Spring事务的同步机制来解决此问题:

Spring事务的同步机制

Spring事务的同步机制的具体原理这里就不做详细介绍了,具体可以参考blog.csdn.net/f641385712/…这篇文章

简单来说,我们可以通过自定义TransactionSynchronization,然后通过在事务方法中使用TransactionSynchronizationManager.registerSynchronization(synchronization)方法来注册事务同步器,在事务完成前后的进行相关逻辑操作;

接口TransactionSynchronization的定义如下:

    public interface TransactionSynchronization extends Ordered, Flushable {
    
       /** Completion status in case of proper commit. */
       int STATUS_COMMITTED = 0;
    
       /** Completion status in case of proper rollback. */
       int STATUS_ROLLED_BACK = 1;
    
       /** Completion status in case of heuristic mixed completion or system errors. */
       int STATUS_UNKNOWN = 2;
    
       
       @Override
       default int getOrder() {
          return Ordered.LOWEST_PRECEDENCE;
       }
    
    
       default void suspend() {
       }
       
       default void resume() {
       }
       
       @Override
       default void flush() {
       }
       
       default void beforeCommit(boolean readOnly) {
       }
       
       default void beforeCompletion() {
       }
       
       default void afterCommit() {
       }
    
       default void afterCompletion(int status) {
       }
    }

例如,对之前场景一的代码进行改写:

    
        @Transactional(propagation = Propagation.REQUIRES_NEW)
        public Order save(Order order) {
            orderRepository.save(order);
    
            //发送mq
            TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
                @Override
                public void afterCommit() {
                    orderSender.send(order);
                }
            });
    
            //do something
    
            return order;
        }

我们注册了一个自定义的事务同步器,其中:

    public void afterCommit() {
        orderSender.send(order);
    }

即表示在事务完成提交后进行MQ消息发送

进一步封装

了解了Spring的事务同步机制,我们就可以对此进行一下封装

  • 函数式接口:要执行方法
    @FunctionalInterface
    public interface Action {
    
        /**
         * 执行方法
         */
        void execute();
    
    }
  • 枚举类:定义需要在事务处于何种状态时进行操作
    @Getter
    @AllArgsConstructor
    public enum ActionExecuteState {
        /**
         * 事务提交时执行Action
         */
        WHEN_COMMITTED(TransactionSynchronization.STATUS_COMMITTED),
        /**
         * 事务回滚时执行Action
         */
        WHEN_ROLL_BACK(TransactionSynchronization.STATUS_ROLLED_BACK),
        /**
         * 事务状态未知时执行Action
         */
        WHEN_UNKNOWN(TransactionSynchronization.STATUS_UNKNOWN),
        /**
         * 不管事务状态为何值下都会执行Action
         */
        WHEN_ALL(null);
        private Integer state;
    
        public static ActionExecuteState getByState(Integer state) {
            if (state == null) {
                return WHEN_ALL;
            }
            for (ActionExecuteState e : ActionExecuteState.values()) {
                if (Objects.equals(e.state, state)) {
                    return e;
                }
            }
            return null;
        }
    }
  • 事务状态操作封装类
    public class ActionWrapper {
        /**
         * Map:<事务状态,操作> 
         */
        private Map<ActionExecuteState, List<Action>> actionMap;
    
        public ActionWrapper(@NotNull Map<ActionExecuteState, List<Action>> actionMap) {
            assert actionMap != null;
            this.actionMap = actionMap;
        }
    
        public Map<ActionExecuteState, List<Action>> getActionMap() {
            return actionMap;
        }
    }
  • 后置的数据库事务同步处理器
    @Slf4j
    public class PostActionTransactionSynchronizationHandler {
        /**
         * Supplier:用于判断当前是否出去活动事务中,默认为 {@code TransactionSynchronizationManager::isActualTransactionActive}
         */
        private final Supplier<Boolean> transactionActiveSupplier;
    
        /**
         * 通过ThreadLocal缓存写操作方法
         */
        private final ThreadLocal<ActionWrapper> cache = ThreadLocal.withInitial(() -> null);
        /**
         * 缓存当前线程中
         */
        private final ThreadLocal<Boolean> initSign = ThreadLocal.withInitial(() -> null);
    
        private PostActionTransactionSynchronization synchronization;
    
        public PostActionTransactionSynchronizationHandler(Supplier<Boolean> transactionActiveSupplier) {
            this.transactionActiveSupplier = transactionActiveSupplier;
        }
    
        public PostActionTransactionSynchronizationHandler() {
            this(TransactionSynchronizationManager::isActualTransactionActive);
        }
    
        public void setSynchronization(PostActionTransactionSynchronization synchronization) {
            this.synchronization = synchronization;
        }
    
        /**
         * 判断当前是否出去活动事务中
         *
         * @return
         */
        public boolean isActualTransactionActive() {
            return transactionActiveSupplier.get();
        }
    
        protected List<Action> getActions(ActionExecuteState state) {
            ActionWrapper wrapper = cache.get();
            if (wrapper == null || state == null) {
                return Collections.emptyList();
            }
            Map<ActionExecuteState, List<Action>> map = wrapper.getActionMap();
            if (map == null) {
                return Collections.emptyList();
            }
            return map.get(state);
        }
    
    
        /**
         * 添加一个事务后操作方法
         *
         * @param action 操作方法
         * @param state  action要运行在的事务状态
         */
        public synchronized void addAction(Action action, ActionExecuteState state) {
            registerSynchronizationIfNeed();
            ActionWrapper wrapper = cache.get();
            if (wrapper == null) {
                wrapper = new ActionWrapper(new HashMap<>(4));
                cache.set(wrapper);
            }
            Map<ActionExecuteState, List<Action>> map = wrapper.getActionMap();
            List<Action> actions = map.computeIfAbsent(state, s -> new ArrayList<>());
            actions.add(action);
        }
    
        private void registerSynchronizationIfNeed() {
            Boolean flag = initSign.get();
            if (flag == null || !flag) {
                TransactionSynchronizationManager.registerSynchronization(synchronization);
                initSign.set(true);
            }
        }
    
        /**
         * 清理缓存
         */
        public void clear() {
            cache.remove();
        }
    
        public void doAfterAfterCompletion(int stateCode) {
            try {
                ActionExecuteState state = ActionExecuteState.getByState(stateCode);
                if (state != null) {
                    executeActionByState(state);
                }
                executeActionByState(ActionExecuteState.WHEN_ALL);
            } finally {
                this.clear();
            }
        }
    
        private void executeActionByState(ActionExecuteState state) {
            if (state == null) {
                return;
            }
            try {
                List<Action> actions = getActions(state);
                if (CollectionUtils.isEmpty(actions)) {
                    return;
                }
                if (log.isInfoEnabled()) {
                    log.info("begin execute action with state={}", state);
                }
                for (Action action : actions) {
                    if (log.isDebugEnabled()) {
                        log.debug("execute {}", action.toString());
                    }
                    action.execute();
                }
            } catch (Exception e) {
                log.error("execute actions after transaction completion fail,state=" + state, e);
            }
        }
    }
  • 后置的数据库事务同步器
    @Slf4j
    public class PostActionTransactionSynchronization implements TransactionSynchronization {
        private PostActionTransactionSynchronizationHandler handler;
    
        public PostActionTransactionSynchronization(PostActionTransactionSynchronizationHandler handler) {
            this.handler = handler;
        }
    
        @Override
        public int getOrder() {
            return Ordered.HIGHEST_PRECEDENCE;
        }
    
        @Override
        public void afterCompletion(int status) {
            handler.doAfterAfterCompletion(status);
        }
    }

使用方式:

  • 定义bean
    @Configuration
    public class TransactionConfiguration {
    
        @Bean
        public PostActionTransactionSynchronizationHandler postDataSourceTransactionHandler() {
            return new PostActionTransactionSynchronizationHandler();
        }
    
        @Bean
        public PostActionTransactionSynchronization postActionTransactionSynchronization(PostActionTransactionSynchronizationHandler handler) {
            PostActionTransactionSynchronization synchronization = new PostActionTransactionSynchronization(handler);
            handler.setSynchronization(synchronization);
            return synchronization;
        }
    
    }

代码改写

  • 场景一
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public Order save(Order order) {
        orderRepository.save(order);
        
        //发送mq
        //注释orderSender.send(order); 
        //ActionExecuteState.WHEN_COMMITTED 表示只有当事务成功提交时才执行lock.unlock()
        postActionTransactionSynchronizationHandler.addAction(lock::unlock, ActionExecuteState.WHEN_COMMITTED)
        
        //do something
        
        return order;
    }
  • 场景二
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    @Override
    public void seckill() {
        lock.lock();
        try {
            Integer exists = orderRepository.countAllBy();
            log.info("当前库存:" + (limit - exists));
            if (exists >= limit) {
                log.warn("下单失败了");
                FAIL.increment();
                return;
            }
    
            Order order = new Order();
            order.setPrice(123L);
            order.setCreatedOn(LocalDateTime.now());
            orderRepository.save(order);
            log.info(" 下单成功了");
            SUCCESS.increment();
        } finally {
            //注释lock.unlock() 
            //ActionExecuteState.WHEN_ALL 表示只要事务结束不管是回滚还是提交都会执行lock.unlock()
            postActionTransactionSynchronizationHandler.addAction(lock::unlock, ActionExecuteState.WHEN_ALL)
        }
    }

示例项目地址

github.com/wf2311/spri…

可以参考本项目的 src/test/java目录查看具体测试用例和用法

参考

  1. 当 Transactional 碰到锁,有个大坑!
  2. 使用Spring的事务同步机制解决:数据库刚插入的记录却查询不到的问题