2023-08-15
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/39

一、SynchronousQueue简介

SynchronousQueue是JDK1.5时,随着J.U.C包一起引入的一种阻塞队列,它实现了BlockingQueue接口,底层基于 队列 实现:

202308152204441021.png

没有看错,SynchronousQueue的底层实现包含两种数据结构—— 队列 。这是一种非常特殊的阻塞队列,它的特点简要概括如下:

  1. 入队线程和出队线程必须一一匹配,否则任意先到达的线程会阻塞。比如ThreadA进行入队操作,在有其它线程执行出队操作之前,ThreadA会一直等待,反之亦然;
  2. SynchronousQueue内部不保存任何元素,也就是说它的容量为0,数据直接在配对的生产者和消费者线程之间传递,不会将数据缓冲到队列中。
  3. SynchronousQueue支持公平/非公平策略。其中非公平模式,基于内部数据结构——“栈”来实现,公平模式,基于内部数据结构——“队列”来实现;
  4. SynchronousQueue基于一种名为“Dual stack and Dual queue”的无锁算法实现。

注意:上述的特点1,和我们之前介绍的Exchanger其实非常相似,可以类比Exchanger的功能来理解。

二、SynchronousQueue原理

2.1 构造

之前提到,SynchronousQueue根据公平/非公平访问策略的不同,内部使用了两种不同的数据结构:栈和队列。我们先来看下对象的构造,SynchronousQueue只有2种构造器:

    /**
     * 默认构造器.
     * 默认使用非公平策略.
     */
    public SynchronousQueue() {
        this(false);
    }
    /**
     * 指定策略的构造器.
     */
    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }

可以看到,对于公平策略,内部构造了一个 TransferQueue 对象,而非公平策略则是构造了 TransferStack 对象。这两个类都继承了内部类 Transferer ,SynchronousQueue中的所有方法,其实都是委托调用了TransferQueue/TransferStack的方法:

    public class SynchronousQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
    
        /**
         * tranferer对象, 构造时根据策略类型确定.
         */
        private transient volatile Transferer<E> transferer;
    
        /**
         * Shared internal API for dual stacks and queues.
         */
        abstract static class Transferer<E> {
            /**
             * Performs a put or take.
             *
             * @param e 非null表示 生产者 -> 消费者;
             *          null表示, 消费者 -> 生产者.
             * @return 非null表示传递的数据; null表示传递失败(超时或中断).
             */
            abstract E transfer(E e, boolean timed, long nanos);
        }
    
        /**
         * Dual stack(双栈结构).
         * 非公平策略时使用.
         */
        static final class TransferStack<E> extends Transferer<E> {
            // ...
        }
    
        /**
         * Dual Queue(双端队列).
         * 公平策略时使用.
         */
        static final class TransferQueue<E> extends Transferer<E> {
            // ...
        }
    
        // ...
    }

2.2 栈结构

非公平策略由TransferStack类实现,既然TransferStack是栈,那就有结点。TransferStack内部定义了名为 SNode 的结点:

    static final class SNode {
        volatile SNode next;
        volatile SNode match;       // 与当前结点配对的结点
        volatile Thread waiter;     // 当前结点对应的线程
        Object item;                // 实际数据或null
        int mode;                   // 结点类型
    
        SNode(Object item) {
            this.item = item;
        }
    
        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long matchOffset;
        private static final long nextOffset;
    
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = SNode.class;
                matchOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("match"));
                nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    
        // ...
    
    }

上述SNode结点的定义中有个mode字段,表示结点的类型。TransferStack一共定义了 三种结点类型 ,任何线程对TransferStack的操作都会创建下述三种类型的某种结点:

  • REQUEST :表示未配对的消费者(当线程进行出队操作时,会创建一个mode值为REQUEST的SNode结点 )
  • DATA :表示未配对的生产者(当线程进行入队操作时,会创建一个mode值为DATA的SNode结点 )
  • FULFILLING :表示配对成功的消费者/生产者
    static final class TransferStack<E> extends Transferer<E> {
    
        /**
         * 未配对的消费者
         */
        static final int REQUEST = 0;
        /**
         * 未配对的生产者
         */
        static final int DATA = 1;
        /**
         * 配对成功的消费者/生产者
         */
        static final int FULFILLING = 2;
    
         volatile SNode head;
    
        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long headOffset;
    
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = TransferStack.class;
                headOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("head"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    
        // ...
    }

2.3 核心操作——put/take

SynchronousQueue的入队操作调用了 put 方法:

    /**
     * 入队指定元素e.
     * 如果没有另一个线程进行出队操作, 则阻塞该入队线程.
     */
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, false, 0) == null) {
            Thread.interrupted();
            throw new InterruptedException();
        }
    }

SynchronousQueue的出队操作调用了 take 方法:

    /**
     * 出队一个元素.
     * 如果没有另一个线程进行出队操作, 则阻塞该入队线程.
     */
    public E take() throws InterruptedException {
        E e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }

可以看到,SynchronousQueue一样不支持null元素,实际的入队/出队操作都是委托给了 transfer 方法,该方法返回null表示出/入队失败(通常是线程被中断或超时):

    /**
     * 入队/出队一个元素.
     */
    E transfer(E e, boolean timed, long nanos) {
        SNode s = null; // s表示新创建的结点
        // 入参e==null, 说明当前是出队线程(消费者), 否则是入队线程(生产者)
        // 入队线程创建一个DATA结点, 出队线程创建一个REQUEST结点
        int mode = (e == null) ? REQUEST : DATA;
    
        for (; ; ) {    // 自旋
            SNode h = head;
            if (h == null || h.mode == mode) {          // CASE1: 栈为空 或 栈顶结点类型与当前mode相同
                if (timed && nanos <= 0) {              // case1.1: 限时等待的情况
                    if (h != null && h.isCancelled())
                        casHead(h, h.next);
                    else
                        return null;
                } else if (casHead(h, s = snode(s, e, h, mode))) {  // case1.2 将当前结点压入栈
                    SNode m = awaitFulfill(s, timed, nanos);        // 阻塞当前调用线程
                    if (m == s) {                                   // 阻塞过程中被中断
                        clean(s);
                        return null;
                    }
    
                    // 此时m为配对结点
                    if ((h = head) != null && h.next == s)
                        casHead(h, s.next);
    
                    // 入队线程null, 出队线程返回配对结点的值
                    return (E) ((mode == REQUEST) ? m.item : s.item);
                }
                // 执行到此处说明入栈失败(多个线程同时入栈导致CAS操作head失败),则进入下一次自旋继续执行
    
            } else if (!isFulfilling(h.mode)) {          // CASE2: 栈顶结点还未配对成功
                if (h.isCancelled())                     // case2.1: 元素取消情况(因中断或超时)的处理
                    casHead(h, h.next);
                else if (casHead(h, s = snode(s, e,
                    h, FULFILLING | mode))) {      // case2.2: 将当前结点压入栈中
                    for (; ; ) {
                        SNode m = s.next;       // s.next指向原栈顶结点(也就是与当前结点匹配的结点)
                        if (m == null) {        // m==null说明被其它线程抢先匹配了, 则跳出循环, 重新下一次自旋
                            casHead(s, null);
                            s = null;
                            break;
                        }
    
                        SNode mn = m.next;
                        if (m.tryMatch(s)) {    // 进行结点匹配
                            casHead(s, mn);     // 匹配成功, 将匹配的两个结点全部弹出栈
                            return (E) ((mode == REQUEST) ? m.item : s.item);   // 返回匹配值
                        } else                  // 匹配失败
                            s.casNext(m, mn);   // 移除原待匹配结点
                    }
                }
            } else {                            // CASE3: 其它线程正在匹配
                SNode m = h.next;
                if (m == null)                  // 栈顶的next==null, 则直接弹出, 重新进入下一次自旋
                    casHead(h, null);
                else {                          // 尝试和其它线程竞争匹配
                    SNode mn = m.next;
                    if (m.tryMatch(h))
                        casHead(h, mn);         // 匹配成功
                    else
                        h.casNext(m, mn);       // 匹配失败(被其它线程抢先匹配成功了)
                }
            }
        }
    }

整个 transfer 方法考虑了限时等待的情况,且入队/出队其实都是调用了同一个方法,其主干逻辑就是在一个自旋中完成以下三种情况之一的操作,直到成功,或者被中断或超时取消:

  1. 栈为空,或栈顶结点类型与当前入队结点相同。这种情况,调用线程会阻塞;
  2. 栈顶结点还未配对成功,且与当前入队结点可以配对。这种情况,直接进行配对操作;
  3. 栈顶结点正在配对中。这种情况,直接进行下一个结点的配对。

2.4 出/入队示例讲解

为了便于理解,我们来看下面这个调用示例(假设不考虑限时等待的情况),假设一共有三个线程ThreadA、ThreadB、ThreadC:

①初始栈结构

初始栈为空,head为栈顶指针,始终指向栈顶结点:

202308152204446232.png

②ThreadA(生产者)执行入队操作

由于此时栈为空,所以ThreadA会进入 CASE1 ,创建一个类型为DATA的结点:

    if (h == null || h.mode == mode) {          // CASE1: 栈为空 或 栈顶结点类型与当前mode相同
        if (timed && nanos <= 0) {              // case1.1: 限时等待的情况
            if (h != null && h.isCancelled())
                casHead(h, h.next);
            else
                return null;
        } else if (casHead(h, s = snode(s, e, h, mode))) {  // case1.2 将当前结点压入栈
            SNode m = awaitFulfill(s, timed, nanos);        // 阻塞当前调用线程
            if (m == s) {                                   // 阻塞过程中被中断
                clean(s);
                return null;
            }
    
            // 此时m为配对结点
            if ((h = head) != null && h.next == s)
                casHead(h, s.next);
    
            // 入队线程null, 出队线程返回配对结点的值
            return (E) ((mode == REQUEST) ? m.item : s.item);
        }
        // 执行到此处说明入栈失败(多个线程同时入栈导致CAS操作head失败),则进入下一次自旋继续执行
    }

CASE1 分支中,将结点压入栈后,会调用awaitFulfill方法,该方法会阻塞调用线程:

    /**
     * 阻塞当前调用线程, 并将线程信息记录在s.waiter字段上.
     *
     * @param s 等待的结点
     * @return 返回配对的结点 或 当前结点(说明线程被中断了)
     */
    SNode awaitFulfill(SNode s, boolean timed, long nanos) {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        Thread w = Thread.currentThread();
    
        // 性能优化操作(计算自旋次数)
        int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0);
        for (; ; ) {
            if (w.isInterrupted())
                s.tryCancel();
            /**
             * s.match保存当前结点的匹配结点.
             * s.match==null说明还没有匹配结点
             * s.match==s说明当前结点s对应的线程被中断了
             */
            SNode m = s.match;
            if (m != null)
                return m;
            if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    s.tryCancel();
                    continue;
                }
            }
            if (spins > 0)
                spins = shouldSpin(s) ? (spins - 1) : 0;
            else if (s.waiter == null)  // 还没有匹配结点, 则保存当前线程
                s.waiter = w;           // s.waiter保存当前阻塞线程
            else if (!timed)
                LockSupport.park(this); // 阻塞当前线程
            else if (nanos > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanos);
        }
    }

此时栈结构如下,结点的waiter字段保存着创建该结点的线程ThreadA,ThreadA等待着被配对消费者线程唤醒:

202308152204452413.png

③ThreadB(生产者)执行入队操作

此时栈顶结点的类型和ThreadB创建的结点相同(都是DATA类型的结点),所以依然走 CASE1 分支,直接将结点压入栈:

202308152204457124.png

④ThreadC(消费者)执行出队操作

此时栈顶结点的类型和ThreadC创建的结点匹配(栈顶DATA类型,ThreadC创建的是REQUEST类型),所以走 CASE2 分支,该分支会将匹配的两个结点弹出栈:

    else if (!isFulfilling(h.mode)) {          // CASE2: 栈顶结点还未配对成功
        if (h.isCancelled())                     // case2.1: 元素取消情况(因中断或超时)的处理
            casHead(h, h.next);
        else if (casHead(h, s = snode(s, e,
            h, FULFILLING | mode))) {      // case2.2: 将当前结点压入栈中
            for (; ; ) {
                SNode m = s.next;       // s.next指向原栈顶结点(也就是与当前结点匹配的结点)
                if (m == null) {        // m==null说明被其它线程抢先匹配了, 则跳出循环, 重新下一次自旋
                    casHead(s, null);
                    s = null;
                    break;
                }
    
                SNode mn = m.next;
                if (m.tryMatch(s)) {    // 进行结点匹配
                    casHead(s, mn);     // 匹配成功, 将匹配的两个结点全部弹出栈
                    return (E) ((mode == REQUEST) ? m.item : s.item);   // 返回匹配值
                } else                  // 匹配失败
                    s.casNext(m, mn);   // 移除原待匹配结点
            }
        }
    }

上述 isFulfilling 方法就是判断结点是否匹配:

    /**
     * 判断m是否已经配对成功.
     */
    static boolean isFulfilling(int m) {
        return (m & FULFILLING) != 0;
    }

ThreadC创建结点并压入栈后,栈的结构如下:

202308152204462845.png

此时,ThreadC会调用 tryMatch 方法进行匹配,该方法的主要作用有两点:

  1. 将待结点的match字段置为与当前配对的结点(如上图中,结点m是待配对结点,最终m.math == s
  2. 唤醒待配对结点中的线程(如上图中,唤醒结点m中ThreadB线程)
    /**
     * 尝试将当前结点和s结点配对.
     */
    boolean tryMatch(SNode s) {
        if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
            Thread w = waiter;
            if (w != null) {    // 唤醒当前结点对应的线程
                waiter = null;
                LockSupport.unpark(w);
            }
            return true;
        }
        return match == s;      // 配对成功返回true
    }

匹配完成后,会将匹配的两个结点弹出栈,并返回匹配值:

    if (m.tryMatch(s)) {    // 进行结点匹配
        casHead(s, mn);     // 匹配成功, 将匹配的两个结点全部弹出栈
        return (E) ((mode == REQUEST) ? m.item : s.item);   // 返回匹配值
    }

最终,ThreadC拿到了等待配对结点中的数据并返回,此时栈的结构如下:

202308152204468086.png

注意: CASE2 分支中ThreadC创建的结点的mode值并不是REQUEST,其mode值为FULFILLING | modeFULFILLING | mode的主要作用就是给栈顶结点置一个标识(二进制为11或10), 表示当前有线程正在对栈顶匹配 ,这时如果有其它线程进入自旋(并发情况),则CASE2一定失败,因为isFulfilling的结果必然为true,所以会进入 CASE3 分支——跳过栈顶结点进行匹配。

    casHead(h, s = snode(s, e, h, FULFILLING | mode))

⑤ThreadB(生产者)唤醒后继续执行

ThreadB被唤醒后,会从原阻塞处继续执行,并进入下一次自旋,在下一次自旋中,由于结点的match字段已经有了匹配结点,所以直接返回配对结点:

    /**
     * 阻塞当前调用线程, 并将线程信息记录在s.waiter字段上.
     *
     * @param s 等待的结点
     * @return 返回配对的结点 或 当前结点(说明线程被中断了)
     */
    SNode awaitFulfill(SNode s, boolean timed, long nanos) {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        Thread w = Thread.currentThread();
    
        // 性能优化操作(计算自旋次数)
        int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0);
        for (; ; ) {
            if (w.isInterrupted())
                s.tryCancel();
            /**
             * s.match保存当前结点的匹配结点.
             * s.match==null说明还没有匹配结点
             * s.match==s说明当前结点s对应的线程被中断了
             */
            SNode m = s.match;
            if (m != null)
                return m;
            if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    s.tryCancel();
                    continue;
                }
            }
            if (spins > 0)
                spins = shouldSpin(s) ? (spins - 1) : 0;
            else if (s.waiter == null)  // 还没有匹配结点, 则保存当前线程
                s.waiter = w;           // s.waiter保存当前阻塞线程
            else if (!timed)
                LockSupport.park(this); // 阻塞当前线程
            else if (nanos > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanos);
        }
    }

最终,在下面分支中返回:

    else if (casHead(h, s = snode(s, e, h, mode))) {  // case1.2 将当前结点压入栈
        SNode m = awaitFulfill(s, timed, nanos);        // 阻塞当前调用线程
        if (m == s) {                                   // 阻塞过程中被中断
            clean(s);
            return null;
        }
    
        // 此时m为配对结点
        if ((h = head) != null && h.next == s)
            casHead(h, s.next);
    
        // 入队线程null, 出队线程返回配对结点的值
        return (E) ((mode == REQUEST) ? m.item : s.item);
    }

注意: 对于 入队 线程(生产者),返回的是它入队时携带的 原有元素值。


2.5 队列结构

SynchronousQueue的公平策略由 TransferQueue 类实现,TransferQueue内部定义了名为QNode的结点,一个head队首指针,一个tail队尾指针:

    /**
     * Dual Queue(双端队列).
     * 公平策略时使用.
     */
    static final class TransferQueue<E> extends Transferer<E> {
    
        /**
         * Head of queue
         */
        transient volatile QNode head;
        /**
         * Tail of queue
         */
        transient volatile QNode tail;
        /**
         * Reference to a cancelled node that might not yet have been
         * unlinked from queue because it was the last inserted node
         * when it was cancelled.
         */
        transient volatile QNode cleanMe;
    
        /**
         * 队列结点定义.
         */
        static final class QNode {
            volatile QNode next;          // next node in queue
            volatile Object item;         // CAS'ed to or from null
            volatile Thread waiter;       // to control park/unpark
            final boolean isData;
            // ...
        }
    
        // ...
    }

关于TransferQueue的transfer方法就不再赘述了,其思路和TransferStack大致相同,总之就是入队/出队必须一一匹配,否则任意一方就会加入队列并等待匹配线程唤醒。读者可以自行阅读TransferQueued的源码。

三、总结

TransferQueue主要用于线程之间的数据交换,由于采用无锁算法,其性能一般比单纯的其它阻塞队列要高。它的最大特点时不存储实际元素,而是在内部通过栈或队列结构保存阻塞线程。后面我们讲 JUC线程池框架 的时候,还会再次看到它的身影。

阅读全文