ReentrantReadWriteLock 结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 final Sync sync;public ReentrantReadWriteLock (boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this ); writerLock = new WriteLock(this ); }public static class ReadLock implements Lock , java .io .Serializable { private final Sync sync; protected ReadLock (ReentrantReadWriteLock lock) { sync = lock.sync; } public void lock () { sync.acquireShared(1 ); } public void lockInterruptibly () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } public boolean tryLock () { return sync.tryReadLock () ; } public boolean tryLock (long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1 , unit.toNanos(timeout)); } public void unlock () { sync.releaseShared(1 ); } public Condition newCondition () { throw new UnsupportedOperationException(); } }public static class WriteLock implements Lock , java .io .Serializable { private final Sync sync; protected WriteLock (ReentrantReadWriteLock lock) { sync = lock.sync; } public void lock () { sync.acquire(1 ); } public void lockInterruptibly () throws InterruptedException { sync.acquireInterruptibly(1 ); } public boolean tryLock ( ) { return sync.tryWriteLock () ; } public boolean tryLock (long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1 , unit.toNanos(timeout)); } public void unlock () { sync.release(1 ); } public Condition newCondition () { return sync.newCondition () ; } }abstract static class Sync extends AbstractQueuedSynchronizer { ...... }static final class FairSync extends Sync { final boolean writerShouldBlock () { return hasQueuedPredecessors () ; } final boolean readerShouldBlock () { return hasQueuedPredecessors () ; } }static final class NonfairSync extends Sync { final boolean writerShouldBlock () { return false ; } final boolean readerShouldBlock () { return apparentlyFirstQueuedIsExclusive () ; } }
这里可以看到:
读、写锁分别对应 ReadLock
和 WriteLock
两个实例 读、写锁内部使用了同一个 Sync
实例, 分为公平模式和非公平模式 Sync
实例继承于 AQSReadLock
使用共享模式, WriteLock
使用独占模式独占模式 看看 WriteLock
的独占模式
1 2 3 4 5 6 7 8 9 10 public void lock () { sync.acquire (1 ); } public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued (addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt (); }
tryAcquire 判断 state,为0则直接占有 acquireQueued 将当前节点进入阻塞队列,并挂起当前线程, 等待前驱节点唤醒 被唤醒后将自己设为head,并将state设为1 这就是一个普通的AQS的独占模式的流程
共享模式 ReadLock
的共享模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public void lock () { sync.acquireShared(1 ); }public final void acquireShared (int arg) { if (tryAcquireShared(arg) < 0 ) doAcquireShared(arg); }private void doAcquireShared (int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; if (interrupted) selfInterrupt(); failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
(1) tryAcquireShared 小于 0 代表没有获取到共享锁 (2) doAcquireShared 将当前节点进入阻塞队列中等待被唤醒,步骤2是挂起自己 (3) 被唤醒后就可以拿到共享锁了, 步骤三 (4) 然后 setHeadAndPropagate 唤醒其他线程
这里注意独占模式和共享模式的区别:
对于独占模式来说,通常就是 0 代表可获取锁,1 代表锁被别人获取了,重入例外 而共享模式下,每个线程都可以对 state 进行加减操作(独占模式操作state的时候会判断当前线程是否是站有锁的线程) 原理分析 再 ReadWriteLock
中 state 同时被独占模式和共享模式操作,实现的手段是将 state 这个 32 位的 int 值分为高 16 位和低 16位,分别用于共享模式和独占模式。
sync 类 直接看关键的内部Sync
类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 abstract static class Sync extends AbstractQueuedSynchronizer { static final int SHARED_SHIFT = 16 ; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1 ; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1 ; static int sharedCount (int c) { return c >>> SHARED_SHIFT; } static int exclusiveCount ( int c) { return c & EXCLUSIVE_MASK; } static final class HoldCounter { int count = 0 ; final long tid = getThreadId(Thread.currentThread()); } static final class ThreadLocalHoldCounter extends ThreadLocal <HoldCounter> { public HoldCounter initialValue () { return new HoldCounter (); } } private transient ThreadLocalHoldCounter readHolds; private transient HoldCounter cachedHoldCounter; private transient Thread firstReader = null ; private transient int firstReaderHoldCount; Sync() { readHolds = new ThreadLocalHoldCounter (); setState(getState()); } ... }
state 的高 16 位代表读锁的获取次数,包括重入次数,获取到读锁一次加 1,释放掉读锁一次减 1。 state 的低 16 位代表写锁的获取次数,因为写锁是独占锁,同时只能被一个线程获得,所以它代表重入次数 每个线程都需要维护自己的HoldCounter
,记录该线程获取的读锁次数,这样才能知道到底是不是读锁重入,用 ThreadLocal 属性readHolds
维护 读锁获取 1 2 3 4 5 6 7 8 9 public void lock () { sync.acquireShared (1 ); }public final void acquireShared (int arg) { if (tryAcquireShared (arg) < 0 ) doAcquireShared (arg); }
这一切都是AQS的流程,Sync
实现了tryAcquireShared
, 再AQS的定义中,tryAcquireShared
返回值小于0代表没有获取到共享锁,大于0代表获取到了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 protected final int tryAcquireShared (int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1 ; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0 ) { firstReader = current; firstReaderHoldCount = 1 ; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0 ) readHolds.set(rh); rh.count++; } return 1 ; } return fullTryAcquireShared (current) ; }
如果要执行fullTryAcquireShared
, 就不能进入if分支,则需要readerShouldBlock
返回true,这里有两种可能: (1) 在 FairSync 中 hasQueuedPredecessors()
(公平模式有其他节点再等待锁,不能直接就获取锁) (2 在 NonFairSync 中apparentlyFirstQueuedIsExclusive()
,即判断阻塞队列中head
的第一个后继节点是否是来获取写锁的,如果是的话,让这个写锁先来,避免写锁饥饿(这里是给了写锁更高的优先级,所以如果碰上获取写锁的线程马上就要获取到锁了,获取读锁的线程不应该和它抢。如果head.next 不是写锁,那么就随便抢了,因为是非公平模式)
另外还有一种情况就是compareAndSetState(c, c + SHARED_UNIT)
的CAS操作失败了也会进入fullTryAcquireShared
,这代表着操作state存在竞争,可能是读锁竞争也可能是写锁竞争
接下来就是fullTryAcquireShared
的流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 final int fullTryAcquireShared(Thread current) { HoldCounter rh = null ; for (;;) { int c = getState(); if (exclusiveCount(c) != 0 ) { if (getExclusiveOwnerThread() != current) return -1 ; } else if (readerShouldBlock()) { if (firstReader == current) { } else { if (rh == null ) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0 ) readHolds.remove(); } } if (rh.count == 0 ) return -1 ; } } if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded" ); if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0 ) { firstReader = current; firstReaderHoldCount = 1 ; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null ) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0 ) readHolds.set(rh); rh.count ++; cachedHoldCounter = rh; } return 1 ; } } }
读锁的释放 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public void unlock() { sync.releaseShared(1 ); }public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; }protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { if (firstReaderHoldCount == 1 ) firstReader = null ; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count ; if (count <= 1 ) { readHolds.remove(); if (count <= 0 ) throw unmatchedUnlockException(); } --rh.count ; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0 ; } }
读锁的释放过程 (1) 将 hold count 减 1,如果减到 0 的话,还要将 ThreadLocal 中的 remove 掉防止内存泄漏 (2) 在 for 循环中将 state 的高 16 位减 1,如果发现读锁和写锁都释放光了,那么唤醒后继的获取写锁的线程
写锁的获取 写锁是独占的,如果发现有读锁占用也是要阻塞等待的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public void lock () { sync.acquire(1 ); }public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }protected final boolean tryAcquire (int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0 ) { if (w == 0 || current != getExclusiveOwnerThread()) return false ; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error ("Maximum lock count exceeded" ); setState(c + acquires); return true ; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false ; setExclusiveOwnerThread(current); return true ; }
writerShouldBlock 的判断: (1) 如果非公平锁永远返回 false, 因为非公平模式永远不需要排队,直接 CAS 尝试获取锁就行 (2) 公平模式还是hasQueuedPredecessors
, 判断是否有线程排队
写锁的释放 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 / WriteLockpublic void unlock () { sync.release(1 ); }public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; }protected final boolean tryRelease (int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0 ; if (free) setExclusiveOwnerThread(null ); setState(nextc); return free; }
独占锁的释放很简单,直接state减1就好
StampedLock ReadWriteLock
可以解决多线程读写的问题, 但是读的时候, 写线程需要等待读线程释放了才能获取写锁,即读的时候不能写,这是一种悲观的策略。
jdk8 引入了新的读写锁:StampedLock
, 进一步提升了并发执行效率。
StampedLock
和ReadWriteLock
相比,改进之处在于:读的过程中也允许获取写锁后写入。采用的是一种乐观锁的方式去判断。
和ReadWriteLoc
相比,写入的加锁是完全一样的,不同的是读取。首先通过tryOptimisticRead()
获取一个乐观读锁,并返回版本号。接着进行读取,读取完成后通过validate()
去验证版本号,如果在读取过程中没有写入,版本号不变,验证成功,就可以放心地继续后续操作。如果在读取过程中有写入,版本号会发生变化,验证将失败。在失败的时候,再通过获取悲观读锁再次读取。由于写入的概率不高,程序在绝大部分情况下可以通过乐观读锁获取数据,极少数情况下使用悲观读锁获取数据。