欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

[Redis | 第 6 部分] 基于 Setnx | Redisson 实现分布式锁(深入了解 Redisson 的锁定、重入锁和看门狗机制)

最编程 2024-03-02 17:32:01
...

文章目录

  • 6.基于Setnx | Redisson实现分布式锁(深入理解Redisson)
    • 6.1分布式锁介绍
      • 6.1.1为什么要使用分布式锁
      • 6.1.2具备的条件
      • 6.1.3应用场景
      • 6.1.4三种实现方式
    • 6.2基于SETNX实现分布式锁
      • 6.2.1SETNX命令
        • (1)作用
        • (2)核心思路
      • 6.2.2实现过程
        • (1)出现的问题
        • (2)最终形态
      • 6.2.3核心代码
    • 6.3基于Redisson实现分布式锁
      • 6.3.1Redisson介绍
      • 6.3.2实现步骤
        • (1)添加依赖
        • (2)配置RedissonClient
          • ①集群模式—配置RedissonClient
          • ②单节点模式—配置RedissonClient
        • (3)使用Redisson获取锁
      • 6.3.3Redisson源码分析
        • (1)加锁源码分析
        • (2)Redisson实现的锁是可重入锁
        • (3)锁的自动续期
    • 6.4总结

6.基于Setnx | Redisson实现分布式锁(深入理解Redisson)

6.1分布式锁介绍

6.1.1为什么要使用分布式锁

  1. 数据一致性:在分布式系统中,多个节点同时访问共享资源可能导致数据不一致的问题,使用分布式锁可以确保对共享资源的访问是串行化的,避免数据竞争和脏数据的产生。
  2. 避免并发问题:在多线程或多进程环境下,并发访问共享资源可能导致诸如竞态条件、死锁等并发问题,分布式锁可以有效地避免这些问题的发生,确保系统能够正常运行。

img

6.1.2具备的条件

在分析分布式锁的三种实现方式之前,先了解一下分布式锁应该具备哪些条件:

1、在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行;
2、高可用的获取锁与释放锁;
3、高性能的获取锁与释放锁;
4、具备可重入特性;
5、具备锁失效机制,防止死锁
6、具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败。

6.1.3应用场景

  1. 互联网秒杀:在秒杀活动中,大量用户同时抢购同一商品可能导致高并发访问,使用分布式锁可以确保每个用户只能抢购一次,避免超卖或重复购买的问题,同时保证商品库存的正确性。
  2. 抢优惠券:类似于秒杀活动,抢优惠券也可能受到大量用户同时请求的影响,使用分布式锁可以有效地控制用户对优惠券的领取操作,避免超发或重复领取。
  3. 接口幂等性校验:在接口调用中,由于网络原因或其他异常情况可能导致接口请求重复发送,为了避免重复操作带来的风险,可以使用分布式锁来实现接口的幂等性校验,确保同一请求只会被处理一次。

6.1.4三种实现方式

目前几乎很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题。分布式的CAP理论告诉我们“任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项。”所以,很多系统在设计之初就要对这三者做出取舍。

在互联网领域的绝大多数的场景中,都需要牺牲强一致性来换取系统的高可用性,系统往往只需要保证“最终一致性”,只要这个最终时间是在用户可以接受的范围内即可。

  • 分布式锁的三种实现方式:
    • 基于数据库实现分布式锁;
    • 基于缓存(Redis等)实现分布式锁;
    • 基于Zookeeper实现分布式锁;

下面就一起探讨基于Redsi缓存实现分布式锁的方式,而基于Redis缓存实现分布式锁又分为了两种方法:SETNX命令、以及封装好的Redisson

6.2基于SETNX实现分布式锁

6.2.1SETNX命令

(1)作用
SETNX key value

作用:只有当指定的键名 key 不存在时,才成功将键值对存储到Redis数据库中。如果键名 key 已经存在,则不执行任何操作。

(2)核心思路

让当前线程尝试先通过SETNE创建一个键值对唯一数据),再执行业务逻辑代码,如果A不存在,就进行创建,并执行相关业务逻辑,业务逻辑执行完毕后释放A;

如果A存在,那么说明此时有其他的线程在执行业务逻辑代码,则拒绝当前线程执行业务逻辑(挂起线程)

img

6.2.2实现过程

(1)出现的问题

使用SETNX实现分布式锁过程中可能会出现以下问题

1.死锁问题

  • 问题:设置好了锁,玩意服务出现宕机,没有执行删除锁逻辑,这就造成了死锁
  • 解决:设置过期时间

2.删锁问题

  • 问题:业务还没执行完锁就过期了,别人拿到锁,自己执行完去删了别人的锁
  • 解决:删锁的时候明确是自己的锁,如uuid。或者逻辑业务还没执行完,锁续期(redisson有看门狗)

3.判断和删两个过程问题

  • 问题:判断uuid对了,但是将要删除的时候锁过期了,别人设置了新值,那删除了别人的锁
  • 解决:删除锁必须保证原子性(保证判断和删锁是原子的)。使用redis+Lua脚本完成,脚本是原子的
(2)最终形态

image-20240228191302778

6.2.3核心代码

  • 其中加锁和删除锁的操作一定保证原子性
    • 加锁:setIfAbsent 命令实现的,封装的是setnx的lua脚本,lua脚本具有原子性的操作
    • 删除锁:保证判断和删除两个动作具有原子性,生成lua脚本命令

下面贴出在我练手小项目中,基于SETNX命令实现了分布式锁的作用,解决了缓存失效的问题

image-20231207173404242

6.3基于Redisson实现分布式锁

6.3.1Redisson介绍

ReRedisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的 分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet , Set,Multimap ,SortedSet ,Map , List , Queue, BlockingQueue , Deque , BlockingDeque,Semaphore, Lock ,AtomicLong ,CountDownLatch , Publish / Subscribe,Bloom filter , Remote service , Spring cache , Executor service , Live Object service ,Scheduler service ) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上

6.3.2实现步骤

(1)添加依赖
<dependencies>
  <!-- Spring Data Redis -->
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
  </dependency>
  
  <!-- Redisson -->
  <dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.16.3</version>
  </dependency>
</dependencies>
(2)配置RedissonClient
①集群模式—配置RedissonClient
@Configuration
public class MyRedissonConfig {

    /**
     * 所有对Redisson的使用都是通过RedissonClient
     * @return
     * @throws IOException
     */
    @Bean(destroyMethod="shutdown")
    public RedissonClient redisson() throws IOException {
        //1、创建配置
        Config config = new Config();
        config.useClusterServers()
                .addNodeAddress("127.0.8.1:7884","127.0.0.1:7e01");
        return Redisson.create(config);
    }

}
②单节点模式—配置RedissonClient
@Configuration
public class MyRedissonConfig {

    /**
     * 所有对Redisson的使用都是通过RedissonClient
     * @return
     * @throws IOException
     */
    @Bean(destroyMethod="shutdown")
    public RedissonClient redisson() throws IOException {
        //1、创建配置
        Config config = new Config();
        config.useSingleServer().setAddress("redis://192.168.234.128:6124").setPassword("123456");
        RedissonClient redissonClient = Redisson.create(config);
        return redissonClient;
    }

}
(3)使用Redisson获取锁
@Autowired
private RedissonClient redissonClient;

public void doSomething() {
    
    //获取键名为lockName的锁
	RLock lock = redissonClient.getLock("lockName");
	try {
        //加锁,若加锁不了,则阻塞等待
    	lock.lock();
        
	    // 执行需要加锁的操作
        
    } finally {
        //解锁
    	lock.unlock();
  	}
}

6.3.3Redisson源码分析

(1)加锁源码分析

我们点击 lock() 方法

	public void lock() {
        try {
            this.lock(-1L, (TimeUnit)null, false);
        } catch (InterruptedException var2) {
            throw new IllegalStateException();
        }
    }
  • 再进入lock方法:
	/**
    *	leaseTime:锁的持有时间(可指定,没指定则为-1)
    */
	private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        Long ttl = this.tryAcquire(leaseTime, unit, threadId);
        if (ttl != null) {
            RFuture<RedissonLockEntry> future = this.subscribe(threadId);
            if (interruptibly) {
                this.commandExecutor.syncSubscriptionInterrupted(future);
            } else {
                this.commandExecutor.syncSubscription(future);
            }

            try {
                while(true) {
                    ttl = this.tryAcquire(leaseTime, unit, threadId);
                    if (ttl == null) {
                        return;
                    }

                    if (ttl >= 0L) {
                        try {
                            ((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                        } catch (InterruptedException var13) {
                            if (interruptibly) {
                                throw var13;
                            }

                            ((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                        }
                    } else if (interruptibly) {
                        ((RedissonLockEntry)future.getNow()).getLatch().acquire();
                    } else {
                        ((RedissonLockEntry)future.getNow()).getLatch().acquireUninterruptibly();
                    }
                }
            } finally {
                this.unsubscribe(future, threadId);
            }
        }
    }
  • 而上面关于这段代码呢,我们先忽略其他逻辑,重点看这一行:
Long ttl = this.tryAcquire(leaseTime, unit, threadId);

这一行第一次尝试加锁,接着往下看:

	private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
        return (Long)this.get(this.tryAcquireAsync(leaseTime, unit, threadId));
    }

	// 上面进入到下面这里
	private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
        if (leaseTime != -1L) {
            // 如果自己设置了锁释放时间,则获取锁后直接返回,且不会设置定时刷新的逻辑(上层方法没有设置定时任务),则获取到锁后超过设定的时间后自动释放
            return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        } else {
            RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
            ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
                if (e == null) {
                    if (ttlRemaining == null) {
                        this.scheduleExpirationRenewal(threadId);
                    }

                }
            });
            return ttlRemainingFuture;
        }
    }

	//再进入
	<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        this.internalLockLeaseTime = unit.toMillis(leaseTime);
        return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
    }

tryAcquireAsync 方法中,根据传入的 leaseTime 值来决定是使用指定的持有时间还是使用默认的锁监视超时时间

如果 leaseTime 不等于 -1,表示使用指定的持有时间来加锁;

如果 leaseTime 等于 -1,则会使用配置的锁监视超时时间作为持有时间,并在获取锁成功后启动定时任务进行锁的自动续约

真正的加锁是通过 tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) 这个方法来做的,

加锁最终执行的就是这段 lua 脚本语言:

if (redis.call('exists', KEYS[1]) == 0) then 
    redis.call('hset', KEYS[1], ARGV[2], 1); 
    redis.call('pexpire', KEYS[1], ARGV[1]); 
    return nil; 
end;

脚本的主要逻辑为:

  • exists 判断 key 是否存在
  • 当判断不存在则设置 key
  • 然后给设置的key追加过期时间

这样来看其实和我们前面案例中的实现方法好像没什么区别,但实际上并不是。

这段lua脚本命令在Redis中执行时,会被当成一条命令来执行,能够保证原子性,故要不都成功,要不都失败。

我们在源码中看到Redssion的许多方法实现中很多都用到了lua脚本,这样能够极大的保证命令执行的原子性。

回到上面,是否指定leaseTime的问题:

可以看出来对于 leaseTime != -1 的判断会走两种方式:

真正的加锁是通过 tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) 这个方法来做的,

  1. 当 leaseTime != -1 时(指定锁的持有时间,即过期时间),直接返回加锁结果了

  2. 当 leaseTime = -1 时(未指定锁的持有时间),在返回加锁结果之前,会监听加锁的结果:

    如果加锁成功了还会开启一个自动延期锁的定时任务。而这个 leaseTime 指的就是加锁成功后锁的默认持有时间。当我们不指定 leaseTime 时,默认的锁持有时间是 30 秒(这个时间叫作看门狗 - lockWatchdogTimeout),并且每 10 秒(30/3)去确认一下锁的状态:如果工作任务还没完成,则重新设置锁的过期时间为 30 秒(当然,持有锁的服务宕机后在 30 秒后锁会自动释放,这个我们后面再说)

  3. 而当我们指定 leaseTime 时,我们可以看出来前面的代码不会走到定时续期锁的逻辑,这时表示:成功获取到锁后,在 leaseTime 后,如果锁仍没有被服务主动释放,锁将自动过期,而不会管持有锁的线程有没有完成对应的操作,相当于==在持有所得服务执行了比较耗时的任务且未完成时,这时锁已经被释放,这时候自然也是不安全的==。

(2)Redisson实现的锁是可重入锁

可重入锁:某个线程已经获取了锁之后,可以再获取该锁

image-20240228203743749

从上面这张图我们可以看出来 Redisson 的分布式锁在 Redis 中的 hash 数据结构:{锁名}:{uuid:threadId}:{count}

另外对于已经存在的健值对初始化过期时间为 30 秒。

结合前面的加锁流程图,我们就可以看出来 Redisson 分布式锁是如何实现加锁的原子性,以下操作是一个原子操作:

  • 某一个节点下的线程加锁首先判断该线程对于的 hash 键是否存在

    • 若不存在(锁未被持有),则将锁的键设置为线程 id 对应的唯一标识,值为 1 (第一次加锁),返回空表示加锁成功
    • 锁存在且对应的是本线程,说明之前加锁的线程为同一个,则将值加1 (加锁次数,可重入),另外将该锁对应的存活时间重新设置,返回空表示加锁成功
    • 锁存在但键对应的不是当前线程,说明持有锁的是其他线程,返回锁剩余的过期时间表示加锁失败
  • 与Java的ReentrantLock可重入锁比较:

到这里,Redisson 的分布式锁加锁的流程以及锁在 Redis 中的数据结构已经清楚了,这时候我们可以对比一下 Java 自身实现的可重入锁 ReentrantLock。对于 ReentrantLock,甚至更多的线程安全组件如 Semaphore、CountDownLatch 等,==其底层的实现都依赖于 AQS(AbstractQueuedSynchronizer),==而 AQS 本身是一个队列,队列中的节点 Node 同样也是封装了线程的对象,只是 AQS 是本地单节点的

Redis 却是分布式的可以被任何 JVM 共享

另外 AQS 中还封装了一个 int 类型的状态变量 state:

/**
 * The synchronization state.
 */
private volatile int state;

当涉及到具体的实现时,state 有不同的含义,

  • 对 ReentrantLock 来说 state 就是可重入锁的加锁次数,
  • 对 Semaphore 来说 state 就是信号量,
  • 对 CountDownLatch 来说就是计数量。

可以看出来==, Java 的 AQS 一些抽象和 Redisson 实现的分布式锁是可以类比==的,比如 thread 标识对应的封装,加锁次数等。

只是 AQS 的实现原子操作一般是基于原子类的 CAS,而 Redisson 实现原子操作是基于 Redis 的 lua 脚本。

另外 AQS 实现队列节点状态同步是基于队列本身可以遍历的特性以及节点中的几种状态(这里不再赘述),而 Redisson 不同线程之间阻塞同步是基于发布订阅(后面会提到)。

可以得出:本地锁和分布式锁很多概念和思想是相似的,甚至其数据结构以及目标都是可类比的,只是分布式锁对本地锁的对象、范围、通信方式基于服务之间通信进行了实现。

(3)锁的自动续期

前面我们从 Redisson 加锁为入口,分析了加锁的整体流程并详细看了加锁时的细节以及数据结构,现在我们看一下 Redisson 分布式锁是如何自动续期的。前面我们已经提到了当第一次加锁成功时会开启自动续期的定时任务,对于的代码入口即为:

if (ttlRemaining == null) {
    this.scheduleExpirationRenewal(threadId)
}

//进入:
private void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        // 表示不是第一次加锁 则加锁次数加 1 不会再开启续期锁 因为第一次加锁时调用 scheduleExpirationRenewal(long threadId) 会进入
        // else 会开启 renewExpiration()
        oldEntry.addThreadId(threadId);
    } else {
        // 在加锁时第一次调用 开启自动续期(定时重设锁的过期时间)
        entry.addThreadId(threadId);
        renewExpiration();
    }
}

ExpirationEntry 封装了定时任务对应的线程对象,结合注释这一段也不必展开,我们继续往下看真正开启续期的方法 renewExpiration():

private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    // 锁已经不存在了直接返回
    if (ee == null) {
        return;
    }
    
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            // 这里监听续期 成功后递归调用(十秒后再次重复)
            future.onComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getName() + " expiration", e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }
                
                if (res) {
                    // reschedule itself
                    renewExpiration();
                }
            });
        }
        // 10 秒续期一次(如果还持有锁)  30000/3
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
    ee.setTimeout(task);
}

可以看出来锁自动续期的流程为:

  1. 若锁已经不存在了(比如手动释放了锁),直接返回
  2. 若锁仍存在,调用 Redis ==异步设置锁的过期时间 renewExpirationAsync(threadId),==同时监听续期结果
  3. 若续期成功,则递归调用 renewExpiration(),否则异常返回
  4. 以上过程每 10 秒重复一次 (internalLockLeaseTime / 3)

然后我们看一下调用 Redis 对锁进行续期的过程

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    // 当前线程持有的锁还存在 重新设置锁的过期时间(默认 30 秒)
    // 否则失败
    return evalWriteAsync(getName(),