欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

AQS 共享模式 循环屏障

最编程 2024-04-30 18:51:00
...

概念:CyclicBarrier翻译为循环(屏障/栅栏),当一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会打开,所有被屏障拦截的线程才会继续工作。

设计目的:和CountDownLatch都为同步辅助类,因为CountDownLatch的计数器是一次性的,属于一对多,

1、【某一个线程需要其余线程执行完之后再执行 (一个等多个)】,

2、【一些线程需要在某个时刻同时执行,就像等待裁判员枪响后,才能同时起跑(多个等一个)】

所以,CyclicBarrier被设计成为计数器可循环使用,多对多。只要多个线程都达到后,自会执行接下来的事,没有CountDownLatch的一个等多个,多个等一个的现象。

源码解析:

public class CyclicBarrier {
    /**
     * Each use of the barrier is represented as a generation instance.
     * The generation changes whenever the barrier is tripped, or
     * is reset. There can be many generations associated with threads
     * using the barrier - due to the non-deterministic way the lock
     * may be allocated to waiting threads - but only one of these
     * can be active at a time (the one to which {@code count} applies)
     * and all the rest are either broken or tripped.
     * There need not be an active generation if there has been a break
     * but no subsequent reset.
     * 屏障的每次使用都表示为一个生成实例。当屏障被触发或重置时,生成就会改变。
     可以有许多代与使用barrier的线程相关联——由于锁可能以不确定的方式分配给等待线程——但是一次只能有一个是活动的({@code count}应用的那个),
     其余的要么中断,要么被触发。如果有中断,但没有随后的重置,则不需要活动代。
     */
    private static class Generation {
        boolean broken = false;
    }

    /** The lock for guarding barrier entry
    保护屏障的锁,重入锁,调用await()方法的时候会用到,只有一个线程会执行成功,所以多线程高并发的情况下CyclicBarrier的执行效率不是很高 */
    private final ReentrantLock lock = new ReentrantLock();
    /** Condition to wait on until tripped
       待跳闸的条件( Condition本身用于线程间通信,在这就是阻塞和唤醒线程用的)*/
    private final Condition trip = lock.newCondition();
    /** The number of parties
    这个就是计数器初始值,新建类的时候传入的数值会赋值给他 */
    private final int parties;
    /* The command to run when tripped 
       跳闸时要执行操作的线程*/
    private final Runnable barrierCommand;
    /** The current generation 
       当前代*/
    private Generation generation = new Generation();

    /**
     * Number of parties still waiting. Counts down from parties to 0
     * on each generation.  It is reset to parties on each new
     * generation or when broken.
       还有很多任务在等待。每一代从计数器初始值(parties)降到0。它会在每一代新创建或用尽时重置。
     */
    private int count;

    /**
     * Updates state on barrier trip and wakes up everyone.
     * Called only while holding lock.
     只在持有锁时调用,去更新屏障的状态并唤醒其他线程。(意思代表下一代,其实就是新建一个代对象重新赋值)
     */
    private void nextGeneration() {
        // signal completion of last generation
        //当计数器减少为零证明这代已经满了,唤醒所有当前代阻塞的线程,这是Condition的方法
        trip.signalAll();
        // set up next generation
        //将初始需要拦截的计数器初始值初始化给count计数器
        count = parties;
        //上一代已经结束了,就要重新开启下一代
        generation = new Generation();
    }

    /**
    这个方法就是告诉这一代线程出问题了: 屏障里面的某个线程中断了,那就唤醒所有线程,然后这个屏障拦截的所有线程都会被唤醒,然后抛出异常,并告诉这一代坏掉了
     * Sets current barrier generation as broken and wakes up everyone.
     * Called only while holding lock.
     */
    private void breakBarrier() {
        //将这代标记为true,告诉已经坏掉了
        generation.broken = true;
        //这一代重新计数
        count = parties;
        //唤醒这一代里面的所有阻塞线程
        trip.signalAll();
    }

    /**
     * Main barrier code, covering the various policies.
     主要的屏障代码,涵盖了各种策略
     */
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
                   //当多线程并发情况下只有一个线程能获得锁(注意:该锁为jvm级锁,只能保证单实例有效,多进程下管理共享资源无效)
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //获取当前代的信息
            final Generation g = generation;
                //调用await方法之前需要判断这一代是否已经坏掉了,如果坏掉了直接抛出异常
            if (g.broken)
                throw new BrokenBarrierException();
            //判断当前线程是否中断了,如果中断了也会抛出异常,并调用breakBarrier方法
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
//如果这代没有坏掉,当前线程也没有中断,那么就将计数器减去1
            int index = --count;
            //减去1之后的数值如果为零,证明这一代已经满了,然后唤醒屏障拦截的所有线程
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    //获取初始化的那个线程或者传入的线程
                    final Runnable command = barrierCommand;
                    if (command != null)
                    //线程执行任务
                        command.run();
                    ranAction = true;
                     //执行换代操作
                    nextGeneration();
                    return 0;
                } finally {
                     //如果上的操作执行出现问题了,那么就执行breakBarrier方法,唤醒所有线程
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            //自旋
            for (;;) {
                try {
                     //await()方法可以传入两个参数的,一个是否需要判断等待时间,一个等待的时间
                    //如果不需要判断等待时间,那么直接调用await()方法,这是Condition类的方法
                    if (!timed)
                        trip.await();
                        //如果需要等待的时间大于0,那么线程阻塞要设置时间,超过这个时间需要被唤醒
                    else if (nanos > 0L)
                     //设置超时阻塞时间,线程就被阻塞了,程序执行到这里就不会继续向下执行,直到这个线程被唤醒,才会继续向下执行
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                         //当前线程在阻塞过程,被中断了或者由于这一代屏障前的其他线程原因导致的代损坏了
                        //执行breakBarrier方法唤醒其他线程
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        //如果不是上述两个原因造成的,就中断当前线程
                        Thread.currentThread().interrupt();
                    }
                }
                // 当有任何一个线程中断,会调用 breakBarrier 方法.
                // 就会唤醒其他的线程,其他线程醒来后,也要抛出异常
                //走到这里这名线程被唤醒了,唤醒的方式可能是计数器为零被唤醒,也有可能调用breakBarrier
                //方法导致代坏掉了而唤醒的,那么需要需要抛出代 破损异常
                if (g.broken)
                    throw new BrokenBarrierException();
                // g != generation 正常换代了,因为计数器为零了,代更新了
                //一切正常,返回当前线程所在屏障的下标
                // 如果 g == generation,说明还没有换代,那为什么会醒了?这里就是代的作用
                // 因为一个线程可以使用多个屏障,当别的屏障唤醒了这个线程,就会走到这里,所以需要判断是否是当前代。
                // 正是因为这个原因,才需要 generation 来保证正确。  
                if (g != generation)
                    return index;
            //如果不需要设置等待时间并且传入的时间不合法小于0,那么执行breakBarrier抛出异常
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

    /**
     * Creates a new {@code CyclicBarrier} that will trip when the
     * given number of parties (threads) are waiting upon it, and which
     * will execute the given barrier action when the barrier is tripped,
     * performed by the last thread entering the barrier.
     *
     * @param parties the number of threads that must invoke {@link #await}
     *        before the barrier is tripped
     * @param barrierAction the command to execute when the barrier is
     *        tripped, or {@code null} if there is no action
     * @throws IllegalArgumentException if {@code parties} is less than 1
     */
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    /**
     * Creates a new {@code CyclicBarrier} that will trip when the
     * given number of parties (threads) are waiting upon it, and
     * does not perform a predefined action when the barrier is tripped.
     *
     * @param parties the number of threads that must invoke {@link #await}
     *        before the barrier is tripped
     * @throws IllegalArgumentException if {@code parties} is less than 1
     */
    public CyclicBarrier(int parties) {
        this(parties, null);
    }

    /**
     * Returns the number of parties required to trip this barrier.
     *
     * @return the number of parties required to trip this barrier
     */
    public int getParties() {
        return parties;
    }

    /**
     * Waits until all {@linkplain #getParties parties} have invoked
     * {@code await} on this barrier.
     *
     * <p>If the current thread is not the last to arrive then it is
     * disabled for thread scheduling purposes and lies dormant until
     * one of the following things happens:
     * <ul>
     * <li>The last thread arrives; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * one of the other waiting threads; or
     * <li>Some other thread times out while waiting for barrier; or
     * <li>Some other thread invokes {@link #reset} on this barrier.
     * </ul>
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * <p>If the barrier is {@link #reset} while any thread is waiting,
     * or if the barrier {@linkplain #isBroken is broken} when
     * {@code await} is invoked, or while any thread is waiting, then
     * {@link BrokenBarrierException} is thrown.
     *
     * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting,
     * then all other waiting threads will throw
     * {@link BrokenBarrierException} and the barrier is placed in the broken
     * state.
     *
     * <p>If the current thread is the last thread to arrive, and a
     * non-null barrier action was supplied in the constructor, then the
     * current thread runs the action before allowing the other threads to
     * continue.
     * If an exception occurs during the barrier action then that exception
     * will be propagated in the current thread and the barrier is placed in
     * the broken state.
     *
     * @return the arrival index of the current thread, where index
     *         {@code getParties() - 1} indicates the first
     *         to arrive and zero indicates the last to arrive
     * @throws InterruptedException if the current thread was interrupted
     *         while waiting
     * @throws BrokenBarrierException if <em>another</em> thread was
     *         interrupted or timed out while the current thread was
     *         waiting, or the barrier was reset, or the barrier was
     *         broken when {@code await} was called, or the barrier
     *         action (if present) failed due to an exception
     */
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

    /**
     * Waits until all {@linkplain #getParties parties} have invoked
     * {@code await} on this barrier, or the specified waiting time elapses.
     *
     * <p>If the current thread is not the last to arrive then it is
     * disabled for thread scheduling purposes and lies dormant until
     * one of the following things happens:
     * <ul>
     * <li>The last thread arrives; or
     * <li>The specified timeout elapses; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * one of the other waiting threads; or
     * <li>Some other thread times out while waiting for barrier; or
     * <li>Some other thread invokes {@link #reset} on this barrier.
     * </ul>
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * <p>If the specified waiting time elapses then {@link TimeoutException}
     * is thrown. If the time is less than or equal to zero, the
     * method will not wait at all.
     *
     * <p>If the barrier is {@link #reset} while any thread is waiting,
     * or if the barrier {@linkplain #isBroken is broken} when
     * {@code await} is invoked, or while any thread is waiting, then
     * {@link BrokenBarrierException} is thrown.
     *
     * <p>If any thread is {@linkplain Thread#interrupt interrupted} while
     * waiting, then all other waiting threads will throw {@link
     * BrokenBarrierException} and the barrier is placed in the broken
     * state.
     *
     * <p>If the current thread is the last thread to arrive, and a
     * non-null barrier action was supplied in the constructor, then the
     * current thread runs the action before allowing the other threads to
     * continue.
     * If an exception occurs during the barrier action then that exception
     * will be propagated in the current thread and the barrier is placed in
     * the broken state.
     *
     * @param timeout the time to wait for the barrier
     * @param unit the time unit of the timeout parameter
     * @return the arrival index of the current thread, where index
     *         {@code getParties() - 1} indicates the first
     *         to arrive and zero indicates the last to arrive
     * @throws InterruptedException if the current thread was interrupted
     *         while waiting
     * @throws TimeoutException if the specified timeout elapses.
     *         In this case the barrier will be broken.
     * @throws BrokenBarrierException if <em>another</em> thread was
     *         interrupted or timed out while the current thread was
     *         waiting, or the barrier was reset, or the barrier was broken
     *         when {@code await} was called, or the barrier action (if
     *         present) failed due to an exception
     */
    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

    /**
     * Queries if this barrier is in a broken state.
     *
     * @return {@code true} if one or more parties broke out of this
     *         barrier due to interruption or timeout since
     *         construction or the last reset, or a barrier action
     *         failed due to an exception; {@code false} otherwise.
     */
    public boolean isBroken() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return generation.broken;
        } finally {
            lock.unlock();
        }
    }

    /**
     * Resets the barrier to its initial state.  If any parties are
     * currently waiting at the barrier, they will return with a
     * {@link BrokenBarrierException}. Note that resets <em>after</em>
     * a breakage has occurred for other reasons can be complicated to
     * carry out; threads need to re-synchronize in some other way,
     * and choose one to perform the reset.  It may be preferable to
     * instead create a new barrier for subsequent use.
     重置整个屏障为初始状态,将已经拦截的释放掉,全部重新开始计数
     */
    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //执行breakBarrier方法唤醒其他线程
            breakBarrier();   // break the current generation
            //更新屏障的状态并唤醒其他线程
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }

    /**
     * Returns the number of parties currently waiting at the barrier.
     * This method is primarily useful for debugging and assertions.
     *
     * @return the number of parties currently blocked in {@link #await}
     */
    public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return parties - count;
        } finally {
            lock.unlock();
        }
    }
}

实例

假如公司团建,大家一起做大巴车,在大巴车出发之前,肯定是需要点名的,只有大家都到车上之后,才会发车,然后到了到了目的地之后,肯定是所有人都下车了,司机才能把车开走,这个过程中涉及了2次大家都就位之后,司机才能继续操作,可以证明CyclicBarrier可以循环使用计数器。

package com.runlion.middleground.marginal;

import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

/**
 * 
 * @description:
 * @date 2024年04月30日 17:26:05
 */
@Slf4j
public class StudyTest {
    @Getter
    @Setter
    static class Flag{
        public int count=0;
    }
    @Test
    @SneakyThrows
    public void testCyclicBarrier() {
        Flag flag=new Flag();
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
            if(flag.getCount()==0){
                System.out.println("所有人都上车了,可以发车了");
                flag.setCount(1);
            }else {
                System.out.println("所有人都下车了,司机可以走了");
            }

        });

        for (int i = 1; i < 6; i++) {
            new Thread(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + "号上车了");
                    cyclicBarrier.await();
                    System.out.println(Thread.currentThread().getName() + "号开始休息了");
                    TimeUnit.MILLISECONDS.sleep(new Random().nextInt(2000));
                    System.out.println(Thread.currentThread().getName() + "号下车了");
                    cyclicBarrier.await();
                    System.out.println(Thread.currentThread().getName() + "号到达目的地了");
                }catch (Exception e){
                    log.info(e.getMessage());
                }

            }, String.valueOf(i)).start();
        }
        System.out.println("主线程不阻塞");
    }
}