Java并发包线程池ThreadPoolExecutor的实现
线程池主要解决两个问题:一是当执行大量异步任务时线程池能够提供较好的性能。在不使用线程池时,每当需要执行异步任务时直接new一个线程来运行,而线程的创建和销毁都是需要开销的。线程池里面的线程是可复用的,不需要每次执行异步任务时都重新创建和销毁线程。二是线程池提供了一种资源限制和管理手段,比如可以限制线程的个数,动态新增线程等。每个ThreadPoolExecutor也保留了一些基本的统计数据,比如当前线程池完成的任务数目等。
我们首先来看一下类图
Excecutor是一个工具类,里面提供了许多静态方法,这些方法根据用户选择返回不同的线程池实例。ThreadPool继承了AbstractExecutorService。
下面我们看一下源码,
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
成员变量ctl是一个Integer的原子变量,用来记录当前线程池状态和线程池中的线程个数,有点类似于ReentrantReadWriteLock使用一个变量来保存两种信息。
线程池一共有五种状态:
- RUNNING:能接受新任务,并且处理阻塞队列里面的任务
- SHUTDOWN:拒绝接受新任务但是处理阻塞队列里的任务
- STOP:拒绝新任务并且抛弃阻塞队列里的任务
- TIDYING:所有任务都执行完(包含阻塞队列里面的任务)后当前线程池活动线程数为0,将要调用terminated方法。
- TERMINATED:终止状态。terminated方法调用完成以后的状态。
线程池状态转换如下:
- RUNNING->SHUTDOWN:显示调用shutdown方法,或者隐式调用finalize()方法里的shutdown()方法。
- RUNNINGSHUTDOWN->STOP:显示调用shutdown方法
- SHUTDOWN->TIDYING:当线程池和任务队列都为空时
- STOP->TIDYING:当线程池为空时
- TIDYING->TERMINATED:Terminated() hook方法执行完毕时
线程池的使用
合理利用线程池能够带来三个好处:
- 降低资源消耗。减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。
- 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
- 提高线程的可管理性。可以根据系统的承受能力,调整线程池中工作线线程的数目,防止因为消耗过多的内存,而把服务器累趴下(每个线程需要大约1MB内存,线程开的越多,消耗的内存也就越大,最后死机)。
在java.util.concurrent.Executors线程工厂类里面提供了一些静态工厂,生成一些常用的线程池。官方建议使用Executors工程类来创建线程池对象。
Executors类中有个创建线程池的方法如下:
- public static ExecutorService newFixedThreadPool(int nThreads):返回线程池对象。(创建的是有界线程池,也就是池中的线程个数可以指定最大数量)
获取到了一个线程池ExecutorService 对象,那么怎么使用呢,在这里定义了一个使用线程池对象的方法如下:
- public Future<?> submit(Runnable task):获取线程池中的某一个线程对象,并执行
Future接口:用来记录线程任务执行完毕后产生的结果。
使用线程池中线程对象的步骤:
- 创建线程池对象。
- 创建Runnable接口子类对象。(task)
- 提交Runnable接口子类对象。(take task)
- 关闭线程池(一般不做)。
Runnable实现类代码:
public class MyRunnable implements Runnable { @Override public void run() { System.out.println("我要一个教练"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("教练来了: " + Thread.currentThread().getName()); System.out.println("教我游泳,交完后,教练回到了游泳池"); } } public class ThreadPoolDemo { public static void main(String[] args) { // 创建线程池对象 ExecutorService service = Executors.newFixedThreadPool(2);//包含2个线程对象 // 创建Runnable实例对象 MyRunnable r = new MyRunnable(); //自己创建线程对象的方式 // Thread t = new Thread(r); // t.start(); ---> 调用MyRunnable中的run() // 从线程池中获取线程对象,然后调用MyRunnable中的run() service.submit(r); // 再获取个线程对象,调用MyRunnable中的run() service.submit(r); service.submit(r); // 注意:submit方法调用结束后,程序并不终止,是因为线程池控制了线程的关闭。 // 将使用完的线程又归还到了线程池中 // 关闭线程池 //service.shutdown(); } }
Callable测试代码:
<T> Future<T> submit(Callable<T> task) : 获取线程池中的某一个线程对象,并执行.
Future : 表示计算的结果.
V get() : 获取计算完成的结果。
public class ThreadPoolDemo2 { public static void main(String[] args) throws Exception { // 创建线程池对象 ExecutorService service = Executors.newFixedThreadPool(2);//包含2个线程对象 // 创建Runnable实例对象 Callable<Double> c = new Callable<Double>() { @Override public Double call() throws Exception { return Math.random(); } }; // 从线程池中获取线程对象,然后调用Callable中的call() Future<Double> f1 = service.submit(c); // Futur 调用get() 获取运算结果 System.out.println(f1.get()); Future<Double> f2 = service.submit(c); System.out.println(f2.get()); Future<Double> f3 = service.submit(c); System.out.println(f3.get()); } }
线程池的练习
public class Demo04 { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService pool = Executors.newFixedThreadPool(3); SumCallable sc = new SumCallable(100); Future<Integer> fu = pool.submit(sc); Integer integer = fu.get(); System.out.println("结果: " + integer); SumCallable sc2 = new SumCallable(200); Future<Integer> fu2 = pool.submit(sc2); Integer integer2 = fu2.get(); System.out.println("结果: " + integer2); pool.shutdown(); } }
public class SumCallable implements Callable<Integer> { private int n; public SumCallable(int n) { this.n = n; } @Override public Integer call() throws Exception { // 求1-n的和? int sum = 0; for (int i = 1; i <= n; i++) { sum += i; } return sum; } }
上一篇:springboot 使用Spring Boot Actuator监控应用小结
栏 目:JAVA代码
下一篇:SpringBoot整合Xxl-job实现定时任务的全过程
本文标题:Java并发包线程池ThreadPoolExecutor的实现
本文地址:http://www.codeinn.net/misctech/210153.html