概述 AbstractQueuedSynchronizer(以下简写AQS)这个抽象类,因为它是 Java 并发包的基础工具类,是实现 ReentrantLock、CountDownLatch、Semaphore、FutureTask 等类的基础。
本文借助ReentrantLock
的实现来更好的理解AQS这个并发框架的基石, 源码环境基于JDK7
AQS结构 AQS的基本属性 AQS有四个基本的属性:
private transient volatile Node head
: 头结点,当前持有锁的线程private transient volatile Node tail
: 阻塞的尾节点,每个新的节点进来,都插入到最后,也就形成了一个链表private volatile int state
: 表当前锁的状态,0代表没有被占用,大于0代表有线程持有当前锁。重入每次加1private transient Thread exclusiveOwnerThread
: 代表当前持有独占锁的线程,继承自AbstractOwnableSynchronizer
等待队列Node结构 AQS的等待队列如图所示:
等待队列中每个线程被包装成一个 node,数据结构是链表, Node的定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 static final class Node { // 标识节点当前在共享模式下 static final Node SHARED = new Node() // 标识节点当前在独占模式下 static final Node EXCLUSIVE = null // = = = = = = = = 下面的几个int常量是给waitStatus用的 = = = = = = = = = = = // 代码此线程取消了争抢这个锁 static final int CANCELLED = 1 // 当前node的后继节点对应的线程需要被唤醒 static final int SIGNAL = -1 // condition条件 static final int CONDITION = -2 // 传播条件,暂时不知道作用 static final int PROPAGATE = -3 // = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = // 取值为上面的1 、-1 、-2 、-3 ,或者0 // 如果这个值 大于0 代表此线程取消了等待, volatile int waitStatus // 前驱节点的引用 volatile Node prev // 后继节点的引用 volatile Node next // 这个就是线程本尊 volatile Thread thread }
Node的数据结构也是四个属性组成: thread + waitStatus + pre + next
ReentrantLock锁的使用 一般使用ReentrantLock的范式如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class LockTest { private static ReentrantLock reentrantLock = new ReentrantLock(true ); public void doSomething () { reentrantLock.lock (); try { } finally { reentrantLock.unlock(); } } }
ReentrantLock 在内部用了内部类 Sync 来管理锁,所以真正的获取锁和释放锁是由 Sync 的实现类来控制的:
1 2 abstract static class Sync extends AbstractQueuedSynchronizer { }
Sync 有两个实现,分别为 NonfairSync(非公平锁)和 FairSync(公平锁),默认是非公平锁:
1 2 3 public ReentrantLock (boolean fair) { sync = fair ? new FairSync () : new NonfairSync (); }
以公平锁的示例,源码流程
线程抢锁(公平锁示例) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L ; final void lock () { acquire(1 ); } public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { selfInterrupt(); } } protected final boolean tryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } public final boolean hasQueuedPredecessors () { Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } private Node addWaiter (Node mode) { Node node = new Node (Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node ())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true ; if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; }private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); } }
运行流程 1.线程1调用 reentrantLock.lock()
,tryAcquire(1) 直接就返回 true。 此时只是设置了 state=1,连 head 都没有初始化,更谈不上什么阻塞队列了
2.线程2在线程1没有调用unlock
的情况下调用lock
, 线程B首先会初始化head,同时线程2也会插入阻塞队列并挂起(enq方法
) 图解如下:
(1)线程 2 初始化 head 节点,此时 head==tail, waitStatus==0
(2)线程 2 入队,此时节点的 waitStatus,我们知道 head 节点是线程 2 初始化的,此时的 waitStatus 没有设置, java 默认会设置为 0,但是到 shouldParkAfterFailedAcquire
这个方法的时候,线程 2 会把前驱节点,也就是 head 的waitStatus设置为-1。此时线程2 的 waitStatus 没有设置所以是0
(3)如果线程3此时再进来,直接插到线程2的后面就可以了,此时线程 3 的 waitStatus 是 0,到 shouldParkAfterFailedAcquire 方法的时候把前驱节点线程 2 的 waitStatus 设置为 -1
总结: waitStatus 中 SIGNAL(-1) 状态的意思是:代表后继节点需要被唤醒。也就是说这个 waitStatus 其实代表的不是自己的状态,而是后继节点的状态,我们知道,每个 node 在入队的时候,都会把前驱节点的状态改为 SIGNAL,然后阻塞,等待被前驱唤醒。
解锁操作 正常情况下,如果线程没获取到锁,线程会被 LockSupport.park(this)
挂起停止,等待被唤醒。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public void unlock () { sync.release(1 ); }public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; }protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException (); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; }private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); }
唤醒线程以后,被唤醒的线程将从以下代码中继续往前走, 直到释放head:
1 2 3 4 private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted () ; }
非公平锁和公平锁 ReentrantLock 默认采用非公平锁,除非你在构造方法中传入参数 true
1 2 3 4 5 6 public ReentrantLock () { sync = new NonfairSync (); }public ReentrantLock (boolean fair) { sync = fair ? new FairSync () : new NonfairSync (); }
公平锁的Lock 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 static final class FairSync extends Sync { final void lock () { acquire(1 ); } public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } }
非公平锁的Lock 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 static final class NonfairSync extends Sync { final void lock () { if (compareAndSetState(0 , 1 )) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1 ); } public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire (int acquires) { return nonfairTryAcquire (acquires) ; } }final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; }
总结 公平锁和非公平锁只有两处不同:
非公平锁在调用 lock 后,首先就会调用 CAS 进行一次抢锁,如果这个时候恰巧锁没有被占用,那么直接就获取到锁返回了。
非公平锁在 CAS 失败后,和公平锁一样都会进入到 tryAcquire 方法,在 tryAcquire 方法中,如果发现锁这个时候被释放了(state == 0),非公平锁会直接 CAS 抢锁,但是公平锁会判断等待队列是否有线程处于等待状态,如果有则不去抢锁,乖乖排到后面
相对来说,非公平锁会有更好的性能,因为它的吞吐量比较大。当然,非公平锁让获取锁的时间变得更加不确定,可能会导致在阻塞队列中的线程长期处于饥饿状态。
参考资料