Java阻塞队列 —— PriorityBlockingQueue

 2023-01-31
原文作者:蒋先森 原文地址:https://jlj98.top/

PriorityBlockingQueue 是一个支持优先级的无界阻塞队列。在Thread中,我们可以通过 setPriority(int newPriority) 来设置优先级,线程优先级高的线程先执行,优先级低的后执行。 PriorityBlockingQueue 默认情况下元素采用自然顺序升序排序,当然我们也可以通过构造函数来指定Comparator来对元素进行排序。需要注意的是PriorityBlockingQueue不能保证同优先级元素的顺序。
PriorityBlockingQueue底层是二叉堆构成实现的,下面先介绍一些二叉堆知识点。

二叉堆

202212301148127491.png

二叉堆是一种特殊的堆,就结构性而言就是完全二叉树或者是近似完全二叉树,满足树结构性和堆序性。树机构特性就是完全二叉树应该有的结构,堆序性则是:父节点的键值总是保持固定的序关系于任何一个子节点的键值,且每个节点的左子树和右子树都是一个二叉堆。

队列结构定义

    public class PriorityBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
        private static final long serialVersionUID = 5595510919245408276L;
        //默认初始化大小11
        private static final int DEFAULT_INITIAL_CAPACITY = 11;
        //定义列表最大容量
        private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
        //二叉堆数组
        private transient Object[] queue;
        private transient int size;//队列个数
        // 比较器,如果为空,则为自然顺序
        private transient Comparator<? super E> comparator;
        private final ReentrantLock lock;
        private final Condition notEmpty;
        }

入队

PriorityBlockingQueue是无界的,所以不可能会阻塞。内部调用offer(E e)。

        public void put(E e) {
            offer(e); // never need to block
        }
        public boolean add(E e) {
            return offer(e);
        }
        public boolean offer(E e) {
            if (e == null)
                throw new NullPointerException();
            final ReentrantLock lock = this.lock;
            lock.lock();
            int n, cap;
            Object[] array;
            while ((n = size) >= (cap = (array = queue).length))
                tryGrow(array, cap);
            try {
                Comparator<? super E> cmp = comparator;
                // 根据比较器是否为null,做不同的处理
                if (cmp == null)
                    siftUpComparable(n, e, array);
                else
                    siftUpUsingComparator(n, e, array, cmp);
                size = n + 1;
                notEmpty.signal();
            } finally {
                lock.unlock();
            }
            return true;
        }
        //采用自然排序,调用siftUpComparable方法
        private static <T> void siftUpComparable(int k, T x, Object[] array) {
            Comparable<? super T> key = (Comparable<? super T>) x;
            while (k > 0) {
                int parent = (k - 1) >>> 1;
                Object e = array[parent];
                if (key.compareTo((T) e) >= 0)
                    break;
                array[k] = e;
                k = parent;
            }
            array[k] = key;
        }
    //当比较器不为null时,采用所指定的比较器,调用siftUpUsingComparator方法
        private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
                                           Comparator<? super T> cmp) {
            while (k > 0) {
                int parent = (k - 1) >>> 1;
                Object e = array[parent];
                if (cmp.compare(x, (T) e) >= 0)
                    break;
                array[k] = e;
                k = parent;
            }
            array[k] = x;
        }

扩容:tryGrow

private void tryGrow(Object[] array, int oldCap) {

        lock.unlock(); // must release and then re-acquire main lock
        Object[] newArray = null;
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {
            try {
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : // grow faster if small
                                       (oldCap >> 1));
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;
                }
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];
            } finally {
                allocationSpinLock = 0;
            }
        }
        if (newArray == null) // back off if another thread is allocating
            Thread.yield();
        lock.lock();
        if (newArray != null && queue == array) {
            queue = newArray;
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
    }

出队

PriorityBlockingQueue提供put()、add()、offer()方法向队列中加入元素。我们这里从put()入手:put(E e) :将指定元素插入此优先级队列。

    public E poll() {
         final ReentrantLock lock = this.lock;
         lock.lock();
         try {
             return dequeue();
         } finally {
             lock.unlock();
         }
     }
     private E dequeue() {
         int n = size - 1;
         if (n < 0)
             return null;
         else {
             Object[] array = queue;
             E result = (E) array[0];
             E x = (E) array[n];
             array[n] = null;
             Comparator<? super E> cmp = comparator;
             if (cmp == null)
                 siftDownComparable(0, x, array, n);
             else
                 siftDownUsingComparator(0, x, array, n, cmp);
             size = n;
             return result;
         }
     }
     private static <T> void siftDownComparable(int k, T x, Object[] array,
                                                int n) {
         if (n > 0) {
             Comparable<? super T> key = (Comparable<? super T>)x;
             int half = n >>> 1;           // loop while a non-leaf
             while (k < half) {
                 int child = (k << 1) + 1; // assume left child is least
                 Object c = array[child];
                 int right = child + 1;
                 if (right < n &&
                     ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                     c = array[child = right];
                 if (key.compareTo((T) c) <= 0)
                     break;
                 array[k] = c;
                 k = child;
             }
             array[k] = key;
         }
     }
    
     private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
                                                     int n,
                                                     Comparator<? super T> cmp) {
         if (n > 0) {
             int half = n >>> 1;
             while (k < half) {
                 int child = (k << 1) + 1;
                 Object c = array[child];
                 int right = child + 1;
                 if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
                     c = array[child = right];
                 if (cmp.compare(x, (T) c) <= 0)
                     break;
                 array[k] = c;
                 k = child;
             }
             array[k] = x;
         }
     }