java自学教程|www.konglongmei.com

作者: 简单350
查看: 13|回复: 0

more +社区更新Forums

more +随机图赏Gallery

疯狂Java讲义(第4版) PDF 电子书 百度云 网盘下载疯狂Java讲义(第4版) PDF 电子书 百度云 网盘下载
价值825元 牛客算法通关课程视频教程 第六期 百度云 网盘下载价值825元 牛客算法通关课程视频教程 第六期 百度云 网盘下载
Spring 5核心原理与30个类手写实战 PDF 电子书 百度云 网盘下载Spring 5核心原理与30个类手写实战 PDF 电子书 百度云 网盘下载
Spring 5核心原理与30个类手写实战+Spring Boot编程思想核心篇pdfSpring 5核心原理与30个类手写实战+Spring Boot编程思想核心篇pdf
Spring Boot编程思想核心篇+Spring 5核心原理与30个类手写实战pdfSpring Boot编程思想核心篇+Spring 5核心原理与30个类手写实战pdf
java电子书]微服务架构设计模式 PDF 电子书 百度云 网盘下载java电子书]微服务架构设计模式 PDF 电子书 百度云 网盘下载

[技术知识] 源码分析— java读写锁ReentrantReadWriteLock

[技术知识] 源码分析— java读写锁ReentrantReadWriteLock

[复制链接]
简单350 | 显示全部楼层 发表于: 3 天前
简单350 发表于: 3 天前 | 显示全部楼层 |阅读模式
查看: 13|回复: 0

你还没有注册,无法下载本站所有资源,请立即注册!

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
前言

今天看Jraft的时候发现了很多地方都用到了读写锁,所以心血来潮想要分析以下读写锁是怎么实现的。
先上一个doc里面的例子:
  1. class CachedData {  Object data;  volatile boolean cacheValid;  final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();  void processCachedData() {      //加上一个读锁    rwl.readLock().lock();    if (!cacheValid) {      // Must release read lock before acquiring write lock      //必须在加写锁之前释放读锁      rwl.readLock().unlock();      rwl.writeLock().lock();      try {        // Recheck state because another thread might have        // acquired write lock and changed state before we did.          //双重检查        if (!cacheValid) {            //设置值          data = ...          cacheValid = true;        }        // Downgrade by acquiring read lock before releasing write lock          //锁降级,反之则不行        rwl.readLock().lock();      } finally {          //释放写锁,但是仍然持有写锁        rwl.writeLock().unlock(); // Unlock write, still hold read      }    }    try {      use(data);    } finally {        //释放读锁      rwl.readLock().unlock();    }  }}}
复制代码
我们一般实例化一个ReentrantReadWriteLock,一般是调用空的构造器创建,所以默认使用的是非公平锁
  1. public ReentrantReadWriteLock() {    this(false);}public ReentrantReadWriteLock(boolean fair) {      //默认使用的是NonfairSync    sync = fair ? new FairSync() : new NonfairSync();    readerLock = new ReadLock(this);    writerLock = new WriteLock(this);}//分别调用writeLock和readLock会返回读写锁实例public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }
复制代码
ReentrantReadWriteLock内部类Sync
  1. abstract static class Sync extends AbstractQueuedSynchronizer {    private static final long serialVersionUID = 6317671515068378041L;      //位移量      //在读写锁中,state是一个32位的int,所以用state的高16位表示读锁,用低16位表示写锁    static final int SHARED_SHIFT   = 16;      //因为读锁是高16位,所以用1向左移动16位表示读锁每次锁状态变化的量    static final int SHARED_UNIT    = (1  SHARED_SHIFT; }    //获取低16位写锁state次数,重入次数    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }    //用来记录每个线程持有的读锁数量    static final class HoldCounter {        int count = 0;        // Use id, not reference, to avoid garbage retention        final long tid = getThreadId(Thread.currentThread());    }        static final class ThreadLocalHoldCounter        extends ThreadLocal {        public HoldCounter initialValue() {            return new HoldCounter();        }    }    private transient ThreadLocalHoldCounter readHolds;      // 用于缓存,记录"最后一个获取读锁的线程"的读锁重入次数    private transient HoldCounter cachedHoldCounter;      // 第一个获取读锁的线程(并且其未释放读锁),以及它持有的读锁数量    private transient Thread firstReader = null;    private transient int firstReaderHoldCount;    Sync() {          // 初始化 readHolds 这个 ThreadLocal 属性        readHolds = new ThreadLocalHoldCounter();        setState(getState()); // ensures visibility of readHolds    }    ....}
复制代码

  • 因为int是32位的,所以在ReentrantReadWriteLock中将state分为两部分,高16位作为读锁的状态控制器,低16位作为写锁的状态控制器。
  • 每次要获取读锁的当前状态都需要调用sharedCount传入当前的state,将state向右移动16位来获取
  • 要获取低16位则需要将1左移16位减一,获得一个低16位全是1的数,然后和传入的state进行取与操作获取state的低16位的值
  • cachedHoldCounter里面保存了最新的读锁的线程和调用次数
  • firstReaderfirstReaderHoldCount 将”第一个”获取读锁的线程记录在 firstReader 属性中,这里的第一个不是全局的概念,等这个 firstReader 当前代表的线程释放掉读锁以后,会有后来的线程占用这个属性的。
读锁获取
  1. //readLock#lockpublic void lock() {      //这里会调用父类AQS的acquireShared,尝试获取锁    sync.acquireShared(1);}//AQS#acquireSharedpublic final void acquireShared(int arg) {      //返回值小于 0 代表没有获取到共享锁    if (tryAcquireShared(arg) < 0)          //进入到阻塞队列,然后等待前驱节点唤醒        doAcquireShared(arg);}
复制代码
这里的tryAcquireShared是调用ReentrantReadWriteLock的内部类Sync的tryAcquireShared的方法
  1. protected final int tryAcquireShared(int unused) {      //获取当前线程    Thread current = Thread.currentThread();      //获取AQS中的state属性值    int c = getState();    //exclusiveCount方法是用来获取写锁状态,不等于0代表有写锁    if (exclusiveCount(c) != 0 &&          //如果不是当前线程获取的写锁,那么直接返回-1        getExclusiveOwnerThread() != current)        return -1;      //获取读锁的锁定次数    int r = sharedCount(c);      // 读锁获取是否需要被阻塞    if (!readerShouldBlock() &&        r < MAX_COUNT &&        //因为高16位代表共享锁,所以CAS需要加上一个SHARED_UNIT        compareAndSetState(c, c + SHARED_UNIT)) {        if (r == 0) {              //记录一下首次读线程            firstReader = current;            firstReaderHoldCount = 1;        } else if (firstReader == current) {               //firstReader 重入获取读锁            firstReaderHoldCount++;        } else {            HoldCounter rh = cachedHoldCounter;              // 如果 cachedHoldCounter 缓存的不是当前线程,设置为缓存当前线程的 HoldCounter            if (rh == null || rh.tid != getThreadId(current))                cachedHoldCounter = rh = readHolds.get();            else if (rh.count == 0)                readHolds.set(rh);            rh.count++;        }           // return 大于 0 的数,代表获取到了共享锁        return 1;    }    return fullTryAcquireShared(current);}
复制代码

  • 首先会去调用exclusiveCount方法来查看写锁是否被占用,如果被占用,那么查看当前线程是否是占用读锁的线程,如果不是则返回-1。通过这里可以看出可以先占用读锁再占用写锁
  • 调用readerShouldBlock方法获取是否需要阻塞读锁获取,然后检查一下高16位读锁重入次数是否超过了2^16-1,最后通过CAS操作将state高16进行加1操作,如果没有其他线程抢占就会成功
  • 如果state的高16位为零,那么就设置首次读线程和首次数次数,如果不是则校验首次读线程是不是当前线程,是的话将firstReaderHoldCount次数加1。如果不是首次读线程,那么校验一下最后一次读线程是不是当前线程,不是的话就从readHolds中获取,并将HoldCounter计数加1,如果最后读线程是当前线程那么计数加1
readerShouldBlock
  1. //NonfairSync#readerShouldBlockfinal boolean readerShouldBlock() {    return apparentlyFirstQueuedIsExclusive();}//AQSfinal boolean apparentlyFirstQueuedIsExclusive() {    Node h, s;    return (h = head) != null &&        (s = h.next)  != null &&        !s.isShared()         &&        s.thread != null;}
复制代码
在非公平模式中readerShouldBlock会调用AQS的方法,判断当前头节点的下一个节点,如果不是共享节点,那么readerShouldBlock就返回true,读锁就会阻塞。
  1. //FairSync#readerShouldBlockfinal boolean readerShouldBlock() {    return hasQueuedPredecessors();}//AQSpublic final boolean hasQueuedPredecessors() {       Node t = tail; // Read fields in reverse initialization order    Node h = head;    Node s;    return h != t &&        ((s = h.next) == null || s.thread != Thread.currentThread());}
复制代码
在公平模式中会去看看队列里有没有其他元素在队列里等待获取锁,如果有那么读锁就进行阻塞
ReentrantReadWriteLock#fullTryAcquireShared
  1. final int fullTryAcquireShared(Thread current) {       HoldCounter rh = null;    for (;;) {        int c = getState();          //检查是否写锁被占用        if (exclusiveCount(c) != 0) {               //被占用,但是占用读锁线程不是当前线程,返回阻塞            if (getExclusiveOwnerThread() != current)                return -1;            // else we hold the exclusive lock; blocking here            // would cause deadlock.            //检查读锁是否应该被阻塞        } else if (readerShouldBlock()) {            // Make sure we&#39;re not acquiring read lock reentrantly              //首次读线程是当前线程,下面直接CAS            if (firstReader == current) {                // assert firstReaderHoldCount > 0;            } else {                if (rh == null) {                       //设置最后一次读线程                    rh = cachedHoldCounter;                    if (rh == null || rh.tid != getThreadId(current)) {                        rh = readHolds.get();                        if (rh.count == 0)                               //如果发现 count == 0,也就是说,纯属上一行代码初始化的,那么执行 remove                            readHolds.remove();                    }                }                   //如果最后读取线程次数为0,那么阻塞                if (rh.count == 0)                    return -1;            }        }          //如果读锁重入次数达到上限,抛异常        if (sharedCount(c) == MAX_COUNT)            throw new Error("Maximum lock count exceeded");          //尝试CAS读锁重入次数加1        if (compareAndSetState(c, c + SHARED_UNIT)) {               // 这里 CAS 成功,那么就意味着成功获取读锁了            // 下面需要做的是设置 firstReader 或 cachedHoldCounter            if (sharedCount(c) == 0) {                firstReader = current;                firstReaderHoldCount = 1;            } else if (firstReader == current) {                firstReaderHoldCount++;            } else {                  // 下面这几行,就是将 cachedHoldCounter 设置为当前线程                if (rh == null)                    rh = cachedHoldCounter;                if (rh == null || rh.tid != getThreadId(current))                    rh = readHolds.get();                else if (rh.count == 0)                    readHolds.set(rh);                rh.count++;                cachedHoldCounter = rh; // cache for release            }              // 返回大于 0 的数,代表获取到了读锁            return 1;        }    }}
复制代码
这个方法主要是用来处理重入锁操作的。首先校验一下写锁是否被占用,如果没有被占用则判断当前线程是否是第一次读线程,如果不是则判断最后一次读线程是不是当前线程,如果不是则从readHolds获取,并判断HoldCounter实例中获取读锁次数如果为0,那么就不是重入。
如果可以判断当前线程是重入的,那么则对state高16进行加1操作,操作成功,则对firstReader或cachedHoldCounter进行设置,并返回1,表示获取到锁。
到这里我们看完了tryAcquireShared方法,我再把acquireShared方法贴出来:
  1. public final void acquireShared(int arg) {    if (tryAcquireShared(arg) < 0)        doAcquireShared(arg);}
复制代码
下面看doAcquireShared方法:
  1. private void doAcquireShared(int arg) {      //实例化一个共享节点入队    final Node node = addWaiter(Node.SHARED);    boolean failed = true;    try {        boolean interrupted = false;        for (;;) {              //获取当前节点的上一个前置节点            final Node p = node.predecessor();              //前置节点如果是头节点,那么代表队列里没有别的节点,先调用tryAcquireShared尝试获取锁            if (p == head) {                int r = tryAcquireShared(arg);                if (r >= 0) {                       //醒队列中其他共享节点                    setHeadAndPropagate(node, r);                    p.next = null; // help GC                       //响应中断                    if (interrupted)                        selfInterrupt();                    failed = false;                    return;                }            }              //设置前置节点waitStatus状态            if (shouldParkAfterFailedAcquire(p, node) &&                  //阻塞当前线程                parkAndCheckInterrupt())                interrupted = true;        }    } finally {        if (failed)            cancelAcquire(node);    }}
复制代码
doAcquireShared方法中会实例化一个共享节点并入队。如果当前节点的前置节点是头节点,那么直接调用tryAcquireShared先获取一次锁,如果返回大于0,那么表示可以获取锁,调用setHeadAndPropagate唤醒队列中其他的线程;如果没有返回则会调用shouldParkAfterFailedAcquire方法将前置节点的waitStatus设值成SIGNAL,然后调用parkAndCheckInterrupt方法阻塞
AQS#setHeadAndPropagate
  1. private void setHeadAndPropagate(Node node, int propagate) {    Node h = head; // Record old head for check below      //把node节点设值为头节点    setHead(node);       //因为是propagate大于零才进这个方法,所以这个必进这个if    if (propagate > 0 || h == null || h.waitStatus < 0 ||        (h = head) == null || h.waitStatus < 0) {          //获取node的下一个节点        Node s = node.next;          //判断下一个节点是否为空,或是共享节点        if (s == null || s.isShared())              //往下看            doReleaseShared();    }}
复制代码
这个方法主要是替换头节点为当前节点,然后调用doReleaseShared进行唤醒节点的操作
AQS#doReleaseShared
  1. private void doReleaseShared() {     for (;;) {        Node h = head;        // 1. h == null: 说明阻塞队列为空        // 2. h == tail: 说明头结点可能是刚刚初始化的头节点,        //   或者是普通线程节点,但是此节点既然是头节点了,那么代表已经被唤醒了,阻塞队列没有其他节点了        // 所以这两种情况不需要进行唤醒后继节点        if (h != null && h != tail) {            int ws = h.waitStatus;               //后面的节点会把前置节点设置为Node.SIGNAL            if (ws == Node.SIGNAL) {                    //1                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                    continue;            // loop to recheck cases                    // 唤醒 head 的后继节点,也就是阻塞队列中的第一个节点                unparkSuccessor(h);            }            else if (ws == 0 &&                        //2                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                continue;                // loop on failed CAS        }          //3 如果被唤醒的节点已经占领head了,那么继续循环,否则跳出循环        if (h == head)                   // loop if head changed            break;    }}
复制代码

  • unparkSuccessor这里会唤醒下一个节点,那么下一个节点也会调用setHeadAndPropagate进行抢占头节点;如果同时有当前线程和被唤醒的下一个线程同时走到这里,那么只会有一个成功,另一个返回false的就不进行唤醒操作
  • 这里CAS失败的原因可能是一个新的节点入队,然后将前置节点设值为了Node.SIGNAL,所以导致当前的CAS失败
  • 如果被唤醒的节点抢占头节点成功,那么h == head 就不成立,那么会进行下一轮的循环,否则就是head没有被抢占成功
AQS#unparkSuccessor
[code]private void unparkSuccessor(Node node) {    //如果当前节点小于零,那么作为头节点要被清除一下状态    int ws = node.waitStatus;    if (ws < 0)        compareAndSetWaitStatus(node, ws, 0);    // 下面的代码就是唤醒后继节点,但是有可能后继节点取消了等待    // 从队尾往前找,找到waitStatus 0) {        s = null;        for (Node t = tail; t != null && t != node; t = t.prev)            if (t.waitStatus  0;        if (firstReaderHoldCount == 1)            firstReader = null;        else            firstReaderHoldCount--;    } else {          // 判断 cachedHoldCounter 是否缓存的是当前线程,不是的话要到 ThreadLocal 中取        HoldCounter rh = cachedHoldCounter;        if (rh == null || rh.tid != getThreadId(current))            rh = readHolds.get();        int count = rh.count;        if (count
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

Archiver|手机版|小黑屋|网站地图|java自学教程|www.konglongmei.com

GMT+8, 2019-12-6 12:06 , Processed in 0.136686 second(s), 47 queries .

快速回复 返回顶部 返回列表