数据库连接池Demo(2)多线程初步

这是我参与11月更文挑战的第7天,活动详情查看:2021最后一次更文挑战

简介

本文在之前文章Druid使用流程文章基础下,来实现在多线程请求下的数据库连接池初步Demo,实现一个多线程下能跑的最基础版本

探索Alibaba Druid的物理连接生成

参考Alibaba Druid数据库的实现:Alibaba Druid 源码阅读(六)数据库连接使用流程初探

我们主要的实现思路如下:

  • 1.初始化配置的初始物理连接数
  • 2.获取连接时,从空闲线程池中阻塞获取
  • 3.获取连接时,发送生成物理连接指令去生成新的物理连接,但物理连接数不得大于配置的最大连接数
  • 4.连接关闭时,归还空闲线程池

在Alibaba Druid中是使用等待-通知机制来阻塞获取的,我们简单点,就用一个阻塞队列实现这个

1.初始化生成配置初始化数据的物理连接

先简单直接的在构造函数中生成,并将其放入空闲池中:

public class SelfDataSource implements DataSource {

    /**
     * 放置空闲可用的连接
     */
    private final LinkedBlockingDeque<SelfPoolConnection> idle = new LinkedBlockingDeque<>();
    /**
     * 放置正在使用的连接
     */
    private final LinkedBlockingDeque<SelfPoolConnection> active = new LinkedBlockingDeque<>();
    private final AtomicInteger connectCount = new AtomicInteger(0);

    private final String url;
    private final String username;
    private final String password;
    private int maxActive;
    private int timeout = 100;

    public SelfDataSource(final Properties properties) {
        this.url = properties.getProperty("url");
        this.username = properties.getProperty("username");
        this.password = properties.getProperty("password");
        this.maxActive = Integer.parseInt(properties.getProperty("maxActive"));
        this.timeout = Integer.parseInt(properties.getProperty("timeout", "100"));

        final int initCount = Integer.parseInt(properties.getProperty("initCount", "0"));
        if (initCount > maxActive) {
            throw new RuntimeException("initCount gt maxActive");
        }

        IntStream.range(0, initCount).forEach(i -> createPhysicsConnect());
    }

    private void createPhysicsConnect() {
        try {
            idle.put(new SelfPoolConnection(this, url, username, password));
            connectCount.addAndGet(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
复制代码

2.从池中获取物理连接

获取连接时,从空闲池中进行获取,返回活跃使用池中:

public class SelfDataSource implements DataSource {

    /**
     * 放置空闲可用的连接
     */
    private final LinkedBlockingDeque<SelfPoolConnection> idle = new LinkedBlockingDeque<>();
    /**
     * 放置正在使用的连接
     */
    private final LinkedBlockingDeque<SelfPoolConnection> active = new LinkedBlockingDeque<>();

    /**
     * 自定义的获取数据库物理连接
     * 1.无空闲连接则生成新的物理连接,并且放入正在运行连接池中
     * 2.如果有空闲连接,则获取,并放入正在运行连接池中
     * @return 自定义的数据库物理连接(自定义以能够自定义Close方法)
     * @throws SQLException
     */
    @Override
    public Connection getConnection() throws SQLException {
        try {
            final SelfPoolConnection connection = idle.take();
            System.out.println("获取连接,结束\n");
            active.put(connection);
            return connection;
        } catch (InterruptedException e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }
}
复制代码

3.回收连接

将连接从使用活跃池中移除,放入空闲池中:

public class SelfDataSource implements DataSource {

    /**
     * 放置空闲可用的连接
     */
    private final LinkedBlockingDeque<SelfPoolConnection> idle = new LinkedBlockingDeque<>();
    /**
     * 放置正在使用的连接
     */
    private final LinkedBlockingDeque<SelfPoolConnection> active = new LinkedBlockingDeque<>();

    /**
     * 初步将Connection从运行池中异常,放入空闲池
     * 从正在使用连接池中移除,放入空闲连接池中
     * @param selfPoolConnection 自定义Connection
     */
    public void recycle(final SelfPoolConnection selfPoolConnection) {
        try {
            while (!active.remove(selfPoolConnection)) {}
            System.out.println("回收连接,结束\n");
            idle.put(selfPoolConnection);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
复制代码

4.测试运行

测试的代码如下:

public class MultiThreadSelfTest {

    public static final String DB_URL = "jdbc:h2:file:./demo-db";
    public static final String USER = "sa";
    public static final String PASS = "sa";
    public static final String QUERY = "SELECT id, name FROM user_example";


    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 第一次运行可以初始化数据库数据,后面可以取消
//        initData();

        final Properties properties = new Properties();
        properties.setProperty("url", "jdbc:h2:file:./demo-db");
        properties.setProperty("username", "sa");
        properties.setProperty("password", "sa");
        properties.setProperty("maxActive", "5");
        properties.setProperty("initCount", "5");

        SelfDataSource dataSource = new SelfDataSource(properties);

        FutureTask[] fs = new FutureTask[10];
        for (int i=0; i<10; i++) {
            fs[i] = new FutureTask(() -> druidQuery(dataSource));
            new Thread(fs[i]).start();
        }

        while (true) {
            for (int i = 0; i < 10; i++) {
                if (!fs[i].isDone()) {
                    continue;
                }
            }
            break;
        }
        
        Thread.sleep(3000);
        System.out.printf("当前数据库连接数:%d\n", dataSource.getConnectionCount());
    }

    /**
     * 生成数据
     */
    public static void initData() {
        final String drop = "drop table `user_example` if exists;";
        final String createTable = "CREATE TABLE IF NOT EXISTS `user_example` (" +
                "`id` bigint NOT NULL AUTO_INCREMENT, " +
                "`name` varchar(100) NOT NULL" +
                ");";
        final String addUser = "insert into user_example (name) values(%s)";
        try(Connection conn = DriverManager.getConnection(DB_URL, USER, PASS);
            Statement stmt = conn.createStatement()) {
            stmt.execute(drop);
            stmt.execute(createTable);
            for (int i=0; i<10; i++) {
                stmt.execute(String.format(addUser, i));
            }
            conn.commit();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    private static long druidQuery(SelfDataSource dataSource) {
        System.out.println("开始执行查询");
        final long cur = System.currentTimeMillis();
        try(Connection conn = dataSource.getConnection();
            Statement stmt = conn.createStatement();
            ResultSet rs = stmt.executeQuery(QUERY)) {
            while (rs.next()) {
            }
            Thread.sleep(1000);
        } catch (SQLException | InterruptedException e) {
            e.printStackTrace();
        }
        final long cost = System.currentTimeMillis() - cur;
        System.out.println("查询结束,耗时:" + cost);
        return cost;
    }
}
复制代码

结果及分析如下:

// 生成最初的五个物理连接
初始化物理连接
初始化物理连接
初始化物理连接
初始化物理连接
初始化物理连接

开始执行查询
开始执行查询
获取连接,结束 // 使用数 1 空闲数 4
开始执行查询
获取连接,结束 // 使用数 2 空闲数 3
开始执行查询
获取连接,结束 // 使用数 3 空闲数 2
开始执行查询
开始执行查询
获取连接,结束 // 使用数 4 空闲数 1
开始执行查询
获取连接,结束 // 使用数 5 空闲数 0
开始执行查询
开始执行查询
开始执行查询
回收连接,结束 // 使用数 4 空闲数 1
回收连接,结束 // 使用数 3 空闲数 2
回收连接,结束 // 使用数 2 空闲数 3

查询结束,耗时:1034
查询结束,耗时:1034
查询结束,耗时:1034

获取连接,结束 // 使用数 3 空闲数 2
回收连接,结束 // 使用数 2 空闲数 3
获取连接,结束 // 使用数 3 空闲数 2
回收连接,结束 // 使用数 2 空闲数 3

查询结束,耗时:1034

获取连接,结束 // 使用数 3 空闲数 2
获取连接,结束 // 使用数 4 空闲数 1
获取连接,结束 // 使用数 5 空闲数 0

查询结束,耗时:1035

回收连接,结束
回收连接,结束

查询结束,耗时:2041

回收连接,结束
回收连接,结束
回收连接,结束

查询结束,耗时:2042
查询结束,耗时:2041
查询结束,耗时:2041
查询结束,耗时:2042

当前数据库连接数:5

复制代码

总结

运行结果看起来大致符合我们的预期,但总感觉还有有一些不对劲的地方,现在暂时还看不出来

下面还有一些未完成的:

  • 最小空闲线程数的实现
  • 动态线程的生成

后面继续完善吧,还有就是和Alibaba Druid的运行对比,这些后面继续写和研究

代码参考地址,可查看Tag V0.0.1:github.com/lw124392545…

参考链接