参考二阶段事务(2PC)思想实现的多线程事务参考

参考 二阶段事务 (2PC)思想 实现的 多线程事务

前言

先提出需求,不贴合业务的技术实现都是耍流氓。
我这里有1个50w~100w 的大数据量 excel导入 。 当月导入上个月的数据。实时性要求不高。导入的是子单明细,业务要全导入或者出错全部不导入。
旧代码实现是逐行读取,然后达到执行行数,for 批量插入数据库,执行时间在10分钟左右。优化后可达1分钟。
我提议空间换时间,做一次优化,因为这个功能只有几个业务人员在用。一般都是先导入数据后再使用,所以放心大胆的导入就可以。
 

思路分析

贴出来 简单的思路代码,有了思路其他就是照着葫芦画瓢了。

public class TestDemo {
    // 保证变量的弱同步 标志最后线程是否提交事务
    public volatile static boolean flag = true;

    public static void main(String[] args) throws InterruptedException {


        //  开启线程数
        int threadNum = 3;
        CountDownLatch main = new CountDownLatch(1);
        CountDownLatch child = new CountDownLatch(threadNum);
        ExecutorService executorPool = Executors.newFixedThreadPool(threadNum);

        for (int i = 1; i <= threadNum; i++) {
            final int n = i;
            executorPool.execute(() -> {
                try {

                    // 开启事务
                    System.out.println(Thread.currentThread().getName() + "执行" + "开启事务");
                    // 模拟数据库操作 插入数据
                    HashMap hashMap = new HashMap<String, String>();
                    hashMap.put("a3", "aa");

                    /*// 随机制造异常
                    if (n == 2) {
                        int i1 = 1 / 0;
                    }
*/
                    System.out.println(Thread.currentThread().getName() + "执行" + "等待提交事务");
                    // 逻辑执行完毕,等待主线程响应
                    child.countDown();
                    // 等待主线程判断完毕
                    main.await();
                    //回滚操作
                    if (!flag) {
                        System.out.println(Thread.currentThread().getName() + "执行" + "dao回滚");
                    } else {
                        //操作完提交事务
                        System.out.println(Thread.currentThread().getName() + "执行" + "dao提交事务");
                    }
                } catch (Exception e) {
                    //回滚操作
                    System.out.println(Thread.currentThread().getName() + "发现异常开始执行" + "dao回滚");
                    flag = false;
                    child.countDown();
                }
            });
        }


        child.await();
        if (flag) {
            System.out.println("逻辑完成");
        } else {
            System.out.println("逻辑失败");
        }
        main.countDown();
        executorPool.shutdown();
        System.out.println("ok");
    }
}
复制代码

 新建 固定 3个线程,来分批执行我们的导入逻辑,每个线程分个10w+,然后使用编程式事务 管理 事务行为,在操作完之后等待主线程响应,没问题之后按照 flag 标志变量进行 事务的提交和 回滚。
主要 使用了juc 的工具类 CountDownLatch来实现。

模拟一个异常执行

我们执行来看看结果 。3个线程都按照逻辑正常回滚了。

 

模拟正常执行的结果

去掉异常代码。事务都正常提交。

 

缺点

上述实现如果在执行中,有2个线程提交事务,突然项目挂掉了,就会丢失部分数据。虽然性能和灵活性大大提升,但是相比单线程的安全性降低了。另外创建的线程数量不宜太大,会占用资源。
 

总结

此例子也可以帮助理解 分布式事务的思想,分布式事务是 进程之间,本例子是线程之间。

参考:juejin.cn/post/689298…