剖析Redisson分布式锁(第一部分):深入理解RedissonLock.tryLock的实现代码
工作中经常用到Redisson分布式锁的功能,刚好最近有时间就详细看了下这块的源码,此篇文章主要用于记录并且分享下自己对这部分的理解
好了,话不多说直接从RedissonLock.tryLock()入口开始
public class RedissonLock extends RedissonBaseLock {
// 在waitTime时间范围内尝试获取锁,如果获取到锁,则设置锁过期时间leaseTime
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
// 第一步:尝试获取锁(下文会详细解析此方法)
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// ttl为空说明获取到了锁
if (ttl == null) {
return true;
}
// 判断尝试获取锁是否超过waitTime
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// 第二步:订阅解锁消息通知
current = System.currentTimeMillis();
// 订阅锁释放
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
// 等待锁释放消息,等待时间超过waitTime,获取锁失败
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
// 如果订阅解锁Future在执行中,等任务执行完后取消订阅锁释放
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
// 取消订阅解锁通知
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
}
try {
// 判断尝试获取锁是否超过waitTime
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// 第三步:自旋尝试获取锁
while (true) {
long currentTime = System.currentTimeMillis();
// 1、尝试获取锁(下文会详细解析此方法)
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// ttl为空说明获取到了锁
if (ttl == null) {
return true;
}
// 判断尝试获取锁是否超过waitTime
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// 等待锁释放(信号量控制)
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
// 尝试获取信号量
subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
// 判断尝试获取锁是否超过waitTime
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
// 第四步:取消解锁订阅
unsubscribe(subscribeFuture, threadId);
}
}
}
tryLock方法主要可以分为四步:
1、tryAcquire尝试获取锁,如果获取到返回true
2、获取不到锁说明锁被占用了,订阅解锁消息通知
3、收到解锁消息通知,再次尝试获取锁,如果获取不到重复步骤三,直到超过waitTime获取锁失败
4、不论是否获取锁成功,取消解锁消息订阅
通过源码可以看到整个方法内跟获取锁有关的地方只有tryAcquire()这个方法了,下面我们跟进到这个方法中,看看这里面到底是如何去获取锁的!
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime != -1) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
// 这里需要注意的是leaseTime==-1,会触发redisson看门狗机制
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// 获取锁成功
if (ttlRemaining == null) {
if (leaseTime != -1) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
// 锁自动续时(看门狗机制)触发条件leaseTime == -1
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
这里有两个方法值得注意
1、tryLockInnerAsync()这里面是尝试获取分布式锁redis lua脚本,下面会对脚本进行分析
2、scheduleExpirationRenewal()锁自动续时,也就是常说的redisson看门狗机制,后文会对此方法进行详细解析
首先我们看下tryLockInnerAsync()这个方法,可以看到这里面是一个lua脚本,这个就是实现redis分布式锁的核心了,首先解释下脚本变量
KEYS[1] = "锁key"
ARGV[1] = "锁过期时间"
ARGV[2] = "当前线程id"
理解了脚本变量意义,我们在看下此方法
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
// 如果key一开始就不存在,则直接创建一个key
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 这里是重入锁的实现,同一个线程多次获取锁只需要在value加1即可,value相当于一个加锁计数器
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 有其他线程持有锁,加锁失败,返回锁过期时间
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
脚本比较好理解,简单总结一下
1、加锁的key不存在就创建一个redis hash key,field=当前线程id,value=加锁次数
2、有线程持有锁并且未解锁,其他线程是无法获取到锁的
3、加锁成功返回null,加锁失败返回锁过期时间
看完了tryLockInnerAsync(),现在转过头来看下scheduleExpirationRenewal()是如何实现锁自动续时的吧,话不多说上代码
protected void scheduleExpirationRenewal(long threadId) {
// 保存当前加锁key有那些线程自动续时,取消自动续时后会清除此对象内部数据
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
try {
// 更新锁过期时间
renewExpiration();
} finally {
if (Thread.currentThread().isInterrupted()) {
cancelExpirationRenewal(threadId);
}
}
}
}
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
// 定时任务(不多讲搜io.netty.util.HashedWheelTimer)
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// 更新锁过期时间(lua脚本)
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getRawName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
// 更新锁过期时间成功
if (res) {
// 递归调用 如果10秒后依然没有解锁,继续更新锁过期时间
renewExpiration();
} else {
cancelExpirationRenewal(null);
}
});
}
// internalLockLeaseTime在不设置lockWatchdogTimeout情况下默认30s,这里会延迟10s触发此任务
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 当前线程已持有锁,更新锁过期时间
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}
下面对锁过期时间自动续时进行总结:
1、锁过期自动续时的触发条件是tryLock设置的锁到期时间leaseTime == -1(前文tryAcquire()解析中提到)
2、自动续时原理就是创建一个定时任务,每internalLockLeaseTime / 3时间触发一次,如果发现持有锁未释放,就把锁过期时间更新为internalLockLeaseTime(internalLockLeaseTime的值取得是lockWatchdogTimeout默认是30s)
3、锁过期时间更新成功后,再次递归调用renewExpiration(),创建下一次定时任务
这种机制主要是为了避免在没来得及解锁的情况下系统就挂了,导致该锁在redis中一直都被占用,其他线程永远都无法获取锁,也就是死锁的情况
至此RedissonLock.tryLock基本就算是告一段落了
在文章最后有一点加以补充,前面提到tryLock方法主要可以分为四步,第二步是订阅解锁消息通知,可能有人不知道这个消息是从哪里来的,这里我觉得有必要提下是调用unlock()发起的通知
public void unlock() {
try {
// 解锁
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
}
这个unlockAsync方法内部会调用lua解锁脚本,在其中最后倒数第5行,会调用publish推送解锁消息,关于订阅这块有兴趣可以自行了解下
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
// 推送解锁通知
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
好啦,以上都是RedissonLock.tryLock我认为比较核心的源码解析,如果大家觉得写的有不对的地方,欢迎讨论指正。
推荐阅读
-
剖析Redisson分布式锁(第一部分):深入理解RedissonLock.tryLock的实现代码
-
深入剖析Redisson Lock的分布式锁实现机制与源码解读
-
【2022新手指南】Java编程进阶之路 - 六、技术架构篇 ### MySQL索引底层解析与优化实战 - 你会讲解MySQL索引的数据结构吗?性能调优技巧知多少? - Redis深度揭秘:你知道多少?从基础到哨兵、主从复制全梳理 - Redis持久化及哨兵模式详解,还有集群搭建和Leader选举黑箱打开 - Zookeeper是个啥?特性和应用场景大公开 - ZooKeeper集群搭建攻略及 Leader选举、读写一致性、共享锁实现细节 - 探究ZooKeeper中的Leader选举机制及其在分布式环境中的作用 - Zab协议深入剖析:原理、功能与在Zookeeper中的核心地位 - RabbitMQ全方位解读:工作模式、消费限流、可靠投递与配置策略 - 设计者视角:RabbitMQ过期时间、死信队列与延时队列实践指南 - RocketMQ特性和应用场景揭示:理解其精髓与差异化优势 - Kafka详细介绍:特性及广泛应用于实时数据处理的场景解析 - ElasticSearch实力揭秘:特性概述与作为搜索引擎的广泛应用 - MongoDB认知升级:非关系型数据库的优势阐述,安装与使用实战教学 - BIO/NIO/AIO网络模型对比:掌握它们的区别与在网络编程中的实际应用 - Netty带你飞:理解其超快速度背后的秘密,包括线程模型分析 - 网络通信黑科技:Netty编解码原理与常用编解码器的应用,Protostuff实战演示 - 解密Netty粘包与拆包现象,怎样有效应对这一常见问题 - 自定义Netty心跳检测机制,轻松调整检测间隔时间的艺术 - Dubbo轻骑兵介绍:核心特性概览,服务降级实战与其实现益处 - Dubbo三大神器解读:本地存根与本地伪装的实战运用与优势呈现 ----------------------- 七、结语与回顾
-
[Redis | 第 6 部分] 基于 Setnx | Redisson 实现分布式锁(深入了解 Redisson 的锁定、重入锁和看门狗机制)
-
深入理解Redisson分布式锁的实现原理(源码解析)