时间:2022-12-11 11:54:14 | 栏目:JAVA代码 | 点击:次
那么在程序的世界中是如何对这种协调关系进行描述的呢?今天就和大家聊聊Java大神Doug Lea在并发包中如何通过CountDownLatch和CyclicBarrier实现任务协调的代码描述。
我相信大家都知道好代码的一个重要特性就是代码中类、变量等的命名可以做到顾名思义,也就是说看到命名就可以大概知道这个类或者变量表达了怎样的业务语义。就拿 CountDownLatch 来说,它的命名形象的表示了其能力属性,Count代表着计数,Down代表着计数器的递减操作,而Latch表示计数器递减后的结果动作。CountDownLatch结合起来的字面意思就是计数器递减后打开门栓,通过后面内容的描述,回过头来看大家肯定会觉得这个命名十分之形象。
好了通过它的类的名称,我们猜测了它的功能是通过计数器的递减操作来控制线程,那我们再看看官方描述是不是这个意思。
/**
* A synchronization aid that allows one or more threads to wait until
* a set of operations being performed in other threads completes.
*
* <p>A {@code CountDownLatch} is initialized with a given <em>count</em>.
* The {@link #await await} methods block until the current count reaches
* zero due to invocations of the {@link #countDown} method, after which
* all waiting threads are released and any subsequent invocations of
* {@link #await await} return immediately. This is a one-shot phenomenon
* -- the count cannot be reset. If you need a version that resets the
* count, consider using a {@link CyclicBarrier}.
*...
*/
上面注释的大致意思就是CountDownLatch是一个线程同步器,它允许一个或者多个线程阻塞等待直到其他线程中业务执行完成。CountDownLatch可以通过一个计数器进行初始化,他可以让那个等待的线程被阻塞,直到对应的计数器被置为0。当计数器置为0后,阻塞的线程被释放。另外它是一个一次性使用的同步器,计数器无法被重置。
通过JDK的官方描述我们可以明确CountDownLatch三个核心特征:
1、它是一种线程同步器,用以协调线程的执行触发时机;
2、它本质是一个计数器,是控制线程的号令枪;
3、它是一次性使用的,用完即失效。
知道了CountDownLatch是一个什么东东之后,我们再一起来看下它的使用场景是什么,我们在什么样的情况下可以使用它帮我们解决一些代码中的问题。
就像上文描述的,CountDownLatch就像是田径赛场上裁判员发射的发令枪,所有参赛的选手准备就绪后,发令枪一响,所有运动员闻声而动。那么在Java多线程场景中,CountDownLatch就是线程协调者,它的计数器在没有减为0之前。假设有这样一个业务场景,在一个监控告警平台中,需要从告警服务中查询告警信息以及从工单服务中查询工单信息,然后再分析哪些告警没有转工单。按照老系统的做法,参见如下简化后的伪代码:
List<Alarm> alarmList = alarmService.getAlarm(); List<WorkOrder> workOrderList = workOrderService.getWorkOrder(); List<Alarm> notTransferToWorkOrder = analysis(alarmList, workOrderList);
大家能看出来这段伪代码有什么需要进行优化的地方吗?我们来一起分析一下。这段代码在数据量不大的时候可能没什么影响,但是一旦告警以及工单的数据量大的时候,获取告警信息或者获取工单信息都可能出现数据查询慢的问题,那就会导致这个分析任务就会出现性能瓶颈的问题。那么我们应该怎么进行优化呢?从业务以及代码我们可以看的出来,获取告警信息以及获取工单信息,实际上并没有业务上面的耦合性,在上述代码中他们是顺序执行的,因此要进行性能优化,可以考虑将它们进行并行执行。
那么修改优化后的伪代码如下所示:
Executor executor = Executors.newFixedThreadPool(2); executor.execute(()-> { alarmList = alarmService.getAlarm(); }); executor.execute(()-> { workOrderList = workOrderService.getWorkOrder(); }); List<Alarm> notTransferToWorkOrder = analysis(alarmList, workOrderList);
我们通过使用线程池的方式,在获取告警信息以及工单信息的时候并发执行,不再像之前的执行完获取告警信息再执行获取工单信息,这样效率更高。但是这样的实现方式还是存在问题,由于在线的线程中执行操作,并不知道其实际的执行结果,这就不好判断执行数据分析的具体时机。这个时候CountDownLatch就派上用场了,利用它可以实现线程拣的等待,条件满足后再放开执行后续的逻辑。这就好比公司组织团建,约定好了早上8点半在公司大门集合,那么司机师傅肯定要等到所有参加团建的同时都到齐后才会发车。
使用CountDownLatch之后的伪代码如下所示:
Executor executor = Executors.newFixedThreadPool(2); CountDownLatch latch = new CountDownLatch(2); executor.execute(()-> { alarmList = alarmService.getAlarm(); latch.countDown(); }); executor.execute(()-> { workOrderList = workOrderService.getWorkOrder(); latch.countDown(); }); latch.await(); List<Alarm> notTransferToWorkOrder = analysis(alarmList, workOrderList);
在使用CountDownLatch之前我们得先进行初始化,在初始化的过程中实际做了两件事情,一个是创建了一个AQS的同步队列,另外一个是将AQS中的state设置成了count,这个state是AQS的核心变量(AQS是并发包的底层实现基础,关于它的分析我们放到下一篇文章中进行)。
从代码中我们可以看的出来实际创建了Sync内部类实例,而Sync继承了AQS,同时重写了AQS加锁解锁的方法,并通过Sync的对象,调用AQS的方法,阻塞线程的运行。Sync内部类的代码如下所示,其中tryAcquireShared方法重写了AQS的模板方法,主要用来获取共享锁,在CountDownLatch内部主要通过判断获取到的state的值是否为0来决定到底有没有获取到锁。如果获取到的state为0,则表示获取锁成功,此时线程不会阻塞,反之则获取锁失败,线程会阻塞。
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } //尝试加共享锁(通过state判断) protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } //尝试释放共享锁(通过state判断) protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }
如上文场景中介绍的代码,每个线程在执行完成自身业务后执行countDown操作,表示该线程已经准备完成。同时检查count值是否为0。如果为0则需要唤醒所有等待的线程。如下代码所示,实际上它调用的是父类AQS的releaseShared方法。
public void countDown() { sync.releaseShared(1); }
tryReleaseShared这个方法实际是进行尝试释放锁的操作,如果此次count递减为0,然后释放所有的线程。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
大致的代码执行逻辑可参见下图:
await的作用就是将当前线程阻塞住,直到count值减为0才会放开执行。它实际调用了内部类的tryAcquireSharedNanos方法,这个方法实际是Sync类的父类AQS中的方法。
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
AQS提供了可以响应中断的获取公平锁的实现的方式。tryAcquireShared在上文已经进行了介绍,该方法的作用是尝试获取共享锁,如果获取失败,则线程将会被加入到AQS的同步队列中进行等待,也就是所谓的线程阻塞。
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
我们还是从CyclicBarrier的字面意思来先进行理解,Cyclic是循环的意思而Barrier则表示栅栏、障碍的意思,字面的意思就是可循环的栅栏。还是老套路,在进行CyclicBarrier之前,我们先来看下JDK是怎么描述的。
/**
* A synchronization aid that allows a set of threads to all wait for
* each other to reach a common barrier point. CyclicBarriers are
* useful in programs involving a fixed sized party of threads that
* must occasionally wait for each other. The barrier is called
* <em>cyclic</em> because it can be re-used after the waiting threads
* are released.
*
* <p>A {@code CyclicBarrier} supports an optional {@link Runnable} command
* that is run once per barrier point, after the last thread in the party
* arrives, but before any threads are released.
* This <em>barrier action</em> is useful
* for updating shared-state before any of the parties continue.
*...
**/
通过JDK的描述,我们可以看得出来,CyclicBarrier也是一个线程同步协调器,用以协调一组进程的执行。当指定个数的线程到达栅栏后,可以放开栅栏,结束线程阻塞状态。这么看上去它和CountDownLatch作用差不多了,实际上还是有区别的,CyclicBarrier是可循环使用的,而CountDownLatch却是一次性的。我们来看下CyclicBarrier的核心属性。
//栅栏入口的锁 private final ReentrantLock lock = new ReentrantLock(); //线程等待条件 private final Condition trip = lock.newCondition(); //拦截的线程数量 private final int parties; //在下一个栅栏代数到来前执行的任务 private final Runnable barrierCommand; //当前的栅栏代数 private Generation generation = new Generation();
CyclicBarrier 的源码实现和 CountDownLatch 大同小异,CountDownLatch 基于 AQS 的共享模式的使用,而 CyclicBarrier 基于 Condition 来实现的。
CyclicBarrier内部维护了parties和count变量,parties表示每次参与到一个Generation中需要被拦截的线程数量,而count是内部计数器,在初始化的时候count与parties相等,当每次调用await方法的时候计数器count就会减1,这和上文中的countDown类似。
还是以上文中的业务场景为例我们再分析一下,上文中我们通过CountDownLatch实现了查询告警信息与查询工单信息的线程协调问题,但是新的问题又出现了。因为告警信息和工单信息都是实时在产生的,而使用CountDownLatch的实现方式只能完成一次的线程协调,后续产生的告警信息以及工单信息如果还有需要查询到之后再进行数据分析的话,它就爱莫能助了。也就是说,如果需要进行持续的线程之间的互相等待完成之后再执行后续的业务操作的话,这个时候就需要使用CyclicBarrier 来实现我们的需求了。
CyclicBarrier 存在两种的构造函数,一种是构建CyclicBarrier 的时候指定每次需要进行协调的线程个数以及解除阻塞之后需要进行后续任务的执行,另一种只是设置需要协调的线程个数不设置后续执行的任务。
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } public CyclicBarrier(int parties) { this(parties, null); }
对于CyclicBarrier 来说,其最核心的等待方法实现就是dowait方法,具体代码如下所示:
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; //如果count计算为0,则需要唤醒所有线程并进入到下一阶段的线程协调期 if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } //计数器不为0,继续进行循环 for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
我们可以看到在dowait方法中进行了count的递减操作,检查count的值是否为0,如果在初始化的时候定义好了要执行的任务,那么在count为0的时候就进行任务执行,任务执行完成之后调用nextGeneration进行下一次的线程协调周期,同时唤醒所有线程并重置计数器。
本文分别从使用场景以及底层实现的角度分别介绍了线程同步协调神器CountDownLatch和CyclicBarrier,虽然它们都可以起到协调线程的作用但是实际上它们还是有区别的。CountDownLatch比较适合一个线程与其他多个线程之间的同步协调场景,而CyclicBarrier则适合一组线程之间的互相等待。另外CountDownLatch是一次性产品,而CyclicBarrier的计数器是可以重复使用的,可以进行自动重置计数器。