时间:2023-01-12 12:40:08 | 栏目:JAVA代码 | 点击:次
CyclicBarrier 字面意思回环栅栏(循环屏障),它可以实现让一组线程等待至某个状态(屏障点)之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。
CyclicBarrier 作用是让一组线程相互等待,当达到一个共同点时,所有之前等待的线程再继续执行,且 CyclicBarrier 功能可重复使用。
构造方法:
// parties表示屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。 public CyclicBarrier(int parties) // 用于在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景(该线程的执行时机是在到达屏障之后再执行)
重要方法:
//屏障 指定数量的线程全部调用await()方法时,这些线程不再阻塞 // BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时 public int await() throws InterruptedException, BrokenBarrierException public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException //循环 通过reset()方法可以进行重置
利用 CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的场景。
public class CyclicBarrierTest2 { //保存每个学生的平均成绩 private Conc urrentHashMap<String, Integer> map=new ConcurrentHashMap<String,Integer>(); private ExecutorService threadPool= Executors.newFixedThreadPool(3); private CyclicBarrier cb=new CyclicBarrier(3,()->{ int result=0; Set<String> set = map.keySet(); for(String s:set){ result+=map.get(s); } System.out.println("三人平均成绩为:"+(result/3)+"分"); }); public void count(){ for(int i=0;i<3;i++){ threadPool.execute(new Runnable(){ @Override public void run() { //获取学生平均成绩 int score=(int)(Math.random()*40+60); map.put(Thread.currentThread().getName(), score); System.out.println(Thread.currentThread().getName() +"同学的平均成绩为:"+score); try { //执行完运行await(),等待所有学生平均成绩都计算完毕 cb.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }); } } public static void main(String[] args) { CyclicBarrierTest2 cb=new CyclicBarrierTest2(); cb.count(); } }
利用CyclicBarrier的计数器能够重置,屏障可以重复使用的特性,可以支持类似“人满发车”的场景
public class CyclicBarrierTest3 { public static void main(String[] args) { AtomicInteger counter = new AtomicInteger(); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 5, 5, 1000, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), (r) -> new Thread(r, counter.addAndGet(1) + " 号 "), new ThreadPoolExecutor.AbortPolicy()); CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("裁判:比赛开始~~")); for (int i = 0; i < 10; i++) { threadPoolExecutor.submit(new Runner(cyclicBarrier)); } } static class Runner extends Thread{ private CyclicBarrier cyclicBarrier; public Runner (CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { try { int sleepMills = ThreadLocalRandom.current().nextInt(1000); Thread.sleep(sleepMills); System.out.println(Thread.currentThread().getName() + " 选手已就位, 准备共用时: " + sleepMills + "ms" + cyclicBarrier.getNumberWaiting()); cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); }catch(BrokenBarrierException e){ e.printStackTrace(); } } } }
输出结果:
3 号 选手已就位, 准备共用时: 78ms0
1 号 选手已就位, 准备共用时: 395ms1
5 号 选手已就位, 准备共用时: 733ms2
2 号 选手已就位, 准备共用时: 776ms3
4 号 选手已就位, 准备共用时: 807ms4
裁判:比赛开始~~
4 号 选手已就位, 准备共用时: 131ms0
3 号 选手已就位, 准备共用时: 256ms1
2 号 选手已就位, 准备共用时: 291ms2
1 号 选手已就位, 准备共用时: 588ms3
5 号 选手已就位, 准备共用时: 763ms4
裁判:比赛开始~~
主要是的流程:
下面是一个简单的流程图:
下面是具体的一些代码调用的流程:
java.util.concurrent.locks.Condition#signalAll
唤醒条件队列上的所有节点。