Redis分布式锁—Redisson+RLock可重入锁实现篇
前言
平时的工作中,由于生产环境中的项目是需要部署在多台服务器中的,所以经常会面临解决分布式场景下数据一致性的问题,那么就需要引入分布式锁来解决这一问题。
针对分布式锁的实现,目前比较常用的就如下几种方案:
- 基于数据库实现分布式锁
- 基于 Redis 实现分布式锁 【本文】
- 基于 Zookeeper 实现分布式锁
接下来这个系列文章会跟大家一块探讨这三种方案,本篇为 Redis 实现分布式锁篇。
Redis分布式环境搭建推荐:基于Docker的Redis集群搭建
Redis分布式锁一览
说到 Redis 锁,能搜到的,或者说常用的无非就下面这两个:
- setNX + Lua脚本
- Redisson + RLock可重入锁 【本文】
接下来我们一一探索这两个的实现,本文为 Redisson + RLock可重入锁 实现篇。
1、setNX+Lua实现方式
跳转链接:https://www.cnblogs.com/niceyoo/p/13711149.html
2、Redisson介绍
Redisson 是 java 的 Redis 客户端之一,是 Redis 官网推荐的 java 语言实现分布式锁的项目。
Redisson 提供了一些 api 方便操作 Redis。因为本文主要以锁为主,所以接下来我们主要关注锁相关的类,以下是 Redisson 中提供的多样化的锁:
- 可重入锁(Reentrant Lock)
- 公平锁(Fair Lock)
- 联锁(MultiLock)
- 红锁(RedLock)
- 读写锁(ReadWriteLock)
- 信号量(Semaphore) 等等
总之,管你了解不了解,反正 Redisson 就是提供了一堆锁... 也是目前大部分公司使用 Redis 分布式锁最常用的一种方式。
本文中 Redisson 分布式锁的实现是基于 RLock 接口,而 RLock 锁接口实现源码主要是 RedissonLock 这个类,而源码中加锁、释放锁等操作都是使用 Lua 脚本来完成的,并且封装的非常完善,开箱即用。
接下来主要以 Redisson 实现 RLock 可重入锁为主。
代码中实现过程
一起来看看在代码中 Redisson 怎么实现分布式锁的,然后再对具体的方法进行解释。
源码地址:https://github.com/niceyoo/redis-redlock
篇幅限制,文中代码不全,请以上方源码链接为主。
代码大致逻辑:首先会涉及数据库 2 个表,order2(订单表)、stock(库存表),controller层会提供一个创建订单的接口,创建订单之前,先获取 RedLock 分布式锁,获取锁成功后,在一个事务下减库存,创建订单;最后通过创建大于库存的并发数模拟是否出现超卖的情况。
代码环境:SpringBoot2.2.2.RELEASE
+ Spring Data JPA
+ Redisson
1)Maven 依赖 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>redis-redlock</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>redis-redlock</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Redis-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.10</version>
</dependency>
<!-- redisson -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.11.1</version>
</dependency>
<!-- Gson -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.6</version>
</dependency>
<!-- JPA -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<!-- Mysql Connector -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.48</version>
</dependency>
<!-- 数据库连接池 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.20</version>
</dependency>
<!-- Hutool工具包 -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>4.6.8</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
redisson、MySQL 等相关依赖。
2)application.yml 配置文件
server:
port: 6666
servlet:
context-path: /
spring:
# 数据源
datasource:
url: jdbc:mysql://127.0.0.1:3306/redis_demo?useUnicode=true&characterEncoding=utf-8&useSSL=false
username: root
password: 123456
type: com.alibaba.druid.pool.DruidDataSource
driverClassName: com.mysql.jdbc.Driver
logSlowSql: true
jpa:
# 显示sql
show-sql: false
# 自动生成表结构
generate-ddl: true
hibernate:
ddl-auto: update
redis:
redis:
cluster:
nodes: 10.211.55.4:6379, 10.211.55.4:6380, 10.211.55.4:6381
lettuce:
pool:
min-idle: 0
max-idle: 8
max-active: 20
# 日志
logging:
# 输出级别
level:
root: info
file:
# 指定路径
path: redis-logs
# 最大保存天数
max-history: 7
# 每个文件最大大小
max-size: 5MB
配置redis,指定数据库地址。
3)Redisson配置类 RedissonConfig.java
/**
* redisson配置类
*/
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useClusterServers()
.setScanInterval(2000)
.addNodeAddress("redis://10.211.55.4:6379", "redis://redis://10.211.55.4:6380")
.addNodeAddress("redis://redis://10.211.55.4:6381");
RedissonClient redisson = Redisson.create(config);
return redisson;
}
}
4)StockServerImpl 库存实现类,其他参考源码
import com.example.redisredlock.bean.Stock;
import com.example.redisredlock.dao.StockDao;
import com.example.redisredlock.server.StockService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Slf4j
@Service
@Transactional
public class StockServerImpl implements StockService {
@Autowired
private StockDao stockDao;
@Override
public StockDao getRepository() {
return stockDao;
}
/**
* 减库存
*
* @param productId
* @return
*/
@Override
public boolean decrease(String productId) {
Stock one = stockDao.getOne(productId);
int stockNum = one.getStockNum() - 1;
one.setStockNum(stockNum);
stockDao.saveAndFlush(one);
return true;
}
}
库存实现类,就一个接口,完成对库存的-1操作。
5)OrderServerImpl 订单实现类(核心代码)
package com.example.redisredlock.server.impl;
import com.example.redisredlock.bean.Order;
import com.example.redisredlock.dao.OrderDao;
import com.example.redisredlock.server.OrderServer;
import com.example.redisredlock.server.StockService;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.Date;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@Slf4j
@Service
@Transactional
public class OrderServerImpl implements OrderServer {
/**
* 库存service
*/
@Resource
private StockService stockService;
/**
* 订单order dao
*/
@Resource
private OrderDao orderDao;
@Override
public OrderDao getRepository() {
return orderDao;
}
@Resource
private RedissonClient redissonClient;
@Override
@Transactional(rollbackFor = Exception.class)
public boolean createOrder(String userId, String productId) {
/** 如果不加锁,必然超卖 **/
RLock lock = redissonClient.getLock("stock:" + productId);
try {
lock.lock(10, TimeUnit.SECONDS);
int stock = stockService.get(productId).getStockNum();
log.info("剩余库存:{}", stock);
if (stock <= 0) {
return false;
}
String orderNo = UUID.randomUUID().toString().replace("-", "").toUpperCase();
/** 减库存操作 **/
if (stockService.decrease(productId)) {
Order order = new Order();
order.setUserId(userId);
order.setProductId(productId);
order.setOrderNo(orderNo);
Date now = new Date();
order.setCreateTime(now);
order.setUpdateTime(now);
orderDao.save(order);
return true;
}
} catch (Exception ex) {
log.error("下单失败", ex);
} finally {
lock.unlock();
}
return false;
}
}
6)Order 订单实体类
@Data
@Entity
@Table(name = "order2")
public class Order extends BaseEntity {
private static final long serialVersionUID = 1L;
/**
* 订单编号
*/
private String orderNo;
/**
* 下单用户id
*/
private String userId;
/**
* 产品id
*/
private String productId;
}
7)Stock 库存实体类
@Data
@Entity
@Table(name = "stock")
public class Stock extends BaseEntity {
private static final long serialVersionUID = 1L;
/**
* 用产品id,设置为库存id
*/
/**
* 库存数量
*/
private Integer stockNum;
}
8)OrderController 订单接口
package com.example.redisredlock.controller;
import com.example.redisredlock.bean.Order;
import com.example.redisredlock.server.OrderServer;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* @author niceyoo
*/
@RestController
@RequestMapping("/order")
public class OrderController {
@Resource
private OrderServer orderServer;
@PostMapping("/createOrder")
public boolean createOrder(Order order) {
return orderServer.createOrder(order.getUserId(), order.getProductId());
}
}
表结构说明及接口测试部分
因为项目中使用 Spring Data JPA,所以会自动创建数据库表结构,大致为:
stock(库存表)
id(商品id) | stock_num(库存数量) | create_time(创建时间) | update_time(更新时间) |
---|---|---|---|
1234 | 100 | xxxx | xxxx |
order2(订单表)
id(订单id) | order_no(订单号) | user_id(用户id) | product_id(商品id) |
---|---|---|---|
xxxx | xxxx | xxxx | 1234 |
如下是详细表结构+数据:
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for order2
-- ----------------------------
DROP TABLE IF EXISTS `order2`;
CREATE TABLE `order2` (
`id` varchar(64) NOT NULL,
`create_time` datetime(6) DEFAULT NULL,
`update_time` datetime(6) DEFAULT NULL,
`order_no` varchar(255) DEFAULT NULL,
`user_id` varchar(64) DEFAULT NULL,
`product_id` varchar(64) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for stock
-- ----------------------------
DROP TABLE IF EXISTS `stock`;
CREATE TABLE `stock` (
`id` varchar(255) NOT NULL,
`create_time` datetime(6) DEFAULT NULL,
`update_time` datetime(6) DEFAULT NULL,
`stock_num` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Records of stock
-- ----------------------------
BEGIN;
INSERT INTO `stock` VALUES ('1234', '2020-09-21 21:38:09.000000', '2020-09-22 08:32:17.883000', 0);
COMMIT;
SET FOREIGN_KEY_CHECKS = 1;
创建订单的过程就是消耗库存表 stock_num 的过程,如果没有分布式锁的情况下,在高并发下很容易出现商品超卖的情况,所以引入了分布式锁的概念,如下是在库存100,并发1000的情况下,测试超卖情况:
JMeter 模拟进程截图
JMeter 调用接口截图
stock 库存表截图
订单表截图
加了锁之后并没有出现超卖情况。
核心代码说明
整个 demo 核心代码在创建订单 createOrder() 加锁的过程,如下:
@Override
@Transactional(rollbackFor = Exception.class)
public boolean createOrder(String userId, String productId) {
// 如果不加锁,必然超卖
RLock lock = redissonClient.getLock("stock:" + productId);
try {
lock.lock(10, TimeUnit.SECONDS);
int stock = stockService.get(productId).getStockNum();
log.info("剩余库存:{}", stock);
if (stock <= 0) {
return false;
}
String orderNo = UUID.randomUUID().toString().replace("-", "").toUpperCase();
if (stockService.decrease(productId)) {
Order order = new Order();
order.setUserId(userId);
order.setProductId(productId);
order.setOrderNo(orderNo);
Date now = new Date();
order.setCreateTime(now);
order.setUpdateTime(now);
orderDao.save(order);
return true;
}
} catch (Exception ex) {
log.error("下单失败", ex);
} finally {
lock.unlock();
}
return false;
}
去除业务逻辑,加锁框架结构为:
RLock lock = redissonClient.getLock("xxx");
lock.lock();
try {
...
} finally {
lock.unlock();
}
关于 RedLock 中的方法
因为 RLock 本身继承自 Lock 接口,如下分为两部分展示:
public interface RLock extends Lock, RLockAsync {
//----------------------Lock接口方法-----------------------
/**
* 加锁 锁的有效期默认30秒
*/
void lock();
/**
* tryLock()方法是有返回值的,它表示用来尝试获取锁,如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false .
*/
boolean tryLock();
/**
* tryLock(long time, TimeUnit unit)方法和tryLock()方法是类似的,只不过区别在于这个方法在拿不到锁时会等待一定的时间,
* 在时间期限之内如果还拿不到锁,就返回false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回true。
*
* @param time 等待时间
* @param unit 时间单位 小时、分、秒、毫秒等
*/
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
/**
* 解锁
*/
void unlock();
/**
* 中断锁 表示该锁可以被中断 假如A和B同时调这个方法,A获取锁,B为获取锁,那么B线程可以通过
* Thread.currentThread().interrupt(); 方法真正中断该线程
*/
void lockInterruptibly();
//----------------------RLock接口方法-----------------------
/**
* 加锁 上面是默认30秒这里可以手动设置锁的有效时间
*
* @param leaseTime 锁有效时间
* @param unit 时间单位 小时、分、秒、毫秒等
*/
void lock(long leaseTime, TimeUnit unit);
/**
* 这里比上面多一个参数,多添加一个锁的有效时间
*
* @param waitTime 等待时间
* @param leaseTime 锁有效时间
* @param unit 时间单位 小时、分、秒、毫秒等
*/
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
/**
* 检验该锁是否被线程使用,如果被使用返回True
*/
boolean isLocked();
/**
* 检查当前线程是否获得此锁(这个和上面的区别就是该方法可以判断是否当前线程获得此锁,而不是此锁是否被线程占有)
* 这个比上面那个实用
*/
boolean isHeldByCurrentThread();
/**
* 中断锁 和上面中断锁差不多,只是这里如果获得锁成功,添加锁的有效时间
* @param leaseTime 锁有效时间
* @param unit 时间单位 小时、分、秒、毫秒等
*/
void lockInterruptibly(long leaseTime, TimeUnit unit);
}
1、加锁
首先重点在 getLock() 方法,到底是怎么拿到分布式锁的,我们点进该方法:
public RLock getLock(String name) {
return new RedissonLock(this.connectionManager.getCommandExecutor(), name);
}
调用 getLock() 方法后实际返回一个 RedissonLock 对象,此时就有点呼应了,文章前面提到的 Redisson 普通的锁实现源码主要是 RedissonLock 这个类,而源码中加锁、释放锁等操作都是使用 Lua 脚本来完成的,封装的非常完善,开箱即用。
在 RedissonLock 对象中,主要实现 lock() 方法,而 lock() 方法主要调用 tryAcquire() 方法:
tryAcquire() 方法又继续调用 tryAcquireAsync() 方法:
到这,由于 leaseTime == -1,于是又调用 tryLockInnerAsync()方法,感觉有点无限套娃那种感觉了...
咳咳,不过这个方法是最关键的了:
这个方法就有点意思了,看到了一些熟悉的东西,还记得上一篇里的 Lua 脚本吗?
我们来分析一下这个部分的 Lua 脚本:
commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
脚本里,一共是有两个参数 KEYS[1]、通过后面的参数可以得知: KEYS[1] 为 getName(),ARGV[2] 为 getLockName(threadId)。
假设传递加锁参数时传入的 name 值为 "niceyoo",
假设线程调用的 ID 为 thread-1,
假设 RedissonLock 类的成员变量 UUID 类型的 id 值为 32063ed-98522fc-80287ap,
结合 getLockName(threadId)) 方法:
protected String getLockName(long threadId) {
return this.id + ":" + threadId;
}
即,KEYS[1] = niceyoo,ARGV[2] = 32063ed-98522fc-80287ap:thread-1
然后将假设值带入语句中:
- 判断是否存在名为 “niceyoo” 的 key;
- 如果没有,则在其下设置一个字段为 “32063ed-98522fc-80287ap:thread-1”,值为 1 的键值对 ,并设置它的过期时间,也就是第一个 if 语句体;
- 如果存在,则进一步判断 “32063ed-98522fc-80287ap:thread-1” 是否存在,若存在,则其值加 1,并重新设置过期时间,这个过程可以看做锁重入;
- 返回 “niceyoo” 的生存时间(毫秒);
如果放在锁这个场景下就是,key 表示当前线程名称,argv 为当前获得锁的线程,所有竞争这把锁的线程都要判断这个 key 下有没有自己的,也就是上边那些 if 判断,如果没有就不能获得锁,如果有,则进入重入锁,字段值+1。
2、解锁
解锁调用的是 unlockInnerAsync() 方法:
该方法同样还是调用的 Lua 脚本实现的。
同样还是假设 name=niceyoo,假设线程 ID 是 thread-1
同理,我们可以得到:
KEYS[1] 是 getName(),即 KEYS[1]=niceyoo,
KEYS[2] 是 getChannelName(),即 KEYS[2]=redisson_lock__channel:{niceyoo},
ARGV[1] 是 LockPubSub.unlockMessage,即ARGV[1]=0,
ARGV[2] 是生存时间,
ARGV[3] 是 getLockName(threadId),即 ARGV[3]=32063ed-98522fc-80287ap:thread-1
因此,上面脚本的意思是:
-
判断是否存在 name 为 “niceyoo” 的key;
-
如果不存在,向 Channel 中广播一条消息,广播的内容是0,并返回1
-
如果存在,进一步判断字段 32063ed-98522fc-80287ap:thread-1 是否存在
-
若字段不存在,返回空,若字段存在,则字段值减1
-
若减完以后,字段值仍大于0,则返回0
-
减完后,若字段值小于或等于0,则广播一条消息,广播内容是0,并返回1;
可以猜测,广播0表示资源可用,即通知那些等待获取锁的线程现在可以获得锁了。
3、加锁解锁小结
4、其他补充
4.1 lock() 方法
通常在获得 RLock 时,需要调用 lock() 方法,那么设置过期时间跟不设置有啥区别:
RLock lock = redissonClient.getLock("xxx");
/*最常见的使用方法*/
lock.lock();
如果没有设置过期时间,默认还是会有一个30秒的过期时间,等价于:
RLock lock = redissonClient.getLock("xxx");
/*支持过期解锁,30秒之后自动释放锁,无须调用unlock方法手动解锁*/
lock.lock(30, TimeUnit.SECONDS);
4.1 tryLock() 方法
有的小伙在在获取分布式锁时,使用的是 tryLock() 方法,跟 lock() 方法有啥区别:
RLock lock = redissonClient.getLock("xxx");
/*尝试
推荐阅读
-
使用 Redis setNX 实现分布式锁 (包括重复数据插入的排他锁定)
-
深入理解Redis分布式锁:从源码看第一种简单实现方法
-
SpringBoot 2.0及以上版本的Redis分布式锁实现指南
-
Redis分布式锁—Redisson+RLock可重入锁实现篇
-
Go与Redis实现分布式互斥锁和红锁
-
如何实现Redis的分布式锁方法探索
-
理解并实现基于5种Lock接口的可重入锁(ReentrantLock)
-
多元化实践:Redisson在实现Redis分布式锁的各种方法探索
-
(Redis篇)超细Redisson分布式Lock锁源码分析
-
理解并实现:可重入锁(ReentrantLock)在多线程中的应用