欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

深入剖析Redisson Lock的分布式锁实现机制与源码解读

最编程 2024-07-28 18:33:02
...

笔者最近在面试过程中,发现面试官喜欢面试关于redis的分布式锁的实现。为了更加清晰地了解加锁工程,然后看了下redisson下封装的锁的操作过程。

redis锁的实现是一个学习redis的难点,那么了解其原理可以让我们更好的使用好lock。
首先聊聊单体redis下如何实现锁的。

单体redis模式下的锁实现

接下来先看下加锁的实现。

加锁

// 构造redisson实现分布式锁必要的Config
Config config = new Config();
config.useSingleServer().setAddress("redis://172.201.1.180:5379").setPassword("a123456").setDatabase(0);
// 构造RedissonClient
RedissonClient redissonClient = Redisson.create(config);
// 设置锁定资源名称
RLock disLock = redissonClient.getLock("DISLOCK1");
boolean isLock;
try {
    //尝试获取分布式锁
    isLock = disLock.tryLock(500, 5000, TimeUnit.MILLISECONDS);
    if (isLock) {
        //TODO if get lock success, do something;
        Thread.sleep(5000);
    }
} catch (Exception e) {
} finally {
    // 无论如何, 最后都要解锁
    disLock.unlock();
}

首先看到trylock的源码:

    public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
        return this.tryLock(waitTime, -1L, unit);
    }

具体进入到trylock中。

    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        final long threadId = Thread.currentThread().getId();
        //要注意到从上面得到的leaseTime值为-1L。 
        Long ttl = this.tryAcquire(leaseTime, unit, threadId);
      //如果ttl为空,说明获取到锁,当前没有线程加锁。
        if (ttl == null) {
            return true;
        } else {
            time -= System.currentTimeMillis() - current;
            //规定时间内没有获取到锁,然后加锁失败。
            if (time <= 0L) {
                this.acquireFailed(threadId);
                return false;
            } else {
              //
                current = System.currentTimeMillis();
                //对当前线程进行订阅。
                final RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
                if (!this.await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
                    if (!subscribeFuture.cancel(false)) {
                        subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
                            public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                              //对该位置进行监听。
                                if (subscribeFuture.isSuccess()) {
                                    RedissonLock.this.unsubscribe(subscribeFuture, threadId);
                                }

                            }
                        });
                    }

                    this.acquireFailed(threadId);
                    return false;
                } else {
                    try {
                //经过上面的处理,看下现在时间是否过期
                        time -= System.currentTimeMillis() - current;
                        if (time <= 0L) {
                    //过期了就获取失败
                            this.acquireFailed(threadId);
                            boolean var20 = false;
                            return var20;
                        } else {
                            boolean var16;
                            do {
                     //然后使用tryacquire去获取
                                long currentTime = System.currentTimeMillis();
                        //下面对tryAcquire进行获取。
                                ttl = this.tryAcquire(leaseTime, unit, threadId);
                            //ttl是空,说明获取到锁
                                  if (ttl == null) {
                                    var16 = true;
                                    return var16;
                                }

                                time -= System.currentTimeMillis() - currentTime;
                      //看时间是否到了,到了就过期了。
                                if (time <= 0L) {
                                    this.acquireFailed(threadId);
                                    var16 = false;
                                    return var16;
                                }

                                currentTime = System.currentTimeMillis();
                        //ttl 大于0,并且ttl小于过期时间,那么尝试去获取锁。
                                if (ttl >= 0L && ttl < time) {
                                    this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                                } else {
                                    this.getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                                }

                                time -= System.currentTimeMillis() - currentTime;
                            } while(time > 0L);

                            this.acquireFailed(threadId);
                            var16 = false;
                            return var16;
                        }
                    } finally {
                     //最终释放掉订阅。
                        this.unsubscribe(subscribeFuture, threadId);
                    }
                }
            }
        }
    }

从 Long ttl = this.tryAcquire(leaseTime, unit, threadId)这句进去到tryAcuire中

    private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
        // 从上面得到leastTime的值为-1L。那么会跳过上面的判断,走下面的路径。
        if (leaseTime != -1L) {
            return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        } else {
          //先按照30秒的过期时间来执行获取锁的方法
            RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(30L, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
          //如果还持有这个锁,则开启定时任务不断刷新该锁的过期时间
         // 异步获取结果,如果获取锁成功,则启动定时线程进行锁续约
            ttlRemainingFuture.addListener(new FutureListener<Long>() {
                public void operationComplete(Future<Long> future) throws Exception {
     
                    if (future.isSuccess()) {
                        Long ttlRemaining = (Long)future.getNow();
                       //如何续约,看下scheduleExpirationRenewal实现方法。
                       //拿到了锁
                        if (ttlRemaining == null) {
                            RedissonLock.this.scheduleExpirationRenewal(threadId);
                        }

                    }
                }
            });
            return ttlRemainingFuture;
        }
    }

先看tryLockInnerAsync尝试去获取到锁,这里里面的实现主要是通过lua脚本来实现的,保证了原子性。

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit,     
                            long threadId, RedisStrictCommand<T> command) {

        //过期时间
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  //如果锁不存在,则通过hset设置它的值,并设置过期时间
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  //如果锁已存在,并且锁的是当前线程,则通过hincrby给数值递增1
                  // 续约internalLockLeaseTime(30s)            
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  //如果锁已存在,但并非本线程,则返回过期时间ttl
                  "return redis.call('pttl', KEYS[1]);",
        Collections.<Object>singletonList(getName()), 
                internalLockLeaseTime, getLockName(threadId));
    }

然后使用定时任务去给这个锁续约(重点)。
那接下来我们就看到scheduleExpirationRenewal方法的源码实现。

    private void scheduleExpirationRenewal(final long threadId) {
        if (!expirationRenewalMap.containsKey(this.getEntryName())) {
            Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
                public void run(Timeout timeout) throws Exception {
//future尝试获取到锁。
                    RFuture<Boolean> future = RedissonLock.this.commandExecutor.evalWriteAsync(RedissonLock.this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//如果获取到改锁,然后给改锁进行续约。
 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) 
then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; 
end; return 0;",
 Collections.singletonList(RedissonLock.this.getName()), new Object[]{RedissonLock.this.internalLockLeaseTime, RedissonLock.this.getLockName(threadId)});
                    future.addListener(new FutureListener<Boolean>() {
                        public void operationComplete(Future<Boolean> future) throws Exception {
                            RedissonLock.expirationRenewalMap.remove(RedissonLock.this.getEntryName());
                           //  如果future为非succes,那么就没法去更新lock的时间了。
                            if (!future.isSuccess()) {
                                RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", future.cause());
                            } else {
                //如果获取到了锁,然后就循环给锁去做续约,做上面的操作。
                                if ((Boolean)future.getNow()) {
                                    RedissonLock.this.scheduleExpirationRenewal(threadId);
                                }

                            }
                        }
                    });
                }
      //每次间隔租期的1/3时间执行这个续约。

            }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
            if (expirationRenewalMap.putIfAbsent(this.getEntryName(), task) != null) {
                task.cancel();
            }

        }
    }

上面就是trylock获取锁并进行续约的流程。
然后看一下tryAcquire的实现。有意思的是这里是用了信号量Semaphore的tryAcqurie方法实现。

    public boolean tryAcquire(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

另外强调一点的是:如果tryLock的时候没有设置leasetime为-1L,那么直接去使用
return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG)。
不会有续约的情况了。同时我们也看见里面也实现了可重入锁。

接下来看下解锁的实现。

解锁

    public void unlock() {
      //这里主要是看this.unlockInnerAsync方法的实现。
        Boolean opStatus = (Boolean)this.get(this.unlockInnerAsync(Thread.currentThread().getId()));
//按照提示,得到如果为null,那么没有得到当前的锁。
        if (opStatus == null) {
            throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + Thread.currentThread().getId());
        } else {
            if (opStatus) {
                this.cancelExpirationRenewal();
            }

        }
    }

然后我们进入到unlockInnerAsync中去看下实现原理。


protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    // 释放锁时需要在redis实例上执行的lua命令
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            // 如果分布式锁KEY不存在,那么向channel发布一条消息
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end;" +
            // 如果分布式锁存在,但是value不匹配,表示锁已经被占用,那么直接返回
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            // 如果就是当前线程占有分布式锁,那么将重入次数减1
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            // 重入次数减1后的值如果大于0,表示分布式锁有重入过,那么只设置失效时间,还不能删除
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            "else " +
                // 重入次数减1后的值如果为0,表示分布式锁只获取过1次,那么删除这个KEY,并发布解锁消息
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
            // 这5个参数分别对应KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3]
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}

同时我们看下unlockAsync的方法。


public RFuture<Void> unlockAsync(final long threadId) {
    final RPromise<Void> result = new RedissonPromise<Void>();
    
    //解锁方法
    RFuture<Boolean> future = unlockInnerAsync(threadId);

    future.addListener(new FutureListener<Boolean>() {
        @Override
        public void operationComplete(Future<Boolean> future) throws Exception {
            if (!future.isSuccess()) {
                cancelExpirationRenewal(threadId);
                result.tryFailure(future.cause());
                return;
            }
            //获取返回值
            Boolean opStatus = future.getNow();
            //如果返回空,则证明解锁的线程和当前锁不是同一个线程,抛出异常
            if (opStatus == null) {
                IllegalMonitorStateException cause = 
                    new IllegalMonitorStateException("
                        attempt to unlock lock, not locked by current thread by node id: "
                        + id + " thread-id: " + threadId);
                result.tryFailure(cause);
                return;
            }
            //解锁成功,取消刷新过期时间的那个定时任务
            if (opStatus) {
                cancelExpirationRenewal(null);
            }
            result.trySuccess(null);
        }
    });

    return result;
}

同时小伙伴可以注意下forceUnlock()方法加锁的方式:

在源码中并没有找到forceUnlock()被调用的痕迹(也有可能是我没有找对),但是forceUnlockAsync()方法被调用的地方很多,大多都是在清理资源时删除锁。此部分比较简单粗暴,删除锁成功则并发布锁被删除的消息,返回1结束,否则返回0结束。


   public void forceUnlock() {
        this.get(this.forceUnlockAsync());
    }

    public RFuture<Boolean> forceUnlockAsync() {
        this.cancelExpirationRenewal();
        return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
 "if (redis.call('del', KEYS[1]) == 1) t
hen redis.call('publish', KEYS[2], ARGV[1]);
 return 1 else return 0 end",
 Arrays.asList(this.getName(), this.getChannelName()), new Object[]{LockPubSub.unlockMessage});
    }

上文就是redisson的分布式锁实现。下次讲讲redlock的实现原理。

参考资料:

redissonLock源码
https://www.jianshu.com/p/47fd7f86c848
https://www.jianshu.com/p/7e47a4503b87
https://my.oschina.net/u/2369201/blog/1573730

推荐阅读