redis中的分布式锁有哪些特点
分布式锁的特点
1.独占性
不论在任何情况下都只能有一个线程持有锁。
2.高可用
redis集群环境不能因为某一个节点宕机而出现获取锁或释放锁失败。
3.防死锁
必须有超时控制机制或者撤销操作。
4.不乱抢
自己加锁,自己释放。不能释放别人加的锁。
5.重入性
同一线程可以多次加锁。
redis单机怎么实现
一般情况下都是使用setnx+lua脚本实现。
直接贴代码
packagecom.fandf.test.redis;
importcn.hutool.core.util.IdUtil;
importcn.hutool.core.util.RandomUtil;
importlombok.extern.slf4j.Slf4j;
importorg.springframework.data.redis.core.RedisTemplate;
importorg.springframework.data.redis.core.script.DefaultRedisScript;
importorg.springframework.stereotype.Service;
importjavax.annotation.Resource;
importjava.util.Collections;
importjava.util.concurrent.TimeUnit;
/**
*redis单机锁
*
*@authorfandongfeng
*@date2023/3/2906:52
*/
@Slf4j
@Service
publicclassRedisLock{
@Resource
RedisTemplateString,Object>
redisTemplate;
privatestaticfinalStringSELL_LOCK="kill:";
/**
*模拟秒杀
*
*@return是否成功
*/
publicStringkill(){
StringproductId="123";
Stringkey=SELL_LOCK+productId;
//锁value,解锁时用来判断当前锁是否是自己加的
Stringvalue=IdUtil.fastSimpleUUID();
//加锁十秒钟过期防死锁
Booleanflag=redisTemplate.opsForValue().setIfAbsent(key,value,10,TimeUnit.SECONDS);
if(!flag){
return"加锁失败";
}
try{
StringproductKey="good123";
//获取商品库存
Integerstock=(Integer)redisTemplate.opsForValue().get(productKey);
if(stock==null){
//模拟录入数据,实际应该加载时从数据库读取
redisTemplate.opsForValue().set(productKey,100);
stock=100;
}
if(stock=0){
return"卖完了,下次早点来吧";
}
//扣减库存,模拟随机卖出数量
intrandomInt=RandomUtil.randomInt(1,10);
redisTemplate.opsForValue().decrement(productKey,randomInt);
//修改db,可以丢到队列里慢慢处理
return"成功卖出"+randomInt+"个,库存剩余"+redisTemplate.opsForValue().get(productKey)+"个";
}
finally{
////这种方法会存在删除别人加的锁的可能
//redisTemplate.delete(key);
//if(value.equals(redisTemplate.opsForValue().get(key))){
////因为if条件的判断和delete不是原子性的,
////if条件判断成功后,恰好锁到期自己解锁
////此时别的线程如果持有锁了,就会把别人的锁删除掉
//redisTemplate.delete(key);
//}
//使用lua脚本保证判断和删除的原子性
StringluaScript=
"if(redis.call('get',KEYS[1])==ARGV[1])then"+
"returnredis.call('del',KEYS[1])"+
"else"+
"return0"+
"end";
redisTemplate.execute(newDefaultRedisScript>
(luaScript,Boolean.class),Collections.singletonList(key),value);
}
}
}
进行单元测试,模拟一百个线程同时进行秒杀
packagecom.fandf.test.redis;
importorg.junit.jupiter.api.DisplayName;
importorg.junit.jupiter.api.RepeatedTest;
importorg.junit.jupiter.api.Test;
importorg.junit.jupiter.api.parallel.Execution;
importorg.springframework.boot.test.context.SpringBootTest;
importjavax.annotation.Resource;
importstaticorg.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT;
/**
*@Description:
*@author:fandongfeng
*@date:2023-3-2416:45
*/
@SpringBootTest
classSignServiceTest{
@Resource
RedisLockredisLock;
@RepeatedTest(100)
@Execution(CONCURRENT)
publicvoidredisLock(){
Stringresult=redisLock.kill();
if("加锁失败".equals(result)){
}
else{
System.out.println(result);
}
}
}
只有三个线程抢到了锁
成功卖出5个,库存剩余95个
成功卖出8个,库存剩余87个
成功卖出7个,库存剩余80个
redis锁有什么问题?
总的来说有两个:
1.无法重入。
2.我们为了防止死锁,加锁时都会加上过期时间,这个时间大部分情况下都是根据经验对现有业务评估得出来的,但是万一程序阻塞或者异常,导致执行了很长时间,锁过期就会自动释放了。此时如果别的线程拿到锁,执行逻辑,就有可能出现问题。
那么这两个问题有没有办法解决呢?有,接下来我们就来讲讲Redisson
Redisson实现分布式锁
Redisson是什么?
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet
, Set
, Multimap
, SortedSet
, Map
, List
, Queue
, BlockingQueue
, Deque
, BlockingDeque
, Semaphore
, Lock
, AtomicLong
, CountDownLatch
, Publish / Subscribe
, Bloom filter
, Remote service
, Spring cache
, Executor service
, Live Object service
, Scheduler service
) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
springboot集成Redisson
集成很简单,只需两步
pom引入依赖
dependency>
groupId>
org.redisson/groupId>
artifactId>
redisson-spring-boot-starter/artifactId>
/dependency>
application.yml增加redis配置
spring:
application:
name:test
redis:
host:127.0.0.1
port:6379
使用也很简单,只需要注入RedissonClient即可
packagecom.fandf.test.redis;
importlombok.extern.slf4j.Slf4j;
importorg.redisson.api.RLock;
importorg.redisson.api.RedissonClient;
importorg.springframework.stereotype.Component;
importjavax.annotation.Resource;
/**
*@authorfandongfeng
*/
@Component
@Slf4j
publicclassRedissonTest{
@Resource
RedissonClientredissonClient;
publicvoidtest(){
RLockrLock=redissonClient.getLock("anyKey");
//rLock.lock(10,TimeUnit.SECONDS);
rLock.lock();
try{
//dosomething
}
catch(Exceptione){
log.error("业务异常",e);
}
finally{
rLock.unlock();
}
}
}
可能不了解redisson的小伙伴会不禁发出疑问。
what?加锁时不需要加过期时间吗?这样会不会导致死锁啊。解锁不需要判断是不是自己持有吗?
哈哈,别着急,我们接下来一步步揭开redisson的面纱。
Redisson lock()源码跟踪
我们来一步步跟着lock()方法看下源码(本地redisson版本为3.20.0)
//RedissonLock.class
@Override
publicvoidlock(){
try{
lock(-1,null,false);
}
catch(InterruptedExceptione){
thrownewIllegalStateException();
}
}
查看lock(-1, null, false); 方法
privatevoidlock(longleaseTime,TimeUnitunit,booleaninterruptibly)throwsInterruptedException{
//获取当前线程id
longthreadId=Thread.currentThread().getId();
//加锁代码块,返回锁的失效时间
Longttl=tryAcquire(-1,leaseTime,unit,threadId);
//lockacquired
if(ttl==null){
return;
}
CompletableFutureRedissonLockEntry>
future=subscribe(threadId);
pubSub.timeout(future);
RedissonLockEntryentry;
if(interruptibly){
entry=commandExecutor.getInterrupted(future);
}
else{
entry=commandExecutor.get(future);
}
try{
while(true){
ttl=tryAcquire(-1,leaseTime,unit,threadId);
//lockacquired
if(ttl==null){
break;
}
//waitingformessage
if(ttl>
=0){
try{
entry.getLatch().tryAcquire(ttl,TimeUnit.MILLISECONDS);
}
catch(InterruptedExceptione){
if(interruptibly){
throwe;
}
entry.getLatch().tryAcquire(ttl,TimeUnit.MILLISECONDS);
}
}
else{
if(interruptibly){
entry.getLatch().acquire();
}
else{
entry.getLatch().acquireUninterruptibly();
}
}
}
}
finally{
unsubscribe(entry,threadId);
}
//get(lockAsync(leaseTime,unit));
}
我们看下它是怎么上锁的,也就是tryAcquire方法
privateLongtryAcquire(longwaitTime,longleaseTime,TimeUnitunit,longthreadId){
//真假加锁方法tryAcquireAsync
returnget(tryAcquireAsync(waitTime,leaseTime,unit,threadId));
}
publicRedissonLock(CommandAsyncExecutorcommandExecutor,Stringname){
super(commandExecutor,name);
this.commandExecutor=commandExecutor;
this.internalLockLeaseTime=commandExecutor.getServiceManager().getCfg().getLockWatchdogTimeout();
this.pubSub=commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}
privateT>
RFutureLong>
tryAcquireAsync(longwaitTime,longleaseTime,TimeUnitunit,longthreadId){
RFutureLong>
ttlRemainingFuture;
if(leaseTime>
0){
ttlRemainingFuture=tryLockInnerAsync(waitTime,leaseTime,unit,threadId,RedisCommands.EVAL_LONG);
}
else{
//waitTime和leaseTime都是-1,所以走这里
//过期时间internalLockLeaseTime初始化的时候赋值commandExecutor.getServiceManager().getCfg().getLockWatchdogTimeout();
//跟进去源码发现默认值是30秒,privatelonglockWatchdogTimeout=30*1000;
ttlRemainingFuture=tryLockInnerAsync(waitTime,internalLockLeaseTime,
TimeUnit.MILLISECONDS,threadId,RedisCommands.EVAL_LONG);
}
CompletionStageLong>
s=handleNoSync(threadId,ttlRemainingFuture);
ttlRemainingFuture=newCompletableFutureWrapper>
(s);
//加锁成功,开启子线程进行续约
CompletionStageLong>
f=ttlRemainingFuture.thenApply(ttlRemaining->
{
//lockacquired
if(ttlRemaining==null){
if(leaseTime>
0){
//如果指定了过期时间,则不续约
internalLockLeaseTime=unit.toMillis(leaseTime);
}
else{
//没指定过期时间,或者小于0,在这里实现锁自动续约
scheduleExpirationRenewal(threadId);
}
}
returnttlRemaining;
}
);
returnnewCompletableFutureWrapper>
(f);
}
上面代码里面包含加锁和锁续约的逻辑,我们先来看看加锁的代码
T>
RFutureT>
tryLockInnerAsync(longwaitTime,longleaseTime,TimeUnitunit,longthreadId,RedisStrictCommandT>
command){
returnevalWriteAsync(getRawName(),LongCodec.INSTANCE,command,
"if((redis.call('exists',KEYS[1])==0)"+
"or(redis.call('hexists',KEYS[1],ARGV[2])==1))then"+
"redis.call('hincrby',KEYS[1],ARGV[2],1);
"+
"redis.call('pexpire',KEYS[1],ARGV[1]);
"+
"returnnil;
"+
"end;
"+
"returnredis.call('pttl',KEYS[1]);
",
Collections.singletonList(getRawName()),unit.toMillis(leaseTime),getLockName(threadId));
}
这里就看的很明白了吧,redisson使用了lua脚本来保证了命令的原子性。
redis.call('hexists', KEYS[1], ARGV[2]) 查看 key value 是否存在。
Redis Hexists 命令用于查看哈希表的指定字段是否存在。
如果哈希表含有给定字段,返回 1 。 如果哈希表不含有给定字段,或 key 不存在,返回 0 。
127.0.0.1:6379>
hexists123uuid
(integer)0
127.0.0.1:6379>
hincrby123uuid1
(integer)1
127.0.0.1:6379>
hincrby123uuid1
(integer)2
127.0.0.1:6379>
hincrby123uuid1
(integer)3
127.0.0.1:6379>
hexists123uuid
(integer)1
127.0.0.1:6379>
hgetall123
1)"uuid"
2)"3"
127.0.0.1:6379>
当key不存在,或者已经含有给定字段(也就是已经加过锁了,这里是为了实现重入性),直接对字段的值+1
这个字段的值,也就是ARGV[2], 取得是getLockName(threadId)方法,我们再看看这个字段的值是什么
protectedStringgetLockName(longthreadId){
returnid+":"+threadId;
}
publicRedissonBaseLock(CommandAsyncExecutorcommandExecutor,Stringname){
super(commandExecutor,name);
this.commandExecutor=commandExecutor;
this.id=commandExecutor.getServiceManager().getId();
this.internalLockLeaseTime=commandExecutor.getServiceManager().getCfg().getLockWatchdogTimeout();
this.entryName=id+":"+name;
}
//commandExecutor.getServiceManager()的id默认值
privatefinalStringid=UUID.randomUUID().toString();
这里就明白了,字段名称是 uuid + : + threadId
接下来我们看看锁续约的代码scheduleExpirationRenewal(threadId);
protectedvoidscheduleExpirationRenewal(longthreadId){
ExpirationEntryentry=newExpirationEntry();
//判断该实例是否加过锁
ExpirationEntryoldEntry=EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(),entry);
if(oldEntry!=null){
//重入次数+1
oldEntry.addThreadId(threadId);
}
else{
//第一次加锁
entry.addThreadId(threadId);
try{
//锁续约核心代码
renewExpiration();
}
finally{
if(Thread.currentThread().isInterrupted()){
//如果线程异常终止,则关闭锁续约线程
cancelExpirationRenewal(threadId);
}
}
}
}
我们看看renewExpiration()方法
privatevoidrenewExpiration(){
ExpirationEntryee=EXPIRATION_RENEWAL_MAP.get(getEntryName());
if(ee==null){
return;
}
//新建一个线程执行
Timeouttask=commandExecutor.getServiceManager().newTimeout(newTimerTask(){
@Override
publicvoidrun(Timeouttimeout)throwsException{
ExpirationEntryent=EXPIRATION_RENEWAL_MAP.get(getEntryName());
if(ent==null){
return;
}
LongthreadId=ent.getFirstThreadId();
if(threadId==null){
return;
}
//设置锁过期时间为30秒
CompletionStageBoolean>
future=renewExpirationAsync(threadId);
future.whenComplete((res,e)->
{
if(e!=null){
log.error("Can'tupdatelock{
}
expiration",getRawName(),e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
//检查锁是还否存在
if(res){
//rescheduleitself10后调用自己
renewExpiration();
}
else{
//关闭续约
cancelExpirationRenewal(null);
}
}
);
}
}
,internalLockLeaseTime/3,TimeUnit.MILLISECONDS);
//注意上行代码internalLockLeaseTime/3,
//internalLockLeaseTime默认30s,那么也就是10s检查一次
ee.setTimeout(task);
}
//设置锁过期时间为internalLockLeaseTime也就是30slua脚本保证原子性
protectedCompletionStageBoolean>
renewExpirationAsync(longthreadId){
returnevalWriteAsync(getRawName(),LongCodec.INSTANCE,RedisCommands.EVAL_BOOLEAN,
"if(redis.call('hexists',KEYS[1],ARGV[2])==1)then"+
"redis.call('pexpire',KEYS[1],ARGV[1]);
"+
"return1;
"+
"end;
"+
"return0;
",
Collections.singletonList(getRawName()),
internalLockLeaseTime,getLockName(threadId));
}
OK,分析到这里我们已经知道了,lock(),方法会默认加30秒过期时间,并且开启一个新线程,每隔10秒检查一下,锁是否释放,如果没释放,就将锁过期时间设置为30秒,如果锁已经释放,那么就将这个新线程也关掉。
我们写个测试类看看
packagecom.fandf.test.redis;
importorg.junit.jupiter.api.Test;
importorg.redisson.api.RLock;
importorg.redisson.api.RedissonClient;
importorg.springframework.boot.test.context.SpringBootTest;
importjavax.annotation.Resource;
/**
*@Description:
*@author:fandongfeng
*@date:2023-3-2416:45
*/
@SpringBootTest
classRedissonTest{
@Resource
privateRedissonClientredisson;
@Test
publicvoidwatchDog()throwsInterruptedException{
RLocklock=redisson.getLock("123");
lock.lock();
Thread.sleep(1000000);
}
}
查看锁的过期时间,及是否续约
127.0.0.1:6379>
keys*
1)"123"
127.0.0.1:6379>
ttl123
(integer)30
127.0.0.1:6379>
ttl123
(integer)26
127.0.0.1:6379>
ttl123
(integer)24
127.0.0.1:6379>
ttl123
(integer)22
127.0.0.1:6379>
ttl123
(integer)21
127.0.0.1:6379>
ttl123
(integer)20
127.0.0.1:6379>
ttl123
(integer)30
127.0.0.1:6379>
ttl123
(integer)28
127.0.0.1:6379>
我们再改改代码,看看是否可重入和字段名称是否和我们预期一致
packagecom.fandf.test.redis;
importorg.junit.jupiter.api.Test;
importorg.redisson.api.RLock;
importorg.redisson.api.RedissonClient;
importorg.springframework.boot.test.context.SpringBootTest;
importjavax.annotation.Resource;
/**
*@Description:
*@author:fandongfeng
*@date:2023-3-2416:45
*/
@SpringBootTest
classRedissonTest{
@Resource
privateRedissonClientredisson;
@Test
publicvoidwatchDog()throwsInterruptedException{
RLocklock=redisson.getLock("123");
lock.lock();
lock.lock();
lock.lock();
//加了三次锁,此时重入次数为3
Thread.sleep(3000);
//解锁一次,此时重入次数变为3
lock.unlock();
Thread.sleep(1000000);
}
}
127.0.0.1:6379>
keys*
1)"123"
127.0.0.1:6379>
127.0.0.1:6379>
ttl123
(integer)24
127.0.0.1:6379>
hgetall123
1)"df7f4c71-b57b-455f-acee-936ad8475e01:12"
2)"3"
127.0.0.1:6379>
127.0.0.1:6379>
hgetall123
1)"df7f4c71-b57b-455f-acee-936ad8475e01:12"
2)"2"
127.0.0.1:6379>
我们加锁了三次,重入次数是3,字段值也是 uuid+:+threadId,和我们预期结果是一致的。
Redlock算法
redisson是基于Redlock算法实现的,那么什么是Redlock算法呢?
假设当前集群有5个节点,那么运行redlock算法的客户端会一次执行下面步骤
1.客户端记录当前系统时间,以毫秒为单位
2.依次尝试从5个redis实例中,使用相同key获取锁
当redis请求获取锁时,客户端会设置一个网络连接和响应超时时间,避免因为网络故障等原因导致阻塞。3.客户端使用当前时间减去开始获取锁时间(步骤1的时间),得到获取锁消耗的时间
只有当半数以上redis节点加锁成功,并且加锁消耗的时间要小于锁失效时间,才算锁获取成功4.如果获取到了锁,key的真正有效时间等于锁失效时间 减去 获取锁消耗的时间
5.如果获取锁失败,所有的redis实例都会进行解锁
防止因为服务端响应消息丢失,但是实际数据又添加成功导致数据不一致问题
这里有下面几个点需要注意:
1.我们都知道单机的redis是cp的,但是集群情况下redis是ap的,所以运行Redisson的节点必须是主节点,不能有从节点,防止主节点加锁成功未同步从节点就宕机,而客户端却收到加锁成功,导致数据不一致问题。
2.为了提高redis节点宕机的容错率,可以使用公式2N(n指宕机数量)+1,假设宕机一台,Redisson还要继续运行,那么至少要部署2*1+1=3台主节点。
到此,相信大家对“redis中的分布式锁有哪些特点”有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
声明:本文内容由网友自发贡献,本站不承担相应法律责任。对本内容有异议或投诉,请联系2913721942#qq.com核实处理,我们将尽快回复您,谢谢合作!
若转载请注明出处: redis中的分布式锁有哪些特点
本文地址: https://pptw.com/jishu/3191.html