盘点TCC-Transaction:Debug手册

这是我参与更文挑战的第1天,活动详情查看: 更文挑战

总文档 :文章目录
Github : github.com/black-ant

一 . 前言

文章目的 :

  • 基于 Feign 的 TCC-Transaction 案例
  • 梳理 TCC-Transaction 的 主流程及 Debug 方向

TCC-Transaction 背景简介 :

TCC 模型

TCC 模型是完全依赖于业务处理的分布式事务模型 , 他将一个 [大事务] 通过代码逻辑分解为 [多个小事务] , TCC 模型同样是 2PC (两阶段提交) 的实现之一.

TCC 的操作可以分为3个阶段 : Try / Confirm / Cancel

  • Try: 业务的主要处理 , 但仅进行初期操作 (例如订单生成)
    • 尝试执行业务
    • 所有子事务完成所有业务检查(一致性)
    • 锁定资源 , 预留必须业务资源(准隔离性)
  • Confirm: 对 Try 操作的一个补充,逐个执行Try操作指定的Confirm操作
    • 真正执行业务,不作任何业务检查
    • 只使用Try阶段预留的业务资源 , 扣除具体的资源
    • Confirm 操作满足幂等性
  • Cancel: 对Try操作的一个回撤
    • 取消执行业务
    • 释放Try阶段预留的业务资源, 业务上的回退
    • Cancel操作满足幂等性

TCC Module.jpg

TCC 的优缺点

优点:

  • 由业务方自行控制事务的范围
  • 自如的控制数据库粒度处理 , 降低锁冲突
  • 业务设计合理 , 可以大大提高吞吐量
  • 代码配置简单 , 无需太多配置 , 集成方式便利

缺点 :

  • 业务侵入大 , 耦合强 , 迁移及改造成本大
  • 设计难度大
  • 对于回滚的处理困难
  • 为了满足一致性的要求,confirm和cancel接口必须实现幂等

二 . TCC-Transaction 案例

官方提供过一个基于 Dubbo 的处理案例 , 本案例是基于 Feign 进行 RestAPI 调用的 , 其核心原理其实是一致的.

为了后文分析时更加清楚 , 首先看一下案例源码 :

业务模块 :

  • Order : 订单服务
  • Capital : 账户服务
  • RedPacket : 红包服务

订单支付后 , 扣除账户余额和红包 . 当红包余额不足时 , 发起回退

前期配置

TODO : TCC 的灵活配置

2.1 Order 服务

Order 主流程 : 发起整个事务及接口调用逻辑

  • makePayment : 生成 Order 订单 , 调用红包及账户服务扣除余额
  • confirmMakePayment
  • cancelMakePayment : 确定后修改状态
@Service
public class PaymentServiceImpl {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    TradeOrderServiceProxy tradeOrderServiceProxy;

    @Autowired
    OrderRepository orderRepository;


    /**
     * 此处考虑后应该是要去掉小事务管理的的 (@Transaction) <br>
     * 原因 : 如果此处存在事务 , 分布式应用上抛出异常 , 则回导致该事务回滚
     * <p>
     * 但是!!! 你可能这个时候会想 , 既然出现异常这个类会回滚 , 那不相当于分布式实现了吗 , 为什么还要加个框架处理
     * <p>
     * 原因 : 此处如果考虑红包的逻辑就对了 , 这个场景实际上为 : 余额足够 ,但是红包不够的情况!!!
     *
     * @param order
     * @param redPacketPayAmount
     * @param capitalPayAmount
     */
    @Compensable(confirmMethod = "confirmMakePayment", cancelMethod = "cancelMakePayment", asyncConfirm = true)
    public void makePayment(Order order, BigDecimal redPacketPayAmount, BigDecimal capitalPayAmount) {


        logger.info("order try make payment called.time seq:" + DateFormatUtils.format(Calendar.getInstance(), "yyyy-MM-dd HH:mm:ss"));
        logger.info("------> 开始事务管理 : 红包支付金额 [{}] / 账户支付余额 [{}] <-------", redPacketPayAmount, capitalPayAmount);
        //check if the order status is DRAFT, if no, means that another call makePayment for the same order happened, ignore this call makePayment.
        if (order.getStatus().equals("DRAFT")) {
            order.pay(redPacketPayAmount, capitalPayAmount);
            try {
                orderRepository.updateOrder(order);
            } catch (OptimisticLockingFailureException e) {
                logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
            }
        }

        logger.info("------> 订单处理完成 :[{}] <-------", order.getId());
        String result = tradeOrderServiceProxy.record(null, buildCapitalTradeOrderDto(order));
        logger.info("------> 余额消费完成 :[{}] <-------", result);
        String result2 = tradeOrderServiceProxy.record(null, buildRedPacketTradeOrderDto(order));
        logger.info("------> 红包消费完成 :[{}] <-------", result2);
    }

    public void confirmMakePayment(Order order, BigDecimal redPacketPayAmount, BigDecimal capitalPayAmount) {

        logger.warn("------> [进入 PayConfirm 流程] <-------");
        logger.warn("order confirm make payment called. time seq:" + DateFormatUtils.format(Calendar.getInstance(), "yyyy-MM-dd HH:mm:ss"));

        Order foundOrder = orderRepository.findByMerchantOrderNo(order.getMerchantOrderNo());

        //check order status, only if the status equals DRAFT, then confirm order
        if (foundOrder != null && foundOrder.getStatus().equals("PAYING")) {
            order.confirm();
            orderRepository.updateOrder(order);
        }
    }

    public void cancelMakePayment(Order order, BigDecimal redPacketPayAmount, BigDecimal capitalPayAmount) {

        logger.error("------> [进入 cancel 流程] <-------");
        logger.error("order cancel make payment called.time seq:" + DateFormatUtils.format(Calendar.getInstance(), "yyyy-MM-dd HH:mm:ss"));

        Order foundOrder = orderRepository.findByMerchantOrderNo(order.getMerchantOrderNo());
        logger.info("------> [cancel 流程处理 Order :{}] <-------", JSONObject.toJSONString(foundOrder));

        if (foundOrder != null && foundOrder.getStatus().equals("PAYING")) {
            order.cancelPayment();
            orderRepository.updateOrder(order);
        }
    }


    private CapitalTradeOrderDto buildCapitalTradeOrderDto(Order order) {

        CapitalTradeOrderDto tradeOrderDto = new CapitalTradeOrderDto();
        tradeOrderDto.setAmount(order.getCapitalPayAmount());
        tradeOrderDto.setMerchantOrderNo(order.getMerchantOrderNo());
        tradeOrderDto.setSelfUserId(order.getPayerUserId());
        tradeOrderDto.setOppositeUserId(order.getPayeeUserId());
        tradeOrderDto.setOrderTitle(String.format("order no:%s", order.getMerchantOrderNo()));

        return tradeOrderDto;
    }

    private RedPacketTradeOrderDto buildRedPacketTradeOrderDto(Order order) {
        RedPacketTradeOrderDto tradeOrderDto = new RedPacketTradeOrderDto();
        tradeOrderDto.setAmount(order.getRedPacketPayAmount());
        tradeOrderDto.setMerchantOrderNo(order.getMerchantOrderNo());
        tradeOrderDto.setSelfUserId(order.getPayerUserId());
        tradeOrderDto.setOppositeUserId(order.getPayeeUserId());
        tradeOrderDto.setOrderTitle(String.format("order no:%s", order.getMerchantOrderNo()));

        return tradeOrderDto;
    }
}


复制代码

Capital , Red 远程调用

@Component
public class TradeOrderServiceProxy {

    @Autowired
    CapitalTradeOrderService capitalTradeOrderService;

    @Autowired
    RedPacketTradeOrderService redPacketTradeOrderService;

    @Compensable(propagation = Propagation.SUPPORTS, confirmMethod = "record", cancelMethod = "record", transactionContextEditor = MethodTransactionContextEditor.class)
    public String record(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {
        return capitalTradeOrderService.record(new TransactionEntity<>(transactionContext, tradeOrderDto));
    }

    @Compensable(propagation = Propagation.SUPPORTS, confirmMethod = "record", cancelMethod = "record", transactionContextEditor = MethodTransactionContextEditor.class)
    public String record(TransactionContext transactionContext, RedPacketTradeOrderDto tradeOrderDto) {
        return redPacketTradeOrderService.record(new TransactionEntity<>(transactionContext, tradeOrderDto));
    }
}

复制代码

2.2 Capital 账户处理余额

@Service
public class CapitalTradeOrderServiceImpl {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    CapitalAccountRepository capitalAccountRepository;

    @Autowired
    TradeOrderRepository tradeOrderRepository;

    /**
     * Step 1 : @Transactional 保证小事务的执行 , 避免余额反复添加
     *
     * @param transactionContext
     * @param tradeOrderDto
     * @return
     */
    @Compensable(confirmMethod = "confirmRecord", cancelMethod = "cancelRecord", transactionContextEditor = MethodTransactionContextEditor.class)
    @Transactional
    public String record(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {

        logger.info("capital try record called. time seq:" + DateFormatUtils.format(Calendar.getInstance(), "yyyy-MM-dd HH:mm:ss"));

        TradeOrder foundTradeOrder = tradeOrderRepository.findByMerchantOrderNo(tradeOrderDto.getMerchantOrderNo());

        //check if trade order has been recorded, if yes, return success directly.
        if (foundTradeOrder == null) {

            TradeOrder tradeOrder = new TradeOrder(
                    tradeOrderDto.getSelfUserId(),
                    tradeOrderDto.getOppositeUserId(),
                    tradeOrderDto.getMerchantOrderNo(),
                    tradeOrderDto.getAmount()
            );

            try {
                tradeOrderRepository.insert(tradeOrder);

                CapitalAccount transferFromAccount = capitalAccountRepository.findByUserId(tradeOrderDto.getSelfUserId());

                transferFromAccount.transferFrom(tradeOrderDto.getAmount());

                capitalAccountRepository.save(transferFromAccount);

                logger.info("------> 账户余额处理完成 , 现余额 [{}] <-------", JSONObject.toJSONString(transferFromAccount));

            } catch (DataIntegrityViolationException e) {
                logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
            }
        }

        return "success";
    }

    @Transactional
    public void confirmRecord(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {

        logger.warn("capital confirm record called. time seq:" + DateFormatUtils.format(Calendar.getInstance(), "yyyy-MM-dd HH:mm:ss"));
        TradeOrder tradeOrder = tradeOrderRepository.findByMerchantOrderNo(tradeOrderDto.getMerchantOrderNo());

        //check if the trade order status is DRAFT, if yes, return directly, ensure idempotency.
        if (null != tradeOrder && "DRAFT".equals(tradeOrder.getStatus())) {
            tradeOrder.confirm();
            tradeOrderRepository.update(tradeOrder);

            CapitalAccount transferToAccount = capitalAccountRepository.findByUserId(tradeOrderDto.getOppositeUserId());

            transferToAccount.transferTo(tradeOrderDto.getAmount());

            capitalAccountRepository.save(transferToAccount);
        }
    }

    @Transactional
    public void cancelRecord(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {

        logger.error("capital cancel record called. time seq:" + DateFormatUtils.format(Calendar.getInstance(), "yyyy-MM-dd HH:mm:ss"));
        TradeOrder tradeOrder = tradeOrderRepository.findByMerchantOrderNo(tradeOrderDto.getMerchantOrderNo());

        //check if the trade order status is DRAFT, if yes, return directly, ensure idempotency.
        if (null != tradeOrder && "DRAFT".equals(tradeOrder.getStatus())) {
            tradeOrder.cancel();
            tradeOrderRepository.update(tradeOrder);

            CapitalAccount capitalAccount = capitalAccountRepository.findByUserId(tradeOrderDto.getSelfUserId());

            capitalAccount.cancelTransfer(tradeOrderDto.getAmount());

            capitalAccountRepository.save(capitalAccount);
        }
    }
}

复制代码

2.3 RedPacket 红包处理

  • record : 创建红包订单 , 扣除金额
  • confirmRecord : 更新订单状态 , 扣除账户
  • cancelRecord :
@Service
public class RedPacketTradeOrderServiceImpl {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    RedPacketAccountRepository redPacketAccountRepository;

    @Autowired
    TradeOrderRepository tradeOrderRepository;

    @Compensable(confirmMethod = "confirmRecord", cancelMethod = "cancelRecord", transactionContextEditor = MethodTransactionContextEditor.class)
    @Transactional
    public String record(TransactionContext transactionContext, RedPacketTradeOrderDto tradeOrderDto) {

        logger.info("red packet try record called. time seq:" + DateFormatUtils.format(Calendar.getInstance(), "yyyy-MM-dd HH:mm:ss"));

        TradeOrder foundTradeOrder = tradeOrderRepository.findByMerchantOrderNo(tradeOrderDto.getMerchantOrderNo());

        if (foundTradeOrder == null) {

            TradeOrder tradeOrder = new TradeOrder(
                    tradeOrderDto.getSelfUserId(),
                    tradeOrderDto.getOppositeUserId(),
                    tradeOrderDto.getMerchantOrderNo(),
                    tradeOrderDto.getAmount()
            );

            try {
                tradeOrderRepository.insert(tradeOrder);

                RedPacketAccount transferFromAccount = redPacketAccountRepository.findByUserId(tradeOrderDto.getSelfUserId());

                transferFromAccount.transferFrom(tradeOrderDto.getAmount());

                redPacketAccountRepository.save(transferFromAccount);

                logger.info("------> [] <-------");

            } catch (DataIntegrityViolationException e) {
                logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
            }
        }

        return "success";
    }

    @Transactional
    public void confirmRecord(TransactionContext transactionContext, RedPacketTradeOrderDto tradeOrderDto) {

        logger.warn("red packet confirm record called. time seq:" + DateFormatUtils.format(Calendar.getInstance(), "yyyy-MM-dd HH:mm:ss"));

        TradeOrder tradeOrder = tradeOrderRepository.findByMerchantOrderNo(tradeOrderDto.getMerchantOrderNo());

        if (null != tradeOrder && "DRAFT".equals(tradeOrder.getStatus())) {
            tradeOrder.confirm();
            tradeOrderRepository.update(tradeOrder);

            RedPacketAccount transferToAccount = redPacketAccountRepository.findByUserId(tradeOrderDto.getOppositeUserId());

            transferToAccount.transferTo(tradeOrderDto.getAmount());

            redPacketAccountRepository.save(transferToAccount);
        }
    }

    @Transactional
    public void cancelRecord(TransactionContext transactionContext, RedPacketTradeOrderDto tradeOrderDto) {

        logger.error("red packet cancel record called. time seq:" + DateFormatUtils.format(Calendar.getInstance(), "yyyy-MM-dd HH:mm:ss"));

        TradeOrder tradeOrder = tradeOrderRepository.findByMerchantOrderNo(tradeOrderDto.getMerchantOrderNo());

        if (null != tradeOrder && "DRAFT".equals(tradeOrder.getStatus())) {
            tradeOrder.cancel();
            tradeOrderRepository.update(tradeOrder);

            RedPacketAccount capitalAccount = redPacketAccountRepository.findByUserId(tradeOrderDto.getSelfUserId());

            capitalAccount.cancelTransfer(tradeOrderDto.getAmount());

            redPacketAccountRepository.save(capitalAccount);
        }
    }
}

复制代码

2.4 流程总结

流程源码可以看 @项目源码

三 . 源码分析

3.1 TCC 成员梳理

  • 事务 : ( Transaction )
  • 事务ID对象 : ( TransactionXid )
  • 事务状态对象 : ( TransactionStatus )
  • 事务类型对象 : ( TransactionType )

  • 参与者 : ( org.mengyun.tcctransaction.Participant )
  • 事务管理器 : TransactionManager
  • 事务恢复配置器 : RecoverConfig
  • 事务恢复处理器 : TransactionRecovery
  • 默认事务恢复配置实现 : DefaultRecoverConfig
  • 事务恢复定时任务 : RecoverScheduledJob
  • 事务恢复处理器 : TransactionRecovery
  • 事务恢复处理器 : TransactionRecovery

3.2 TCC 流程快查

事务的主要流程如下所示 :

  • 发起根事务 : MethodType.ROOT / begin / registerTransaction
  • 传播发起分支事务 : MethodType.PROVIDER / try / propagationNewBegin
  • 传播获取分支事务 : MethodType.PROVIDER / confirm / cancel / propagationExistBegin
  • 提交事务 : commit / confirm / cancel /
  • 回滚事务 : rollback / confirm / cancel /
  • 添加事务 : enlistParticipant / try
  • 事务拦截器 : @Compensable / @Aspect / @Transactional

3.3 流程一 : 切面的处理

TCC 的注解基于 @Aspect + @Compensable 实现切面的处理 , 核心的处理类为 CompensableTransactionAspect

@Aspect
public abstract class CompensableTransactionAspect {

    private CompensableTransactionInterceptor compensableTransactionInterceptor;

    public void setCompensableTransactionInterceptor(CompensableTransactionInterceptor compensableTransactionInterceptor) {
        this.compensableTransactionInterceptor = compensableTransactionInterceptor;
    }

    @Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")
    public void compensableService() {

    }
    
    // 可以看到 , 这里使用的环绕切面
    @Around("compensableService()")
    public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {

        return compensableTransactionInterceptor.interceptCompensableMethod(pjp);
    }

    public abstract int getOrder();
}

复制代码

处理拦截器

当切面拦截后 , 此处会使用拦截器进行真正的具体逻辑 :

  • org.mengyun.tcctransaction.interceptor.CompensableTransactionInterceptor,可补偿事务拦截器。
  • org.mengyun.tcctransaction.interceptor.ResourceCoordinatorInterceptor,资源协调者拦截器。
C05- CompensableTransactionInterceptor
	M05_01- interceptCompensableMethod
	M05_02- rootMethodProceed
	
/**
*
*
**/
public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {
		
        // 获取对应得 Method 对象
        Method method = CompensableMethodUtils.getCompensableMethod(pjp);

        Compensable compensable = method.getAnnotation(Compensable.class);
        Propagation propagation = compensable.propagation();
        
        // 获取事务容器
        TransactionContext transactionContext = FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs());
        
        // 是否异步处理
        boolean asyncConfirm = compensable.asyncConfirm();
        boolean asyncCancel = compensable.asyncCancel();

        // 是否开启事务	
        boolean isTransactionActive = transactionManager.isTransactionActive();

        if (!TransactionUtils.isLegalTransactionContext(isTransactionActive, propagation, transactionContext)) {
            throw new SystemException("no active compensable transaction while propagation is mandatory for method " + method.getName());
        }
        
        // 
        MethodType methodType = CompensableMethodUtils.calculateMethodType(propagation, isTransactionActive, transactionContext);

        switch (methodType) {
            case ROOT:
                return rootMethodProceed(pjp, asyncConfirm, asyncCancel);
            case PROVIDER:
                return providerMethodProceed(pjp, transactionContext, asyncConfirm, asyncCancel);
            default:
                return pjp.proceed();
        }
}


// 此处会通过事务的类型选择不同的方法进行处理 :
  private Object rootMethodProceed(ProceedingJoinPoint pjp, boolean asyncConfirm, boolean asyncCancel) throws Throwable {

        // 属性准备
        Object returnValue = null;
        Transaction transaction = null;
      
        try {
            // 开启事务 , 获取事务对象 -> M05_02_01
            transaction = transactionManager.begin();
            try {
                // 执行 proceed 处理方法
                returnValue = pjp.proceed();
            } catch (Throwable tryingException) {

                if (!isDelayCancelException(tryingException)) {
                    transactionManager.rollback(asyncCancel);
                }
                throw tryingException;
            }
            // 事件管理器提交 commit 
            transactionManager.commit(asyncConfirm);
        } finally {
            transactionManager.cleanAfterCompletion(transaction);
        }
        return returnValue;
    }

复制代码

Pro 1 : Propagation 是什么 ?

Propagation 提供了多种传播方式 , 来定义具体的传播类型

public enum Propagation {
    REQUIRED(0),
    SUPPORTS(1),
    MANDATORY(2),
    REQUIRES_NEW(3);
	//.............
}


// @Transactional(propagation=Propagation.REQUIRED)
- 如果有事务, 那么加入事务, 没有的话新建一个(默认情况下)

// @Transactional(propagation=Propagation.REQUIRES_NEW) 
- 不管是否存在事务,都创建一个新的事务,原来的挂起,新的执行完毕,继续执行老的事务

// @Transactional(propagation=Propagation.MANDATORY)  
- 必须在一个已有的事务中执行,否则抛出异常   

// @Transactional(propagation=Propagation.SUPPORTS)
- 如果其他bean调用这个方法,在其他bean中声明事务,那就用事务.如果其他bean没有声明事务,那就不用事务


复制代码

Pro 2 : MethodType 是什么

MethodType 表示方法对应的事务类型 :



public enum MethodType {
    ROOT,
    CONSUMER,
    PROVIDER,
    NORMAL;
}

- MethodType.ROOT : 根事务 , 也可以理解为事务发起者
- MethodType.CONSUMER : 消费参与者
- MethodType.PROVIDER : 生产参与者
- MethodType.NORMAL : 


复制代码

M05_02_01 Transaction 实体类解析

tcc-trans-transMangerBean.jpg

3.4 事务的执行和通知

3.4.1 事务的commit

之前 C05- CompensableTransactionInterceptor # M05_02- rootMethodProceed 中通过 TransactionManager 执行 commit 操作

先看一下 TransactionManager 的结构 :

C09- TransactionManager
	F09_01- ThreadLocal<Deque<Transaction>> CURRENT = new ThreadLocal<Deque<Transaction>>();
	M09_01- begin()
	M09_02- propagationNewBegin(TransactionContext transactionContext)
	M09_03- propagationExistBegin(TransactionContext transactionContext)
	M09_04- commit(boolean asyncCommit)
	M09_05- rollback(boolean asyncRollback)
	M09_06- commitTransaction(Transaction transaction)
	M09_07- rollbackTransaction(Transaction transaction)
复制代码

Step 1 : TransactionManager 进行管理

TransactionManager 进行 Transaction 的配合和通过线程发起事务


public void commit(boolean asyncCommit) {
    // 从 ThreadLocal 中获取 Transaction
    final Transaction transaction = getCurrentTransaction();
    // commit 后修改状态    
    transaction.changeStatus(TransactionStatus.CONFIRMING);
    // 更新 Transaction 状态
    transactionRepository.update(transaction);

    if (asyncCommit) {
        try {
            Long statTime = System.currentTimeMillis();
            
             // 此处主要为 ThreadPoolExecutor , 通过线程池提交 -> M09_06
            executorService.submit(new Runnable() {
                    @Override
                    public void run() {
                        commitTransaction(transaction);
                    }
                });
        } catch (Throwable commitException) {
            throw new ConfirmingException(commitException);
        }
    } else {
        commitTransaction(transaction);
    }
}
复制代码

Step 2 :提交事务

// M09_06 此处提交事务
private void commitTransaction(Transaction transaction) {
        try {
            // 提交多个事务
            transaction.commit();
            // 删除事务
            transactionRepository.delete(transaction);
        } catch (Throwable commitException) {
            logger.warn("compensable transaction confirm failed, recovery job will try to confirm later.", commitException);
            throw new ConfirmingException(commitException);
        }
}

// C15- Transaction -> PS:C15_01
public void commit() {
    for (Participant participant : participants) {
        participant.commit();
    }
}

public void commit() {
    terminator.invoke(new TransactionContext(xid, TransactionStatus.CONFIRMING.getId()), confirmInvocationContext, transactionContextEditorClass);
}


public Object invoke(TransactionContext transactionContext, InvocationContext invocationContext, Class<? extends TransactionContextEditor> transactionContextEditorClass) {
    if (StringUtils.isNotEmpty(invocationContext.getMethodName())) {
            try {
                Object target = FactoryBuilder.factoryOf(invocationContext.getTargetClass()).getInstance();
                Method method = null;
                // 代理执行对象的 Method 
                method = target.getClass().getMethod(invocationContext.getMethodName(), invocationContext.getParameterTypes());
                FactoryBuilder.factoryOf(transactionContextEditorClass).getInstance().set(transactionContext, target, method, invocationContext.getArgs());
                return method.invoke(target, invocationContext.getArgs());
            } catch (Exception e) {
                throw new SystemException(e);
            }
    }
    return null;
}


// public final void com.tcc.demo.order.service.PaymentServiceImpl$$EnhancerBySpringCGLIB$$6a17cffa.confirmMakePayment(com.tcc.demo.order.model.Order,java.math.BigDecimal,java.math.BigDecimal)


   
// TransactionRepository 是用于事务管理的持久化操作
C10- TransactionRepository -> PS:C10_01

    
// ExecutorService 执行 Service
C12- ExecutorService -> PS:C11_01

复制代码

PS:C10_01 TransactionRepository 家族体系

public interface TransactionRepository {
    int create(Transaction transaction);
    int update(Transaction transaction);
    int delete(Transaction transaction);
    Transaction findByXid(TransactionXid xid);
    List<Transaction> findAllUnmodifiedSince(Date date);
}


// 这里看一下 Transaction 的数据库结构

CREATE TABLE `TCC_TRANSACTION` (
  `TRANSACTION_ID` int(11) NOT NULL AUTO_INCREMENT COMMENT '事务 ID',
  `DOMAIN` varchar(100) DEFAULT NULL COMMENT '域名',
  `GLOBAL_TX_ID` varbinary(32) NOT NULL COMMENT '全局事务ID',
  `BRANCH_QUALIFIER` varbinary(32) NOT NULL,
  `CONTENT` varbinary(8000) DEFAULT NULL COMMENT '序列化 Transaction 事务内容',
  `STATUS` int(11) DEFAULT NULL COMMENT '事务状态',
  `TRANSACTION_TYPE` int(11) DEFAULT NULL COMMENT '事务类型',
  `RETRIED_COUNT` int(11) DEFAULT NULL COMMENT '重试次数',
  `CREATE_TIME` datetime DEFAULT NULL COMMENT '创建时间',
  `LAST_UPDATE_TIME` datetime DEFAULT NULL COMMENT '最后更新时间',
  `VERSION` int(11) DEFAULT NULL COMMENT '乐观锁版本',
  `IS_DELETE` int(12) DEFAULT NULL COMMENT '是否删除',
  PRIMARY KEY (`TRANSACTION_ID`),
  UNIQUE KEY `UX_TX_BQ` (`GLOBAL_TX_ID`,`BRANCH_QUALIFIER`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


// 每一个小事务都有个自己的 TCC_TRANSACTION 类 , 类名通常为 TCC_TRANSACTION_xxx


复制代码

TransactionRepository_system.png

PS:C15_01 参数

tcc-participants.jpg

Pro 1 : Xid 的对象

public interface Xid {
    int MAXGTRIDSIZE = 64;
    int MAXBQUALSIZE = 64;

    int getFormatId();

    byte[] getGlobalTransactionId();

    byte[] getBranchQualifier();
}

复制代码

3.4.2 事务的通知

事务的还有一个核心逻辑就是通知其他的应用执行相关的逻辑 , 那么事务是怎么相互告知的呢 ? 我们从实际应用出发 :

问题 :
疑点一 : 当 captial try 逻辑完成后 , 实际上已经返回了 , 并不会拿到对应的通知

现象 :
现象一 : 当 try 再次调用时 , 是通过 restAPI 接口进行网络调用 , 所以应该是外部调用实现的

// 找了相关的代码找到了这个类 : 

// C15- Transaction -> PS:C15_01
public void commit() {
    for (Participant participant : participants) {
        participant.commit();
    }
}

public class TradeOrderServiceProxy {

    @Autowired
    CapitalTradeOrderService capitalTradeOrderService;

    @Autowired
    RedPacketTradeOrderService redPacketTradeOrderService;

    @Compensable(propagation = Propagation.SUPPORTS, confirmMethod = "record", cancelMethod = "record", transactionContextEditor = MethodTransactionContextEditor.class)
    public String record(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {
        return capitalTradeOrderService.record(new TransactionEntity<>(transactionContext, tradeOrderDto));
    }

    //.............
}


复制代码

如果我们跟着相关的代码 , 会发现确实在出现rollbakc 时 , 会调用该参与者

所以我们能得出如下的结论 :

  1. 当出现异常后 , 会通过 Participant 调用相同的接口一次
  2. 调用对应接口后 , 会因为拦截器的原因 , 通过 Transaction 状态 , 调用对应的所属流程 (例如异常就是 rollback)

3.5 事务的回退

事务的回退时 , 会先调用起本身的 cancel 方法 ,其次会调用依赖微服务的原方法

PS : 确实是原方法 , 但是由于代理 , 会进入 CompensableTransactionAspect 切面

通过判断 TransactionContext 中的 status 决定执行什么方法

private Object providerMethodProceed(ProceedingJoinPoint pjp, TransactionContext transactionContext, boolean asyncConfirm, boolean asyncCancel) throws Throwable {

        Transaction transaction = null;
        try {

            switch (TransactionStatus.valueOf(transactionContext.getStatus())) {
                case TRYING:
                    transaction = transactionManager.propagationNewBegin(transactionContext);
                    return pjp.proceed();
                case CONFIRMING:
                    try {
                        transaction = transactionManager.propagationExistBegin(transactionContext);
                        transactionManager.commit(asyncConfirm);
                    } catch (NoExistedTransactionException excepton) {
                        //the transaction has been commit,ignore it.
                    }
                    break;
                case CANCELLING:
                    // 
                    try {
                        transaction = transactionManager.propagationExistBegin(transactionContext);
                        transactionManager.rollback(asyncCancel);
                    } catch (NoExistedTransactionException exception) {
                        //the transaction has been rollback,ignore it.
                    }
                    break;
            }

        } finally {
            transactionManager.cleanAfterCompletion(transaction);
        }

        Method method = ((MethodSignature) (pjp.getSignature())).getMethod();

        return ReflectionUtils.getNullValue(method.getReturnType());
    }

public void rollback() {
    for (Participant participant : participants) {
        participant.rollback();
    }
}


// 回退的发起
public void rollback() {
    terminator.invoke(new TransactionContext(xid, TransactionStatus.CANCELLING.getId()), cancelInvocationContext, transactionContextEditorClass);
}



// 注意 :
1- Participant -> Order Cancel
2- Participant -> Capital Rest API
3- Participant -> Capital Cancel

复制代码

3.6 事务的异步处理

TCC-Transaction 的处理默认是同步的 , 可以通过注解来配置异步处理

@Compensable(confirmMethod = "confirmMakePayment", cancelMethod = "cancelMakePayment", asyncConfirm = false, asyncCancel = false)

这里来看一下2者的区别 :

    public void rollback(boolean asyncRollback) {

        final Transaction transaction = getCurrentTransaction();
        transaction.changeStatus(TransactionStatus.CANCELLING);

        transactionRepository.update(transaction);

        if (asyncRollback) {

            try {
                // 最大的区别就是此处调用线程池构建了一个新线程
                executorService.submit(new Runnable() {
                    @Override
                    public void run() {
                        rollbackTransaction(transaction);
                    }
                });
            } catch (Throwable rollbackException) {
                throw new CancellingException(rollbackException);
            }
        } else {

            rollbackTransaction(transaction);
        }
    }

复制代码

3.7 事务的恢复

事务的恢复和事务的通知并不是一个概念 , 当事务的初期执行出现异常后 , 事务在后续会通过定时任务的方式 , 完成事务的继续执行操作

  • org.mengyun.tcctransaction.recover.RecoverConfig,事务恢复配置接口
  • org.mengyun.tcctransaction.spring.recover.DefaultRecoverConfig,默认事务恢复配置实现
  • org.mengyun.tcctransaction.spring.recover.RecoverScheduledJob,事务恢复定时任务,基于 Quartz 实现调度,不断不断不断执行事务恢复

总结

正常处理流程 , 执行 Confirm

A- 事务发起者 (订单服务)
B- 事务消费对象B  (账户服务) 
C- 事务消费对象C (红包服务)

// Step 1 : 事务发起者处理
A1- 调用发起方法
A2- TCC CompensableTransactionAspect 切面拦截  
A3- TCC CompensableTransactionInterceptor # rootMethodProceed 准备事务容器 , 大流程处理
A4- CompensableTransactionInterceptor # rootMethodProceed 开启事务及事务管理 (begin , process , commit)
A5- 进入真正的方法执行业务逻辑
    
// Step 2 : 账户对象处理
B1- 调用发起方法
B2- TCC CompensableTransactionAspect 切面拦截  
B3- TCC CompensableTransactionInterceptor # rootMethodProceed 准备事务容器 , 大流程处理
B4- CompensableTransactionInterceptor # providerMethodProceed 执行消费处理
B5- 进入真正的方法执行业务逻辑
    
// PS : 主要对比 A4 - B4 的区别
    
// Step 3 : 账户处理完成后 , 订单服务继续处理
A6- transactionManager.commit(asyncConfirm) : 提交 commit
A7- Transaction.Participant # commit() 提交代理方法
A8- 执行 Order Confirm 方法
    
    
// Step 4 : 账户服务Confirm 确定
B6- TransactionAspectSupport # invokeWithinTransaction
B7- 执行 账户 Confirm 方法

复制代码

回退处理逻辑 , 执行 Cancel

// PS : 前几步都是一样的 , 基于代理的方式


// Step 1: A4 rootMethodProceed 处理过程中出现异常 , 由 catch 继续处理
A4- rootMethodProceed : 执行账户主方法
A5- 出现异常 , catch 处理 , 调用 rollback
A6- TransactionManager # rollback 执行 Rollback 主流程
    
// Step 2 : order 模块调用 cancel 方法
    
    
// Step 3 : 账户对象处理回退
B5- 再次原样调用主方法
B6- 事务容器状态为回退 , 执行回退逻辑


复制代码

@Compensable 的使用

// 整个流程中共有以下几个地方需要标注 @Compensable , 我们来单独看看其中的关联


// Step 1 : Order 中 try 方法标注
@Compensable(confirmMethod = "confirmMakePayment", cancelMethod = "cancelMakePayment", asyncConfirm = true)

// Step 2 : Order 中调用远程接口时标注
@Compensable(propagation = Propagation.SUPPORTS, confirmMethod = "record", cancelMethod = "record", transactionContextEditor = MethodTransactionContextEditor.class)
public String record(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {
        return capitalTradeOrderService.record(new TransactionEntity<>(transactionContext, tradeOrderDto));
}

@Compensable(propagation = Propagation.SUPPORTS, confirmMethod = "record", cancelMethod = "record", transactionContextEditor = MethodTransactionContextEditor.class)
public String record(TransactionContext transactionContext, RedPacketTradeOrderDto tradeOrderDto) {
        return redPacketTradeOrderService.record(new TransactionEntity<>(transactionContext, tradeOrderDto));
}




// Step 3 : capital 中远程接口的 try 方法
@Compensable(confirmMethod = "confirmRecord", cancelMethod = "cancelRecord", transactionContextEditor = MethodTransactionContextEditor.class)

// Step 4 : red 接口中 try 方法
@Compensable(confirmMethod = "confirmRecord", cancelMethod = "cancelRecord", transactionContextEditor = MethodTransactionContextEditor.class)


// PS : 官方原文中 , 此处是没有添加对应方法的 , 不清楚是否是因为 Dubbo 的 Feign 的机制问题

复制代码

感谢和参考