欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

剖析Redisson分布式锁(第一部分):深入理解RedissonLock.tryLock的实现代码

最编程 2024-07-28 19:21:34
...

工作中经常用到Redisson分布式锁的功能,刚好最近有时间就详细看了下这块的源码,此篇文章主要用于记录并且分享下自己对这部分的理解
好了,话不多说直接从RedissonLock.tryLock()入口开始

public class RedissonLock extends RedissonBaseLock {
    // 在waitTime时间范围内尝试获取锁,如果获取到锁,则设置锁过期时间leaseTime
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        // 第一步:尝试获取锁(下文会详细解析此方法)
        Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
        // ttl为空说明获取到了锁
        if (ttl == null) {
            return true;
        }
        
        // 判断尝试获取锁是否超过waitTime
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(waitTime, unit, threadId);
            return false;
        }
        
        // 第二步:订阅解锁消息通知
        current = System.currentTimeMillis();
        // 订阅锁释放
        RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        // 等待锁释放消息,等待时间超过waitTime,获取锁失败
        if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
            // 如果订阅解锁Future在执行中,等任务执行完后取消订阅锁释放
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.onComplete((res, e) -> {
                    if (e == null) {
                        // 取消订阅解锁通知
                        unsubscribe(subscribeFuture, threadId);
                    }
                });
            }
            acquireFailed(waitTime, unit, threadId);
            return false;
        }

        try {
            // 判断尝试获取锁是否超过waitTime
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
        
            // 第三步:自旋尝试获取锁
            while (true) {
                long currentTime = System.currentTimeMillis();
                // 1、尝试获取锁(下文会详细解析此方法)
                ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
                // ttl为空说明获取到了锁
                if (ttl == null) {
                    return true;
                }

                // 判断尝试获取锁是否超过waitTime
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }

                // 等待锁释放(信号量控制)
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    // 尝试获取信号量
                    subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                // 判断尝试获取锁是否超过waitTime
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }
            }
        } finally {
            // 第四步:取消解锁订阅
            unsubscribe(subscribeFuture, threadId);
        }
    }
}

tryLock方法主要可以分为四步:
1、tryAcquire尝试获取锁,如果获取到返回true
2、获取不到锁说明锁被占用了,订阅解锁消息通知
3、收到解锁消息通知,再次尝试获取锁,如果获取不到重复步骤三,直到超过waitTime获取锁失败
4、不论是否获取锁成功,取消解锁消息订阅

通过源码可以看到整个方法内跟获取锁有关的地方只有tryAcquire()这个方法了,下面我们跟进到这个方法中,看看这里面到底是如何去获取锁的!

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture<Long> ttlRemainingFuture;
    if (leaseTime != -1) {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        // 这里需要注意的是leaseTime==-1,会触发redisson看门狗机制
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            return;
        }

        // 获取锁成功
        if (ttlRemaining == null) {
            if (leaseTime != -1) {
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
                // 锁自动续时(看门狗机制)触发条件leaseTime == -1
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

这里有两个方法值得注意
1、tryLockInnerAsync()这里面是尝试获取分布式锁redis lua脚本,下面会对脚本进行分析
2、scheduleExpirationRenewal()锁自动续时,也就是常说的redisson看门狗机制,后文会对此方法进行详细解析

首先我们看下tryLockInnerAsync()这个方法,可以看到这里面是一个lua脚本,这个就是实现redis分布式锁的核心了,首先解释下脚本变量

KEYS[1] = "锁key"
ARGV[1] = "锁过期时间"
ARGV[2] = "当前线程id"

理解了脚本变量意义,我们在看下此方法

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
            // 如果key一开始就不存在,则直接创建一个key
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    // 这里是重入锁的实现,同一个线程多次获取锁只需要在value加1即可,value相当于一个加锁计数器
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    // 有其他线程持有锁,加锁失败,返回锁过期时间
                    "return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

脚本比较好理解,简单总结一下
1、加锁的key不存在就创建一个redis hash key,field=当前线程id,value=加锁次数
2、有线程持有锁并且未解锁,其他线程是无法获取到锁的
3、加锁成功返回null,加锁失败返回锁过期时间

看完了tryLockInnerAsync(),现在转过头来看下scheduleExpirationRenewal()是如何实现锁自动续时的吧,话不多说上代码

protected void scheduleExpirationRenewal(long threadId) {
    // 保存当前加锁key有那些线程自动续时,取消自动续时后会清除此对象内部数据
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        try {
            // 更新锁过期时间
            renewExpiration();
        } finally {
            if (Thread.currentThread().isInterrupted()) {
                cancelExpirationRenewal(threadId);
            }
        }
    }
}
private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    
    // 定时任务(不多讲搜io.netty.util.HashedWheelTimer)
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            
            // 更新锁过期时间(lua脚本)
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.whenComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getRawName() + " expiration", e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }
                
                // 更新锁过期时间成功
                if (res) {
                    // 递归调用 如果10秒后依然没有解锁,继续更新锁过期时间
                    renewExpiration();
                } else {
                    cancelExpirationRenewal(null);
                }
            });
        }
        // internalLockLeaseTime在不设置lockWatchdogTimeout情况下默认30s,这里会延迟10s触发此任务
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
    ee.setTimeout(task);
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            // 当前线程已持有锁,更新锁过期时间
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return 0;",
            Collections.singletonList(getRawName()),
            internalLockLeaseTime, getLockName(threadId));
}

下面对锁过期时间自动续时进行总结:
1、锁过期自动续时的触发条件是tryLock设置的锁到期时间leaseTime == -1(前文tryAcquire()解析中提到)
2、自动续时原理就是创建一个定时任务,每internalLockLeaseTime / 3时间触发一次,如果发现持有锁未释放,就把锁过期时间更新为internalLockLeaseTime(internalLockLeaseTime的值取得是lockWatchdogTimeout默认是30s)
3、锁过期时间更新成功后,再次递归调用renewExpiration(),创建下一次定时任务
这种机制主要是为了避免在没来得及解锁的情况下系统就挂了,导致该锁在redis中一直都被占用,其他线程永远都无法获取锁,也就是死锁的情况
至此RedissonLock.tryLock基本就算是告一段落了

在文章最后有一点加以补充,前面提到tryLock方法主要可以分为四步,第二步是订阅解锁消息通知,可能有人不知道这个消息是从哪里来的,这里我觉得有必要提下是调用unlock()发起的通知

public void unlock() {
        try {
            // 解锁
            get(unlockAsync(Thread.currentThread().getId()));
        } catch (RedisException e) {
            if (e.getCause() instanceof IllegalMonitorStateException) {
                throw (IllegalMonitorStateException) e.getCause();
            } else {
                throw e;
            }
        }
    }

这个unlockAsync方法内部会调用lua解锁脚本,在其中最后倒数第5行,会调用publish推送解锁消息,关于订阅这块有兴趣可以自行了解下

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                    "end; " +
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                    "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                    "else " +
                    "redis.call('del', KEYS[1]); " +
                    // 推送解锁通知
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return nil;",
            Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

好啦,以上都是RedissonLock.tryLock我认为比较核心的源码解析,如果大家觉得写的有不对的地方,欢迎讨论指正。

推荐阅读