2023-01-14
原文作者: HelloWorld_EE 原文地址:https://blog.csdn.net/u010412719/category_6159934_2.html

《Java源码分析》:CyclicBarrier(part two)

在上篇博文中http://blog.csdn.net/u010412719/article/details/52156588,我们看了关于CyclicBarrier应用的一些例子,也得出的使用CyclicBarrier时的相关结论。

回顾下我们在上篇博文中了解了关于CyclicBarrier的相关知识点如下:

1、CyclicBarrier的用途是让一组线程互相等待,直到到达某个公共屏障点才开始继续工作。

2、CyclicBarrier是可以重复利用的,CountDownLatch不可以重复利用

3、在等待的只要有一个线程发生中断,则其它线程就会被唤醒继续正常运行。

4、CyclicBarrier指定的任务是进行barrier处最后一个线程来调用的,如果在执行这个任务发生异常时,则会传播到此线程,其它线程不受影响继续正常运行。

这篇博文就从源码出发开下,里面到底是如何具体的实现的。

1、CyclicBarrier的构造函数

在我们的使用CyclicBarrier时,第一步肯定是new一个CyclicBarrier对象。

如果我们没有 栅栏任务(barrierAction)需要指定,则就直接使用如下的构造函数构造对象。

        /*
         *创建一个新的CyclicBarrier对象,当给定的线程都到达这个临界点等待(即调用await方法),则开启barrier。
         *当开启barrier时并没有任何预先定义的action需要执行。
         */
        public CyclicBarrier(int parties) {
            this(parties, null);
        }

其中构造函数中的参数指的就是你准备需要多少个线程等待至公共屏障点。

如果有 栅栏任务(barrierAction)需要指定,则就需要使用如下的构造函数构造对象。

        /*
         * Creates a new {@code CyclicBarrier} that will trip when the
         * given number of parties (threads) are waiting upon it, and which
         * will execute the given barrier action when the barrier is tripped,
         * performed by the last thread entering the barrier.
         *
         */
        public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0) throw new IllegalArgumentException();
            this.parties = parties;
            this.count = parties;
            this.barrierCommand = barrierAction;
        }

创建一个新的CyclicBarrier对象,当给定的线程都到达这个临界点等待(即调用await方法),则开启barrier。当开启barrier时由最后一个进入barrier的线程来执行预先定义的action。

即从源码中的这个构造方法我们可以获悉:当开启barrier时由最后一个进入barrier的线程来执行预先定义的action

继续往下面来看。

CyclicBarrier的await()/await(time,TimeUnit)方法分析

        public int await() throws InterruptedException, BrokenBarrierException {
            try {
                return dowait(false, 0L);
            } catch (TimeoutException toe) {
                throw new Error(toe); // cannot happen
            }
        }

此函数完成的详细功能:

1、等待直到在这个barrier中有parties(构造函数中自己指定的)个线程调用了await方法。如果当前线程不是最后一个到达的,则由于线程调度的原因被禁用,睡眠直至以下事情发生:

     1、最后一个线程的到达。
     2、其它的线程中断了当前线程
     3、其它的线程中断了和一样正在等待的线程
     4、其它线程等待barrier超时
     5、其它线程在这个barrier处调用了reset方法

2、如果当前线程在进入这个方法时由中断标志或者是在等待的过程中被中断,则会抛中断异常,并且中断标志被清除。

3、当存在线程正在等待的时候,有线程调用了reset方法。或者是当调用await方法时barrier的isBroken已经被broken。则会抛BrokenBarrierException异常。

4、当存在线程在等待的时候被中断,则所有其他正在被等待的线程都将跑抛BrokenBarrierException异常,并且barrier就处于broken状态。

5、如果当前线程是最后一个到达,并且在构造方法中有一个非空的barrier action,则在允许其他线程继续运行之前
当前线程需要执行此Runnable的run方法。英文注释如下:

     * <p>If the current thread is the last thread to arrive, and a
     * non-null barrier action was supplied in the constructor, then the
     * current thread runs the action before allowing the other threads to
     * continue.

如果在执行此barrier action期间发生了异常,此异常就会在此线程中传播。并且barrier的状态将会设置为broken。

6、返回值:返回的是当前线程到达barrier的索引,
例如,第一个到达barrier处的线程返回的是patiers-1.
最后一个返回的是 0。

上面是关于await方法功能上的一些描述,下面来看其具体是如何来实现上面的功能细节的

从await源码中可以看到,此方法是直接调用dewait方法来完成的。下面就直接看下dowait方法。

dowait方法的代码如下,(注释的相当详细)

        /**
         * Main barrier code, covering the various policies.
         */
        private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
                   TimeoutException {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                final Generation g = generation;
    
                if (g.broken)//检查状态,如果为true,则说明已经broken了
                    throw new BrokenBarrierException();
                //检查当前线程是否被中断,如果被中断先调用breakBarrier方法然后抛中断异常
                if (Thread.interrupted()) {
                    breakBarrier();//设置generation并且唤醒所有正在等待的线程
                    throw new InterruptedException();
                }
    
                int index = --count;//先减一然后再赋值
                if (index == 0) {  // tripped,打开barrier
                    boolean ranAction = false;
                    try {
                        final Runnable command = barrierCommand;
                        if (command != null)
                            command.run();//当前线程调用command的run方法
                        //从这里可以看出,调用barrier的command的run方法在唤醒其他所有正在等待的线程在前。
                        ranAction = true;
                        nextGeneration();//唤醒所有的正在等待的线程并且设置状态为下一次重复利用做准备
                        return 0;
                    } finally {
                        if (!ranAction)//如果command中的run方法抛异常,则就运行这里的breakBarrier()方法来唤醒所有其他正在等待的线程。
                            breakBarrier();
                    }
                }
    
                // loop until tripped, broken, interrupted, or timed out
                //如果当前线程不是最后一个到达的线程,则一直循环检测等待,直到tripped, broken, interrupted, or timed out发生
                for (;;) {
                    try {
                        if (!timed) //如果没有设置等待时间,则一直等待,直到其它线程唤醒
                            trip.await();
                        else if (nanos > 0L)//如果设置了等待时间,则等待指定的时间。
                            nanos = trip.awaitNanos(nanos);
                    } catch (InterruptedException ie) {
                        //如果再等待的期间发生了中断异常,如果其它线程还没有开始唤醒工作,则当前线程就开始唤醒
                        if (g == generation && ! g.broken) {
                            breakBarrier();
                            throw ie;
                        } else {
                            // We're about to finish waiting even if we had not
                            // been interrupted, so this interrupt is deemed to
                            // "belong" to subsequent execution.
                            Thread.currentThread().interrupt();
                        }
                    }
    
                    if (g.broken)
                        throw new BrokenBarrierException();
    
                    if (g != generation)
                        return index;
    
                    if (timed && nanos <= 0L) {
                        breakBarrier();
                        throw new TimeoutException();
                    }
                }
            } finally {
                lock.unlock();
            }
        }
    
        /*
         * Sets current barrier generation as broken and wakes up everyone.
         * Called only while holding lock.
         设置当前的generation状态为broken且唤醒所有正在等待的线程。
    
         */
        private void breakBarrier() {
            generation.broken = true;
            count = parties;
            trip.signalAll();
        }
    
        /*
         * Updates state on barrier trip and wakes up everyone.
         * Called only while holding lock.
         *更新barrier的状态为重复利用做准备并且唤醒所有正在等待的线程
         */
        private void nextGeneration() {
            // signal completion of last generation
            trip.signalAll();
            // set up next generation
            count = parties;
            generation = new Generation();
        }

跟着上面的功能描述来仔细阅读dowait源码是相当简单的,当我们的思路是清晰的,看源码也就相当清晰了。

await方法的代码流程是这样的

0、首先拿的锁,并检查barrier的标志位broken是否为true。如果为true,说明barrier已经被打破。因此此线程抛异常并不需要等待。如果为false,则进行1

1、检查此线程是否已经被中断,如果被中断,则在抛异常之前先设置barrier的状态为broken并唤醒其他正在等待的线程。如果没有被中断,则进行 2

2、当barrier的状态数减一,并检查此时的状态数是否为零。如果为零,则进行3,如果不为零,则进行 6

3、由于状态数为零,说明此线程是最后一个到达barrier的,barrier应该开启,在开启之前,运行栅栏任务(barrierAction).如果栅栏任务(barrierAction)在运行过程中抛异常,则进行4,否则进行5.

4、调用breakBarrier先将barrier状态设置为broken并唤醒其他正在等待的线程。并进入6.

5、并唤醒所有的正在等待的线程并且调用nextGeneration设置状态为下一次重复利用做准备,然后返回0退出。

6、如果当前线程不是最后一个到达的线程,则一直循环检测等待,直到tripped, broken, interrupted, or timed out发生并做相应的处理。最后释放锁即可。

从源码中我们可以学习到,代码具有可读性和文档的规范性是相当重要的,这就方便其他人来学习

而 await(long timeout, TimeUnit unit)与awit()方法类似,也是直接调用了dowait方法来完成,这里就没有什么好分析的了。

        public int await(long timeout, TimeUnit unit)
            throws InterruptedException,
                   BrokenBarrierException,
                   TimeoutException {
            return dowait(true, unit.toNanos(timeout));
        }

小结

从dowait方法的代码出发,我们可以明确的知道以下几点内容:

1、CyclicBarrier是可以重用的。

2、栅栏任务是由最后一个进行barrier的线程来执行的。

3、当一个线程调用了CyclicBarrier的await方法,则会在发生了如下情况之后就会被唤醒。

  • 所有线程都来到了barrier,则barrier正常打开,所有线程继续正常运行
  • 当前线程被中断
  • 和当前线程一样正在等待的线程被其它线程中断
  • 当前线程或其它正在等待的线程超时
  • 外部线程调用了reset方法
阅读全文