这是我参与更文挑战的第1天,活动详情查看: 更文挑战
总文档 :文章目录
Github : github.com/black-ant
一 . 前言
文章目的 :
- 基于 Feign 的 TCC-Transaction 案例
- 梳理 TCC-Transaction 的 主流程及 Debug 方向
TCC-Transaction 背景简介 :
TCC 模型
TCC 模型是完全依赖于业务处理的分布式事务模型 , 他将一个 [大事务] 通过代码逻辑分解为 [多个小事务] , TCC 模型同样是 2PC (两阶段提交) 的实现之一.
TCC 的操作可以分为3个阶段 : Try / Confirm / Cancel
- Try: 业务的主要处理 , 但仅进行初期操作 (例如订单生成)
- 尝试执行业务
- 所有子事务完成所有业务检查(一致性)
- 锁定资源 , 预留必须业务资源(准隔离性)
- Confirm: 对 Try 操作的一个补充,逐个执行Try操作指定的Confirm操作
- 真正执行业务,不作任何业务检查
- 只使用Try阶段预留的业务资源 , 扣除具体的资源
- Confirm 操作满足幂等性
- Cancel: 对Try操作的一个回撤
- 取消执行业务
- 释放Try阶段预留的业务资源, 业务上的回退
- Cancel操作满足幂等性
TCC 的优缺点
优点:
- 由业务方自行控制事务的范围
- 自如的控制数据库粒度处理 , 降低锁冲突
- 业务设计合理 , 可以大大提高吞吐量
- 代码配置简单 , 无需太多配置 , 集成方式便利
缺点 :
- 业务侵入大 , 耦合强 , 迁移及改造成本大
- 设计难度大
- 对于回滚的处理困难
- 为了满足一致性的要求,confirm和cancel接口必须实现幂等
二 . TCC-Transaction 案例
官方提供过一个基于 Dubbo 的处理案例 , 本案例是基于 Feign 进行 RestAPI 调用的 , 其核心原理其实是一致的.
为了后文分析时更加清楚 , 首先看一下案例源码 :
业务模块 :
- Order : 订单服务
- Capital : 账户服务
- RedPacket : 红包服务
订单支付后 , 扣除账户余额和红包 . 当红包余额不足时 , 发起回退
前期配置
TODO : TCC 的灵活配置
2.1 Order 服务
Order 主流程 : 发起整个事务及接口调用逻辑
- makePayment : 生成 Order 订单 , 调用红包及账户服务扣除余额
- confirmMakePayment
- cancelMakePayment : 确定后修改状态
@Service
public class PaymentServiceImpl {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
TradeOrderServiceProxy tradeOrderServiceProxy;
@Autowired
OrderRepository orderRepository;
/**
* 此处考虑后应该是要去掉小事务管理的的 (@Transaction) <br>
* 原因 : 如果此处存在事务 , 分布式应用上抛出异常 , 则回导致该事务回滚
* <p>
* 但是!!! 你可能这个时候会想 , 既然出现异常这个类会回滚 , 那不相当于分布式实现了吗 , 为什么还要加个框架处理
* <p>
* 原因 : 此处如果考虑红包的逻辑就对了 , 这个场景实际上为 : 余额足够 ,但是红包不够的情况!!!
*
* @param order
* @param redPacketPayAmount
* @param capitalPayAmount
*/
@Compensable(confirmMethod = "confirmMakePayment", cancelMethod = "cancelMakePayment", asyncConfirm = true)
public void makePayment(Order order, BigDecimal redPacketPayAmount, BigDecimal capitalPayAmount) {
logger.info("order try make payment called.time seq:" + DateFormatUtils.format(Calendar.getInstance(), "yyyy-MM-dd HH:mm:ss"));
logger.info("------> 开始事务管理 : 红包支付金额 [{}] / 账户支付余额 [{}] <-------", redPacketPayAmount, capitalPayAmount);
//check if the order status is DRAFT, if no, means that another call makePayment for the same order happened, ignore this call makePayment.
if (order.getStatus().equals("DRAFT")) {
order.pay(redPacketPayAmount, capitalPayAmount);
try {
orderRepository.updateOrder(order);
} catch (OptimisticLockingFailureException e) {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
}
}
logger.info("------> 订单处理完成 :[{}] <-------", order.getId());
String result = tradeOrderServiceProxy.record(null, buildCapitalTradeOrderDto(order));
logger.info("------> 余额消费完成 :[{}] <-------", result);
String result2 = tradeOrderServiceProxy.record(null, buildRedPacketTradeOrderDto(order));
logger.info("------> 红包消费完成 :[{}] <-------", result2);
}
public void confirmMakePayment(Order order, BigDecimal redPacketPayAmount, BigDecimal capitalPayAmount) {
logger.warn("------> [进入 PayConfirm 流程] <-------");
logger.warn("order confirm make payment called. time seq:" + DateFormatUtils.format(Calendar.getInstance(), "yyyy-MM-dd HH:mm:ss"));
Order foundOrder = orderRepository.findByMerchantOrderNo(order.getMerchantOrderNo());
//check order status, only if the status equals DRAFT, then confirm order
if (foundOrder != null && foundOrder.getStatus().equals("PAYING")) {
order.confirm();
orderRepository.updateOrder(order);
}
}
public void cancelMakePayment(Order order, BigDecimal redPacketPayAmount, BigDecimal capitalPayAmount) {
logger.error("------> [进入 cancel 流程] <-------");
logger.error("order cancel make payment called.time seq:" + DateFormatUtils.format(Calendar.getInstance(), "yyyy-MM-dd HH:mm:ss"));
Order foundOrder = orderRepository.findByMerchantOrderNo(order.getMerchantOrderNo());
logger.info("------> [cancel 流程处理 Order :{}] <-------", JSONObject.toJSONString(foundOrder));
if (foundOrder != null && foundOrder.getStatus().equals("PAYING")) {
order.cancelPayment();
orderRepository.updateOrder(order);
}
}
private CapitalTradeOrderDto buildCapitalTradeOrderDto(Order order) {
CapitalTradeOrderDto tradeOrderDto = new CapitalTradeOrderDto();
tradeOrderDto.setAmount(order.getCapitalPayAmount());
tradeOrderDto.setMerchantOrderNo(order.getMerchantOrderNo());
tradeOrderDto.setSelfUserId(order.getPayerUserId());
tradeOrderDto.setOppositeUserId(order.getPayeeUserId());
tradeOrderDto.setOrderTitle(String.format("order no:%s", order.getMerchantOrderNo()));
return tradeOrderDto;
}
private RedPacketTradeOrderDto buildRedPacketTradeOrderDto(Order order) {
RedPacketTradeOrderDto tradeOrderDto = new RedPacketTradeOrderDto();
tradeOrderDto.setAmount(order.getRedPacketPayAmount());
tradeOrderDto.setMerchantOrderNo(order.getMerchantOrderNo());
tradeOrderDto.setSelfUserId(order.getPayerUserId());
tradeOrderDto.setOppositeUserId(order.getPayeeUserId());
tradeOrderDto.setOrderTitle(String.format("order no:%s", order.getMerchantOrderNo()));
return tradeOrderDto;
}
}
复制代码
Capital , Red 远程调用
@Component
public class TradeOrderServiceProxy {
@Autowired
CapitalTradeOrderService capitalTradeOrderService;
@Autowired
RedPacketTradeOrderService redPacketTradeOrderService;
@Compensable(propagation = Propagation.SUPPORTS, confirmMethod = "record", cancelMethod = "record", transactionContextEditor = MethodTransactionContextEditor.class)
public String record(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {
return capitalTradeOrderService.record(new TransactionEntity<>(transactionContext, tradeOrderDto));
}
@Compensable(propagation = Propagation.SUPPORTS, confirmMethod = "record", cancelMethod = "record", transactionContextEditor = MethodTransactionContextEditor.class)
public String record(TransactionContext transactionContext, RedPacketTradeOrderDto tradeOrderDto) {
return redPacketTradeOrderService.record(new TransactionEntity<>(transactionContext, tradeOrderDto));
}
}
复制代码
2.2 Capital 账户处理余额
@Service
public class CapitalTradeOrderServiceImpl {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
CapitalAccountRepository capitalAccountRepository;
@Autowired
TradeOrderRepository tradeOrderRepository;
/**
* Step 1 : @Transactional 保证小事务的执行 , 避免余额反复添加
*
* @param transactionContext
* @param tradeOrderDto
* @return
*/
@Compensable(confirmMethod = "confirmRecord", cancelMethod = "cancelRecord", transactionContextEditor = MethodTransactionContextEditor.class)
@Transactional
public String record(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {
logger.info("capital try record called. time seq:" + DateFormatUtils.format(Calendar.getInstance(), "yyyy-MM-dd HH:mm:ss"));
TradeOrder foundTradeOrder = tradeOrderRepository.findByMerchantOrderNo(tradeOrderDto.getMerchantOrderNo());
//check if trade order has been recorded, if yes, return success directly.
if (foundTradeOrder == null) {
TradeOrder tradeOrder = new TradeOrder(
tradeOrderDto.getSelfUserId(),
tradeOrderDto.getOppositeUserId(),
tradeOrderDto.getMerchantOrderNo(),
tradeOrderDto.getAmount()
);
try {
tradeOrderRepository.insert(tradeOrder);
CapitalAccount transferFromAccount = capitalAccountRepository.findByUserId(tradeOrderDto.getSelfUserId());
transferFromAccount.transferFrom(tradeOrderDto.getAmount());
capitalAccountRepository.save(transferFromAccount);
logger.info("------> 账户余额处理完成 , 现余额 [{}] <-------", JSONObject.toJSONString(transferFromAccount));
} catch (DataIntegrityViolationException e) {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
}
}
return "success";
}
@Transactional
public void confirmRecord(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {
logger.warn("capital confirm record called. time seq:" + DateFormatUtils.format(Calendar.getInstance(), "yyyy-MM-dd HH:mm:ss"));
TradeOrder tradeOrder = tradeOrderRepository.findByMerchantOrderNo(tradeOrderDto.getMerchantOrderNo());
//check if the trade order status is DRAFT, if yes, return directly, ensure idempotency.
if (null != tradeOrder && "DRAFT".equals(tradeOrder.getStatus())) {
tradeOrder.confirm();
tradeOrderRepository.update(tradeOrder);
CapitalAccount transferToAccount = capitalAccountRepository.findByUserId(tradeOrderDto.getOppositeUserId());
transferToAccount.transferTo(tradeOrderDto.getAmount());
capitalAccountRepository.save(transferToAccount);
}
}
@Transactional
public void cancelRecord(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {
logger.error("capital cancel record called. time seq:" + DateFormatUtils.format(Calendar.getInstance(), "yyyy-MM-dd HH:mm:ss"));
TradeOrder tradeOrder = tradeOrderRepository.findByMerchantOrderNo(tradeOrderDto.getMerchantOrderNo());
//check if the trade order status is DRAFT, if yes, return directly, ensure idempotency.
if (null != tradeOrder && "DRAFT".equals(tradeOrder.getStatus())) {
tradeOrder.cancel();
tradeOrderRepository.update(tradeOrder);
CapitalAccount capitalAccount = capitalAccountRepository.findByUserId(tradeOrderDto.getSelfUserId());
capitalAccount.cancelTransfer(tradeOrderDto.getAmount());
capitalAccountRepository.save(capitalAccount);
}
}
}
复制代码
2.3 RedPacket 红包处理
- record : 创建红包订单 , 扣除金额
- confirmRecord : 更新订单状态 , 扣除账户
- cancelRecord :
@Service
public class RedPacketTradeOrderServiceImpl {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
RedPacketAccountRepository redPacketAccountRepository;
@Autowired
TradeOrderRepository tradeOrderRepository;
@Compensable(confirmMethod = "confirmRecord", cancelMethod = "cancelRecord", transactionContextEditor = MethodTransactionContextEditor.class)
@Transactional
public String record(TransactionContext transactionContext, RedPacketTradeOrderDto tradeOrderDto) {
logger.info("red packet try record called. time seq:" + DateFormatUtils.format(Calendar.getInstance(), "yyyy-MM-dd HH:mm:ss"));
TradeOrder foundTradeOrder = tradeOrderRepository.findByMerchantOrderNo(tradeOrderDto.getMerchantOrderNo());
if (foundTradeOrder == null) {
TradeOrder tradeOrder = new TradeOrder(
tradeOrderDto.getSelfUserId(),
tradeOrderDto.getOppositeUserId(),
tradeOrderDto.getMerchantOrderNo(),
tradeOrderDto.getAmount()
);
try {
tradeOrderRepository.insert(tradeOrder);
RedPacketAccount transferFromAccount = redPacketAccountRepository.findByUserId(tradeOrderDto.getSelfUserId());
transferFromAccount.transferFrom(tradeOrderDto.getAmount());
redPacketAccountRepository.save(transferFromAccount);
logger.info("------> [] <-------");
} catch (DataIntegrityViolationException e) {
logger.error("E----> error :{} -- content :{}", e.getClass(), e.getMessage());
}
}
return "success";
}
@Transactional
public void confirmRecord(TransactionContext transactionContext, RedPacketTradeOrderDto tradeOrderDto) {
logger.warn("red packet confirm record called. time seq:" + DateFormatUtils.format(Calendar.getInstance(), "yyyy-MM-dd HH:mm:ss"));
TradeOrder tradeOrder = tradeOrderRepository.findByMerchantOrderNo(tradeOrderDto.getMerchantOrderNo());
if (null != tradeOrder && "DRAFT".equals(tradeOrder.getStatus())) {
tradeOrder.confirm();
tradeOrderRepository.update(tradeOrder);
RedPacketAccount transferToAccount = redPacketAccountRepository.findByUserId(tradeOrderDto.getOppositeUserId());
transferToAccount.transferTo(tradeOrderDto.getAmount());
redPacketAccountRepository.save(transferToAccount);
}
}
@Transactional
public void cancelRecord(TransactionContext transactionContext, RedPacketTradeOrderDto tradeOrderDto) {
logger.error("red packet cancel record called. time seq:" + DateFormatUtils.format(Calendar.getInstance(), "yyyy-MM-dd HH:mm:ss"));
TradeOrder tradeOrder = tradeOrderRepository.findByMerchantOrderNo(tradeOrderDto.getMerchantOrderNo());
if (null != tradeOrder && "DRAFT".equals(tradeOrder.getStatus())) {
tradeOrder.cancel();
tradeOrderRepository.update(tradeOrder);
RedPacketAccount capitalAccount = redPacketAccountRepository.findByUserId(tradeOrderDto.getSelfUserId());
capitalAccount.cancelTransfer(tradeOrderDto.getAmount());
redPacketAccountRepository.save(capitalAccount);
}
}
}
复制代码
2.4 流程总结
流程源码可以看 @项目源码
三 . 源码分析
3.1 TCC 成员梳理
- 事务 : ( Transaction )
- 事务ID对象 : ( TransactionXid )
- 事务状态对象 : ( TransactionStatus )
- 事务类型对象 : ( TransactionType )
- 参与者 : ( org.mengyun.tcctransaction.Participant )
- 事务管理器 : TransactionManager
- 事务恢复配置器 : RecoverConfig
- 事务恢复处理器 : TransactionRecovery
- 默认事务恢复配置实现 : DefaultRecoverConfig
- 事务恢复定时任务 : RecoverScheduledJob
- 事务恢复处理器 : TransactionRecovery
- 事务恢复处理器 : TransactionRecovery
3.2 TCC 流程快查
事务的主要流程如下所示 :
- 发起根事务 : MethodType.ROOT / begin / registerTransaction
- 传播发起分支事务 : MethodType.PROVIDER / try / propagationNewBegin
- 传播获取分支事务 : MethodType.PROVIDER / confirm / cancel / propagationExistBegin
- 提交事务 : commit / confirm / cancel /
- 回滚事务 : rollback / confirm / cancel /
- 添加事务 : enlistParticipant / try
- 事务拦截器 : @Compensable / @Aspect / @Transactional
3.3 流程一 : 切面的处理
TCC 的注解基于 @Aspect + @Compensable 实现切面的处理 , 核心的处理类为 CompensableTransactionAspect
@Aspect
public abstract class CompensableTransactionAspect {
private CompensableTransactionInterceptor compensableTransactionInterceptor;
public void setCompensableTransactionInterceptor(CompensableTransactionInterceptor compensableTransactionInterceptor) {
this.compensableTransactionInterceptor = compensableTransactionInterceptor;
}
@Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")
public void compensableService() {
}
// 可以看到 , 这里使用的环绕切面
@Around("compensableService()")
public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {
return compensableTransactionInterceptor.interceptCompensableMethod(pjp);
}
public abstract int getOrder();
}
复制代码
处理拦截器
当切面拦截后 , 此处会使用拦截器进行真正的具体逻辑 :
org.mengyun.tcctransaction.interceptor.CompensableTransactionInterceptor
,可补偿事务拦截器。org.mengyun.tcctransaction.interceptor.ResourceCoordinatorInterceptor
,资源协调者拦截器。
C05- CompensableTransactionInterceptor
M05_01- interceptCompensableMethod
M05_02- rootMethodProceed
/**
*
*
**/
public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {
// 获取对应得 Method 对象
Method method = CompensableMethodUtils.getCompensableMethod(pjp);
Compensable compensable = method.getAnnotation(Compensable.class);
Propagation propagation = compensable.propagation();
// 获取事务容器
TransactionContext transactionContext = FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs());
// 是否异步处理
boolean asyncConfirm = compensable.asyncConfirm();
boolean asyncCancel = compensable.asyncCancel();
// 是否开启事务
boolean isTransactionActive = transactionManager.isTransactionActive();
if (!TransactionUtils.isLegalTransactionContext(isTransactionActive, propagation, transactionContext)) {
throw new SystemException("no active compensable transaction while propagation is mandatory for method " + method.getName());
}
//
MethodType methodType = CompensableMethodUtils.calculateMethodType(propagation, isTransactionActive, transactionContext);
switch (methodType) {
case ROOT:
return rootMethodProceed(pjp, asyncConfirm, asyncCancel);
case PROVIDER:
return providerMethodProceed(pjp, transactionContext, asyncConfirm, asyncCancel);
default:
return pjp.proceed();
}
}
// 此处会通过事务的类型选择不同的方法进行处理 :
private Object rootMethodProceed(ProceedingJoinPoint pjp, boolean asyncConfirm, boolean asyncCancel) throws Throwable {
// 属性准备
Object returnValue = null;
Transaction transaction = null;
try {
// 开启事务 , 获取事务对象 -> M05_02_01
transaction = transactionManager.begin();
try {
// 执行 proceed 处理方法
returnValue = pjp.proceed();
} catch (Throwable tryingException) {
if (!isDelayCancelException(tryingException)) {
transactionManager.rollback(asyncCancel);
}
throw tryingException;
}
// 事件管理器提交 commit
transactionManager.commit(asyncConfirm);
} finally {
transactionManager.cleanAfterCompletion(transaction);
}
return returnValue;
}
复制代码
Pro 1 : Propagation 是什么 ?
Propagation 提供了多种传播方式 , 来定义具体的传播类型
public enum Propagation {
REQUIRED(0),
SUPPORTS(1),
MANDATORY(2),
REQUIRES_NEW(3);
//.............
}
// @Transactional(propagation=Propagation.REQUIRED)
- 如果有事务, 那么加入事务, 没有的话新建一个(默认情况下)
// @Transactional(propagation=Propagation.REQUIRES_NEW)
- 不管是否存在事务,都创建一个新的事务,原来的挂起,新的执行完毕,继续执行老的事务
// @Transactional(propagation=Propagation.MANDATORY)
- 必须在一个已有的事务中执行,否则抛出异常
// @Transactional(propagation=Propagation.SUPPORTS)
- 如果其他bean调用这个方法,在其他bean中声明事务,那就用事务.如果其他bean没有声明事务,那就不用事务
复制代码
Pro 2 : MethodType 是什么
MethodType 表示方法对应的事务类型 :
public enum MethodType {
ROOT,
CONSUMER,
PROVIDER,
NORMAL;
}
- MethodType.ROOT : 根事务 , 也可以理解为事务发起者
- MethodType.CONSUMER : 消费参与者
- MethodType.PROVIDER : 生产参与者
- MethodType.NORMAL :
复制代码
M05_02_01 Transaction 实体类解析
3.4 事务的执行和通知
3.4.1 事务的commit
之前 C05- CompensableTransactionInterceptor # M05_02- rootMethodProceed 中通过 TransactionManager 执行 commit 操作
先看一下 TransactionManager 的结构 :
C09- TransactionManager
F09_01- ThreadLocal<Deque<Transaction>> CURRENT = new ThreadLocal<Deque<Transaction>>();
M09_01- begin()
M09_02- propagationNewBegin(TransactionContext transactionContext)
M09_03- propagationExistBegin(TransactionContext transactionContext)
M09_04- commit(boolean asyncCommit)
M09_05- rollback(boolean asyncRollback)
M09_06- commitTransaction(Transaction transaction)
M09_07- rollbackTransaction(Transaction transaction)
复制代码
Step 1 : TransactionManager 进行管理
TransactionManager 进行 Transaction 的配合和通过线程发起事务
public void commit(boolean asyncCommit) {
// 从 ThreadLocal 中获取 Transaction
final Transaction transaction = getCurrentTransaction();
// commit 后修改状态
transaction.changeStatus(TransactionStatus.CONFIRMING);
// 更新 Transaction 状态
transactionRepository.update(transaction);
if (asyncCommit) {
try {
Long statTime = System.currentTimeMillis();
// 此处主要为 ThreadPoolExecutor , 通过线程池提交 -> M09_06
executorService.submit(new Runnable() {
@Override
public void run() {
commitTransaction(transaction);
}
});
} catch (Throwable commitException) {
throw new ConfirmingException(commitException);
}
} else {
commitTransaction(transaction);
}
}
复制代码
Step 2 :提交事务
// M09_06 此处提交事务
private void commitTransaction(Transaction transaction) {
try {
// 提交多个事务
transaction.commit();
// 删除事务
transactionRepository.delete(transaction);
} catch (Throwable commitException) {
logger.warn("compensable transaction confirm failed, recovery job will try to confirm later.", commitException);
throw new ConfirmingException(commitException);
}
}
// C15- Transaction -> PS:C15_01
public void commit() {
for (Participant participant : participants) {
participant.commit();
}
}
public void commit() {
terminator.invoke(new TransactionContext(xid, TransactionStatus.CONFIRMING.getId()), confirmInvocationContext, transactionContextEditorClass);
}
public Object invoke(TransactionContext transactionContext, InvocationContext invocationContext, Class<? extends TransactionContextEditor> transactionContextEditorClass) {
if (StringUtils.isNotEmpty(invocationContext.getMethodName())) {
try {
Object target = FactoryBuilder.factoryOf(invocationContext.getTargetClass()).getInstance();
Method method = null;
// 代理执行对象的 Method
method = target.getClass().getMethod(invocationContext.getMethodName(), invocationContext.getParameterTypes());
FactoryBuilder.factoryOf(transactionContextEditorClass).getInstance().set(transactionContext, target, method, invocationContext.getArgs());
return method.invoke(target, invocationContext.getArgs());
} catch (Exception e) {
throw new SystemException(e);
}
}
return null;
}
// public final void com.tcc.demo.order.service.PaymentServiceImpl$$EnhancerBySpringCGLIB$$6a17cffa.confirmMakePayment(com.tcc.demo.order.model.Order,java.math.BigDecimal,java.math.BigDecimal)
// TransactionRepository 是用于事务管理的持久化操作
C10- TransactionRepository -> PS:C10_01
// ExecutorService 执行 Service
C12- ExecutorService -> PS:C11_01
复制代码
PS:C10_01 TransactionRepository 家族体系
public interface TransactionRepository {
int create(Transaction transaction);
int update(Transaction transaction);
int delete(Transaction transaction);
Transaction findByXid(TransactionXid xid);
List<Transaction> findAllUnmodifiedSince(Date date);
}
// 这里看一下 Transaction 的数据库结构
CREATE TABLE `TCC_TRANSACTION` (
`TRANSACTION_ID` int(11) NOT NULL AUTO_INCREMENT COMMENT '事务 ID',
`DOMAIN` varchar(100) DEFAULT NULL COMMENT '域名',
`GLOBAL_TX_ID` varbinary(32) NOT NULL COMMENT '全局事务ID',
`BRANCH_QUALIFIER` varbinary(32) NOT NULL,
`CONTENT` varbinary(8000) DEFAULT NULL COMMENT '序列化 Transaction 事务内容',
`STATUS` int(11) DEFAULT NULL COMMENT '事务状态',
`TRANSACTION_TYPE` int(11) DEFAULT NULL COMMENT '事务类型',
`RETRIED_COUNT` int(11) DEFAULT NULL COMMENT '重试次数',
`CREATE_TIME` datetime DEFAULT NULL COMMENT '创建时间',
`LAST_UPDATE_TIME` datetime DEFAULT NULL COMMENT '最后更新时间',
`VERSION` int(11) DEFAULT NULL COMMENT '乐观锁版本',
`IS_DELETE` int(12) DEFAULT NULL COMMENT '是否删除',
PRIMARY KEY (`TRANSACTION_ID`),
UNIQUE KEY `UX_TX_BQ` (`GLOBAL_TX_ID`,`BRANCH_QUALIFIER`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
// 每一个小事务都有个自己的 TCC_TRANSACTION 类 , 类名通常为 TCC_TRANSACTION_xxx
复制代码
PS:C15_01 参数
Pro 1 : Xid 的对象
public interface Xid {
int MAXGTRIDSIZE = 64;
int MAXBQUALSIZE = 64;
int getFormatId();
byte[] getGlobalTransactionId();
byte[] getBranchQualifier();
}
复制代码
3.4.2 事务的通知
事务的还有一个核心逻辑就是通知其他的应用执行相关的逻辑 , 那么事务是怎么相互告知的呢 ? 我们从实际应用出发 :
问题 :
疑点一 : 当 captial try 逻辑完成后 , 实际上已经返回了 , 并不会拿到对应的通知
现象 :
现象一 : 当 try 再次调用时 , 是通过 restAPI 接口进行网络调用 , 所以应该是外部调用实现的
// 找了相关的代码找到了这个类 :
// C15- Transaction -> PS:C15_01
public void commit() {
for (Participant participant : participants) {
participant.commit();
}
}
public class TradeOrderServiceProxy {
@Autowired
CapitalTradeOrderService capitalTradeOrderService;
@Autowired
RedPacketTradeOrderService redPacketTradeOrderService;
@Compensable(propagation = Propagation.SUPPORTS, confirmMethod = "record", cancelMethod = "record", transactionContextEditor = MethodTransactionContextEditor.class)
public String record(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {
return capitalTradeOrderService.record(new TransactionEntity<>(transactionContext, tradeOrderDto));
}
//.............
}
复制代码
如果我们跟着相关的代码 , 会发现确实在出现rollbakc 时 , 会调用该参与者
所以我们能得出如下的结论 :
- 当出现异常后 , 会通过 Participant 调用相同的接口一次
- 调用对应接口后 , 会因为拦截器的原因 , 通过 Transaction 状态 , 调用对应的所属流程 (例如异常就是 rollback)
3.5 事务的回退
事务的回退时 , 会先调用起本身的 cancel 方法 ,其次会调用依赖微服务的原方法
PS : 确实是原方法 , 但是由于代理 , 会进入 CompensableTransactionAspect 切面
通过判断 TransactionContext 中的 status 决定执行什么方法
private Object providerMethodProceed(ProceedingJoinPoint pjp, TransactionContext transactionContext, boolean asyncConfirm, boolean asyncCancel) throws Throwable {
Transaction transaction = null;
try {
switch (TransactionStatus.valueOf(transactionContext.getStatus())) {
case TRYING:
transaction = transactionManager.propagationNewBegin(transactionContext);
return pjp.proceed();
case CONFIRMING:
try {
transaction = transactionManager.propagationExistBegin(transactionContext);
transactionManager.commit(asyncConfirm);
} catch (NoExistedTransactionException excepton) {
//the transaction has been commit,ignore it.
}
break;
case CANCELLING:
//
try {
transaction = transactionManager.propagationExistBegin(transactionContext);
transactionManager.rollback(asyncCancel);
} catch (NoExistedTransactionException exception) {
//the transaction has been rollback,ignore it.
}
break;
}
} finally {
transactionManager.cleanAfterCompletion(transaction);
}
Method method = ((MethodSignature) (pjp.getSignature())).getMethod();
return ReflectionUtils.getNullValue(method.getReturnType());
}
public void rollback() {
for (Participant participant : participants) {
participant.rollback();
}
}
// 回退的发起
public void rollback() {
terminator.invoke(new TransactionContext(xid, TransactionStatus.CANCELLING.getId()), cancelInvocationContext, transactionContextEditorClass);
}
// 注意 :
1- Participant -> Order Cancel
2- Participant -> Capital Rest API
3- Participant -> Capital Cancel
复制代码
3.6 事务的异步处理
TCC-Transaction 的处理默认是同步的 , 可以通过注解来配置异步处理
@Compensable(confirmMethod = "confirmMakePayment", cancelMethod = "cancelMakePayment", asyncConfirm = false, asyncCancel = false)
这里来看一下2者的区别 :
public void rollback(boolean asyncRollback) {
final Transaction transaction = getCurrentTransaction();
transaction.changeStatus(TransactionStatus.CANCELLING);
transactionRepository.update(transaction);
if (asyncRollback) {
try {
// 最大的区别就是此处调用线程池构建了一个新线程
executorService.submit(new Runnable() {
@Override
public void run() {
rollbackTransaction(transaction);
}
});
} catch (Throwable rollbackException) {
throw new CancellingException(rollbackException);
}
} else {
rollbackTransaction(transaction);
}
}
复制代码
3.7 事务的恢复
事务的恢复和事务的通知并不是一个概念 , 当事务的初期执行出现异常后 , 事务在后续会通过定时任务的方式 , 完成事务的继续执行操作
- org.mengyun.tcctransaction.recover.RecoverConfig,事务恢复配置接口
- org.mengyun.tcctransaction.spring.recover.DefaultRecoverConfig,默认事务恢复配置实现
- org.mengyun.tcctransaction.spring.recover.RecoverScheduledJob,事务恢复定时任务,基于 Quartz 实现调度,不断不断不断执行事务恢复
总结
正常处理流程 , 执行 Confirm
A- 事务发起者 (订单服务)
B- 事务消费对象B (账户服务)
C- 事务消费对象C (红包服务)
// Step 1 : 事务发起者处理
A1- 调用发起方法
A2- TCC CompensableTransactionAspect 切面拦截
A3- TCC CompensableTransactionInterceptor # rootMethodProceed 准备事务容器 , 大流程处理
A4- CompensableTransactionInterceptor # rootMethodProceed 开启事务及事务管理 (begin , process , commit)
A5- 进入真正的方法执行业务逻辑
// Step 2 : 账户对象处理
B1- 调用发起方法
B2- TCC CompensableTransactionAspect 切面拦截
B3- TCC CompensableTransactionInterceptor # rootMethodProceed 准备事务容器 , 大流程处理
B4- CompensableTransactionInterceptor # providerMethodProceed 执行消费处理
B5- 进入真正的方法执行业务逻辑
// PS : 主要对比 A4 - B4 的区别
// Step 3 : 账户处理完成后 , 订单服务继续处理
A6- transactionManager.commit(asyncConfirm) : 提交 commit
A7- Transaction.Participant # commit() 提交代理方法
A8- 执行 Order Confirm 方法
// Step 4 : 账户服务Confirm 确定
B6- TransactionAspectSupport # invokeWithinTransaction
B7- 执行 账户 Confirm 方法
复制代码
回退处理逻辑 , 执行 Cancel
// PS : 前几步都是一样的 , 基于代理的方式
// Step 1: A4 rootMethodProceed 处理过程中出现异常 , 由 catch 继续处理
A4- rootMethodProceed : 执行账户主方法
A5- 出现异常 , catch 处理 , 调用 rollback
A6- TransactionManager # rollback 执行 Rollback 主流程
// Step 2 : order 模块调用 cancel 方法
// Step 3 : 账户对象处理回退
B5- 再次原样调用主方法
B6- 事务容器状态为回退 , 执行回退逻辑
复制代码
@Compensable 的使用
// 整个流程中共有以下几个地方需要标注 @Compensable , 我们来单独看看其中的关联
// Step 1 : Order 中 try 方法标注
@Compensable(confirmMethod = "confirmMakePayment", cancelMethod = "cancelMakePayment", asyncConfirm = true)
// Step 2 : Order 中调用远程接口时标注
@Compensable(propagation = Propagation.SUPPORTS, confirmMethod = "record", cancelMethod = "record", transactionContextEditor = MethodTransactionContextEditor.class)
public String record(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {
return capitalTradeOrderService.record(new TransactionEntity<>(transactionContext, tradeOrderDto));
}
@Compensable(propagation = Propagation.SUPPORTS, confirmMethod = "record", cancelMethod = "record", transactionContextEditor = MethodTransactionContextEditor.class)
public String record(TransactionContext transactionContext, RedPacketTradeOrderDto tradeOrderDto) {
return redPacketTradeOrderService.record(new TransactionEntity<>(transactionContext, tradeOrderDto));
}
// Step 3 : capital 中远程接口的 try 方法
@Compensable(confirmMethod = "confirmRecord", cancelMethod = "cancelRecord", transactionContextEditor = MethodTransactionContextEditor.class)
// Step 4 : red 接口中 try 方法
@Compensable(confirmMethod = "confirmRecord", cancelMethod = "cancelRecord", transactionContextEditor = MethodTransactionContextEditor.class)
// PS : 官方原文中 , 此处是没有添加对应方法的 , 不清楚是否是因为 Dubbo 的 Feign 的机制问题
复制代码
近期评论