Redisson特性
可重入源码
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
/**
* @author Dee
* @date 2022/11/5
* <p>Description:
*/
@Slf4j
@SpringBootTest
public class RedissonTest {
@Resource
private RedissonClient redissonClient;
@Test
void testRedisson() throws InterruptedException {
//获取锁(可重入), 指定锁的名称
RLock lock = redissonClient.getLock("anyLock");
//尝试获取锁, 参数: 获取锁的最大等待时间(期间会重试), 锁自动释放时间, 时间单位
boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
//判断释放获取成功
if (isLock){
try {
System.out.println("执行业务");
}finally {
lock.unlock();
}
}
}
}
进入 redisson 源码
RLock lock = redissonClient.getLock("anyLock");
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
// 转换时间单位为毫秒
long time = unit.toMillis(waitTime);
// 获取当前时间
long current = System.currentTimeMillis();
// 获取当前线程
long threadId = Thread.currentThread().getId();
// 尝试获取锁(跟入,见下) 如果成功则是null,如果失败则获取到剩余有效期
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
// 等待时间-获取锁消耗时间 = 剩余时间
time -= System.currentTimeMillis() - current;
// 剩余时间没了,那么直接返回失败
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
current = System.currentTimeMillis();
// 订阅别人释放锁的信号
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
// 此处并不是无限制等待,而是等待锁剩余的时间
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
// 取消订阅
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
// 返回获取锁失败
return false;
}
// 还有剩余时间,那就尝试获取锁
try {
// 再次计算剩余时间
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
while (true) {
long currentTime = System.currentTimeMillis();
// 开始重试获取锁
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// waiting for message
currentTime = System.currentTimeMillis();
// redis锁剩余时间小于获取锁的剩余时间,就没必要等剩余等待时间,直接在锁超时的时候再次抢锁即可
if (ttl >= 0 && ttl < time) {
// 此处采用信号量的方式获取订阅
subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
// 用剩余等待时间作为订阅时间
subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
unsubscribe(subscribeFuture, threadId);
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
// 通过上述注释可以看出redisson解决了锁重试的问题
}
WatchDog机制源码
tryAcquireAsync()
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 此处会查找看门狗超时时间,(继续跟入)
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
// 当回调函数成功之后 ttlRemaining:剩余有效期;e:异常
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
// 获取锁成功
if (ttlRemaining == null) {
// 计划到期续订(跟踪源码)
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
tryLockInnerAsync() 以 lua 脚本方式 获取锁
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
// 如果获取锁成功,则返回nil;如果失败,则拿回剩余有效期
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// pttl 与ttl类似,只是时间单位是毫秒
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
scheduleExpirationRenewal() 计划到期续订
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
// 此处使用的是ConcurrentHashMap, 如果map中不存在, 那就将这把锁放入,并且返回null, 如果存在, 则返回锁的标识
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
// 加入线程id
entry.addThreadId(threadId);
// 更新有效期(跟入)
renewExpiration();
}
}
renewExpiration() 更新有效期
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// 刷新有效期(跟入)
RFuture<Boolean> future = renewExpirationAsync(threadId);
// 如果完成之后
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
// reschedule itself
// 此处是一个递归, 完成后再次刷新有效期
renewExpiration();
}
});
}
// 默认是 30/3=10秒
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
renewExpirationAsync() 刷新有效期
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
释放锁
@Override
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
// Future<Void> future = unlockAsync();
// future.awaitUninterruptibly();
// if (future.isSuccess()) {
// return;
// }
// if (future.cause() instanceof IllegalMonitorStateException) {
// throw (IllegalMonitorStateException)future.cause();
// }
// throw commandExecutor.convertException(future);
}
@Override
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<Void>();
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
// 取消到期时间续期(跟入)
cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
return;
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
result.trySuccess(null);
});
return result;
}
void cancelExpirationRenewal(Long threadId) {
ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (task == null) {
return;
}
if (threadId != null) {
task.removeThreadId(threadId);
}
if (threadId == null || task.hasNoThreads()) {
Timeout timeout = task.getTimeout();
if (timeout != null) {
timeout.cancel();
}
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
}
}