Java阻塞队列 —— ArrayBlockingQueue

 2023-01-31
原文作者:蒋先森 原文地址:https://jlj98.top/

ArrayBlockingQueue 是一个数组实现的有界阻塞队列,采用FIFO算法对队列元素进行排序。

    ArrayBlockingQueue queue = new ArrayBlockingQueue(100);

定义

    public class ArrayBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
        private static final long serialVersionUID = -817911632652898426L;
        //一个定长数组,维护ArrayBlockingQueue的元素
        final Object[] items;
        //为ArrayBlockingQueue对首位置
        int takeIndex;
        //ArrayBlockingQueue对尾位置
        int putIndex;
        //元素个数
        int count;
        //重入锁,出列入列都必须获取该锁,两个步骤公用一个锁
        final ReentrantLock lock;
        //出列条件
        private final Condition notEmpty;
        //入列条件
        private final Condition notFull;
        transient Itrs itrs = null;
    }

ArrayBlockingQueue继承AbstractQueue,实现BlockingQueue接口。AbstractQueue提供了对queue操作的骨干实现。BlockingQueue继承java.util.Queue为阻塞队列的核心接口,提供了在多线程环境下的出列、入列操作,作为使用者,则不需要关心队列在什么时候阻塞线程,什么时候唤醒线程,所有一切均由BlockingQueue来完成。
ArrayBlockingQueue内部使用可重入锁ReentrantLock + Condition来完成多线程环境的并发操作。

入队

add(E e)

add(E e)调用父类AbstractQueue接口,如果添加成功,返回true,否则抛出 IllegalStateException("Queue full") 异常。

    public boolean add(E e) {
        return super.add(e);
    }
    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

offer(E e)

add方法最后调用的还是offer进行添加元素的。先判断添加的节点是否为null,然后获取lock,如果队列已经满了,返回false,否则调用enqueue添加元素,最后释放lock。

    public boolean offer(E e) {
        checkNotNull(e);//检查添加的节点是否是null
        //获取lock
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
    private void enqueue(E x) {
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

出队

poll

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }

dequeue 方法主要是从列头(takeIndex 位置)取出元素,同时如果迭代器itrs不为null,则需要维护下该迭代器。最后调用notFull.signal()唤醒入列线程。

    private E dequeue() {
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

take

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }