当前位置:主页 > 软件编程 > JAVA代码 >

Java并发编程回环屏障CyclicBarrier

时间:2022-11-05 12:22:47 | 栏目:JAVA代码 | 点击:

CyclicBarrier

前面介绍的CountDownLatch在解决多个线程同步方面相对于调用线程的join方法已经有了不少优化。但是CountDownLatch的计数器是一次性的,也就是等到计数器值变为0后,再调用CountDownLatch的await和countdown方法都会立刻返回,这就起不到线程同步的效果了。所以为了满足计数器可以重置的需要,JDK开发组提供了CyclicBarrier类,并且CyclicBarrier类的功能并不限于CountDownLatch的功能。从字面意思理解 CyclicBarrier 是回环屏障的意思,它可以让一组线程全部达到一个状态后再全部同时执行。这里之所以叫作回环是因为当所有等待线程执行完毕,并重置CyclicBarrier 的状态后它可以被重用。之所以叫作屏障是因为线程调用await方法后就会被阻塞,这个阻塞点就称为屏障点,等所有线程都调用了 await方法后,线程们就会冲破屏障,继续向下运行。在介绍原理前先介绍几个实例以便加深理解。在下面的例子中,我们要实现的是,使用两个线程去执行一个被分解的任务 A,当两个线程把自己的任务都执行完毕后再对它们的结果进行汇总处理。

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CycleBarrierTest {

   
   //创建一个线程数固定为2的线程池
   private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() {
       @Override
       public void run() {
           System.out.println(Thread.currentThread() + " task1 merge result");
       }
   });

   public static void main(String[] args) throws InterruptedException{
       ExecutorService executorService = Executors.newFixedThreadPool(2);

       
       //添加线程A到线程池
       executorService.submit(new Runnable() {
           @Override
           public void run() {
               try {
                   System.out.println(Thread.currentThread() + "task1");
                   System.out.println(Thread.currentThread() + "enter in  barrier");
                   cyclicBarrier.await();
                   System.out.println(Thread.currentThread() + "enter out  barrier");
               } catch (Exception e) {
                   e.printStackTrace();
               }
           }
       });

       //添加线程B到线程池
       executorService.submit(new Runnable() {
           @Override
           public void run() {
               try {
                   System.out.println(Thread.currentThread() + "task2");
                   System.out.println(Thread.currentThread() + "enter in  barrier");
                   cyclicBarrier.await();
                   System.out.println(Thread.currentThread() + "enter out  barrier");
               } catch (Exception e) {
                   e.printStackTrace();
               }
           }
       });

       //关闭线程池
       executorService.shutdown();

   }
}

image.png

如上代码创建了一个CyclicBarrier对象,其第一个参数为计数器初始值,第二个数Runable是当计数值为0时需要执行的任务。在main函数里面首先创建了一个大小为2的线程池。然后添加两个子任务到线程池,每个子任务在执行完自己的逻辑后会调用方法。一开始计数器值为2,当第一个线程调用await方法时,计数器值会递减为1,由于此时计数器值不为0,所以当前线程就到了屏障点而被阻塞。然后第二个线程调用await时,会进入屏障,计数器值也会递减,现在计数器值为0,这时就会去执行 CyclicBarrier构造函数中的任务,执行完毕后退出屏障点,并且唤醒被阻塞的第二个线程。这时候第一个线程也会退出屏障点继续向下运行。

上面的例子说明了多个线程之间是相互等待的,假如计数器值为N,那么随后调用 await 方法的N1个线程都会因为到达屏障点而被阻塞,当第N个线程调用await后,计数器值为0了,这时候第N个线程才会发出通知唤醒前面的N1个线程。也就是当全部线程都到达屏障点时才能一块继续向下执行。对于这个例子来说,使用CountDownLatch也可以得到类似的输出结果。下面再举个例子来说明CyclicBarrier的可复用性。

假设一个任务由阶段1、阶段2和阶段3组成,每个线程要串行地执行阶段1、阶段2和阶段3,当多个线程执行该任务时,必须要保证所有线程的阶段1全部完成后才能进入阶段2执行,当所有线程的阶段2全部完成后才能进入阶段3执行。下面使用 CyclicBarrier 来完成这个需求。

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CycleBarrierTest1 {

    //创建一个线程数固定为2的线程池
    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

    public static void main(String[] args) throws InterruptedException{
        ExecutorService executorService = Executors.newFixedThreadPool(2);


        //添加线程A到线程池
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread() + "step1");
                    cyclicBarrier.await();
                    System.out.println(Thread.currentThread() + "step2");
                    cyclicBarrier.await();
                    System.out.println(Thread.currentThread() + "step3");
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        //添加线程B到线程池
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread() + "step1");
                    cyclicBarrier.await();
                    System.out.println(Thread.currentThread() + "step2");
                    cyclicBarrier.await();
                    System.out.println(Thread.currentThread() + "step3");
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        //关闭线程池
        executorService.shutdown();

    }
}

image.png

如上代码中,每个子线程在执行完阶段1后都调用了await方法,等到所有线程都到达屏障点后才会一块往下执行,这就保证了所有线程都完成了阶段1后才会开始执行阶段2。

您可能感兴趣的文章:

相关文章