玩命加载中🤣🤣🤣

Redisson特性


Redisson特性

可重入源码

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;

/**
 * @author Dee
 * @date 2022/11/5
 * <p>Description:
 */
@Slf4j
@SpringBootTest
public class RedissonTest {

    @Resource
    private RedissonClient redissonClient;

    @Test
    void testRedisson() throws InterruptedException {
        //获取锁(可重入), 指定锁的名称
        RLock lock = redissonClient.getLock("anyLock");
        //尝试获取锁, 参数: 获取锁的最大等待时间(期间会重试), 锁自动释放时间, 时间单位
        boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
        //判断释放获取成功
        if (isLock){
            try {
                System.out.println("执行业务");
            }finally {
                lock.unlock();
            }
        }

    }
}

进入 redisson 源码

RLock lock = redissonClient.getLock("anyLock");

@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    // 转换时间单位为毫秒
    long time = unit.toMillis(waitTime);
    // 获取当前时间
    long current = System.currentTimeMillis();
    // 获取当前线程
    long threadId = Thread.currentThread().getId();
    // 尝试获取锁(跟入,见下) 如果成功则是null,如果失败则获取到剩余有效期
    Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
        return true;
    }

    // 等待时间-获取锁消耗时间 = 剩余时间
    time -= System.currentTimeMillis() - current;
    // 剩余时间没了,那么直接返回失败
    if (time <= 0) {
        acquireFailed(waitTime, unit, threadId);
        return false;
    }

    current = System.currentTimeMillis();
    // 订阅别人释放锁的信号
    RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
    // 此处并不是无限制等待,而是等待锁剩余的时间
    if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
        if (!subscribeFuture.cancel(false)) {
            subscribeFuture.onComplete((res, e) -> {
                if (e == null) {
                    // 取消订阅
                    unsubscribe(subscribeFuture, threadId);
                }
            });
        }
        acquireFailed(waitTime, unit, threadId);
        // 返回获取锁失败
        return false;
    }
	// 还有剩余时间,那就尝试获取锁
    try {
        // 再次计算剩余时间
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(waitTime, unit, threadId);
            return false;
        }

        while (true) {
            long currentTime = System.currentTimeMillis();
            // 开始重试获取锁
            ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                return true;
            }

            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }

            // waiting for message
            currentTime = System.currentTimeMillis();
            // redis锁剩余时间小于获取锁的剩余时间,就没必要等剩余等待时间,直接在锁超时的时候再次抢锁即可
            if (ttl >= 0 && ttl < time) {
                // 此处采用信号量的方式获取订阅
                subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                // 用剩余等待时间作为订阅时间
                subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
            }

            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
        }
    } finally {
        unsubscribe(subscribeFuture, threadId);
    }
    //        return get(tryLockAsync(waitTime, leaseTime, unit));
    // 通过上述注释可以看出redisson解决了锁重试的问题
}

WatchDog机制源码

tryAcquireAsync()

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    if (leaseTime != -1) {
        return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    // 此处会查找看门狗超时时间,(继续跟入)
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
                                          commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
                                                         TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    // 当回调函数成功之后 ttlRemaining:剩余有效期;e:异常
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            return;
        }

        // lock acquired
        // 获取锁成功
        if (ttlRemaining == null) {
            // 计划到期续订(跟踪源码)
            scheduleExpirationRenewal(threadId);
        }
    });
    return ttlRemainingFuture;
}

tryLockInnerAsync() 以 lua 脚本方式 获取锁

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);
	// 如果获取锁成功,则返回nil;如果失败,则拿回剩余有效期
    return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                          "if (redis.call('exists', KEYS[1]) == 0) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                          "end; " +
                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                          "end; " +
                          // pttl 与ttl类似,只是时间单位是毫秒
                          "return redis.call('pttl', KEYS[1]);",
                          Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

scheduleExpirationRenewal() 计划到期续订

private void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    // 此处使用的是ConcurrentHashMap, 如果map中不存在, 那就将这把锁放入,并且返回null, 如果存在, 则返回锁的标识
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        // 加入线程id
        entry.addThreadId(threadId);
        // 更新有效期(跟入)
        renewExpiration();
    }
}

renewExpiration() 更新有效期

private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }

    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
			// 刷新有效期(跟入)
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            // 如果完成之后
            future.onComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getName() + " expiration", e);
                    return;
                }

                if (res) {
                    // reschedule itself
                    // 此处是一个递归, 完成后再次刷新有效期
                    renewExpiration();
                }
            });
        }
        // 默认是 30/3=10秒
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

    ee.setTimeout(task);
}

renewExpirationAsync() 刷新有效期

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return 1; " +
                          "end; " +
                          "return 0;",
                          Collections.singletonList(getName()),
                          internalLockLeaseTime, getLockName(threadId));
}

释放锁

@Override
public void unlock() {
    try {
        get(unlockAsync(Thread.currentThread().getId()));
    } catch (RedisException e) {
        if (e.getCause() instanceof IllegalMonitorStateException) {
            throw (IllegalMonitorStateException) e.getCause();
        } else {
            throw e;
        }
    }

    //        Future<Void> future = unlockAsync();
    //        future.awaitUninterruptibly();
    //        if (future.isSuccess()) {
    //            return;
    //        }
    //        if (future.cause() instanceof IllegalMonitorStateException) {
    //            throw (IllegalMonitorStateException)future.cause();
    //        }
    //        throw commandExecutor.convertException(future);
}
@Override
public RFuture<Void> unlockAsync(long threadId) {
    RPromise<Void> result = new RedissonPromise<Void>();
    RFuture<Boolean> future = unlockInnerAsync(threadId);

    future.onComplete((opStatus, e) -> {
        // 取消到期时间续期(跟入)
        cancelExpirationRenewal(threadId);

        if (e != null) {
            result.tryFailure(e);
            return;
        }

        if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                                                                                  + id + " thread-id: " + threadId);
            result.tryFailure(cause);
            return;
        }

        result.trySuccess(null);
    });

    return result;
}
void cancelExpirationRenewal(Long threadId) {
    ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (task == null) {
        return;
    }

    if (threadId != null) {
        task.removeThreadId(threadId);
    }

    if (threadId == null || task.hasNoThreads()) {
        Timeout timeout = task.getTimeout();
        if (timeout != null) {
            timeout.cancel();
        }
        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
    }
}

Redisson分布式锁原理图解


文章作者: 👑Dee👑
版权声明: 本博客所有文章除特別声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来源 👑Dee👑 !
  目录