欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

面试问题:Redis (viii)

最编程 2024-10-19 07:10:53
...

1. 面试题

2. 锁的特性

单机版同一个jvm虚拟机内,synchronized或者Lock接口

分布式多个不同jvm虚拟机,单机的线程锁机制不再起作用,资源类在不同的服务器之间共享

一个靠谱分布式锁所需的条件 

3. 手写分布式锁

3.1 独占性(线程安全)

执行业务代码前进行加锁操作,保证同一时刻仅有一个线程能进行业务操作 

/**
     * V1.0 原始版本
     * 不足:可能出现超卖现象,锁失效(线程安全问题)
     * 在单机环境下,可以使用synchronized或Lock来实现。
     * 但是在分布式系统中,因为竞争的线程可能不在同一个节点上(同一个jvm中),
     * 所以需要一个让所有进程都能访问到的锁来实现(比如redis或者zookeeper来构建)
     * 不同进程jvm层面的锁就不管用了,那么可以利用第三方的一个组件,来获取锁,未获取到锁,则阻塞当前想要运行的线程
     * @return
     */
    public String sale() {
        String message = "";
        String key = "lgyLock";
        lock.lock();
        try {
            String result = stringRedisTemplate.opsForValue().get("inventory");
            Integer inventory = result == null ? 0 : Integer.valueOf(result);
            if (inventory > 0) {
                System.out.println("库存数量:" + inventory);
                int num = inventory - 1;
                stringRedisTemplate.opsForValue().set("inventory", String.valueOf(num));
                message = "销售成功,剩余" + num + "件" + ",服务端口号:" + port;
            } else {
                message = "库存不足";
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        return message;
    }

3.2 高可用(分布式锁)

因为有多个订单模块(nginx轮询调用不同的订单模块进行业务操作),而v1.0的版本只是本地jvm锁,无法解决分布式场景,此时可引入第三方组件进行加锁释放锁判断。从而可用解决跨进程+跨服务问题、超卖问题、缓存击穿等等

/**
     * V2.0 改进版本:解决不同线程中锁失效问题
     * 不足:在上锁和解锁的过程中会运行其他业务代码,若该订单模块出现故障(宕机)则会发生死锁现象,该key值一直存在,其他线程无法获取到该锁
     * @return
     */
    public String sale() {
        String message = "";
        String key = "lgyLock";
        String uuidValue = key + UUID.randomUUID().toString().replace("-", "") + Thread.currentThread().getId();

        // 分布式锁:
        while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue)) {
            try {
                // 暂停20毫秒,该锁被去哦他线程获取时会将该线程休眠20毫秒后在尝试获取
                TimeUnit.MILLISECONDS.sleep(20);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        try {
            String result = stringRedisTemplate.opsForValue().get("inventory");
            Integer inventory = result == null ? 0 : Integer.valueOf(result);
            if (inventory > 0) {
                System.out.println("库存数量:" + inventory);
                int num = inventory - 1;
                stringRedisTemplate.opsForValue().set("inventory", String.valueOf(num));
                message = "销售成功,剩余" + num + "件" + ",服务端口号:" + port;
            } else {
                message = "库存不足";
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 释放锁
            stringRedisTemplate.delete(key);
        }
        return message;
    }

3.3 防死锁(宕机)

如部署的微服务Java程序挂了(宕机)从而导致锁一直没有释放,其他正常运行的微服务一直无法获得锁从而引起死锁问题

 /**
     * V3.0 改进版本:预防可能发生死锁现象
     * 不足: 若业务未完成锁到期了会提前释放锁,此时其他线程会进行加锁,在加锁后当前线程完成业务进行释放锁,导致误删其他线程的锁
     * @return
     */
    public String sale() {
        String message = "";
        String key = "lgyLock";
        String uuidValue = key + UUID.randomUUID().toString().replace("-", "") + Thread.currentThread().getId();

        // 分布式锁:锁设置自动过期时间
        while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue, 30L, TimeUnit.SECONDS)) {
            try {
                // 暂停20毫秒,锁被其他线程获取后会将当前线程休眠20毫秒后在尝试获取
                TimeUnit.MILLISECONDS.sleep(20);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        try {
            String result = stringRedisTemplate.opsForValue().get("inventory");
            Integer inventory = result == null ? 0 : Integer.valueOf(result);
            if (inventory > 0) {
                System.out.println("库存数量:" + inventory);
                int num = inventory - 1;
                stringRedisTemplate.opsForValue().set("inventory", String.valueOf(num));
                message = "销售成功,剩余" + num + "件" + ",服务端口号:" + port;
            } else {
                message = "库存不足";
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 释放锁
            stringRedisTemplate.delete(key);
        }
        return message;
    }

3.4 不乱抢(误删锁)

如线程1业务还未完成(正在进行),提前释放了锁 (锁过期),线程2进入执行业务代码(还在执行),线程1此时完成并进行释放锁,但线程1加的锁已经释放,此时线程1去释放线程2的锁,出现乱抢问题)

/**
     * V4.0 改进版本:解决其他线程误删锁问题
     * @return
     */
    public String sale() {
        String message = "";
        String key = "lgyLock";
        String uuidValue = key + UUID.randomUUID().toString().replace("-", "") + Thread.currentThread().getId();

        // 分布式锁:锁设置自动过期时间
        while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue, 30L, TimeUnit.SECONDS)) {
            try {
                // 暂停20毫秒,锁被其他线程获取后会将当前线程休眠20毫秒后在尝试获取
                TimeUnit.MILLISECONDS.sleep(20);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        try {
            String result = stringRedisTemplate.opsForValue().get("inventory");
            Integer inventory = result == null ? 0 : Integer.valueOf(result);
            if (inventory > 0) {
                System.out.println("库存数量:" + inventory);
                int num = inventory - 1;
                stringRedisTemplate.opsForValue().set("inventory", String.valueOf(num));
                message = "销售成功,剩余" + num + "件" + ",服务端口号:" + port;
            } else {
                message = "库存不足";
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 释放锁:比较要释放的锁和当前线程的锁是否一致,若一致则释放
            if (stringRedisTemplate.opsForValue().get(key).equalsIgnoreCase(uuidValue)) {
                stringRedisTemplate.delete(key);
            }
        }
        return message;
    }

3.5 原子性(Lua保证原子性)

finall的判断操作和del操作不是原子性的

 

/**
     * V5.0 改进版本:保证执行命令的原子性,使用lua脚本实现
     *  不足:不能兼顾可重复性
     * @return
     */
    public String sale() {
        String message = "";
        String key = "lgyLock";
        String uuidValue = key + UUID.randomUUID().toString().replace("-", "") + Thread.currentThread().getId();

        // 分布式锁:锁设置自动过期时间
        while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue, 30L, TimeUnit.SECONDS)) {
            try {
                // 暂停20毫秒,锁被其他线程获取后会将当前线程休眠20毫秒后在尝试获取
                TimeUnit.MILLISECONDS.sleep(20);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        try {
            String result = stringRedisTemplate.opsForValue().get("inventory");
            Integer inventory = result == null ? 0 : Integer.valueOf(result);
            if (inventory > 0) {
                System.out.println("库存数量:" + inventory);
                int num = inventory - 1;
                stringRedisTemplate.opsForValue().set("inventory", String.valueOf(num));
                message = "销售成功,剩余" + num + "件" + ",服务端口号:" + port;
            } else {
                message = "库存不足";
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 释放锁:比较要释放的锁和当前线程的锁是否一致,若一致则释放
            String luaScript =
                    "if (redis.call('get',KEYS[1]) == ARGV[1]) then " +
                            "return redis.call('del',KEYS[1]) " +
                            "else " +
                            "return 0 " +
                            "end";
            stringRedisTemplate.execute(new DefaultRedisScript<>(luaScript, Boolean.class), Arrays.asList(key), uuidValue);
        }
        return message;

3.6 重入性(可重入锁)

可重入锁又名递归锁

 

是指在同一个线程在外层方法获取锁的时候,再进入该线程的内层方法会自动获取锁(前提,锁对象得是同一个对象),不会因为之前已经获取过还没释放而阻塞。

 

如果是1个有 synchronized 修饰的递归调用方法,程序第2次进入被自己阻塞了那......

所以Java中ReentrantLock和synchronized都是可重入锁,可重入锁的一个优点是可一定程度避免死锁。

 3.6.1 隐式锁

synchronized关键字使用的锁默认是可重入锁

隐式锁指的是可重复可递归调用的锁,在外层使用锁之后,在内层仍然可以使用,并且不发生死锁,这样的锁就叫做可重入锁。

简单的来说就是:在一个synchronized修饰的方法或代码块的内部调用本类的其他synchronized修饰的方法或代码块时,是永远可以得到锁的

 

 

与可重入锁相反,不可重入锁不可递归调用,递归调用就发生死锁。

在一个Synchronized修饰的方法或代码块的内部调用本类的其他Synchronized修饰的方法或代码块时,是永远可以得到锁的

public class ReEntryLockDemo
{
    public synchronized void m1()
    {
        System.out.println("-----m1");
        m2();
    }
    public synchronized void m2()
    {
        System.out.println("-----m2");
        m3();
    }
    public synchronized void m3()
    {
        System.out.println("-----m3");
    }

    public static void main(String[] args)
    {
        ReEntryLockDemo reEntryLockDemo = new ReEntryLockDemo();

        reEntryLockDemo.m1();
    }
}

3.6.2 重入实现机制

 每个锁对象拥有一个锁计数器和一个指向持有该锁的线程的指针。

 

当执行monitorenter时,如果目标锁对象的计数器为零,那么说明它没有被其他线程所持有,Java虚拟机会将该锁对象的持有线程设置为当前线程,并且将其计数器加1。

在目标锁对象的计数器不为零的情况下,如果锁对象的持有线程是当前线程,那么 Java 虚拟机可以将其计数器加1,否则需要等待,直至持有线程释放该锁。

 

当执行monitorexit时,Java虚拟机则需将锁对象的计数器减1。计数器为零代表锁已被释放。

3.6.3 显示锁

lock使用的是显示锁,加锁几次必须释放锁几次 

public class ReEntryLockDemo
{
    static Lock lock = new ReentrantLock();

    public static void main(String[] args)
    {
        new Thread(() -> {
            lock.lock();
            try
            {
                System.out.println("----外层调用lock");
                lock.lock();
                try
                {
                    System.out.println("----内层调用lock");
                }finally {
                    // 这里故意注释,实现加锁次数和释放次数不一样
                    // 由于加锁次数和释放次数不一样,第二个线程始终无法获取到锁,导致一直在等待。
                    lock.unlock(); // 正常情况,加锁几次就要解锁几次
                }
            }finally {
                lock.unlock();
            }
        },"a").start();

        new Thread(() -> {
            lock.lock();
            try
            {
                System.out.println("b thread----外层调用lock");
            }finally {
                lock.unlock();
            }
        },"b").start();

    }
}

3.7 自动续期

自动续期功能可确保锁的过期时间大于业务执行时间,避免业务还未执行完成锁过期的问题

4. RedLock

4.1 问题引入 

手写分布式锁的不足

 

4.2 解决方案

Redis也提供了Redlock算法,用来实现基于多个实例的分布式锁。

锁变量由多个实例维护,即使有实例发生了故障,锁变量仍然是存在的,客户端还是可以完成锁操作。

该方案也是基于(set 加锁、Lua 脚本解锁)进行改良的,所以redis之父antirez 只描述了差异的地方,大致方案如下。

假设我们有N个Redis主节点,例如 N = 5这些节点是完全独立的,我们不使用复制或任何其他隐式协调系统,

为了取到锁客户端执行以下操作:

1
获取当前时间,以毫秒为单位;
2
依次尝试从5个实例,使用相同的 key 和随机值(例如 UUID)获取锁。当向Redis 请求获取锁时,客户端应该设置一个超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为 10 秒,则超时时间应该在 5-50 毫秒之间。这样可以防止客户端在试图与一个宕机的 Redis 节点对话时长时间处于阻塞状态。如果一个实例不可用,客户端应该尽快尝试去另外一个 Redis 实例请求获取锁;
3
客户端通过当前时间减去步骤 1 记录的时间来计算获取锁使用的时间。当且仅当从大多数(N/2+1,这里是 3 个节点)的 Redis 节点都取到锁,并且获取锁使用的时间小于锁失效时间时,锁才算获取成功;
4
如果取到了锁,其真正有效时间等于初始有效时间减去获取锁所使用的时间(步骤 3 计算的结果)。
5
如果由于某些原因未能获得锁(无法在至少 N/2 + 1 个 Redis 实例获取锁、或获取锁的时间超过了有效时间),客户端应该在所有的 Redis 实例上进行解锁(即便某些Redis实例根本就没有加锁成功,防止某些节点获取到锁但是客户端没有得到响应而导致接下来的一段时间不能被重新获取锁)。

该方案为了解决数据不一致的问题,直接舍弃了异步复制只使用 master 节点,同时由于舍弃了 slave,为了保证可用性,引入了 N 个节点,官方建议是 5。阳哥本次教学演示用3台实例来做说明。

客户端只有在满足下面的这两个条件时,才能认为是加锁成功。

条件1:客户端从超过半数(大于等于N/2+1)的Redis实例上成功获取到了锁;

条件2:客户端获取锁的总耗时没有超过锁的有效时间。

 

这个锁的算法实现了多redis实例的情况,相对于单redis节点来说,优点在于 防止了 单节点故障造成整个服务停止运行的情况且在多节点中锁的设计,及多节点同时崩溃等各种意外情况有自己独特的设计方法。

Redisson 分布式锁支持 MultiLock 机制可以将多个锁合并为一个大锁,对一个大锁进行统一的申请加锁以及释放锁。

 

最低保证分布式锁的有效性及安全性的要求如下:

1.互斥;任何时刻只能有一个client获取锁

2.释放死锁;即使锁定资源的服务崩溃或者分区,仍然能释放锁

3.容错性;只要多数redis节点(一半以上)在使用,client就可以获取和释放锁

 

网上讲的基于故障转移实现的redis主从无法真正实现Redlock:

因为redis在进行主从复制时是异步完成的,比如在clientA获取锁后,主redis复制数据到从redis过程中崩溃了,导致没有复制到从redis中,然后从redis选举出一个升级为主redis,造成新的主redis没有clientA 设置的锁,这是clientB尝试获取锁,并且能够成功获取锁,导致互斥失效;