版权声明:
作者:WangJY
出处:http://www.imrookie.cn
本文版权归作者所有,转载请指明出处,否则保留追究法律责任的权利。
ThreadPoolExecutor类是java线程池的核心类,位于java.util.concurrent包;
前排提示:文章最后附带测试代码(附带个人理解注释)。
ThreadPoolExecutor的核心组成
阻塞队列BlockingQueue,用来存放待执行的任务;
内部类Worker,实际的执行任务的线程(实现的Runable接口,通过ThreadFactory包装为线程对象);
线程工厂ThreadFactory,在ThreadPoolExecutor中用来包装Woker,效果等同于new Thread(worker);
拒绝策略执行器RejectedExecutionHandler;
核心线程数corePoolSize,即线程池允许的核心工作线程的数量,注意和maximumPoolSize有区别;
最大线程数maximumPoolSize;
线程存活时间keepAliveTime,表示线程没有任务执行时能保持多长时间才终止,默认对超出核心线程数的线程生效,如果allowCoreThreadTimeOut设置为true,则对所有线程生效;
ThreadPoolExecutor构造函数(共4个)
1.参数为:核心线程数、最大线程数、存活时间、存活时间的时间单元、存放任务的阻塞队列;源码如下(jdk1.8):
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
2.参数为:核心线程数、最大线程数、存活时间、存活时间的时间单元、存放任务的阻塞队列、线程工厂;源码如下(jdk1.8):
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); }
3.参数为:核心线程数、最大线程数、存活时间、存活时间的时间单元、存放任务的阻塞队列、拒绝策略处理器;
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); }
4.参数为:核心线程数、最大线程数、存活时间、存活时间的时间单元、存放任务的阻塞队列、线程工厂、拒绝策略处理器;
其他3个构造函数均是通过调用该构造函数来实现。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
如何执行任务
使用方法:调用线程池的execute(Runnable command)方法;
将我们的任务通过execute方法传入执行。
来看下源码:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
从源码中我们可以看到,任务不允许为null(会直接报NPE异常)
任务交给线程池后的处理策略
1.如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
2.如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;
3.如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;
4.如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。
ThreadPoolExecutor有个内部类Worker
worker的核心功能为执行任务,执行完成后循环向任务缓存队列取任务来执行.
ThreadPoolExecutor提供了任务执行前后的监听方法
protected void beforeExecute(Thread t, Runnable r) { } protected void afterExecute(Runnable r, Throwable t) { }
可以通过继承ThreadPoolExecutor类重写这2个方法来监控任务的执行.
在中断worker时使用锁.tryLock()来判断当前worker是否空闲.
线程池常用方法:
prestartCoreThread() | 初始化1个核心线程 |
prestartAllCoreThreads() | 初始化所有的核心线程 |
任务拒绝策略
当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务(前提是线程池没有关闭),即在调用execute方法的线程中执行 任务.run(),源码如下:
线程池的关闭
ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:
shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
线程池动态改变大小
ThreadPoolExecutor提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),
setCorePoolSize:设置核心池大小
setMaximumPoolSize:设置线程池最大能创建的线程数目大小
当上述参数从小变大时,ThreadPoolExecutor进行线程池容量赋值,还可能立即创建新的线程来执行任务。
线程池的使用
java doc并不建议我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法创建线程池;
1.Executors.newCachedThreadPool()
创建一个大小为Integer.MAX_VALUE的线程池(corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE),任务进来立马创建新的线程去执行,线程空闲60秒则销毁(keepAliveTime=60,单位为:秒)。阻塞队列使用的SynchronousQueue。
2.Executors.newSingleThreadExecutor()
创建一个大小为1的线程池(corePoolSize=1,maximumPoolSize=1),阻塞队列使用的LinkedBlockingQueue。
3.Executors.newFixedThreadPool(int i);
创建一个固定大小的线程池(corePoolSize=maximumPoolSize=i),阻塞队列使用的LinkedBlockingQueue。
除了这3种,还有其他创建线程池的方法。
线程池大小设置
一般来说,CPU密集型(计算密集型)任务推荐线程数不要太多,可以设置为:cpu核数+1,减少cpu上下文切换;
IO密集型任务因为更多时间消耗在IO上,cpu占用有限,推荐多设置线程数,可以设置为:cpu核数 * 2 + 1;
网上有这么一个公式,可以供参考:
最佳线程数目 = (线程等待时间(例如io操作或网络操作花费的时间)与线程CPU时间之比 + 1)* CPU数目
个人理解
线程池其实是使用它准备好的线程来执行我们提供的任务的run()方法,并不是以new Thread(任务)方式来跑的;它负责管理它创建的线程(其实就是new Thread(Worker worker)),在它创建的线程里通过调用任务.run()来完成我们的任务.
附:测试代码
1.任务类:MyTask.java
/** * 任务类 * Created by rookie on 2018/7/6. */ public class MyTask implements Runnable { //实际生产中,任务往往是一个复杂的类(有很多的属性和方法),在这里我们通过一个字符串来模拟 private String text; //构造函数 public MyTask(String text){ this.text = text; } //在ThreadPoolExecutor中,任务并不是通过线程的start方法来直接执行的 //线程池中的线程在任务队列中取到任务后,通过 任务.run()来执行本任务 public void run() { //这里写我们需要执行的任务,可以是调用其他service,也可以是数据库操作,也可以是别的任何你需要的操作 System.out.println("该任务的执行线程为: "+Thread.currentThread()+" , 任务执行: " + text); } }
2.线程工厂类:MyThreadFactory.java
import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; /** * 线程工厂 * Created by rookie on 2018/7/6. */ public class MyThreadFactory implements ThreadFactory { //记录线程创建次数,使用原子int,避免多线程环境下的线程安全问题 AtomicInteger atomicInteger = new AtomicInteger(0); //在ThreadPoolExecutor中,会将Worker传进来,通过此方法将Worker包装为thread //调用此方法创建线程效果相当于 new Thread(worker); //主要还是为了提供一个自定义线程的机会,例如自定义线程名等 public Thread newThread(Runnable r) { //可以是简单的返回,如下: //return new Thread(r); //也可以做一些必要的操作(根据自己的需求),例如:我需要自定义每个线程的名字和优先级 Thread thread = new Thread(r); thread.setName("imrookie-" + atomicInteger.incrementAndGet());//设置自己的线程名 thread.setPriority(1);//根据实际需要设置线程优先级 System.out.println("MyThreadFactory 创建一个线程..."); return thread; } }
3.测试入口:ThreadPoolTest.java
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * 线程池测试类 * Created by rookie on 2018/7/6. */ public class ThreadPoolTest { public static void main(String[] args) { //创建线程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 15, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100),//当队列满了并且最大线程数满了,在控制台可以看到拒绝策略的结果(默认抛异常) //new LinkedBlockingDeque<Runnable>(),//由于LinkedBlockingDeque可以无限添加,所以线程数最多等于10(corePoolSize) new MyThreadFactory() ); System.out.println("线程池创建完成!"); for(int i=0;i<1000;i++){ final int finalI = i; //模拟多线程环境下执行任务,使用了lambda表达式 new Thread(() -> { //通过线程池的execute方法执行任务 threadPoolExecutor.execute(new MyTask("任务--"+ finalI)); }).start(); } } }