使用atomikos如何实现JTA/XA全局事务写在前

写在前面的话

最近终于忙完了自考的事情,可以有更多的时间总结了。

久违的国家考试,听着广播里播放着考试相关的信息,以及感受着考场上紧张的氛围,突然还很怀念。

这样的机会真是越来越少呢。

什么是JTA

在介绍atomikos先介绍下JTA。

首先JTA是一个规范。

Java事务API(JTA:Java Transaction API)和它的同胞Java事务服务(JTS:Java Transaction Service),为J2EE平台提供了分布式事务服务(distributed transaction)的能力。

某种程度上,可以认为JTA规范是XA规范的Java版,其把XA规范中规定的DTP模型交互接口抽象成Java接口中的方法,并规定每个方法要实现什么样的功能。

这就把XA和JTA联系起来了。

JTA接口规范

JTA事务模型规定了一个分布式事务中有哪些组件,而JTA接口规范则规定这些组件之间如何交互。

需要注意的是JTA规范定义的这些接口,并不需要应用程序的开发人员去实现,而是由各个厂商去实现,根据在DTP模型中扮演的不同角色,需要实现不同的接口。

作为开发人员的我们只需要学会如何使用即可。

JTA规范是Java扩展包,在应用中需要额外引入相应的jar包依赖

<dependency>  

<groupId>javax.transaction</groupId>  

<artifactId>jta</artifactId>  

<version>1.1</version>

</dependency>
复制代码

JTA规范1.1中的源码非常少,如下所示:

可以看到,这里除了红色框中包含的接口定义之外,其他全部是异常(XxxException),这里我们仅讨论JTA规范中定义的接口作用:

javax.transaction.Status:事务状态,这个接口主要是定义一些表示事务状态的常量,此接口无需实现

javax.transaction.Synchronization:同步

javax.transaction.Transaction:事务

javax.transaction.TransactionManager:事务管理器

javax.transaction.UserTransaction:用于声明一个分布式事务

javax.transaction.TransactionSynchronizationRegistry:事务同步注册

javax.transaction.xa.XAResource:定义RM提供给TM操作的接口

javax.transaction.xa.Xid:事务id

TM供应商

实现UserTransaction、TransactionManager、Transaction、TransactionSynchronizationRegistry、Synchronization、Xid接口,通过与XAResource接口交互来实现分布式事务。

此外,TM厂商如果要支持跨应用的分布式事务,那么还要实现JTS规范定义的接口。

常见的TM提供者包括我们前面提到的application server,包括:jboss、ejb server、weblogic等,以及一些以第三方类库形式提供事务管理器功能的jotm、Atomikos。

RM供应商

XAResource接口需要由资源管理器者来实现,XAResource接口中定义了一些方法,这些方法将会被TM进行调用,如:

start方法:开启事务分支

end方法:结束事务分支

prepare方法:准备提交

commit方法:提交

rollback方法:回滚

recover方法:列出所有处于PREPARED状态的事务分支

一些RM提供者,可能也会提供自己的Xid接口的实现。

特定接口

此外,不同的资源管理器有一些各自的特定接口要实现:

如JDBC2.0规范定义支持分布式事务的jdbc driver需要实现:javax.sql.XAConnection、javax.sql.XADataSource接口

JMS1.0规范规定支持分布式事务的JMS厂商,需要实现javax.jms.XAConnection、javax.jms.XASession接口

作为DTP模型中Application开发者的我们,并不需要去实现任何JTA规范中定义的接口,只需要使用TM提供的UserTransaction实现,来声明、提交、回滚一个分布式事务即可。

以下案例演示了UserTransaction接口的基本使用:构建一个分布式事务,来操作位于2个不同的数据库的数据,假设这两个库中都有一个user表。

UserTransaction userTransaction= new UserTransaction() {
        @Override
        public void begin() throws NotSupportedException, SystemException {
            
        }

        @Override
        public void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException, SecurityException, IllegalStateException, SystemException {

        }

        @Override
        public void rollback() throws IllegalStateException, SecurityException, SystemException {

        }

        @Override
        public void setRollbackOnly() throws IllegalStateException, SystemException {

        }

        @Override
        public int getStatus() throws SystemException {
            return 0;
        }

        @Override
        public void setTransactionTimeout(int i) throws SystemException {

        }
    };
            try{     
                //开启分布式事务            
        try {
            userTransaction.begin();
        } catch (NotSupportedException e1) {
            e1.printStackTrace();
        } catch (SystemException e1) {
            e1.printStackTrace();
        }
        //  执行事务分支1            
                 conn1 = db1.getConnection();         
                   ps1= conn1.prepareStatement("INSERT into user(name,age) VALUES ('tianshouzhi',23)");        
                    ps1.executeUpdate();           
             //    执行事务分支2           
                 conn2 = db2.getConnection();      
        
                 ps2 = conn2.prepareStatement("INSERT into user(name,age) VALUES ('tianshouzhi',23)");      
                      ps2.executeUpdate();            
               //  提交,两阶段提交发生在这个方法内部       
                     userTransaction.commit();    
            }catch (Exception e){        
                try {             
                    userTransaction.rollback();//回滚     
                } catch (SystemException ignore) {  
                    
                } 
         }
复制代码

需要注意的是,在分布式事务中,当我们需要提交或者回滚一个事务时,不应该再使用Connection接口提供的commit和rollback方法。

而是应该使用UserTransaction接口的commit接口和rollback接口替代。

另外,这个案例只是用于说明如何使用UserTransaction类,事实上,在实际开发中,并没有这么复杂。

一些开源的分布式事务解决方案,可以与spring声明式事务管理功能,因此我们可以通过一个简单@Transactional注解,即可实现分布式事务的功能。

例如,下面我们将要提到Atomikos,就支持与spring事务整合。

Atomikos介绍

Atomikos 是一个为Java平台提供增值服务的并且开源类事务管理器。

Atomikos TransactionsEssentials 是一个为Java平台提供增值服务的并且开源类事务管理器,以下是包括在这个开源版本中的一些功能:
1.全面崩溃 / 重启恢复

2.兼容标准的SUN公司JTA API

3.嵌套事务

4.为XA和非XA提供内置的JDBC适配器

Atomikos公司

Atomikos公司官方网址为 www.atomikos.com/。其旗下最著名的产品就…

TransactionEssentials:开源的免费产品

ExtremeTransactions:上商业版,需要收费。

开源版Atomikos-TransactionEssentials

TransactionEssentials:

1、实现了JTA/XA规范中的事务管理器(Transaction Manager)应该实现的相关接口,如:

UserTransaction实现是com.atomikos.icatch.jta.UserTransactionImp,用户只需要直接操作这个类

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package com.atomikos.icatch.jta;

import com.atomikos.icatch.config.UserTransactionService;
import com.atomikos.icatch.config.UserTransactionServiceImp;
import com.atomikos.util.SerializableObjectFactory;
import java.io.Serializable;
import javax.naming.NamingException;
import javax.naming.Reference;
import javax.naming.Referenceable;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.NotSupportedException;
import javax.transaction.RollbackException;
import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;

public class UserTransactionImp implements UserTransaction, Serializable, Referenceable {
    private static final long serialVersionUID = -865418426269785202L;
    private transient TransactionManager txmgr_;

    public UserTransactionImp() {
    }

    private void checkSetup() {
        Class var1 = TransactionManagerImp.class;
        synchronized(TransactionManagerImp.class) {
            this.txmgr_ = TransactionManagerImp.getTransactionManager();
            if (this.txmgr_ == null) {
                UserTransactionService uts = new UserTransactionServiceImp();
                uts.init();
                this.txmgr_ = TransactionManagerImp.getTransactionManager();
            }

        }
    }

    public void begin() throws NotSupportedException, SystemException {
        this.checkSetup();
        this.txmgr_.begin();
    }

    public void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException, SystemException, IllegalStateException, SecurityException {
        this.checkSetup();
        this.txmgr_.commit();
    }

    public void rollback() throws IllegalStateException, SystemException, SecurityException {
        this.checkSetup();
        this.txmgr_.rollback();
    }

    public void setRollbackOnly() throws IllegalStateException, SystemException {
        this.checkSetup();
        this.txmgr_.setRollbackOnly();
    }

    public int getStatus() throws SystemException {
        this.checkSetup();
        return this.txmgr_.getStatus();
    }

    public void setTransactionTimeout(int seconds) throws SystemException {
        this.checkSetup();
        this.txmgr_.setTransactionTimeout(seconds);
    }

    public Reference getReference() throws NamingException {
        return SerializableObjectFactory.createReference(this);
    }
}

复制代码

TransactionManager实现是com.atomikos.icatch.jta.UserTransactionManager

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package com.atomikos.icatch.jta;

import com.atomikos.icatch.config.Configuration;
import com.atomikos.util.SerializableObjectFactory;
import java.io.Serializable;
import javax.naming.NamingException;
import javax.naming.Reference;
import javax.naming.Referenceable;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.InvalidTransactionException;
import javax.transaction.NotSupportedException;
import javax.transaction.RollbackException;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;

public class UserTransactionManager implements TransactionManager, Serializable, Referenceable, UserTransaction {
    private static final long serialVersionUID = -655789038710288096L;
    private transient TransactionManagerImp tm;
    private boolean forceShutdown;
    private boolean startupTransactionService = true;
    private boolean closed = false;
    private boolean coreStartedHere;

    private void checkSetup() throws SystemException {
        if (!this.closed) {
            this.initializeTransactionManagerSingleton();
        }

    }

    private void initializeTransactionManagerSingleton() throws SystemException {
        this.tm = (TransactionManagerImp)TransactionManagerImp.getTransactionManager();
        if (this.tm == null) {
            if (!this.getStartupTransactionService()) {
                throw new SystemException("Transaction service not running");
            }

            this.startupTransactionService();
            this.tm = (TransactionManagerImp)TransactionManagerImp.getTransactionManager();
        }

    }

    private void startupTransactionService() {
        this.coreStartedHere = Configuration.init();
    }

    private void shutdownTransactionService() {
        if (this.coreStartedHere) {
            Configuration.shutdown(this.forceShutdown);
            this.coreStartedHere = false;
        }

    }

    public UserTransactionManager() {
    }

    public void setStartupTransactionService(boolean startup) {
        this.startupTransactionService = startup;
    }

    public boolean getStartupTransactionService() {
        return this.startupTransactionService;
    }

    public void init() throws SystemException {
        this.closed = false;
        this.checkSetup();
    }

    public void begin() throws NotSupportedException, SystemException {
        if (this.closed) {
            throw new SystemException("This UserTransactionManager instance was closed already. Call init() to reuse if desired.");
        } else {
            this.checkSetup();
            this.tm.begin();
        }
    }

    public boolean getForceShutdown() {
        return this.forceShutdown;
    }

    public void setForceShutdown(boolean value) {
        this.forceShutdown = value;
    }

    public void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException, SecurityException, IllegalStateException, SystemException {
        if (this.closed) {
            throw new SystemException("This UserTransactionManager instance was closed already - commit no longer allowed or possible.");
        } else {
            this.checkSetup();
            this.tm.commit();
        }
    }

    public int getStatus() throws SystemException {
        this.checkSetup();
        return this.tm.getStatus();
    }

    public Transaction getTransaction() throws SystemException {
        this.checkSetup();
        return this.tm.getTransaction();
    }

    public void resume(Transaction tx) throws InvalidTransactionException, IllegalStateException, SystemException {
        this.checkSetup();
        this.tm.resume(tx);
    }

    public void rollback() throws IllegalStateException, SecurityException, SystemException {
        this.tm.rollback();
    }

    public void setRollbackOnly() throws IllegalStateException, SystemException {
        this.tm.setRollbackOnly();
    }

    public void setTransactionTimeout(int secs) throws SystemException {
        this.checkSetup();
        this.tm.setTransactionTimeout(secs);
    }

    public Transaction suspend() throws SystemException {
        this.checkSetup();
        return this.tm.suspend();
    }

    public Reference getReference() throws NamingException {
        return SerializableObjectFactory.createReference(this);
    }

    public void close() {
        this.shutdownTransactionService();
        this.closed = true;
    }
}

复制代码

Transaction实现是com.atomikos.icatch.jta.TransactionImpl

2、针对实现了JDBC规范中规定的实现了XADataSource接口的数据库连接池,以及实现了JMS规范的MQ客户端提供一层封装。

在JTA规范时,XADataSource、XAConnection等接口应该由资源管理器RM来实现,

而Atomikos的作用是一个事务管理器(TM),并不需要提供对应的实现。

Atomikos对XADataSource进行封装,只是为了方便与事务管理器整合。

封装XADataSource的实现类为AtomikosDataSourceBean。典型的XADataSource实现包括:

1、mysql官方提供的com.mysql.jdbc.jdbc2.optional.MysqlXADataSource

2、阿里巴巴开源的druid连接池,对应的实现类为com.alibaba.druid.pool.xa.DruidXADataSource

3、tomcat-jdbc连接池提供的org.apache.tomcat.jdbc.pool.XADataSource

 而其他一些常用的数据库连接池,如dbcp、dbcp2或者c3p0,目前貌似尚未提供XADataSource接口的实现。如果提供给AtomikosDataSourceBean一个没有实现XADataSource接口的数据源,如c3p0的ComboPooledDataSource,则会抛出类似以下异常: 
复制代码
com.atomikos.jdbc.AtomikosSQLException: The class 'com.mchange.v2.c3p0.ComboPooledDataSource'
 specified by property 'xaDataSourceClassName' does not implement the required interface  
 javax.jdbc.XADataSource.  
 Please make sure the spelling is correct, and check your JDBC driver vendor's documentation.
复制代码

ExtremeTransactions在TransactionEssentials的基础上额外提供了以下功能:

支持TCC:这是一种柔性事务

支持通过RMI、IIOP、SOAP这些远程过程调用技术,进行事务传播。

本文主要针对Atomikos开源版本的事务管理器实现TransactionEssentials进行讲解,包括:

1、直接使用TransactionEssentials的API

2、TransactionEssentials与spring、mybatis整合

3、Atomikos配置详解

直接使用TransactionEssentials的API

在maven项目的pom文件中引入以下依赖:

<dependency>
    <groupId>com.atomikos</groupId>
    <artifactId>transactions-jdbc</artifactId>
    <version>4.0.6</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.39</version>
</dependency>
复制代码

新建mysql数据库表

需要注意的是,在mysql中,innodb引擎才支持XA事务以这里显式的指定了数据库引擎为innodb。

-- 新建数据库db_user;
create database db_user;
-- 在db_user库中新建user表
create table db_user.user(id int AUTO_INCREMENT PRIMARY KEY,name varchar(50)) engine=innodb;
-- 新建数据库db_account;
create database db_account;
-- 在db_account库中新建account表
create table db_account.account(user_id int,money double) engine=innodb;
另外,在本案例中,db_user库和db_account库是位于同一个mysql实例中的。 
复制代码

1.案例代码

在使用了事务管理器之后,我们通过atomikos提供的UserTransaction接口的实现类com.atomikos.icatch.jta.UserTransactionImp来开启、提交和回滚事务。

而不再是使用java.sql.Connection中的setAutoCommit(false)的方式来开启事务。其他JTA规范中定义的接口,开发人员并不需要直接使用。

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.jdbc.AtomikosDataSourceBean;
 
import javax.transaction.SystemException;
import javax.transaction.UserTransaction;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Properties;
 
public class AtomikosExample {
 
   private static AtomikosDataSourceBean createAtomikosDataSourceBean(String dbName) {
      // 连接池基本属性
      Properties p = new Properties();
      p.setProperty("url", "jdbc:mysql://localhost:3306/" + dbName);
      p.setProperty("user", "root");
      p.setProperty("password", "your password");
 
      // 使用AtomikosDataSourceBean封装com.mysql.jdbc.jdbc2.optional.MysqlXADataSource
      AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
      //atomikos要求为每个AtomikosDataSourceBean名称,为了方便记忆,这里设置为和dbName相同
      ds.setUniqueResourceName(dbName);
      ds.setXaDataSourceClassName("com.mysql.jdbc.jdbc2.optional.MysqlXADataSource");
      ds.setXaProperties(p);
      return ds;
   }
 
   public static void main(String[] args) {
 
      AtomikosDataSourceBean ds1 = createAtomikosDataSourceBean("db_user");
      AtomikosDataSourceBean ds2 = createAtomikosDataSourceBean("db_account");
 
      Connection conn1 = null;
      Connection conn2 = null;
      PreparedStatement ps1 = null;
      PreparedStatement ps2 = null;
 
      UserTransaction userTransaction = new UserTransactionImp();
      try {
         // 开启事务
         userTransaction.begin();
 
         // 执行db1上的sql
         conn1 = ds1.getConnection();
         ps1 = conn1.prepareStatement("INSERT into user(name) VALUES (?)", Statement.RETURN_GENERATED_KEYS);
         ps1.setString(1, "tianshouzhi");
         ps1.executeUpdate();
         ResultSet generatedKeys = ps1.getGeneratedKeys();
         int userId = -1;
         while (generatedKeys.next()) {
            userId = generatedKeys.getInt(1);// 获得自动生成的userId
         }
 
         // 模拟异常 ,直接进入catch代码块,2个都不会提交
//        int i=1/0;
 
         // 执行db2上的sql
         conn2 = ds2.getConnection();
         ps2 = conn2.prepareStatement("INSERT into account(user_id,money) VALUES (?,?)");
         ps2.setInt(1, userId);
         ps2.setDouble(2, 10000000);
         ps2.executeUpdate();
 
         // 两阶段提交
         userTransaction.commit();
      } catch (Exception e) {
         try {
            e.printStackTrace();
            userTransaction.rollback();
         } catch (SystemException e1) {
            e1.printStackTrace();
         }
      } finally {
         try {
            ps1.close();
            ps2.close();
            conn1.close();
            conn2.close();
            ds1.close();
            ds2.close();
         } catch (Exception ignore) {
         }
      }
   }
}
复制代码

2.TransactionEssentials与spring、mybatis整合

1.pom中添加以下依赖

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jdbc</artifactId>
    <version>4.3.7.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>4.3.7.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.mybatis</groupId>
    <artifactId>mybatis</artifactId>
    <version>3.4.1</version>
</dependency>
<dependency>
    <groupId>org.mybatis</groupId>
    <artifactId>mybatis-spring</artifactId>
    <version>1.3.1</version>
</dependency>
复制代码

2.新建User实体

package com.tianshouzhi.atomikos;
public class User {
   private int id;
   private String name;
   // setters and getters
}
复制代码

3.新建Account实例

package com.tianshouzhi.atomikos;
public class Account {
   private int userId;
   private double money;
   // setters and getters
}
复制代码

4.新建UserMapper接口

为了方便,这里使用了mybatis 注解方式,没有编写映射文件,作用是一样的

package com.tianshouzhi.atomikos.mappers.db_user;
import org.apache.ibatis.annotations.Insert;
import com.tianshouzhi.atomikos.User;
import org.apache.ibatis.annotations.Options;
public interface UserMapper {
   @Insert("INSERT INTO user(id,name) VALUES(#{id},#{name})")
   @Options(useGeneratedKeys = true, keyColumn = "id", keyProperty = "id")
   public void insert(User user);
}
复制代码

5.新建AccountMapper接口

package com.tianshouzhi.atomikos.mappers.ds_account;
import com.tianshouzhi.atomikos.Account;
import org.apache.ibatis.annotations.Insert;
public interface AccountMapper {
    @Insert("INSERT INTO account(user_id,money) VALUES(#{userId},#{money})")
    public void insert(Account account);
}
复制代码

6.新建使用JTA事务的bean

注意在使用jta事务的时候,依然可以使用spring的声明式事务管理

package com.tianshouzhi.atomikos;
import com.tianshouzhi.atomikos.mappers.db_user.UserMapper;
import com.tianshouzhi.atomikos.mappers.ds_account.AccountMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
public class JTAService {
   @Autowired
   private UserMapper userMapper;//操作db_user库
   @Autowired
   private AccountMapper accountMapper;//操作db_account库
   @Transactional
   public void insert() {
      User user = new User();
      user.setName("wangxiaoxiao");
      userMapper.insert(user);
       
      //    int i = 1 / 0;//模拟异常,spring回滚后,db_user库中user表中也不会插入记录
      Account account = new Account();
      account.setUserId(user.getId());
      account.setMoney(123456789);
      accountMapper.insert(account);
   }
}
复制代码

7.编写配置文件spring-atomikos.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">
 
 
    <!--==========针对两个库,各配置一个AtomikosDataSourceBean,底层都使用MysqlXADataSource=====================-->
    <!--配置数据源db_user-->
    <bean id="db_user" class="com.atomikos.jdbc.AtomikosDataSourceBean"
          init-method="init" destroy-method="close">
        <property name="uniqueResourceName" value="ds1" />
        <property name="xaDataSourceClassName"
                  value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource" />
        <property name="xaProperties">
            <props>
                <prop key="url">jdbc:mysql://localhost:3306/db_user</prop>
                <prop key="user">root</prop>
                <prop key="password">shxx12151022</prop>
            </props>
        </property>
    </bean>
 
    <!--配置数据源db_account-->
    <bean id="db_account" class="com.atomikos.jdbc.AtomikosDataSourceBean"
          init-method="init" destroy-method="close">
        <property name="uniqueResourceName" value="ds2" />
        <property name="xaDataSourceClassName"
                  value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource" />
        <property name="xaProperties">
            <props>
                <prop key="url">jdbc:mysql://localhost:3306/db_account</prop>
                <prop key="user">root</prop>
                <prop key="password">shxx12151022</prop>
            </props>
        </property>
    </bean>
 
    <!--=============针对两个数据源,各配置一个SqlSessionFactoryBean============ -->
    <bean id="ssf_user" class="org.mybatis.spring.SqlSessionFactoryBean">
        <property name="dataSource" ref="db_user" />
    </bean>
 
    <bean id="ssf_account" class="org.mybatis.spring.SqlSessionFactoryBean">
        <property name="dataSource" ref="db_account" />
    </bean>
 
    <!--=============针对两个SqlSessionFactoryBean,各配置一个MapperScannerConfigurer============ -->
    <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
        <property name="sqlSessionFactoryBeanName" value="ssf_user"/>
        <!--指定com.tianshouzhi.atomikos.mappers.db_user包下的UserMapper接口使用ssf_user获取底层数据库连接-->
        <property name="basePackage" value="com.tianshouzhi.atomikos.mappers.db_user"/>
    </bean>
 
    <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
        <property name="sqlSessionFactoryBeanName" value="ssf_account"/>
        <!--指定com.tianshouzhi.atomikos.mappers.ds_account包下的AccountMapper接口使用ssf_account获取底层数据库连接-->
        <property name="basePackage" value="com.tianshouzhi.atomikos.mappers.ds_account"/>
    </bean>
 
    <!--================配置atomikos事务管理器========================-->
    <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init"
          destroy-method="close">
        <property name="forceShutdown" value="false"/>
    </bean>
 
    <!--============配置spring的JtaTransactionManager,底层委派给atomikos进行处理===============-->
    <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
        <property name="transactionManager" ref="atomikosTransactionManager"/>
    </bean>
 
    <!--配置spring声明式事务管理器-->
    <tx:annotation-driven transaction-manager="jtaTransactionManager"/>
 
    <bean id="jtaService" class="com.tianshouzhi.atomikos.JTAService"/>
</beans>
复制代码

8.测试代码

package com.tianshouzhi.atomikos;
 
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
 
public class AtomikosSpringMybatisExample {
   public static void main(String[] args) {
      ApplicationContext context = new ClassPathXmlApplicationContext("spring-atomikos.xml");
      JTAService jtaService = context.getBean("jtaService", JTAService.class);
      jtaService.insert();
   }
}
复制代码

建议读者先直接按照上述代码运行,以确定代码执行后,db_user库的user表和db_account的account表中的确各插入了一条记录,以证明我们的代码的确是操作了2个库,属于分布式事务。

然后将JTAService中的异常模拟的注释打开,会发现出现异常后,两个库中都没有新插入的数据库,说明我们使用的JTA事务管理器的确保证数据的一致性了。

3.Atomikos配置

在掌握了Atomikos基本使用之后,我们对Atomikos的配置进行一下简单的介绍。

Atomikos在启动后,默认会从以下几个位置读取配置文件,这里直接贴出atomikos源码进行说明:

com.atomikos.icatch.provider.imp.AssemblerImp#initializeProperties方法中定义了配置加载顺序逻辑:

@Override
    public ConfigProperties initializeProperties() {
        //读取classpath下的默认配置transactions-defaults.properties
        Properties defaults = new Properties();
        loadPropertiesFromClasspath(defaults, DEFAULT_PROPERTIES_FILE_NAME);
        //读取classpath下,transactions.properties配置,覆盖transactions-defaults.properties中相同key的值
        Properties transactionsProperties = new Properties(defaults);
        loadPropertiesFromClasspath(transactionsProperties, TRANSACTIONS_PROPERTIES_FILE_NAME);
        //读取classpath下,jta.properties,覆盖transactions-defaults.properties、transactions.properties中相同key的值
        Properties jtaProperties = new Properties(transactionsProperties);
        loadPropertiesFromClasspath(jtaProperties, JTA_PROPERTIES_FILE_NAME);
        
        //读取通过java -Dcom.atomikos.icatch.file方式指定的自定义配置文件路径,覆盖之前的同名配置
        Properties customProperties = new Properties(jtaProperties);
        loadPropertiesFromCustomFilePath(customProperties);
        //最终构造一个ConfigProperties对象,来表示实际要使用的配置
        Properties finalProperties = new Properties(customProperties);
        return new ConfigProperties(finalProperties);
    }
复制代码

配置文件优先级:transactions-defaults.properties<transactions.properties<jta.properties<自定义配置文件路径,后面的配置会覆盖之前同名key的配置。

其中transactions-defaults.properties是atomikos自带的默认配置,位于transactions-xxx.jar中.

注意不同版本的默认配置可能不同。特别是3.x版本和4.x版本的差异比较明显。

以下是4.0.6中 transactions-default.properties中配置内容,这些配置归类如下:

===============================================================
============          事务管理器(TM)配置参数       ==============
===============================================================
#指定是否启动磁盘日志,默认为true。在生产环境下一定要保证为true,否则数据的完整性无法保证
com.atomikos.icatch.enable_logging=true
#JTA/XA资源是否应该自动注册
com.atomikos.icatch.automatic_resource_registration=true
#JTA事务的默认超时时间,默认为10000ms
com.atomikos.icatch.default_jta_timeout=10000
#事务的最大超时时间,默认为300000ms。这表示事务超时时间由 UserTransaction.setTransactionTimeout()较大者决定。4.x版本之后,指定为0的话则表示不设置超时时间
com.atomikos.icatch.max_timeout=300000
#指定在两阶段提交时,是否使用不同的线程(意味着并行)。3.7版本之后默认为false,更早的版本默认为true。如果为false,则提交将按照事务中访问资源的顺序进行。
com.atomikos.icatch.threaded_2pc=false
#指定最多可以同时运行的事务数量,默认值为50,负数表示没有数量限制。在调用 UserTransaction.begin()方法时,可能会抛出一个”Max number of active transactions reached”异常信息,表示超出最大事务数限制
com.atomikos.icatch.max_actives=50
#是否支持subtransaction,默认为true
com.atomikos.icatch.allow_subtransactions=true
#指定在可能的情况下,否应该join 子事务(subtransactions),默认值为true。如果设置为false,对于有关联的不同subtransactions,不会调用XAResource.start(TM_JOIN)
com.atomikos.icatch.serial_jta_transactions=true
#指定JVM关闭时是否强制(force)关闭事务管理器,默认为false
com.atomikos.icatch.force_shutdown_on_vm_exit=false
#在正常关闭(no-force)的情况下,应该等待事务执行完成的时间,默认为Long.MAX_VALUE
com.atomikos.icatch.default_max_wait_time_on_shutdown=9223372036854775807
 
===============================================================
=========        事务日志(Transaction logs)记录配置       =======
===============================================================
#事务日志目录,默认为./。
com.atomikos.icatch.log_base_dir=./
#事务日志文件前缀,默认为tmlog。事务日志存储在文件中,文件名包含一个数字后缀,日志文件以.log为扩展名,如tmlog1.log。遇到checkpoint时,新的事务日志文件会被创建,数字增加。
com.atomikos.icatch.log_base_name=tmlog
#指定两次checkpoint的时间间隔,默认为500
com.atomikos.icatch.checkpoint_interval=500
 
===============================================================
=========          事务日志恢复(Recovery)配置       =============
===============================================================
#指定在多长时间后可以清空无法恢复的事务日志(orphaned),默认86400000ms
com.atomikos.icatch.forget_orphaned_log_entries_delay=86400000
#指定两次恢复扫描之间的延迟时间。默认值为与com.atomikos.icatch.default_jta_timeout相同
com.atomikos.icatch.recovery_delay=${com.atomikos.icatch.default_jta_timeout}
#提交失败时,再抛出一个异常之前,最多可以重试几次,默认值为5
com.atomikos.icatch.oltp_max_retries=5
#提交失败时,每次重试的时间间隔,默认10000 s
com.atomikos.icatch.oltp_retry_interval=10000
 
===============================================================
=========          其他       =============================== ==
===============================================================
java.naming.factory.initial=com.sun.jndi.rmi.registry.RegistryContextFactory
com.atomikos.icatch.client_demarcation=false
java.naming.provider.url=rmi://localhost:1099
com.atomikos.icatch.rmi_export_class=none
com.atomikos.icatch.trust_client_tm=false
复制代码

当我们想对默认的配置进行修改时,可以在classpath下新建一个jta.properties,覆盖同名的配置项即可。

关于不同版本配置的差异,请参考官方文档:www.atomikos.com/Documentati…

4.打印日志

4.x版本之后,优先尝试使用slf4j,如果没有则尝试使用log4j,如果二者都没有,则使用JUL。

参见:www.atomikos.com/Documentati…

注意这里是说的是如何配置打印工作日志(work log),而前面说的是事务日志(transactions log),二者不是不同的。

写在后面的话

山河远阔,人间烟火。

无一是你,无一不是你。

参考

田守枝JAVA技术博客