BlockingQueue

BlockingQueue 介绍

BlockingQueue 是一个先进先出的队列(Queue), 并且当获取队列元素但是队列为空时,会阻塞等待队列中有元素再返回;也支持添加元素时,如果队列已满,那么等到队列可以放入新元素时再放入。

BlockingQueue 对插入、删除、获取元素在不同场景下提供了不同的操作:

抛异常返回特殊值(成功或失败)阻塞等待阻塞等待直至超时
插入add(e)offer(e)put(e)offer(e, time, unit)
删除remove()poll()take()poll(time, unit)
获取element()peek()

我们重点关注 puttake 这两个阻塞操作, BlockingQueue 主要是于消费者-生产者场景的一个线程安全容器

ArrayBlockingQueue

  • ArrayBlockingQueue 是 BlockingQueue 的一个有界队列实现,底层采取数组
  • 并发控制采取可重入锁, 插入和读取操作都需要获取锁
  • 如果队列为空,这个时候读操作的线程进入到读线程队列排队,等待写线程写入新的元素,然后唤醒读线程队列的第一个等待线程
  • 如果队列已满,这个时候写操作的线程进入到写线程队列排队,等待读线程将队列元素移除腾出空间,然后唤醒写线程队列的第一个等待线程

主要属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 用于存放元素的数组
final Object[] items;
// 下一次读取操作的位置
int takeIndex;
// 下一次写入操作的位置
int putIndex;
// 队列中的元素数量
int count;

// 以下几个就是控制并发用的同步器
final ReentrantLock lock;
// 队列为空的条件队列
private final Condition notEmpty;
// 队列满的条件队列
private final Condition notFull;

阻塞操作 take & put

put 操作流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 插入前先获取锁
lock.lockInterruptibly();
try {
// 如果队列满了, 写操作线程进入条件队列等待
while (count == items.length)
notFull.await();
// 元素入队
enqueue(e);
} finally {
lock.unlock();
}
}
// 入队操作
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
// 入队
items[putIndex] = x;
// 循环使用 index
if (++putIndex == items.length)
putIndex = 0;
count++;
// 队列已经有数据了,唤醒等待队消费者
notEmpty.signal();
}

take 操作流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果队列为空, 读操作线程挂起等待
while (count == 0)
notEmpty.await();
// 出队操作
return dequeue();
} finally {
lock.unlock();
}
}

private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 唤醒队列满时等待的写线程
notFull.signal();
return x;
}

LinkedBlockingQueue

  • LinkedBlockingQueue 底层基于单向链表,可以作为无界队列也可作为有界队列
  • 如果要获取(take)一个元素,需要获取 takeLock 锁,但是获取了锁还不够,如果队列此时为空,还需要队列不为空(notEmpty)这个条件(Condition)
  • 如果要插入(put)一个元素,需要获取 putLock 锁,但是获取了锁还不够,如果队列此时已满,还需要队列不是满的(notFull)这个条件(Condition)

主要属性

构造方法:

1
2
3
4
5
6
7
8
9
10
11
// 无界队列构造方法
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}

// 有界队列构造方法
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}

类属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 队列容量
private final int capacity;

// 队列中的元素数量
private final AtomicInteger count = new AtomicInteger(0);

// 队头
private transient Node<E> head;

// 队尾
private transient Node<E> last;

// take, poll, peek 等读操作的方法需要获取到这个锁
private final ReentrantLock takeLock = new ReentrantLock();

// 如果读操作的时候队列是空的,那么等待 notEmpty 条件
private final Condition notEmpty = takeLock.newCondition();

// put, offer 等写操作的方法需要获取到这个锁
private final ReentrantLock putLock = new ReentrantLock();

// 如果写操作的时候队列是满的,那么等待 notFull 条件
private final Condition notFull = putLock.newCondition();

阻塞操作 put & take

put 操作流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// 标识成功、失败的标志
int c = -1;
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 必须要获取到 putLock 才可以进行插入操作
putLock.lockInterruptibly();
try {
// 如果队列满,等待 notFull 的条件满足。
while (count.get() == capacity) {
notFull.await();
}
// 入队
enqueue(node);
// count 原子加 1,c 还是加 1 前的值
c = count.getAndIncrement();
// 如果这个元素入队后,还有至少一个槽可以使用,调用 notFull.signal() 唤醒等待线程。
if (c + 1 < capacity)
notFull.signal();
} finally {
// 入队后,释放掉 putLock
putLock.unlock();
}
// 如果 c == 0,那么代表队列在这个元素入队前是空的(不包括head空节点),
// 那么所有的读线程都在等待 notEmpty 这个条件,等待唤醒,这里做一次唤醒操作
if (c == 0)
signalNotEmpty();
}

// 入队的代码非常简单,就是将 last 属性指向这个新元素,并且让原队尾的 next 指向这个元素
// 这里入队没有并发问题,因为只有获取到 putLock 独占锁以后,才可以进行此操作
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}

// 元素入队后,如果需要,调用这个方法唤醒读线程来读
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}

take 操作流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// 首先,需要获取到 takeLock 才能进行出队操作
takeLock.lockInterruptibly();
try {
// 如果队列为空,等待 notEmpty 这个条件满足再继续执行
while (count.get() == 0) {
notEmpty.await();
}
// 出队
x = dequeue();
// count 进行原子减 1
c = count.getAndDecrement();
// 如果这次出队后,队列中至少还有一个元素,那么调用 notEmpty.signal() 唤醒其他的读线程
if (c > 1)
notEmpty.signal();
} finally {
// 出队后释放掉 takeLock
takeLock.unlock();
}
// 如果 c == capacity,那么说明在这个 take 方法发生的时候,队列是满的
// 既然出队了一个,那么意味着队列不满了,唤醒写线程去写
if (c == capacity)
signalNotFull();
return x;
}
// 取队头,出队
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
// 之前说了,头结点是空的
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
// 设置这个为新的头结点
head = first;
E x = first.item;
first.item = null;
return x;
}
// 元素出队后,如果需要,调用这个方法唤醒写线程来写
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}

SynchronousQueue

  • 同步队列: 当一个线程往队列中写入一个元素时,写入操作不会立即返回,需要等待另一个线程来将这个元素拿走;同理,当一个读线程做读操作的时候,同样需要一个相匹配的写线程的写操作
  • SynchronousQueue 不提供任何空间来存储元素。数据必须从某个写线程交给某个读线程,而不是写到某个队列中等待被消费
  • 没有 peek 方法(直接返回null)

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 构造时,我们可以指定公平模式还是非公平模式
// queue 先入先出, stack 先入后出
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue() : new TransferStack();
}

// TransferQueue 和 TransferStack 都实现了抽象类 Transfer
abstract static class Transferer {
// 从方法名上大概就知道,这个方法用于转移元素,从生产者手上转到消费者手上
// 也可以被动地,消费者调用这个方法来从生产者手上取元素
// 第一个参数 e 如果不是 null,代表场景为:将元素从生产者转移给消费者
// 如果是 null,代表消费者等待生产者提供元素,然后返回值就是相应的生产者提供的元素
// 第二个参数代表是否设置超时,如果设置超时,超时时间是第三个参数的值
// 返回值如果是 null,代表超时,或者中断。具体是哪个,可以通过检测中断状态得到。
abstract Object transfer(Object e, boolean timed, long nanos);
}

put & take

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 写入值
public void put(E o) throws InterruptedException {
if (o == null) throw new NullPointerException();
if (transferer.transfer(o, false, 0) == null) { // 1
Thread.interrupted();
throw new InterruptedException();
}
}
// 读取值并移除
public E take() throws InterruptedException {
Object e = transferer.transfer(null, false, 0); // 2
if (e != null)
return (E)e;
Thread.interrupted();
throw new InterruptedException();
}

transfer 分析(queue 公平模式)

put(E o)take() 都调用了 transferer.transfer(), 区别是 take 操作的第一个参数为 null(则该操作为读操作)

transfer 整体的设计思路如下:

  1. 当调用这个方法时,如果队列是空的,或者队列中的节点和当前的线程操作类型一致(如当前操作是 put 操作,而队列中的元素也都是写线程)。这种情况下,将当前线程加入到等待队列
  2. 如果队列中有等待节点,而且与当前操作可以匹配(如队列中都是读操作线程,当前线程是写操作线程,反之亦然)。这种情况下,匹配等待队列的队头,出队,返回相应数据

等待队列节点 QNode 的结构如下:

1
2
3
4
5
6
7
8
9
10
11
static final class QNode {
volatile QNode next; // 可以看出来,等待队列是单向链表
volatile Object item;
volatile Thread waiter; // 将线程对象保存在这里,用于挂起和唤醒
final boolean isData; // 用于判断是写线程节点(isData == true),还是读线程节点

QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
......

整个transfer流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
Object transfer(Object e, boolean timed, long nanos) {

QNode s = null;
// 标记是否为写操作
boolean isData = (e != null);

for (;;) {
QNode t = tail;
QNode h = head;

// 未初始化, 自旋
if (t == null || h == null)
continue;

// 队列空,或队列中节点类型和当前节点一致, 直接入队
if (h == t || t.isData == isData) {
QNode tn = t.next;
// t != tail 说明刚刚有节点入队,continue 重试
if (t != tail)
continue;
// 有其他节点入队,但是 tail 还是指向原来的,此时设置 tail 即可
if (tn != null) {
// 这个方法就是:如果 tail 此时为 t 的话,设置为 tn 为 tail
advanceTail(t, tn);
continue;
}
// 超时 can't wait
if (timed && nanos <= 0)
return null;
// 构造新节点
if (s == null)
s = new QNode(e, isData);
// 将当前节点,插入到 tail 的后面
if (!t.casNext(null, s)) // failed to link in
continue;

// 将当前节点设置为新的 tail
advanceTail(t, s); // swing tail and wait

// 自旋或阻塞,直到满足条件, 进入该方法看详情
Object x = awaitFulfill(s, e, timed, nanos);
// 到这里,说明之前入队的线程被唤醒了,准备往下执行
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}

if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? x : e;

// 有相应的读或写相匹配的情况
} else {
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
continue;

Object x = m.item;
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // lost CAS
advanceHead(h, m); // dequeue and retry
continue;
}

advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);
return (x != null) ? x : e;
}
}
}

// nt cas 设置为 新的 tail
void advanceTail(QNode t, QNode nt) {
if (tail == t)
UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}


// 自旋或阻塞,直到满足条件,这个方法返回
Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {

long lastTime = timed ? System.nanoTime() : 0;
Thread w = Thread.currentThread();
// 判断需要自旋的次数,
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
// 如果被中断了,那么取消这个节点
if (w.isInterrupted())
// 就是将当前节点 s 中的 item 属性设置为 this
s.tryCancel(e);
Object x = s.item;
// 这里是这个方法的唯一的出口
if (x != e)
return x;
// 如果需要,检测是否超时
if (timed) {
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
if (nanos <= 0) {
s.tryCancel(e);
continue;
}
}
if (spins > 0)
--spins;
// 如果自旋达到了最大的次数,那么检测
else if (s.waiter == null)
s.waiter = w;
// 如果自旋到了最大的次数,那么线程挂起,等待唤醒
else if (!timed)
LockSupport.park(this);
// spinForTimeoutThreshold 这个之前讲 AQS 的时候其实也说过,剩余时间小于这个阈值的时候,就
// 不要进行挂起了,自旋的性能会比较好
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}

非公平模式 TransferStack

上面分析了公平模式 TransferQueue, TransferStack 流程类似:

  1. 如果队列是空的,或者队列中的节点和当前的线程操作类型一致(如当前操作是 put 操作,而栈中的元素也都是写线程)。这种情况下,将当前线程加入到等待栈中,等待配对
  2. 如果栈中有等待节点,而且与当前操作可以匹配(如栈里面都是读操作线程,当前线程是写操作线程,反之亦然)。将当前节点压入栈顶,和栈中的节点进行匹配,然后将这两个节点出栈
  3. 如果栈顶是进行匹配而入栈的节点,帮助其进行匹配并出栈,然后再继续操作

PriorityBlockingQueue

  • PriorityQueue 的线程安全版本
  • 插入值不可为null, 并且是 comparable 的(否则会抛出 ClassCastException)
  • 相较于其他 BlockingQueue,PriorityBlockingQueue 的 put 操作不会阻塞, 因为它是无界队列

主要属性

前面说了 PriorityBlockingQueue 是 PriorityQueue 的线程安全版本, 所以基本的存储结构也与 PriorityQueue 一样.

  • 使用一个基于数组的二叉堆来存储, 采取同一个 lock 来控制并发
  • 二叉堆(小顶堆), 每个节点的值都小于其左右子节点的值, 二叉堆中最小的值就是根节点
  • 对于数组中的元素 a[i],其左子节点为 a[2*i+1],其右子节点为 a[2*i + 2],其父节点为 a[(i-1)/2]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 构造方法中,如果不指定大小的话,默认大小为 11
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// 数组的最大容量
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

// 这个就是存放数据的数组
private transient Object[] queue;

// 队列当前大小
private transient int size;

// 大小比较器,如果按照自然序排序,那么此属性可设置为 null
private transient Comparator<? super E> comparator;

// 并发控制所用的锁,所有的 public 且涉及到线程安全的方法,都必须先获取到这个锁
private final ReentrantLock lock;

// 由上面的 lock 属性创建
private final Condition notEmpty;

// 这个也是用于锁,用于数组扩容的时候,需要先获取到这个锁,才能进行扩容操作
// 其使用 CAS 操作
private transient volatile int allocationSpinLock;

// 用于序列化和反序列化的时候用,对于 PriorityBlockingQueue 我们应该比较少使用到序列化
private PriorityQueue q;

自动扩容

PriorityBlockingQueue 实现了并发安全的自动扩容, tryGrow:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private void tryGrow(Object[] array, int oldCap) {
// 先释放锁, 后面重新获取锁
// 这里先释放独占锁, 这样读操作和扩容操作就可以同时进行了
lock.unlock();
Object[] newArray = null;
// 用 CAS 操作将 allocationSpinLock 由 0 变为 1,也算是获取锁
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
// 如果节点个数小于 64,那么增加的 oldCap + 2 的容量
// 如果节点数大于等于 64,那么增加 oldCap 的一半
// 所以节点数较小时,增长得快一些
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) :
(oldCap >> 1));
// 这里有可能溢出
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
// 如果 queue != array,那么说明有其他线程给 queue 分配了其他的空间
if (newCap > oldCap && queue == array)
// 分配一个新的大数组
newArray = new Object[newCap];
} finally {
// 重置,也就是释放锁
allocationSpinLock = 0;
}
}
// 如果有其他的线程也在做扩容的操作
if (newArray == null)
Thread.yield();
// 重新获取锁
lock.lock();
// 将原来数组中的元素复制到新分配的大数组中
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}

put & take 操作

put 流程与 PriorityQueue 类似, 都是先插入到最后,然后与父节点比较,直到父节点小于插入元素,不过加了一个 lock,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void put(E e) {
// 直接调用 offer 方法,因为前面我们也说了,在这里,put 方法不会阻塞
offer(e);
}
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
// 首先获取到独占锁
lock.lock();
int n, cap;
Object[] array;
// 如果当前队列中的元素个数 >= 数组的大小,那么需要扩容了
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
// 节点添加到二叉堆中
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
// 更新 size
size = n + 1;
// 唤醒等待的读线程
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}

take 流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 独占锁
lock.lockInterruptibly();
E result;
try {
// dequeue 出队, 如果没有元素就阻塞在这里
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}

private E dequeue() {
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
// 队头,用于返回
E result = (E) array[0];
// 队尾元素先取出
E x = (E) array[n];
// 队尾置空
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
// 出队后调整树的结构, 使其符合小顶堆
private static <T> void siftDownComparable(int k, T x, Object[] array,
int n) {
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>)x;
// 这里得到的 half 肯定是非叶节点
// a[n] 是最后一个元素,其父节点是 a[(n-1)/2]。所以 n >>> 1 代表的节点肯定不是叶子节点
int half = n >>> 1; // 得到 half = 4
while (k < half) {
// 先取左子节点
int child = (k << 1) + 1; // 得到 child = 1
Object c = array[child]; // c = 12
int right = child + 1; // right = 2
// 如果右子节点存在,而且比左子节点小
// 此时 array[right] = 20,所以条件不满足
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
c = array[child = right];
// key = 17, c = 12,所以条件不满足
if (key.compareTo((T) c) <= 0)
break;
// 把 12 填充到根节点
array[k] = c;
// k 赋值后为 1
k = child;
// 一轮过后,我们发现,12 左边的子树和刚刚的差不多,都是缺少根节点,接下来处理就简单了
}
array[k] = key;
}
}

DelayQueue

DelayQueue 是用优先队列实现的无界阻塞队列

主要属性

1
2
3
4
5
6
7
8
9
10
11
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
// 持有内部重入锁。
private final transient ReentrantLock lock = new ReentrantLock();
// 优先级队列,存放工作任务。
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 当前等待获取到期元素的线程
private Thread leader = null;
// 依赖于重入锁的 condition。
private final Condition available = lock.newCondition();
}

put & take 操作

put 操作如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void put(E e) {
offer(e);
}

public boolean offer(E e) {
// 获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 往优先队列添加元素
q.offer(e);
// 添加成功
if (q.peek() == e) {
// 添加元素成功后将当前等待的leader线程移除, 重新唤醒一个线程
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}

take 操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public E take() throws InterruptedException {
// 同样先获取锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 队头元素
E first = q.peek();
// 如果队头为空, 则挂起当前线程
if (first == null)
available.await();
else {
// 通过延迟任务的 getDelay 获取延迟时间
long delay = first.getDelay(NANOSECONDS);
// 时间到期, 删除并返回队头
if (delay <= 0)
return q.poll();
// 时间未到, 线程进入等待,此时不能持有头节点元素引用, 防止内存泄漏
first = null; // don't retain ref while waiting
// 如果 leader 线程已存在, 当前线程直接等待
if (leader != null)
available.await();
else {
// 当前线程赋值给 leader 并且限时等待获取元素
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 限时等待
available.awaitNanos(delay);
} finally {
// 限时等待完成, 进入下一个循环获取元素返回
// 如果 leader != thisThread, 代表等待期间有新元素添加, 重新选择 leader
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 如果leader线程为null且队头任务不为空,唤醒其中一个等待线程,使之能成为新leader
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}

BlockingQueue
https://haobin.work/2020/12/21/并发/BlockingQueue/
作者
Leo Hao
发布于
2020年12月21日
许可协议