java自学教程|www.konglongmei.com

作者: 简单350
查看: 40|回复: 0

more +社区更新Forums

more +随机图赏Gallery

[技术知识] 源码分析— java读写锁ReentrantReadWriteLock

[技术知识] 源码分析— java读写锁ReentrantReadWriteLock

[复制链接]
简单350 | 显示全部楼层 发表于: 2019-12-3 04:35:44
简单350 发表于: 2019-12-3 04:35:44 | 显示全部楼层 |阅读模式
查看: 40|回复: 0

你还没有注册,无法下载本站所有资源,请立即注册!

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
前言

今天看Jraft的时候发现了很多地方都用到了读写锁,所以心血来潮想要分析以下读写锁是怎么实现的。
先上一个doc里面的例子:
  1. class CachedData {  Object data;  volatile boolean cacheValid;  final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();  void processCachedData() {      //加上一个读锁    rwl.readLock().lock();    if (!cacheValid) {      // Must release read lock before acquiring write lock      //必须在加写锁之前释放读锁      rwl.readLock().unlock();      rwl.writeLock().lock();      try {        // Recheck state because another thread might have        // acquired write lock and changed state before we did.          //双重检查        if (!cacheValid) {            //设置值          data = ...          cacheValid = true;        }        // Downgrade by acquiring read lock before releasing write lock          //锁降级,反之则不行        rwl.readLock().lock();      } finally {          //释放写锁,但是仍然持有写锁        rwl.writeLock().unlock(); // Unlock write, still hold read      }    }    try {      use(data);    } finally {        //释放读锁      rwl.readLock().unlock();    }  }}}
复制代码
我们一般实例化一个ReentrantReadWriteLock,一般是调用空的构造器创建,所以默认使用的是非公平锁
  1. public ReentrantReadWriteLock() {    this(false);}public ReentrantReadWriteLock(boolean fair) {      //默认使用的是NonfairSync    sync = fair ? new FairSync() : new NonfairSync();    readerLock = new ReadLock(this);    writerLock = new WriteLock(this);}//分别调用writeLock和readLock会返回读写锁实例public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }
复制代码
ReentrantReadWriteLock内部类Sync
  1. abstract static class Sync extends AbstractQueuedSynchronizer {    private static final long serialVersionUID = 6317671515068378041L;      //位移量      //在读写锁中,state是一个32位的int,所以用state的高16位表示读锁,用低16位表示写锁    static final int SHARED_SHIFT   = 16;      //因为读锁是高16位,所以用1向左移动16位表示读锁每次锁状态变化的量    static final int SHARED_UNIT    = (1  SHARED_SHIFT; }    //获取低16位写锁state次数,重入次数    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }    //用来记录每个线程持有的读锁数量    static final class HoldCounter {        int count = 0;        // Use id, not reference, to avoid garbage retention        final long tid = getThreadId(Thread.currentThread());    }        static final class ThreadLocalHoldCounter        extends ThreadLocal {        public HoldCounter initialValue() {            return new HoldCounter();        }    }    private transient ThreadLocalHoldCounter readHolds;      // 用于缓存,记录"最后一个获取读锁的线程"的读锁重入次数    private transient HoldCounter cachedHoldCounter;      // 第一个获取读锁的线程(并且其未释放读锁),以及它持有的读锁数量    private transient Thread firstReader = null;    private transient int firstReaderHoldCount;    Sync() {          // 初始化 readHolds 这个 ThreadLocal 属性        readHolds = new ThreadLocalHoldCounter();        setState(getState()); // ensures visibility of readHolds    }    ....}
复制代码

  • 因为int是32位的,所以在ReentrantReadWriteLock中将state分为两部分,高16位作为读锁的状态控制器,低16位作为写锁的状态控制器。
  • 每次要获取读锁的当前状态都需要调用sharedCount传入当前的state,将state向右移动16位来获取
  • 要获取低16位则需要将1左移16位减一,获得一个低16位全是1的数,然后和传入的state进行取与操作获取state的低16位的值
  • cachedHoldCounter里面保存了最新的读锁的线程和调用次数
  • firstReaderfirstReaderHoldCount 将”第一个”获取读锁的线程记录在 firstReader 属性中,这里的第一个不是全局的概念,等这个 firstReader 当前代表的线程释放掉读锁以后,会有后来的线程占用这个属性的。
读锁获取
  1. //readLock#lockpublic void lock() {      //这里会调用父类AQS的acquireShared,尝试获取锁    sync.acquireShared(1);}//AQS#acquireSharedpublic final void acquireShared(int arg) {      //返回值小于 0 代表没有获取到共享锁    if (tryAcquireShared(arg) < 0)          //进入到阻塞队列,然后等待前驱节点唤醒        doAcquireShared(arg);}
复制代码
这里的tryAcquireShared是调用ReentrantReadWriteLock的内部类Sync的tryAcquireShared的方法
  1. protected final int tryAcquireShared(int unused) {      //获取当前线程    Thread current = Thread.currentThread();      //获取AQS中的state属性值    int c = getState();    //exclusiveCount方法是用来获取写锁状态,不等于0代表有写锁    if (exclusiveCount(c) != 0 &&          //如果不是当前线程获取的写锁,那么直接返回-1        getExclusiveOwnerThread() != current)        return -1;      //获取读锁的锁定次数    int r = sharedCount(c);      // 读锁获取是否需要被阻塞    if (!readerShouldBlock() &&        r < MAX_COUNT &&        //因为高16位代表共享锁,所以CAS需要加上一个SHARED_UNIT        compareAndSetState(c, c + SHARED_UNIT)) {        if (r == 0) {              //记录一下首次读线程            firstReader = current;            firstReaderHoldCount = 1;        } else if (firstReader == current) {               //firstReader 重入获取读锁            firstReaderHoldCount++;        } else {            HoldCounter rh = cachedHoldCounter;              // 如果 cachedHoldCounter 缓存的不是当前线程,设置为缓存当前线程的 HoldCounter            if (rh == null || rh.tid != getThreadId(current))                cachedHoldCounter = rh = readHolds.get();            else if (rh.count == 0)                readHolds.set(rh);            rh.count++;        }           // return 大于 0 的数,代表获取到了共享锁        return 1;    }    return fullTryAcquireShared(current);}
复制代码

  • 首先会去调用exclusiveCount方法来查看写锁是否被占用,如果被占用,那么查看当前线程是否是占用读锁的线程,如果不是则返回-1。通过这里可以看出可以先占用读锁再占用写锁
  • 调用readerShouldBlock方法获取是否需要阻塞读锁获取,然后检查一下高16位读锁重入次数是否超过了2^16-1,最后通过CAS操作将state高16进行加1操作,如果没有其他线程抢占就会成功
  • 如果state的高16位为零,那么就设置首次读线程和首次数次数,如果不是则校验首次读线程是不是当前线程,是的话将firstReaderHoldCount次数加1。如果不是首次读线程,那么校验一下最后一次读线程是不是当前线程,不是的话就从readHolds中获取,并将HoldCounter计数加1,如果最后读线程是当前线程那么计数加1
readerShouldBlock
  1. //NonfairSync#readerShouldBlockfinal boolean readerShouldBlock() {    return apparentlyFirstQueuedIsExclusive();}//AQSfinal boolean apparentlyFirstQueuedIsExclusive() {    Node h, s;    return (h = head) != null &&        (s = h.next)  != null &&        !s.isShared()         &&        s.thread != null;}
复制代码
在非公平模式中readerShouldBlock会调用AQS的方法,判断当前头节点的下一个节点,如果不是共享节点,那么readerShouldBlock就返回true,读锁就会阻塞。
  1. //FairSync#readerShouldBlockfinal boolean readerShouldBlock() {    return hasQueuedPredecessors();}//AQSpublic final boolean hasQueuedPredecessors() {       Node t = tail; // Read fields in reverse initialization order    Node h = head;    Node s;    return h != t &&        ((s = h.next) == null || s.thread != Thread.currentThread());}
复制代码
在公平模式中会去看看队列里有没有其他元素在队列里等待获取锁,如果有那么读锁就进行阻塞
ReentrantReadWriteLock#fullTryAcquireShared
  1. final int fullTryAcquireShared(Thread current) {       HoldCounter rh = null;    for (;;) {        int c = getState();          //检查是否写锁被占用        if (exclusiveCount(c) != 0) {               //被占用,但是占用读锁线程不是当前线程,返回阻塞            if (getExclusiveOwnerThread() != current)                return -1;            // else we hold the exclusive lock; blocking here            // would cause deadlock.            //检查读锁是否应该被阻塞        } else if (readerShouldBlock()) {            // Make sure we&#39;re not acquiring read lock reentrantly              //首次读线程是当前线程,下面直接CAS            if (firstReader == current) {                // assert firstReaderHoldCount > 0;            } else {                if (rh == null) {                       //设置最后一次读线程                    rh = cachedHoldCounter;                    if (rh == null || rh.tid != getThreadId(current)) {                        rh = readHolds.get();                        if (rh.count == 0)                               //如果发现 count == 0,也就是说,纯属上一行代码初始化的,那么执行 remove                            readHolds.remove();                    }                }                   //如果最后读取线程次数为0,那么阻塞                if (rh.count == 0)                    return -1;            }        }          //如果读锁重入次数达到上限,抛异常        if (sharedCount(c) == MAX_COUNT)            throw new Error("Maximum lock count exceeded");          //尝试CAS读锁重入次数加1        if (compareAndSetState(c, c + SHARED_UNIT)) {               // 这里 CAS 成功,那么就意味着成功获取读锁了            // 下面需要做的是设置 firstReader 或 cachedHoldCounter            if (sharedCount(c) == 0) {                firstReader = current;                firstReaderHoldCount = 1;            } else if (firstReader == current) {                firstReaderHoldCount++;            } else {                  // 下面这几行,就是将 cachedHoldCounter 设置为当前线程                if (rh == null)                    rh = cachedHoldCounter;                if (rh == null || rh.tid != getThreadId(current))                    rh = readHolds.get();                else if (rh.count == 0)                    readHolds.set(rh);                rh.count++;                cachedHoldCounter = rh; // cache for release            }              // 返回大于 0 的数,代表获取到了读锁            return 1;        }    }}
复制代码
这个方法主要是用来处理重入锁操作的。首先校验一下写锁是否被占用,如果没有被占用则判断当前线程是否是第一次读线程,如果不是则判断最后一次读线程是不是当前线程,如果不是则从readHolds获取,并判断HoldCounter实例中获取读锁次数如果为0,那么就不是重入。
如果可以判断当前线程是重入的,那么则对state高16进行加1操作,操作成功,则对firstReader或cachedHoldCounter进行设置,并返回1,表示获取到锁。
到这里我们看完了tryAcquireShared方法,我再把acquireShared方法贴出来:
  1. public final void acquireShared(int arg) {    if (tryAcquireShared(arg) < 0)        doAcquireShared(arg);}
复制代码
下面看doAcquireShared方法:
  1. private void doAcquireShared(int arg) {      //实例化一个共享节点入队    final Node node = addWaiter(Node.SHARED);    boolean failed = true;    try {        boolean interrupted = false;        for (;;) {              //获取当前节点的上一个前置节点            final Node p = node.predecessor();              //前置节点如果是头节点,那么代表队列里没有别的节点,先调用tryAcquireShared尝试获取锁            if (p == head) {                int r = tryAcquireShared(arg);                if (r >= 0) {                       //醒队列中其他共享节点                    setHeadAndPropagate(node, r);                    p.next = null; // help GC                       //响应中断                    if (interrupted)                        selfInterrupt();                    failed = false;                    return;                }            }              //设置前置节点waitStatus状态            if (shouldParkAfterFailedAcquire(p, node) &&                  //阻塞当前线程                parkAndCheckInterrupt())                interrupted = true;        }    } finally {        if (failed)            cancelAcquire(node);    }}
复制代码
doAcquireShared方法中会实例化一个共享节点并入队。如果当前节点的前置节点是头节点,那么直接调用tryAcquireShared先获取一次锁,如果返回大于0,那么表示可以获取锁,调用setHeadAndPropagate唤醒队列中其他的线程;如果没有返回则会调用shouldParkAfterFailedAcquire方法将前置节点的waitStatus设值成SIGNAL,然后调用parkAndCheckInterrupt方法阻塞
AQS#setHeadAndPropagate
  1. private void setHeadAndPropagate(Node node, int propagate) {    Node h = head; // Record old head for check below      //把node节点设值为头节点    setHead(node);       //因为是propagate大于零才进这个方法,所以这个必进这个if    if (propagate > 0 || h == null || h.waitStatus < 0 ||        (h = head) == null || h.waitStatus < 0) {          //获取node的下一个节点        Node s = node.next;          //判断下一个节点是否为空,或是共享节点        if (s == null || s.isShared())              //往下看            doReleaseShared();    }}
复制代码
这个方法主要是替换头节点为当前节点,然后调用doReleaseShared进行唤醒节点的操作
AQS#doReleaseShared
  1. private void doReleaseShared() {     for (;;) {        Node h = head;        // 1. h == null: 说明阻塞队列为空        // 2. h == tail: 说明头结点可能是刚刚初始化的头节点,        //   或者是普通线程节点,但是此节点既然是头节点了,那么代表已经被唤醒了,阻塞队列没有其他节点了        // 所以这两种情况不需要进行唤醒后继节点        if (h != null && h != tail) {            int ws = h.waitStatus;               //后面的节点会把前置节点设置为Node.SIGNAL            if (ws == Node.SIGNAL) {                    //1                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                    continue;            // loop to recheck cases                    // 唤醒 head 的后继节点,也就是阻塞队列中的第一个节点                unparkSuccessor(h);            }            else if (ws == 0 &&                        //2                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                continue;                // loop on failed CAS        }          //3 如果被唤醒的节点已经占领head了,那么继续循环,否则跳出循环        if (h == head)                   // loop if head changed            break;    }}
复制代码

  • unparkSuccessor这里会唤醒下一个节点,那么下一个节点也会调用setHeadAndPropagate进行抢占头节点;如果同时有当前线程和被唤醒的下一个线程同时走到这里,那么只会有一个成功,另一个返回false的就不进行唤醒操作
  • 这里CAS失败的原因可能是一个新的节点入队,然后将前置节点设值为了Node.SIGNAL,所以导致当前的CAS失败
  • 如果被唤醒的节点抢占头节点成功,那么h == head 就不成立,那么会进行下一轮的循环,否则就是head没有被抢占成功
AQS#unparkSuccessor
[code]private void unparkSuccessor(Node node) {    //如果当前节点小于零,那么作为头节点要被清除一下状态    int ws = node.waitStatus;    if (ws < 0)        compareAndSetWaitStatus(node, ws, 0);    // 下面的代码就是唤醒后继节点,但是有可能后继节点取消了等待    // 从队尾往前找,找到waitStatus 0) {        s = null;        for (Node t = tail; t != null && t != node; t = t.prev)            if (t.waitStatus  0;        if (firstReaderHoldCount == 1)            firstReader = null;        else            firstReaderHoldCount--;    } else {          // 判断 cachedHoldCounter 是否缓存的是当前线程,不是的话要到 ThreadLocal 中取        HoldCounter rh = cachedHoldCounter;        if (rh == null || rh.tid != getThreadId(current))            rh = readHolds.get();        int count = rh.count;        if (count
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

Archiver|手机版|小黑屋|网站地图|java自学教程|www.konglongmei.com

GMT+8, 2020-5-27 11:34 , Processed in 0.100818 second(s), 27 queries .

快速回复 返回顶部 返回列表