Java阻塞队列 —— LinkedBlockingDeque 和 LinkedBlockingQueue

 2023-01-31
原文作者:蒋先森 原文地址:https://jlj98.top/

LinkedBlockingDeque 则是一个由链表组成的双向阻塞队列。可以从对头、对尾两端插入和移除元素,同样意味着 LinkedBlockingDeque 支持FIFO、FILO两种操作方式。LinkedBlockingDeque 是可选容量的,在初始化时可以设置容量防止其过度膨胀,如果不设置,默认容量大小为Integer.MAX_VALUE。

LinkedBlockingQueue 是一个由链表组成的,只能从一端出一端入,支持 FIFO,并通过 ReentrantLock 和 两个Condition实现。

LinkedBlockingDeque

结构定义

通过上面的Lock可以看出,LinkedBlockingDeque底层实现机制与LinkedBlockingQueue一样,依然是通过互斥锁ReentrantLock 来实现,notEmpty 、notFull 两个Condition做协调生产者、消费者问题。

    public class LinkedBlockingDeque<E>
        extends AbstractQueue<E>
        implements BlockingDeque<E>, java.io.Serializable {
            static final class Node<E> {
            E item;
            Node<E> prev;
            Node<E> next;
            Node(E x) {
                item = x;
            }
        }
        transient Node<E> first;// 双向链表的表头
        transient Node<E> last;// 双向链表的表尾
        private transient int count;
        private final int capacity;//容量
        final ReentrantLock lock = new ReentrantLock();
        private final Condition notEmpty = lock.newCondition();
        private final Condition notFull = lock.newCondition();

基本API

LinkedBlockingDeque 的add、put、offer、take、peek、poll系列方法都是通过调用XXXFirst,XXXLast方法。因此只需要了解putFirst、putLast、pollFirst、pollLast。

putFirst

putFirst(E e) :将指定的元素插入此双端队列的开头,必要时将一直等待可用空间。
先获取锁,然后调用linkFirst方法入列,最后释放锁。如果队列是满的则在notFull上面等待。linkFirst设置Node为对头。

        public void putFirst(E e) throws InterruptedException {
            if (e == null) throw new NullPointerException();
            Node<E> node = new Node<E>(e);//生成一个新节点
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                while (!linkFirst(node))//调用linkFirst添加节点
                    notFull.await();
            } finally {
                lock.unlock();
            }
        }
linkFirst主要是设置node节点队列的列头节点,成功返回true,如果队列满了返回false。整个过程还是比较简单的。
        private boolean linkFirst(Node<E> node) {
            // assert lock.isHeldByCurrentThread();
            if (count >= capacity)
                return false;
            Node<E> f = first;//首节点
            node.next = f;// 新节点的next指向原first
            first = node;// 设置node为新的first
            if (last == null)
            // 没有尾节点,设置node为尾节点
                last = node;
            else
            // 有尾节点,那就将之前first的pre指向新增node
                f.prev = node;
            ++count;
            notEmpty.signal();
            return true;
        }

putLast

putLast(E e) :将指定的元素插入此双端队列的末尾,必要时将一直等待可用空间。

    public void putLast(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //调用linkLast将节点Node链接到队列尾部
            while (!linkLast(node))
                notFull.await();
        } finally {
            lock.unlock();
        }
    }
    private boolean linkLast(Node<E> node) {
        if (count >= capacity)
            return false;
        Node<E> l = last;//获取尾节点
        node.prev = l; // 将Node的前驱指向原本的last
        last = node;// 将node设置为last
        if (first == null)
        // 首节点为null,则设置node为first
            first = node;
        else
        //非null,说明之前的last有值,就将之前的last的next指向node
            l.next = node;
        ++count;
        notEmpty.signal();
        return true;
    }

pollFirst

pollFirst():获取并移除此双端队列的第一个元素;如果此双端队列为空,则返回 null。

    public E pollFirst() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return unlinkFirst();
        } finally {
            lock.unlock();
        }
    }
    private E unlinkFirst() {
        // assert lock.isHeldByCurrentThread();
        Node<E> f = first;
        if (f == null)
            return null;
        Node<E> n = f.next;
        E item = f.item;
        f.item = null;
        f.next = f; // help GC
        first = n;
        if (n == null)
            last = null;
        else
            n.prev = null;
        --count;
        notFull.signal();
        return item;
    }

pollLast

pollLast():获取并移除此双端队列的最后一个元素;如果此双端队列为空,则返回 null。

    public E pollLast() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return unlinkLast();
        } finally {
            lock.unlock();
        }
    }
    private E unlinkLast() {
        // assert lock.isHeldByCurrentThread();
        Node<E> l = last;
        if (l == null)
            return null;
        Node<E> p = l.prev;
        E item = l.item;
        l.item = null;
        l.prev = l; // help GC
        last = p;
        if (p == null)
            first = null;
        else
            p.next = null;
        --count;
        notFull.signal();
        return item;
    }

LinkedBlockingQueue

结构

这是一个只能一端出一端入的单向队列结构,具有 FIFO 特性。

    public class LinkedBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
        /**
         * 主要 node节点
         */
        static class Node<E> {
            E item;
            Node<E> next;
            Node(E x) { item = x; }
        }
    
        /** 边界限制,否则是Integer.MAX_VALUE */
        private final int capacity;
    
        /** 用AtomicInteger 来记录数量 */
        private final AtomicInteger count = new AtomicInteger();
    
        /**链表头结点*/
        transient Node<E> head;
    
        /**链表last节点*/
        private transient Node<E> last;
    
        /**take 锁*/
        private final ReentrantLock takeLock = new ReentrantLock();
    
        /** 等待take的节点序列 */
        private final Condition notEmpty = takeLock.newCondition();
    
        /** put锁*/
        private final ReentrantLock putLock = new ReentrantLock();
    
        /**put等待队列*/
        private final Condition notFull = putLock.newCondition();

put操作

put操作是向队列尾部插入一个元素,具体源码如下:

    public void put(E e) throws InterruptedException {
        //插入的元素不能为null
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;//获取put锁
        final AtomicInteger count = this.count;//获取count
        putLock.lockInterruptibly();
        try {
            //如果队列满了,使用notFull阻塞
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();//CAS增加count
            //如果队列有空间了,notFull唤醒
            if (c + 1 < capacity)
                notFull.signal();
        } finally {//释放锁
            putLock.unlock();
        }
        //如果队列size为0,也要
        if (c == 0)
            signalNotEmpty();
    }
    private void enqueue(Node<E> node) {
        //入队操作,对尾插入元素
        last = last.next = node;
    }
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();//加锁
        try {
            notEmpty.signal();//用于signal,notEmpty
        } finally {
            takeLock.unlock();
        }
    }

take操作

take操作是从对了中弹出一个元素:

    public E take() throws InterruptedException {
        E x;
        int c = -1;//设定一个记录变量
        final AtomicInteger count = this.count;//获取count
        final ReentrantLock takeLock = this.takeLock;//获取take锁
        takeLock.lockInterruptibly();//加锁
        try {
            //如果队列中没有元素,就阻塞性等待
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            //队列还有元素,唤醒队列
            if (c > 1)
                notEmpty.signal();
        } finally {//释放锁
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();//解锁
        return x;
    }
    private E dequeue() {
        // 
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC 指向自己,帮助GC回收
        head = first;
        E x = first.item;//从队列头部弹出元素
        first.item = null;//将head.item设为null
        return x;
    }