好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Redisson分布式锁源码解析

redisson锁继承implements reentrant lock,所以具备 reentrant lock 锁中的一些特性:超时,重试,可中断等。加上redisson中redis具备分布式的特性,所以非常适合用来做java中的 分布式锁 。 下面我们对其加锁、解锁过程中的源码细节进行一一分析。

锁的接口定义了一下方法:

分布式锁当中加锁,我们常用的加锁接口:

boolean trylock(long waittime, long leasetime, timeunit unit) throws interruptedexception;

下面我们来看一下方法的具体实现:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

public boolean trylock( long waittime, long leasetime, timeunit unit) throws interruptedexception {

   long time = unit.tomillis(waittime);

   long current = system.currenttimemillis();

   final long threadid = thread.currentthread().getid();

   long ttl = tryacquire(leasetime, unit, threadid);

   // lock acquired

   if (ttl == null ) {

    return true ;

   }

  

   time -= (system.currenttimemillis() - current);

   if (time <= 0 ) {

    acquirefailed(threadid);

    return false ;

   }

  

   current = system.currenttimemillis();

   final rfuture subscribefuture = subscribe(threadid);

   if (!await(subscribefuture, time, timeunit.milliseconds)) {

    if (!subscribefuture.cancel( false )) {

     subscribefuture.addlistener( new futurelistener() {

      @override

      public void operationcomplete(future future) throws exception {

       if (subscribefuture.issuccess()) {

        unsubscribe(subscribefuture, threadid);

       }

      }

     });

    }

    acquirefailed(threadid);

    return false ;

   }

 

   try {

    time -= (system.currenttimemillis() - current);

    if (time <= 0 ) {

     acquirefailed(threadid);

     return false ;

    }

  

    while ( true ) {

     long currenttime = system.currenttimemillis();

     ttl = tryacquire(leasetime, unit, threadid);

     // lock acquired

     if (ttl == null ) {

      return true ;

     }

 

     time -= (system.currenttimemillis() - currenttime);

     if (time = 0 && ttl < time) {

      getentry(threadid).getlatch().tryacquire(ttl, timeunit.milliseconds);

     } else {

      getentry(threadid).getlatch().tryacquire(time, timeunit.milliseconds);

     }

 

     time -= (system.currenttimemillis() - currenttime);

     if (time <= 0 ) {

      acquirefailed(threadid);

      return false ;

     }

    }

   } finally {

    unsubscribe(subscribefuture, threadid);

   }

//  return get(trylockasync(waittime, leasetime, unit));

  }

首先我们看到调用tryacquire尝试获取锁,在这里是否能获取到锁,是根据锁名称的过期时间ttl来判定的(ttl

下面我们接着看一下tryacquire的实现:

?

1

2

3

private long tryacquire( long leasetime, timeunit unit, long threadid) {

  return get(tryacquireasync(leasetime, unit, threadid));

}

可以看到真正获取锁的操作经过一层get操作里面执行的,这里为何要这么操作,本人也不是太理解,如有理解错误,欢迎指正。

?

1

get 是由commandasyncexecutor(一个线程executor)封装的一个executor

设置一个单线程的同步控制器countdownlatch,用于控制单个线程的中断信息。个人理解经过中间的这么一步:主要是为了支持线程可中断操作。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

public v get(rfuture future) {

  if (!future.isdone()) {

   final countdownlatch l = new countdownlatch( 1 );

   future.addlistener( new futurelistener() {

    @override

    public void operationcomplete(future future) throws exception {

     l.countdown();

    }

   });

  

   boolean interrupted = false ;

   while (!future.isdone()) {

    try {

     l.await();

    } catch (interruptedexception e) {

     interrupted = true ;

    }

   }

  

   if (interrupted) {

    thread.currentthread().interrupt();

   }

  }

 

  // commented out due to blocking issues up to 200 ms per minute for each thread:由于每个线程的阻塞问题,每分钟高达200毫秒

  // future.awaituninterruptibly();

  if (future.issuccess()) {

   return future.getnow();

  }

 

  throw convertexception(future);

}

我们进一步往下看:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

private rfuture tryacquireasync( long leasetime, timeunit unit, final long threadid) {

  if (leasetime != - 1 ) {

   return trylockinnerasync(leasetime, unit, threadid, rediscommands.eval_long);

  }

  rfuture ttlremainingfuture = trylockinnerasync(commandexecutor.getconnectionmanager().getcfg().getlockwatchdogtimeout(), timeunit.milliseconds, threadid, rediscommands.eval_long);

  ttlremainingfuture.addlistener( new futurelistener() {

   @override

   public void operationcomplete(future future) throws exception {

    if (!future.issuccess()) {

     return ;

    }

 

    long ttlremaining = future.getnow();

    // lock acquired

    if (ttlremaining == null ) {

     scheduleexpirationrenewal(threadid);

    }

   }

  });

  return ttlremainingfuture;

}

首先判断锁是否有超时时间,有过期时间的话,会在后面获取锁的时候设置进去。没有过期时间的话,则会用默认的

?

1

private long lockwatchdogtimeout = 30 * 1000 ;

下面我们在进一步往下分析真正获取锁的操作:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

rfuture trylockinnerasync( long leasetime, timeunit unit, long threadid, redisstrictcommand command) {

  internallockleasetime = unit.tomillis(leasetime);

 

  return commandexecutor.evalwriteasync(getname(), longcodec.instance, command,

     "if (redis.call('exists', keys[1]) == 0) then " +

      "redis.call('hset', keys[1], argv[2], 1); " +

      "redis.call('pexpire', keys[1], argv[1]); " +

      "return nil; " +

     "end; " +

     "if (redis.call('hexists', keys[1], argv[2]) == 1) then " +

      "redis.call('hincrby', keys[1], argv[2], 1); " +

      "redis.call('pexpire', keys[1], argv[1]); " +

      "return nil; " +

     "end; " +

     "return redis.call('pttl', keys[1]);" ,

     collections.singletonlist(getname()), internallockleasetime, getlockname(threadid));

}

我把里面的重点信息做了以下三点总结:

1:真正执行的是一段具有原子性的lua脚本,并且最终也是由commandasynexecutor去执行。

2:锁真正持久化到redis时,用的hash类型key field value

3:获取锁的三个参数:getname()是逻辑锁名称,例如:分布式锁要锁住的methodname+params;internallockleasetime是毫秒单位的锁过期时间;getlockname则是锁对应的线程级别的名称,因为支持相同线程可重入,不同线程不可重入,所以这里的锁的生成方式是:uuid+":"threadid。有的同学可能会问,这样不是很缜密:不同的jvm可能会生成相同的threadid,所以redission这里加了一个区分度很高的uuid;

lua脚本中的执行分为以下三步:

1:exists检查redis中是否存在锁名称;如果不存在,则获取成功;同时把逻辑锁名称keys[1],线程级别的锁名称[argv[2],value=1,设置到redis。并设置逻辑锁名称的过期时间argv[2],返回;

2:如果检查到存在keys[1],[argv[2],则说明获取成功,此时会自增对应的value值,记录重入次数;并更新锁的过期时间

3:key不存,直接返回key的剩余过期时间(-2)

原文链接:https://www.roncoo.com/article/detail/133572

查看更多关于Redisson分布式锁源码解析的详细内容...

  阅读:44次