分布式锁:Redisson源码解析-Semaphore、Co

一、Semaphore

1. 文档及实现

Github

  • 与Java里面的Semaphore是类似的
  • 可以在使用前初始化,通过 trySetPermits(permits) 方法获得可用的许可数量。

官方文档中没有说的详细,但是可以理解的是核心的功能也就是和Java中的Semaphore是类似的

代码实现

// 获取到一个semaphore锁
RSemaphore semaphore = redissonClient.getSemaphore("anyLock");
// 允许同时三个客户端获取到semaphore锁
semaphore.trySetPermits(3);

for (int i = 0; i < 10; i++) {
    new Thread(() -> {
        try {
            System.out.println(new Date() + ":" + Thread.currentThread().getName() + " start");
            // 通过acquire来获取
            semaphore.acquire();
            System.out.println(new Date() + ":" + Thread.currentThread().getName() + " success");
            Thread.sleep(3000);
        } catch (InterruptedException e) {

        } finally {
            System.out.println(new Date() + ":" + Thread.currentThread().getName() + " release");
            // 通过release释放
            semaphore.release();
        }
    }).start();
}
复制代码

2. 源码剖析

初始化

RSemaphore semaphore = redissonClient.getSemaphore("anyLock");

这里的初始化,是获取了一个什么呢

  • RedissonSemaphore实际上也没啥核心就初始化了一个commandExecutor

设置信号量

semaphore.trySetPermits(3);

  • org.redisson.RedissonSemaphore#trySetPermitsAsync
// KEY[1] lockName
// KEY[2] redisson_sc:{lockName}
// ARGV[1] permits

// 从redis中获取这个key
local value = redis.call('get', KEYS[1]);
// key不存在
if (value == false or value == 0) then 
  // 设置key的值为permits
  redis.call('set', KEYS[1], ARGV[1]);
  redis.call('publish', KEYS[2], ARGV[1]);
  // 响应1
  return 1;
end;
return 0;
复制代码

所以,首先会在redis中设置一个key为lockName,值为信号量值的字符串

请求acquire

  • 默认如果不给参数的话,那么这个acquire会请求值为1的信号量
// KEY[1] lockName
// ARVG[1] permits

// 获取redis key
local value = redis.call('get', KEYS[1]);
// 存在key且值大于等于permits
if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then
  // 递减
  local val = redis.call('decrby', KEYS[1], ARGV[1]);
  return 1;
end; 
return 0;
复制代码

实际获取信号量,就是通过将lockName的值减小,如果value的值无法承担permits的值,就会加锁失败

  • 加锁失败以后,会进入循环,再次尝试获取,如果加锁失败就会阻塞

释放release

  • 同样的默认释放的release的数量也是1
// KEY[1] lockName
// KEY[2] redisson_sc:{lockName}
// ARVG[1] permits

// redis key递增
local value = redis.call('incrby', KEYS[1], ARGV[1]);
redis.call('publish', KEYS[2], value);
复制代码

释放,就是将redis key的值给递增下

3. 总结

实际semaphore的核心部分就是在redis中建立了一个key,并设置了对应的信号量,通过获取来递减key的值,当value的值无法满足permits的时候就会进入循环,再次获取锁并阻塞,释放锁是通过增加lockName的值的

二、CountDownLatch

1. 文档及实现

Github

  • 具有Java CountDownLatch的特性和数据结构
  • 使用前应通过 trySetCount(count) 方法用 count 初始化

代码实现

// 初始化一个countdownLatch对象
RCountDownLatch lockName = redissonClient.getCountDownLatch("lockName");

// 设置countDown线程数量
lockName.trySetCount(3);
System.out.println(new Date() + ":" + Thread.currentThread().getName() + " wait");
for (int i = 0; i < 3; i++) {
    new Thread(() -> {
        try {
            Thread.sleep(3000);
            // 获取对应的countdownLatch对象
            RCountDownLatch latch = redissonClient.getCountDownLatch("lockName");
            // countDown
            latch.countDown();
            System.out.println(new Date() + ":" + Thread.currentThread().getName() + " countdown");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }).start();
}
// 阻塞、等待
lockName.await();
System.out.println(new Date() + ":" + Thread.currentThread().getName() + " go");
复制代码

2. 源码剖析

初始化

  • 实际初始化的是一个 RedissonCountDownLatch对象
  • 初始化了一个 connection uuid,一个CountDownLatchPubSub

初始化countDown数量

org.redisson.RedissonCountDownLatch#trySetCountAsync

// KEY[1] lockName
// KEY[2] redisson_countdownlatch__channel__{lockName}
// ARGV[1] CountDownLatchPubSub.NEW_COUNT_MESSAGE 1
// ARGV[2] count

// 判断redis key是不存在的
if redis.call('exists', KEYS[1]) == 0 then
  // 设置set lockName count
  redis.call('set', KEYS[1], ARGV[2]);
  // publish
  redis.call('publish', KEYS[2], ARGV[1]);
  return 1
else
  return 0
end
复制代码

同样,CountDownLatch也会在redis中创建一个key,并设置一个value代表countDown数量

wait

  • 循环中阻塞掉

countDown

  • org.redisson.RedissonCountDownLatch#countDownAsync
// 将redis中的lockName的值-1
local v = redis.call('decr', KEYS[1]);
// 如果小于零就删除掉这个key
if v <= 0 then redis.call('del', KEYS[1]) end;
// 等于零就会发一个publish,实际会notifyAll
if v == 0 then redis.call('publish', KEYS[2], ARGV[1]) end;
复制代码

3. 总结

总的来说,countdownLatch和semaphore的实现还是比较简单的