BlockingQueue 介绍 BlockingQueue 是一个先进先出的队列(Queue), 并且当获取队列元素但是队列为空时,会阻塞等待队列中有元素再返回;也支持添加元素时,如果队列已满,那么等到队列可以放入新元素时再放入。
BlockingQueue 对插入、删除、获取元素在不同场景下提供了不同的操作:
抛异常 返回特殊值(成功或失败) 阻塞等待 阻塞等待直至超时 插入 add(e) offer(e) put(e) offer(e, time, unit) 删除 remove() poll() take() poll(time, unit) 获取 element() peek() 无 无
我们重点关注 put
和 take
这两个阻塞操作, BlockingQueue 主要是于消费者-生产者场景的一个线程安全容器
ArrayBlockingQueue ArrayBlockingQueue 是 BlockingQueue 的一个有界队列实现,底层采取数组 并发控制采取可重入锁, 插入和读取操作都需要获取锁 如果队列为空,这个时候读操作的线程进入到读线程队列排队,等待写线程写入新的元素,然后唤醒读线程队列的第一个等待线程 如果队列已满,这个时候写操作的线程进入到写线程队列排队,等待读线程将队列元素移除腾出空间,然后唤醒写线程队列的第一个等待线程 主要属性 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 final Object[] items;int takeIndex;int putIndex;int count ;final ReentrantLock lock;private final Condition notEmpty;private final Condition notFull;
阻塞操作 take & put put 操作流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public void put (E e ) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this .lock ; lock .lockInterruptibly(); try { while (count == items.length) notFull.await (); enqueue(e); } finally { lock .unlock(); } }private void enqueue (E x ) { final Object[] items = this .items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0 ; count++; notEmpty.signal(); }
take 操作流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public E take () throws InterruptedException { final ReentrantLock lock = this .lock ; lock .lockInterruptibly(); try { while (count == 0 ) notEmpty.await (); return dequeue(); } finally { lock .unlock(); } }private E dequeue () { final Object[] items = this .items; @SuppressWarnings("unchecked" ) E x = (E) items[takeIndex]; items[takeIndex] = null ; if (++takeIndex == items.length) takeIndex = 0 ; count--; if (itrs != null ) itrs.elementDequeued(); notFull.signal(); return x; }
LinkedBlockingQueue LinkedBlockingQueue 底层基于单向链表,可以作为无界队列也可作为有界队列 如果要获取(take)一个元素,需要获取 takeLock 锁,但是获取了锁还不够,如果队列此时为空,还需要队列不为空(notEmpty)这个条件(Condition) 如果要插入(put)一个元素,需要获取 putLock 锁,但是获取了锁还不够,如果队列此时已满,还需要队列不是满的(notFull)这个条件(Condition) 主要属性 构造方法:
1 2 3 4 5 6 7 8 9 10 11 public LinkedBlockingQueue () { this (Integer.MAX_VALUE); }public LinkedBlockingQueue (int capacity ) { if (capacity <= 0 ) throw new IllegalArgumentException(); this .capacity = capacity; last = head = new Node<E>(null ); }
类属性:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private final int capacity;private final AtomicInteger count = new AtomicInteger (0 );private transient Node<E> head;private transient Node<E> last;private final ReentrantLock takeLock = new ReentrantLock ();private final Condition notEmpty = takeLock.newCondition();private final ReentrantLock putLock = new ReentrantLock ();private final Condition notFull = putLock.newCondition();
阻塞操作 put & take put 操作流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public void put (E e) throws InterruptedException { if (e == null ) throw new NullPointerException (); int c = -1 ; Node<E> node = new Node (e); final ReentrantLock putLock = this .putLock; final AtomicInteger count = this .count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0 ) signalNotEmpty(); }private void enqueue (Node<E> node) { last = last.next = node; }private void signalNotEmpty () { final ReentrantLock takeLock = this .takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
take 操作流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 public E take() throws InterruptedException { E x int c = -1 final AtomicInteger count = this.count final ReentrantLock takeLock = this.takeLock // 首先,需要获取到 takeLock 才能进行出队操作 takeLock.lockInterruptibly() try { // 如果队列为空,等待 notEmpty 这个条件满足再继续执行 while (count.get() = = 0 ) { notEmpty.await() } // 出队 x = dequeue() // count 进行原子减 1 c = count.getAndDecrement() // 如果这次出队后,队列中至少还有一个元素,那么调用 notEmpty.signal() 唤醒其他的读线程 if (c > 1 ) notEmpty.signal() } finally { // 出队后释放掉 takeLock takeLock.unlock() } // 如果 c = = capacity,那么说明在这个 take 方法发生的时候,队列是满的 // 既然出队了一个,那么意味着队列不满了,唤醒写线程去写 if (c = = capacity) signalNotFull() return x } // 取队头,出队 private E dequeue() { // assert takeLock.isHeldByCurrentThread() // assert head.item = = null // 之前说了,头结点是空的 Node<E> h = head Node<E> first = h.next h.next = h // 设置这个为新的头结点 head = first E x = first.item first.item = null return x } // 元素出队后,如果需要,调用这个方法唤醒写线程来写 private void signalNotFull() { final ReentrantLock putLock = this.putLock putLock.lock() try { notFull.signal() } finally { putLock.unlock() } }
SynchronousQueue 同步队列: 当一个线程往队列中写入一个元素时,写入操作不会立即返回,需要等待另一个线程来将这个元素拿走;同理,当一个读线程做读操作的时候,同样需要一个相匹配的写线程的写操作 SynchronousQueue 不提供任何空间来存储元素。数据必须从某个写线程交给某个读线程,而不是写到某个队列中等待被消费 没有 peek 方法(直接返回null) 构造方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public SynchronousQueue (boolean fair) { transferer = fair ? new TransferQueue() : new TransferStack(); }abstract static class Transferer { abstract Object transfer (Object e, boolean timed, long nanos) ; }
put & take 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public void put (E o) throws InterruptedException { if (o == null ) throw new NullPointerException (); if (transferer.transfer(o, false , 0 ) == null ) { Thread.interrupted(); throw new InterruptedException (); } }public E take () throws InterruptedException { Object e = transferer.transfer(null , false , 0 ); if (e != null ) return (E)e; Thread.interrupted(); throw new InterruptedException (); }
transfer 分析(queue 公平模式) put(E o)
和 take()
都调用了 transferer.transfer()
, 区别是 take 操作的第一个参数为 null(则该操作为读操作)
transfer
整体的设计思路如下:
当调用这个方法时,如果队列是空的,或者队列中的节点和当前的线程操作类型一致(如当前操作是 put 操作,而队列中的元素也都是写线程)。这种情况下,将当前线程加入到等待队列 如果队列中有等待节点,而且与当前操作可以匹配(如队列中都是读操作线程,当前线程是写操作线程,反之亦然)。这种情况下,匹配等待队列的队头,出队,返回相应数据 等待队列节点 QNode
的结构如下:
1 2 3 4 5 6 7 8 9 10 11 static final class QNode { volatile QNode next; volatile Object item; volatile Thread waiter; final boolean isData; QNode (Object item, boolean isData) { this .item = item; this .isData = isData; } ......
整个transfer流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 Object transfer (Object e, boolean timed, long nanos) { QNode s = null ; boolean isData = (e != null ); for (;;) { QNode t = tail; QNode h = head; if (t == null || h == null ) continue ; if (h == t || t.isData == isData) { QNode tn = t.next; if (t != tail) continue ; if (tn != null ) { advanceTail(t, tn); continue ; } if (timed && nanos <= 0 ) return null ; if (s == null ) s = new QNode (e, isData); if (!t.casNext(null , s)) continue ; advanceTail(t, s); Object x = awaitFulfill(s, e, timed, nanos); if (x == s) { clean(t, s); return null ; } if (!s.isOffList()) { advanceHead(t, s); if (x != null ) s.item = s; s.waiter = null ; } return (x != null ) ? x : e; } else { QNode m = h.next; if (t != tail || m == null || h != head) continue ; Object x = m.item; if (isData == (x != null ) || x == m || !m.casItem(x, e)) { advanceHead(h, m); continue ; } advanceHead(h, m); LockSupport.unpark(m.waiter); return (x != null ) ? x : e; } } }void advanceTail (QNode t, QNode nt) { if (tail == t) UNSAFE.compareAndSwapObject(this , tailOffset, t, nt); } Object awaitFulfill (QNode s, Object e, boolean timed, long nanos) { long lastTime = timed ? System.nanoTime() : 0 ; Thread w = Thread.currentThread(); int spins = ((head.next == s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0 ); for (;;) { if (w.isInterrupted()) s.tryCancel(e); Object x = s.item; if (x != e) return x; if (timed) { long now = System.nanoTime(); nanos -= now - lastTime; lastTime = now; if (nanos <= 0 ) { s.tryCancel(e); continue ; } } if (spins > 0 ) --spins; else if (s.waiter == null ) s.waiter = w; else if (!timed) LockSupport.park(this ); else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this , nanos); } }
非公平模式 TransferStack 上面分析了公平模式 TransferQueue, TransferStack 流程类似:
如果队列是空的,或者队列中的节点和当前的线程操作类型一致(如当前操作是 put 操作,而栈中的元素也都是写线程)。这种情况下,将当前线程加入到等待栈中,等待配对 如果栈中有等待节点,而且与当前操作可以匹配(如栈里面都是读操作线程,当前线程是写操作线程,反之亦然)。将当前节点压入栈顶,和栈中的节点进行匹配,然后将这两个节点出栈 如果栈顶是进行匹配而入栈的节点,帮助其进行匹配并出栈,然后再继续操作 PriorityBlockingQueue PriorityQueue 的线程安全版本 插入值不可为null, 并且是 comparable 的(否则会抛出 ClassCastException) 相较于其他 BlockingQueue,PriorityBlockingQueue 的 put 操作不会阻塞, 因为它是无界队列 主要属性 前面说了 PriorityBlockingQueue 是 PriorityQueue 的线程安全版本, 所以基本的存储结构也与 PriorityQueue 一样.
使用一个基于数组的二叉堆来存储, 采取同一个 lock 来控制并发 二叉堆(小顶堆), 每个节点的值都小于其左右子节点的值, 二叉堆中最小的值就是根节点 对于数组中的元素 a[i]
,其左子节点为 a[2*i+1]
,其右子节点为 a[2*i + 2]
,其父节点为 a[(i-1)/2]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private static final int DEFAULT_INITIAL_CAPACITY = 11 ;private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8 ;private transient Object [] queue;private transient int size ;private transient Comparator<? super E> comparator;private final ReentrantLock lock;private final Condition notEmpty;private transient volatile int allocationSpinLock;private PriorityQueue q;
自动扩容 PriorityBlockingQueue 实现了并发安全的自动扩容, tryGrow
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 private void tryGrow(Object[] array, int oldCap) { lock.unlock(); Object[] new Array = null ; if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this , allocationSpinLockOffset, 0 , 1 )) { try { int new Cap = oldCap + ((oldCap < 64 ) ? (oldCap + 2 ) : (oldCap >> 1 )); if (new Cap - MAX_ARRAY_SIZE > 0 ) { int minCap = oldCap + 1 ; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError (); new Cap = MAX_ARRAY_SIZE; } if (new Cap > oldCap && queue == array) new Array = new Object [new Cap ]; } finally { allocationSpinLock = 0 ; } } if (new Array == null ) Thread.yield(); lock.lock(); if (new Array != null && queue == array) { queue = new Array ; System.arraycopy(array, 0 , new Array , 0 , oldCap); } }
put & take 操作 put 流程与 PriorityQueue 类似, 都是先插入到最后,然后与父节点比较,直到父节点小于插入元素,不过加了一个 lock,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public void put (E e ) { offer(e); }public boolean offer (E e ) { if (e == null ) throw new NullPointerException(); final ReentrantLock lock = this .lock ; lock .lock (); int n, cap; Object[] array; while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; if (cmp == null ) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp); size = n + 1 ; notEmpty.signal(); } finally { lock .unlock(); } return true ; }
take 流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 public E take() throws InterruptedException { final ReentrantLock lock = this .lock; lock.lockInterruptibly(); E result; try { while ( (result = dequeue()) == null ) notEmpty.await(); } finally { lock.unlock(); } return result; }private E dequeue() { int n = size - 1 ; if (n < 0 ) return null ; else { Object[] array = queue; E result = (E) array [0 ]; E x = (E) array [n]; array [n] = null ; Comparator<? super E> cmp = comparator; if (cmp == null ) siftDownComparable(0 , x, array , n); else siftDownUsingComparator(0 , x, array , n, cmp); size = n; return result; } }private static <T> void siftDownComparable(int k, T x, Object[] array , int n) { if (n > 0 ) { Comparable<? super T> key = (Comparable<? super T>)x; int half = n >>> 1 ; while (k < half) { int child = (k << 1 ) + 1 ; Object c = array [child]; int right = child + 1 ; if (right < n && ((Comparable<? super T>) c).compareTo((T) array [right]) > 0 ) c = array [child = right]; if (key.compareTo((T) c) <= 0 ) break ; array [k] = c; k = child; } array [k] = key; } }
DelayQueue DelayQueue 是用优先队列实现的无界阻塞队列
主要属性 1 2 3 4 5 6 7 8 9 10 11 public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue <E > { private final transient ReentrantLock lock = new ReentrantLock (); private final PriorityQueue <E > q = new PriorityQueue <E >(); private Thread leader = null ; private final Condition available = lock.newCondition(); }
put & take 操作 put 操作如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public void put (E e ) { offer(e); }public boolean offer (E e ) { final ReentrantLock lock = this .lock ; lock .lock (); try { q.offer(e); if (q.peek() == e) { leader = null ; available.signal(); } return true ; } finally { lock .unlock(); } }
take 操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public E take () throws InterruptedException { final ReentrantLock lock = this .lock ; lock .lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null ) available.await (); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0 ) return q.poll(); first = null ; if (leader != null ) available.await (); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null ; } } } } } finally { if (leader == null && q.peek() != null ) available.signal(); lock .unlock(); } }