SpringBoot 2.0及以上版本的Redis分布式锁实现指南
最编程
2024-07-28 19:20:15
...
SpringBoot2.0以后,redis 的库替换为了lettuce ,
分享基于redis一个分布式锁实现,
特点:
1/ 非重入,等待锁时使用线程sleep
2/使用 redis的 SETNX 带过期时间的方法
3/使用ThreadLocal保存锁的值,在锁超时时,防止删除其他线程的锁,使用lua 脚本保证原子性;
实现如下,欢迎提出指正:
package ???;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.ScriptOutputType;
import io.lettuce.core.SetArgs;
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.api.async.RedisScriptingAsyncCommands;
import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.util.Assert;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
/**
* 只支持springboot2 以后的Redis分布式锁(lettuce底层,不支持jedis)
* 不支持重入
*
* 经过测试,在本地redis情况下,一次lock和releaseLock 总花费约3ms
*/
public class RedisLock extends AbstractLock {
private RedisTemplate<String, Object> redisTemplate;
private ThreadLocal<String> lockValue = new ThreadLocal<>();
private final Logger logger = LoggerFactory.getLogger(RedisLock.class);
private static final String REDIS_LIB_MISMATCH = "Failed to convert nativeConnection. " +
"Is your SpringBoot main version > 2.0 ? Only lib:lettuce is supported.";
private static final String UNLOCK_LUA;
static {
StringBuilder sb = new StringBuilder();
sb.append("if redis.call(\"get\",KEYS[1]) == ARGV[1] ");
sb.append("then ");
sb.append(" return redis.call(\"del\",KEYS[1]) ");
sb.append("else ");
sb.append(" return 0 ");
sb.append("end ");
UNLOCK_LUA = sb.toString();
}
public RedisLock(RedisTemplate<String, Object> redisTemplate) {
Assert.notNull(redisTemplate,"redisTemplate should not be null.");
this.redisTemplate = redisTemplate;
}
/**
* 加锁
* @param key
* @param expireSeconds
* @param retryTimes
* @param sleepMillis
* @return
*/
@Override
public boolean lock(String key, long expireSeconds, int retryTimes, long sleepMillis) {
boolean result = tryLock(key, expireSeconds);
while((!result) && retryTimes-- > 0){
try {
logger.debug("Lock failed, retrying..." + retryTimes);
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
return false;
}
result = tryLock(key, expireSeconds);
}
return result;
}
/**
* 尝试Lock
* @param key
* @param expireSeconds
* @return
*/
@SuppressWarnings("unchecked")
private boolean tryLock(String key, long expireSeconds) {
String uuid = UUID.randomUUID().toString();
try {
String result = redisTemplate.execute(new RedisCallback<String>() {
@Override
public String doInRedis(RedisConnection connection) throws DataAccessException {
try{
Object nativeConnection = connection.getNativeConnection();
byte[] keyByte = key.getBytes(StandardCharsets.UTF_8);
byte[] valueByte = uuid.getBytes(StandardCharsets.UTF_8);
String resultString = "";
if(nativeConnection instanceof RedisAsyncCommands){
RedisAsyncCommands command = (RedisAsyncCommands) nativeConnection;
resultString = command
.getStatefulConnection()
.sync()
.set(keyByte, valueByte, SetArgs.Builder.nx().ex(expireSeconds));
}else if(nativeConnection instanceof RedisAdvancedClusterAsyncCommands){
RedisAdvancedClusterAsyncCommands clusterAsyncCommands = (RedisAdvancedClusterAsyncCommands) nativeConnection;
resultString = clusterAsyncCommands
.getStatefulConnection()
.sync()
.set(keyByte, keyByte, SetArgs.Builder.nx().ex(expireSeconds));
}else{
logger.error(REDIS_LIB_MISMATCH);
}
return resultString;
}catch (Exception e){
logger.error("Failed to lock, closing connection",e);
closeConnection(connection);
return "";
}
}
});
boolean eq = "OK".equals(result);
if(eq) {
lockValue.set(uuid);
}
return eq;
} catch (Exception e) {
logger.error("Set redis exception", e);
return false;
}
}
/**
* 释放锁
* 有可能因为持锁之后方法执行时间大于锁的有效期,此时有可能已经被另外一个线程持有锁,所以不能直接删除
* 使用lua脚本删除redis中匹配value的key
* @param key
* @return false: 锁已不属于当前线程 或者 锁已超时
*/
@SuppressWarnings("unchecked")
@Override
public boolean releaseLock(String key) {
try {
String lockValue = this.lockValue.get();
if(lockValue==null){
return false;
}
byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
byte[] valueBytes = lockValue.getBytes(StandardCharsets.UTF_8);
Object[] keyParam = new Object[]{keyBytes};
Long result = redisTemplate.execute(new RedisCallback<Long>() {
public Long doInRedis(RedisConnection connection) throws DataAccessException {
try{
Object nativeConnection = connection.getNativeConnection();
if (nativeConnection instanceof RedisScriptingAsyncCommands) {
/**
* 不要问我为什么这里的参数这么奇怪
*/
RedisScriptingAsyncCommands<Object,byte[]> command = (RedisScriptingAsyncCommands<Object,byte[]>) nativeConnection;
RedisFuture future = command.eval(UNLOCK_LUA, ScriptOutputType.INTEGER, keyParam, valueBytes);
return getEvalResult(future,connection);
}else{
logger.warn(REDIS_LIB_MISMATCH);
return 0L;
}
}catch (Exception e){
logger.error("Failed to releaseLock, closing connection",e);
closeConnection(connection);
return 0L;
}
}
});
return result != null && result > 0;
} catch (Exception e) {
logger.error("release lock exception", e);
}
return false;
}
private Long getEvalResult(RedisFuture future,RedisConnection connection){
try {
Object o = future.get();
return (Long)o;
} catch (InterruptedException |ExecutionException e) {
logger.error("Future get failed, trying to close connection.", e);
closeConnection(connection);
return 0L;
}
}
private void closeConnection(RedisConnection connection){
try{
connection.close();
}catch (Exception e2){
logger.error("close connection fail.", e2);
}
}
/**
* 查看是否加锁
* @param key
* @return
*/
@Override
public boolean isLocked(String key) {
Object o = redisTemplate.opsForValue().get(key);
return o!=null;
}
}
上一篇: Redis分布式锁实战应用概述与经验分享
下一篇: 5种解决Java独占写文件的方法
推荐阅读
-
SpringBoot 2.0及以上版本的Redis分布式锁实现指南
-
【2022新手指南】Java编程进阶之路 - 六、技术架构篇 ### MySQL索引底层解析与优化实战 - 你会讲解MySQL索引的数据结构吗?性能调优技巧知多少? - Redis深度揭秘:你知道多少?从基础到哨兵、主从复制全梳理 - Redis持久化及哨兵模式详解,还有集群搭建和Leader选举黑箱打开 - Zookeeper是个啥?特性和应用场景大公开 - ZooKeeper集群搭建攻略及 Leader选举、读写一致性、共享锁实现细节 - 探究ZooKeeper中的Leader选举机制及其在分布式环境中的作用 - Zab协议深入剖析:原理、功能与在Zookeeper中的核心地位 - RabbitMQ全方位解读:工作模式、消费限流、可靠投递与配置策略 - 设计者视角:RabbitMQ过期时间、死信队列与延时队列实践指南 - RocketMQ特性和应用场景揭示:理解其精髓与差异化优势 - Kafka详细介绍:特性及广泛应用于实时数据处理的场景解析 - ElasticSearch实力揭秘:特性概述与作为搜索引擎的广泛应用 - MongoDB认知升级:非关系型数据库的优势阐述,安装与使用实战教学 - BIO/NIO/AIO网络模型对比:掌握它们的区别与在网络编程中的实际应用 - Netty带你飞:理解其超快速度背后的秘密,包括线程模型分析 - 网络通信黑科技:Netty编解码原理与常用编解码器的应用,Protostuff实战演示 - 解密Netty粘包与拆包现象,怎样有效应对这一常见问题 - 自定义Netty心跳检测机制,轻松调整检测间隔时间的艺术 - Dubbo轻骑兵介绍:核心特性概览,服务降级实战与其实现益处 - Dubbo三大神器解读:本地存根与本地伪装的实战运用与优势呈现 ----------------------- 七、结语与回顾