189 8069 5689

CountDownLatch的实现原理是什么

这篇文章主要讲解了“CountDownLatch的实现原理是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“CountDownLatch的实现原理是什么”吧!

创新互联于2013年创立,是专业互联网技术服务公司,拥有项目网站建设、网站制作网站策划,项目实施与项目整合能力。我们以让每一个梦想脱颖而出为使命,1280元青神做网站,已为上家服务,为青神各地企业和个人服务,联系电话:13518219792

 前言

CountDownLatch是多线程中一个比较重要的概念,它可以使得一个或多个线程等待其他线程执行完毕之后再执行。它内部有一个计数器和一个阻塞队列,每当一个线程调用countDown()方法后,计数器的值减少1。当计数器的值不为0时,调用await()方法的线程将会被加入到阻塞队列,一直阻塞到计数器的值为0。

常用方法

public class CountDownLatch {      //构造一个值为count的计数器     public CountDownLatch(int count);      //阻塞当前线程直到计数器为0     public void await() throws InterruptedException;      //在单位为unit的timeout时间之内阻塞当前线程     public boolean await(long timeout, TimeUnit unit);      //将计数器的值减1,当计数器的值为0时,阻塞队列内的线程才可以运行     public void countDown();        }

下面给一个简单的示例:

package com.yang.testCountDownLatch;  import java.util.concurrent.CountDownLatch;  public class Main {     private static final int NUM = 3;      public static void main(String[] args) throws InterruptedException {         CountDownLatch latch = new CountDownLatch(NUM);         for (int i = 0; i < NUM; i++) {             new Thread(() -> {                 try {                     Thread.sleep(2000);                     System.out.println(Thread.currentThread().getName() + "运行完毕");                 } catch (InterruptedException e) {                     e.printStackTrace();                 } finally {                     latch.countDown();                 }             }).start();         }         latch.await();         System.out.println("主线程运行完毕");     } }

输出如下:

CountDownLatch的实现原理是什么

看得出来,主线程会等到3个子线程执行完毕才会执行。

原理解析

类图

CountDownLatch的实现原理是什么

可以看得出来,CountDownLatch里面有一个继承AQS的内部类Sync,其实是AQS来支持CountDownLatch的各项操作的。

CountDownLatch(int count)

new CountDownLatch(int count)用来创建一个AQS同步队列,并将计数器的值赋给了AQS的state。

public CountDownLatch(int count) {     if (count < 0) throw new IllegalArgumentException("count < 0");     this.sync = new Sync(count); }  private static final class Sync extends AbstractQueuedSynchronizer {          Sync(int count) {         setState(count);     }  }

countDown()

countDown()方法会对计数器进行减1的操作,当计数器值为0时,将会唤醒在阻塞队列中等待的所有线程。其内部调用了Sync的releaseShared(1)方法

public void countDown() {      sync.releaseShared(1);  }   public final boolean releaseShared(int arg) {      if (tryReleaseShared(arg)) {          //此时计数器的值为0,唤醒所有被阻塞的线程          doReleaseShared();          return true;      }      return false;  }

tryReleaseShared(arg)内部使用了自旋+CAS操将计数器的值减1,当减为0时,方法返回true,将会调用doReleaseShared()方法。对CAS机制不了解的同学,可以先参考我的另外一篇文章浅探CAS实现原理

protected boolean tryReleaseShared(int releases) {       //自旋       for (;;) {           int c = getState();           if (c == 0)               //此时计数器的值已经为0了,其他线程早就执行完毕了,当前线程也已经再执行了,不需要再次唤醒了               return false;           int nextc = c-1;           //使用CAS机制,将state的值变为state-1           if (compareAndSetState(c, nextc))               return nextc == 0;       }   }

doReleaseShared()是AQS中的方法,该方法会唤醒队列中所有被阻塞的线程。

private void doReleaseShared() {      for (;;) {          Node h = head;          if (h != null && h != tail) {              int ws = h.waitStatus;              if (ws == Node.SIGNAL) {                  if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                      continue;            // loop to recheck cases                  unparkSuccessor(h);              }              else if (ws == 0 &&                       !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                  continue;                // loop on failed CAS          }          if (h == head)                   // loop if head changed              break;      }  }

这段方法比较难理解,会另外篇幅介绍。这里只要认为该段方法会唤醒所有因调用await()方法而阻塞的线程。

await()

当计数器的值不为0时,该方法会将当前线程加入到阻塞队列中,并把当前线程挂起。

public void await() throws InterruptedException {     sync.acquireSharedInterruptibly(1); }

同样是委托内部类Sync,调用其

acquireSharedInterruptibly()方法

public final void acquireSharedInterruptibly(int arg)           throws InterruptedException {       if (Thread.interrupted())           throw new InterruptedException();       if (tryAcquireShared(arg) < 0)           doAcquireSharedInterruptibly(arg);   }

接着看Sync内的tryAcquireShared()方法,如果当前计数器的值为0,则返回1,最终将导致await()不会将线程阻塞。如果当前计数器的值不为0,则返回-1。

protected int tryAcquireShared(int acquires) {         return (getState() == 0) ? 1 : -1;     }

tryAcquireShared方法返回一个负值时,将会调用AQS中的

doAcquireSharedInterruptibly()方法,将调用await()方法的线程加入到阻塞队列中,并将此线程挂起。

private void doAcquireSharedInterruptibly(int arg)       throws InterruptedException {       //将当前线程构造成一个共享模式的节点,并加入到阻塞队列中       final Node node = addWaiter(Node.SHARED);       boolean failed = true;       try {           for (;;) {               final Node p = node.predecessor();               if (p == head) {                           int r = tryAcquireShared(arg);                   if (r >= 0) {                       setHeadAndPropagate(node, r);                       p.next = null; // help GC                       failed = false;                       return;                   }               }               if (shouldParkAfterFailedAcquire(p, node) &&                   parkAndCheckInterrupt())                   throw new InterruptedException();           }       } finally {           if (failed)               cancelAcquire(node);       }   }

同样,以上的代码位于AQS中,在没有了解AQS结构的情况下去理解上述代码,有些困难,关于AQS源码,会另开篇幅介绍。

使用场景

CountDownLatch的使用场景很广泛,一般用于分头做某些事,再汇总的情景。例如:

数据报表:当前的微服务架构十分流行,大多数项目都会被拆成若干的子服务,那么报表服务在进行统计时,需要向各个服务抽取数据。此时可以创建与服务数相同的线程数,交由线程池处理,每个线程去对应服务中抽取数据,注意需要在finally语句块中进行countDown()操作。主线程调用await()阻塞,直到所有数据抽取成功,最后主线程再进行对数据的过滤组装等,形成直观的报表。

风险评估:客户端的一个同步请求查询用户的风险等级,服务端收到请求后会请求多个子系统获取数据,然后使用风险评估规则模型进行风险评估。如果使用单线程去完成这些操作,这个同步请求超时的可能性会很大,因为服务端请求多个子系统是依次排队的,请求子系统获取数据的时间是线性累加的。此时可以使用CountDownLatch,让多个线程并发请求多个子系统,当获取到多个子系统数据之后,再进行风险评估,这样请求子系统获取数据的时间就等于最耗时的那个请求的时间,可以大大减少处理时间。

感谢各位的阅读,以上就是“CountDownLatch的实现原理是什么”的内容了,经过本文的学习后,相信大家对CountDownLatch的实现原理是什么这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是创新互联,小编将为大家推送更多相关知识点的文章,欢迎关注!


网页名称:CountDownLatch的实现原理是什么
当前URL:http://cdxtjz.com/article/pjigii.html

其他资讯