Java并发容器 —— ConcurrentLinkedQueue

 2023-01-31
原文作者:蒋先森 原文地址:https://jlj98.top/

对于实现线程安全队列,现在有两种方式:1、使用阻塞算法;2、使用非阻塞算法。使用阻塞算法的队列是锁的应用。非阻塞则是CAS算法的应用。

简介

CoucurrentLinkedQueue是一个基于链接节点的无界线程安全队列,采用 FIFO 对节点排序。CoucurrentLinkedQueue是一个非阻塞队列。
CoucurrentLinkedQueue规定了如下几个不变性:

  • 在入队的最后一个元素的next为null
  • 队列中所有未删除的节点的item都不能为null且都能从head节点遍历到
  • 对于要删除的节点,不是直接将其设置为null,而是先将其item域设置为null(迭代器会跳过item为null的节点)
  • 允许head和tail更新滞后。这是什么意思呢?意思就说是head、tail不总是指向第一个元素和最后一个元素。

CoucurrentLinkedQueue结构

CoucurrentLinkedQueue 继承 AbstractQueue。CoucurrentLinkedQueue 是由head节点和tair节点组成,每个节点(Node)又节点元素(item)和主项下一个节点的(next)组成。默认情况下head节点存储的元素为空,tair节点等于head节点。

202212301148151621.png

CoucurrentLinkedQueue主要方法:

  • add/offer:add是调用offer方法的,指定元素插入到队列尾部
  • poll:拉取头部元素
  • remove:删除元素

Node

    private static class Node<E> {
        /**节点元素*/
        volatile E item;
        volatile Node<E> next;
        /**初始化,获得item和next的偏移量,为后期的CAS做准备*/
        Node(E item) {
            UNSAFE.putObject(this, itemOffset, item);
        }
    
        boolean casItem(E cmp, E val) {
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }
    
        void lazySetNext(Node<E> val) {
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }
    
        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }
    
        private static final sun.misc.Unsafe UNSAFE;
        private static final long itemOffset;//偏移量
        private static final long nextOffset;//下个元素偏移量
    
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = Node.class;
                itemOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("item"));
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

poll(出队)

    public E poll() {
        restartFromHead://没搞懂这个是干啥子的???
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;
                // item 不为null,则将item 设置为null
                if (item != null && p.casItem(item, null)) {
                    if (p != h) // p != head 则更新head
                    // p.next != null,则将head更新为p.next ,否则更新为p
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                // p.next == null 队列为空
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                // 当一个线程在poll的时候,另一个线程已经把当前的p从队列中删除——将p.next = p,p已经被移除不能继续,需要重新开始
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

其中 updateHead() ,该方法用于CAS更新head节点,如下:

    final void updateHead(Node<E> h, Node<E> p) {
        if (h != p && casHead(h, p))
            h.lazySetNext(h);
    }

offer(入队)

    public boolean offer(E e) {
        /**检查添加的元素是否是null*/
        checkNotNull(e);
        /**创建新的节点*/
        final Node<E> newNode = new Node<E>(e);
        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            //q==null,表示节点p是最后一个节点了,在尾部添加节点
            // 如果插入失败,则表示其他线程已经修改了p的指向
            if (q == null) {
                // casNext:t节点的next指向当前节点
                // casTail:设置tail 尾节点
                if (p.casNext(null, newNode)) {
                    // node 加入节点后会导致tail距离最后一个节点相差大于一个,需要更新tail
                    if (p != t)
                        casTail(t, newNode);
                    return true;
                }
            }
            else if (p == q)
                // p == q 代表着该节点已经被删除了
                // 由于多线程的原因,我们offer()的时候也会poll(),如果offer()的时候正好该节点已经poll()了
                // 那么在poll()方法中的updateHead()方法会将head指向当前的q,而把p.next指向自己,即:p.next == p
                // 这样就会导致tail节点滞后head(tail位于head的前面),则需要重新设置p
                p = (t != (t = tail)) ? t : head;
                // tail并没有指向尾节点
            else
                // tail已经不是最后一个节点,将p指向最后一个节点
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

202212301148161572.png

  • 第一步添加元素1。队列更新head节点的next节点为元素1节点。又因为tail节点默认情况下等于head节点,所以它们的next节点都指向元素1节点。
  • 第二步添加元素2。队列首先设置元素1节点的next节点为元素2节点,然后更新tail节点指向元素2节点。
  • 第三步添加元素3,设置tail节点的next节点为元素3节点。
  • 第四步添加元素4,设置元素3的next节点为元素4节点,然后将tail节点指向元素4节点。

remove

202212301148169003.png

Reference