为什么需要cp分布式锁
分布式锁的功能和诉求,我们已经在Redis分布式锁:基于AOP和Redis实现的简易版分布式锁简单的介绍过了。
目前自研的Redis分布式锁,已可满足大部分场景(非公平+可自动续期+可重入的分布式锁),可投入生产环境的单机环境中使用。但是因为是基于Redis单机的环境,只能用于并发量并不高的场景。随着接入的业务场景扩大,Redis单机已经变得不可靠了,那么接下来给我们的选择只有两种:
1、Redis单机改为集群。
2、改用其他基于一致性算法的实现方式。
方案1有先天性的缺陷,redis集群无法保证一致性问题,在master节点宕机的瞬间,master和slave节点之间的数据可能是不一致的。这将会导致服务a从master节点拿到了锁a,然后master节点宕机,在slave节点尚未完全同步完master的数据之前,服务b将从slave节点上成功拿到同样的锁a。
而在其他基于一致性算法的实现方式上,zk和ectd是不错的选择。然后考虑到zk已廉颇老矣,我们选择了ectd这个后起之秀。
由于在分布式锁的场景内,我们更关注的是锁的一致性,而非锁的可用性,所以cp锁比ap锁更可靠。
设计思路
etcd引入了租约的概念,我们首先需要授予一个租约,然后同时设置租约的有效时间。租约的有效时间我们可以用来作为锁的有效时间。
然后我们可以直接调用etcd的lock功能,在指定的租约上对指定的lockName进行加锁操作。如果当前没有其他线程持有该锁,则该线程能直接持有锁。否则需要等待。这里我们可以将timeout的时间设置为锁的等待时间来实现竞争锁失败等待获取的过程。当然由于网络波动等问题,我建议timeout的时间最少设置为500ms(或你们认为合理的数值)。
然后解锁的过程,我们放弃了etcd的unlock操作,而直接使用了etcd的revoke操作。之所以没采用unlock操作,一是因为unlock所需要的参数是上一步lock操作返回的lockKey,我们并不希望多维护一个字段,二是因为我们最终会执行revoke操作,而revoke操作会将该租约下的所有key都失效,因为我们目前目前设计的是一个租约对应一个锁,不存在会释放其它业务场景中的锁的情况。
此外,为了保证线程在等待获取锁的过程中租约不会过期,所以我们得为这个线程设置一个守护线程,在该线程授予租约后就开启守护线程,定期去判断是否需要续期。
和redis分布式锁不一样的是,redis分布式锁的有效时间是缓存的有效时间,所以可以在获取锁成功后再开启用于续期的守护线程,而etcd分布式锁的有效时间是租约的有效时间,在等待获取锁的过程中可能租约会过期,所以得在获取租约后就得开启守护线程。这样就增加了很多的复杂度。
##具体实现
原生的etcd是通过Go语言来写的,直接在java程序中应用会有一点困难,所以我们直接采用jetcd来作为etcd的客户端,这样在java程序中就可以使用代码方式和etcd服务端通讯。
jetcd提供了LeaseClient,我们可以直接使用grant功能完成授予租约的操作。
public LockLeaseData getLeaseData(String lockName, Long lockTime) {
try {
LockLeaseData lockLeaseData = new LockLeaseData();
CompletableFuture<LeaseGrantResponse> leaseGrantResponseCompletableFuture = client.getLeaseClient().grant(lockTime);
Long leaseId = leaseGrantResponseCompletableFuture.get(1, TimeUnit.SECONDS).getID();
lockLeaseData.setLeaseId(leaseId);
CpSurvivalClam cpSurvivalClam = new CpSurvivalClam(Thread.currentThread(), leaseId, lockName, lockTime, this);
Thread survivalThread = threadFactoryManager.getThreadFactory().newThread(cpSurvivalClam);
survivalThread.start();
lockLeaseData.setCpSurvivalClam(cpSurvivalClam);
lockLeaseData.setSurvivalThread(survivalThread);
return lockLeaseData;
} catch (InterruptedException | ExecutionException | TimeoutException e) {
return null;
}
}
复制代码
此外如上所述,我们在获取租约后,开启了CpSurvivalClam的守护线程来定期续期。CpSurvivalClam的实现和我们在redis分布式锁的时候实现大体一致,差别只是将其中的expandLockTime操作改为了etcd中的keepAliveOnce。expandLockTime方法具体如下所示:
/**
* 重置锁的有效时间
*
* @param leaseId 锁的租约id
* @return 是否成功重置
*/
public Boolean expandLockTime(Long leaseId) {
try {
CompletableFuture<LeaseKeepAliveResponse> leaseKeepAliveResponseCompletableFuture = client.getLeaseClient().keepAliveOnce(leaseId);
leaseKeepAliveResponseCompletableFuture.get();
return Boolean.TRUE;
} catch (InterruptedException | ExecutionException e) {
return Boolean.FALSE;
}
}
复制代码
然后jetcd提供了LockClient,我们直接可以用lock功能,将leaseId和lockName传入,我们会得到一个在该租约下的lockKey。此外为了保证加锁成功后,租约未过期。我们加了一步timeToLive的操作,用于判断租约在获取锁成功后的是否还存活。如果ttl未大于0,则判断为加锁失败。
/**
* 在指定的租约上加锁,如果租约过期,则算加锁失败。
*
* @param leaseId 锁的租约Id
* @param lockName 锁的名称
* @param waitTime 加锁过程中的的等待时间,单位ms
* @return 是否加锁成功
*/
public Boolean tryLock(Long leaseId, String lockName, Long waitTime) {
try {
CompletableFuture<LockResponse> lockResponseCompletableFuture = client.getLockClient().lock(ByteSequence.from(lockName, Charset.defaultCharset()), leaseId);
long timeout = Math.max(500, waitTime);
lockResponseCompletableFuture.get(timeout, TimeUnit.MILLISECONDS).getKey();
CompletableFuture<LeaseTimeToLiveResponse> leaseTimeToLiveResponseCompletableFuture = client.getLeaseClient().timeToLive(leaseId, LeaseOption.DEFAULT);
long ttl = leaseTimeToLiveResponseCompletableFuture.get(1, TimeUnit.SECONDS).getTTl();
if (ttl > 0) {
return Boolean.TRUE;
} else {
return Boolean.FALSE;
}
} catch (TimeoutException | InterruptedException | ExecutionException e) {
return Boolean.FALSE;
}
}
复制代码
解锁过程,我们可以直接使用LeaseClient下的revoke操作,在撤销租约的同时将该租约下的lock释放。
/**
* 取消租约,并释放锁
*
* @param leaseId 租约id
* @return 是否成功释放
*/
public Boolean unLock(Long leaseId) {
try {
CompletableFuture<LeaseRevokeResponse> revokeResponseCompletableFuture = client.getLeaseClient().revoke(leaseId);
revokeResponseCompletableFuture.get(1, TimeUnit.SECONDS);
return Boolean.TRUE;
} catch (InterruptedException | ExecutionException | TimeoutException e) {
return Boolean.FALSE;
}
}
复制代码
然后是统一的CpLock对象,封装了加解锁的过程,对外只暴露execute方法,避免使用者忘记解锁步骤。
public class CpLock {
private String lockName;
private LockEtcdClient lockEtcdClient;
/**
* 分布式锁的锁持有数
*/
private volatile int state;
private volatile transient Thread lockOwnerThread;
/**
* 当前线程拥有的lease对象
*/
private FastThreadLocal<LockLeaseData> lockLeaseDataFastThreadLocal = new FastThreadLocal<>();
/**
* 锁自动释放时间,单位s,默认为30
*/
private static Long LOCK_TIME = 30L;
/**
* 获取锁失败单次等待时间,单位ms,默认为300
*/
private static Integer SLEEP_TIME_ONCE = 300;
CpLock(String lockName, LockEtcdClient lockEtcdClient) {
this.lockName = lockName;
this.lockEtcdClient = lockEtcdClient;
}
private LockLeaseData getLockLeaseData(String lockName, long lockTime) {
if (lockLeaseDataFastThreadLocal.get() != null) {
return lockLeaseDataFastThreadLocal.get();
} else {
LockLeaseData lockLeaseData = lockEtcdClient.getLeaseData(lockName, lockTime);
lockLeaseDataFastThreadLocal.set(lockLeaseData);
return lockLeaseData;
}
}
final Boolean tryLock(long waitTime) {
final long startTime = System.currentTimeMillis();
final long endTime = startTime + waitTime * 1000;
final long lockTime = LOCK_TIME;
final Thread current = Thread.currentThread();
try {
do {
int c = this.getState();
if (c == 0) {
LockLeaseData lockLeaseData = this.getLockLeaseData(lockName, lockTime);
if (Objects.isNull(lockLeaseData)) {
return Boolean.FALSE;
}
Long leaseId = lockLeaseData.getLeaseId();
if (lockEtcdClient.tryLock(leaseId, lockName, endTime - System.currentTimeMillis())) {
log.info("线程获取重入锁成功,cp锁的名称为{}", lockName);
this.setLockOwnerThread(current);
this.setState(c + 1);
return Boolean.TRUE;
}
} else if (lockOwnerThread == Thread.currentThread()) {
if (c + 1 <= 0) {
throw new Error("Maximum lock count exceeded");
}
this.setState(c + 1);
log.info("线程重入锁成功,cp锁的名称为{},当前LockCount为{}", lockName, state);
return Boolean.TRUE;
}
int sleepTime = SLEEP_TIME_ONCE;
if (waitTime > 0) {
log.info("线程暂时无法获得cp锁,当前已等待{}ms,本次将再等待{}ms,cp锁的名称为{}", System.currentTimeMillis() - startTime, sleepTime, lockName);
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
log.info("线程等待过程中被中断,cp锁的名称为{}", lockName, e);
}
}
} while (System.currentTimeMillis() <= endTime);
if (waitTime == 0) {
log.info("线程获得cp锁失败,将放弃获取,cp锁的名称为{}", lockName);
} else {
log.info("线程获得cp锁失败,之前共等待{}ms,将放弃等待获取,cp锁的名称为{}", System.currentTimeMillis() - startTime, lockName);
}
this.stopKeepAlive();
return Boolean.FALSE;
} catch (Exception e) {
log.error("execute error", e);
this.stopKeepAlive();
return Boolean.FALSE;
}
}
/**
* 停止续约,并将租约对象从线程中移除
*/
private void stopKeepAlive() {
LockLeaseData lockLeaseData = lockLeaseDataFastThreadLocal.get();
if (Objects.nonNull(lockLeaseData)) {
lockLeaseData.getCpSurvivalClam().stop();
lockLeaseData.setCpSurvivalClam(null);
lockLeaseData.getSurvivalThread().interrupt();
lockLeaseData.setSurvivalThread(null);
}
lockLeaseDataFastThreadLocal.remove();
}
final void unLock() {
if (lockOwnerThread == Thread.currentThread()) {
int c = this.getState() - 1;
if (c == 0) {
this.setLockOwnerThread(null);
this.setState(c);
LockLeaseData lockLeaseData = lockLeaseDataFastThreadLocal.get();
this.stopKeepAlive();
//unLock操作必须在最后执行,避免其他线程获取到锁时的state等数据不正确
lockEtcdClient.unLock(lockLeaseData.getLeaseId());
log.info("重入锁LockCount-1,线程已成功释放锁,cp锁的名称为{}", lockName);
} else {
this.setState(c);
log.info("重入锁LockCount-1,cp锁的名称为{},剩余LockCount为{}", lockName, c);
}
}
}
public <T> T execute(Supplier<T> supplier, int waitTime) {
Boolean holdLock = Boolean.FALSE;
Preconditions.checkArgument(waitTime >= 0, "waitTime必须为自然数");
try {
if (holdLock = this.tryLock(waitTime)) {
return supplier.get();
}
return null;
} catch (Exception e) {
log.error("cpLock execute error", e);
return null;
} finally {
if (holdLock) {
this.unLock();
}
}
}
public <T> T execute(Supplier<T> supplier) {
return this.execute(supplier, 0);
}
}
复制代码
CpLock和之前Redis分布式锁中的ApLock实现大体一致。区别主要有:
1、因为我们是在授予租约的操作中开启了守护线程,所以在竞争锁失败、出现异常和释放锁这些场景下,我们必须得停止守护线程续期。又因为是可重入的场景,我们又只希望在state为0的情况下再去生成租约去竞争锁。所以避免多种情况判断,我们引入了FastThreadLocal lockLeaseDataFastThreadLocal来保存当前线程的Lease对象。
2、redis分布式锁在任何场景下,等待获取锁都是通过休眠轮询的方式实现的,而在etcd场景下,我们在state为0时通过etcd自身的等待逻辑来完成等待,在state非0场景下,依然通过休眠轮询的方式来实现等待。因为可能会存在state从非0转为0的情况,所以我们的waitTime值是endTime - System.currentTimeMillis(),而非原本传入的waitTime。这样能够让等待时间更接近我们期望值。
更新说明
本次更新,我们实现了基于etcd的cp分布式锁,同时也修复了redis分布式锁中的一个隐藏问题。
之前的setState操作在unLock之后,这样在并发场景下会导致一个问题发生。线程a和线程b在竞争获取锁a,此时各自的局部变量c和state都为0,然后线程a在获取到了锁之后立刻释放了锁,此时先执行了unLock,state还是1,线程b成功获得锁,将state重置为c+1,依然是1,然后线程a执行setState,将stete改为0。此时线程b如果去释放锁,执行stete-1操作,变为了-1。这个问题主要是因为获取state值和state值修改操作是异步的,而在多线程场景下,分布式锁是通过lock控制的,我们只需要将unLock操作挪到所有赋值之后即可解决这个问题。
后续计划
目前实现的cp分布式锁的版本,已可满足分布式锁的绝大部分场景(非公平+可自动续期+可重入+强一致性的分布式锁),已可投入生产环境的集群中使用。后续的计划中,ap锁和cp锁将会分别更新,会优化一些使用场景。也会尝试去解决公平锁的问题,以及循环获取锁需要等待休眠的问题。
以上计划已完成,如何实现公平锁可详见Etcd分布式锁(二):支持公平锁,避免某些场景下线程长期无法获取锁
本次cp分布式锁需要考虑大量的使用场景,目前只进行了小规模的测试,如有考虑不周的地方,还望大家海涵。
推荐阅读
1、Redis分布式锁:基于AOP和Redis实现的简易版分布式锁
2、Redis分布式锁(二):支持锁的续期,避免锁超时后导致多个线程获得锁
3、Redis分布式锁(三):支持锁可重入,避免锁递归调用时死锁
好了,我们下一期再见,欢迎大家一起留言讨论。同时也欢迎点赞~
近期评论