时间:2023-03-17来源:系统城装机大师作者:佚名
2023的金三银四来的没想象中那么激烈,一个朋友前段时间投了几十家,多数石沉大海,好不容易等来面试机会,就恰好被问道项目中关于分布式锁的应用,后涉及Redisson实现分布式锁的原理,答不上来。
我们都知道,Java中synchronized和lock都支持可重入,synchronized的锁关联一个线程持有者和一个计数器。当一个线程请求成功后,JVM会记下持有锁的线程,并将计数器计为1。此时其他线程请求该锁,则必须等待。而该持有锁的线程如果再次请求这个锁,就可以再次拿到这个锁,同时计数器会递增。当线程退出一个synchronized方法/块时,计数器会递减,如果计数器为0则释放该锁;在ReentrantLock中,底层的 AQS 对应的state 同步状态值表示线程获取该锁的可重入次数,通过CAS方式进行设置,在默认情况下,state的值为0 表示当前锁没有被任何线程持有,原理类似。所以如果想要实现可重入性,可能须有一个计数器来控制重入次数,实际Redisson确实是这么做的。
好的我们通过Redisson客户端进行设置,并循环3次,模拟锁重入:000
1 2 3 |
for ( int i = 0 ; i < 3 ; i++) { RedissonLockUtil.tryLock( "distributed:lock:distribute_key" , TimeUnit.SECONDS, 20 , 100 ); } |
连接Redis客户端进行查看:
可以看到,我们设置的分布式锁是存在一个hash结构中,value看起来是循环的次数3,key就不怎么认识了,那这个key是怎么设置进去的呢,另外为什么要设置成为Hash类型呢?
我们先来看看普通的分布式锁的上锁流程:
说明:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
public boolean tryLock( long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); Long ttl = this .tryAcquire(waitTime, leaseTime, unit, threadId); if (ttl == null ) { return true ; } else { // 订阅分布式Key对应的消息,监听其它锁持有者释放,锁没有释放的时候则会等待,直到锁释放的时候会执行下面的while循环 CompletableFuture subscribeFuture = this .subscribe(threadId); subscribeFuture.get(time, TimeUnit.MILLISECONDS); try { do { // 尝试获取锁 ttl = this .tryAcquire(waitTime, leaseTime, unit, threadId); // 竞争获取锁成功,退出循环,不再竞争。 if (ttl == null ) { return true ; } // 利用信号量机制阻塞当前线程相应时间,之后再重新获取锁 if (ttl >= 0L && ttl < time) { ((RedissonLockEntry) this .commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { ((RedissonLockEntry) this .commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() - currentTime; } while (time > 0L); } finally { // 竞争锁成功后,取消订阅该线程Id事件 this .unsubscribe((RedissonLockEntry) this .commandExecutor.getNow(subscribeFuture), threadId); } } } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
RFuture<Long> tryAcquireAsync( long leaseTime, TimeUnit unit, final long threadId) { // 如果设置了持有锁的时长,直接进行尝试加锁操作 if (leaseTime != -1L) { return this .tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { // 未设置加锁时长,在加锁成功后,启动续期任务,初始默认持有锁时间是30s RFuture<Long> ttlRemainingFuture = this .tryLockInnerAsync( this .commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.addListener( new FutureListener<Long>() { public void operationComplete(Future<Long> future) throws Exception { if (future.isSuccess()) { Long ttlRemaining = (Long)future.getNow(); if (ttlRemaining == null ) { RedissonLock. this .scheduleExpirationRenewal(threadId); } } } }); return ttlRemainingFuture; } } |
我们都知道Redis执行Lua脚本具有原子性,所以在尝试加锁的下层,Redis主要执行了一段复杂的lua脚本:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
-- 不存在该key时 if (redis.call( 'exists' , KEYS[ 1 ]) == 0 ) then -- 新增该锁并且hash中该线程id对应的count置 1 redis.call( 'hincrby' , KEYS[ 1 ], ARGV[ 2 ], 1 ); -- 设置过期时间 redis.call( 'pexpire' , KEYS[ 1 ], ARGV[ 1 ]); return nil; end; -- 存在该key 并且 hash中线程id的key也存在 if (redis.call( 'hexists' , KEYS[ 1 ], ARGV[ 2 ]) == 1 ) then -- 线程重入次数++ redis.call( 'hincrby' , KEYS[ 1 ], ARGV[ 2 ], 1 ); redis.call( 'pexpire' , KEYS[ 1 ], ARGV[ 1 ]); return nil; end; return redis.call( 'pttl' , KEYS[ 1 ]); |
参数说明:
KEYS[1]:对应我们设置的分布式key,即:distributed:lock:distribute_key
ARGV[1]:业务自定义的加锁时长或者默认的30s;
ARGV[2]: 具体的客户端初始化连接UUID+线程ID: 9d8f0907-1165-47d2-8983-1e130b07ad0c:1
我们从上面的脚本中可以看出核心逻辑其实不难:
接下来看看scheduleExpirationRenewal续命是怎么做的呢?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
private void scheduleExpirationRenewal( final long threadId) { if (!expirationRenewalMap.containsKey( this .getEntryName())) { Timeout task = this .commandExecutor.getConnectionManager().newTimeout( new TimerTask() { public void run(Timeout timeout) throws Exception { // 执行续命操作 RFuture<Boolean> future = RedissonLock. this .renewExpirationAsync(threadId); future.addListener( new FutureListener<Boolean>() { public void operationComplete(Future<Boolean> future) throws Exception { RedissonLock.expirationRenewalMap.remove(RedissonLock. this .getEntryName()); ... // 续命成功,继续 if ((Boolean)future.getNow()) { RedissonLock. this .scheduleExpirationRenewal(threadId); } } }); } }, this .internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS); } } |
Tip小知识点:
而在上面的renewExpirationAsync中续命操作的执行核心Lua脚本要做的事情也非常的简单,就是给这个Key的过期时间重新设置为指定的30s.
1 2 3 4 5 |
if (redis.call( 'hexists' , KEYS[ 1 ], ARGV[ 2 ]) == 1 ) then redis.call( 'pexpire' , KEYS[ 1 ], ARGV[ 1 ]); return 1 ; end; return 0 ; |
释放锁主要是除了解锁本省,另外还要考虑到如果存在续期的情况,要将续期任务删除:
1 2 3 4 5 6 7 8 9 10 |
public RFuture<Void> unlockAsync( long threadId) { // 解锁 RFuture<Boolean> future = this .unlockInnerAsync(threadId); CompletionStage<Void> f = future.handle((opStatus, e) -> { // 解除续期 this .cancelExpirationRenewal(threadId); ... }); return new CompletableFutureWrapper(f); } |
在unlockInnerAsync内部,Redisson释放锁其实核心也是执行了如下一段核心Lua脚本:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
// 校验是否存在 if (redis.call( 'hexists' , KEYS[ 1 ], ARGV[ 3 ]) == 0 ) then return nil; end; // 获取加锁次数,校验是否为重入锁 local counter = redis.call( 'hincrby' , KEYS[ 1 ], ARGV[ 3 ], - 1 ); // 如果为重入锁,重置过期时间,锁本身不释放 if (counter > 0 ) then redis.call( 'pexpire' , KEYS[ 1 ], ARGV[ 2 ]); return 0 ; // 删除Key else redis.call( 'del' , KEYS[ 1 ]); // 通知阻塞的客户端可以抢锁啦 redis.call( 'publish' , KEYS[ 2 ], ARGV[ 1 ]); return 1 ; end; return nil; |
其中:
KEYS[1]: 分布式锁
KEYS[2]: redisson_lock_channel:{分布式锁} 发布订阅消息的管道名称
ARGV[1]: 发布的消息内容
ARGV[2]: 锁的过期时间
ARGV[3]: 线程ID标识名称
其它问题
2023-11-01
React中immutable的使用2023-11-01
命令行清除Redis缓存的实现2023-11-01
Redis缓存空间优化实践详解引言大厂很多项目都是部署到多台服务器上,这些服务器在各个地区都存在,当我们访问服务时虽然执行的是同一个服务,但是可能是不同服务器运行的;在我学习项目时遇到这样一个登录情...
2023-11-01
1.多次修改一个redis的String过期键,如何保证他仍然能保留第一次设置时的删除时间 2.修改hash、set、Zset、list的值,会使过期时间重置吗?...
2023-11-01