WangJY's Blog
深入理解java多线程--线程池
更新时间:2020-03-15 分类:java 发布:imrookie 阅读(229) 评论(0) 字号:  

版权声明:    
作者:WangJY    
出处:http://www.imrookie.cn
   
本文版权归作者所有,转载请指明出处,否则保留追究法律责任的权利。

ThreadPoolExecutor类是java线程池的核心类,位于java.util.concurrent包;

前排提示:文章最后附带测试代码(附带个人理解注释)。

ThreadPoolExecutor的核心组成

  1. 阻塞队列BlockingQueue,用来存放待执行的任务;

  2. 内部类Worker,实际的执行任务的线程(实现的Runable接口,通过ThreadFactory包装为线程对象);

  3. 线程工厂ThreadFactory,在ThreadPoolExecutor中用来包装Woker,效果等同于new Thread(worker);

  4. 拒绝策略执行器RejectedExecutionHandler;

  5. 核心线程数corePoolSize,即线程池允许的核心工作线程的数量,注意和maximumPoolSize有区别;

  6. 最大线程数maximumPoolSize;

  7. 线程存活时间keepAliveTime,表示线程没有任务执行时能保持多长时间才终止,默认对超出核心线程数的线程生效,如果allowCoreThreadTimeOut设置为true,则对所有线程生效;

ThreadPoolExecutor构造函数(共4个)

1.参数为:核心线程数、最大线程数、存活时间、存活时间的时间单元、存放任务的阻塞队列;源码如下(jdk1.8):

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

2.参数为:核心线程数、最大线程数、存活时间、存活时间的时间单元、存放任务的阻塞队列、线程工厂;源码如下(jdk1.8)

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

3.参数为:核心线程数、最大线程数、存活时间、存活时间的时间单元、存放任务的阻塞队列、拒绝策略处理器;

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

4.参数为:核心线程数、最大线程数、存活时间、存活时间的时间单元、存放任务的阻塞队列、线程工厂、拒绝策略处理器;

其他3个构造函数均是通过调用该构造函数来实现。

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

如何执行任务

使用方法:调用线程池的execute(Runnable command)方法;

将我们的任务通过execute方法传入执行。

来看下源码:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

从源码中我们可以看到,任务不允许为null(会直接报NPE异常)

任务交给线程池后的处理策略

1.如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;

2.如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;

3.如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;

4.如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。

ThreadPoolExecutor有个内部类Worker

worker的核心功能为执行任务,执行完成后循环向任务缓存队列取任务来执行.

ThreadPoolExecutor提供了任务执行前后的监听方法

protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }

可以通过继承ThreadPoolExecutor类重写这2个方法来监控任务的执行.

在中断worker时使用锁.tryLock()来判断当前worker是否空闲.

线程池常用方法:

prestartCoreThread()初始化1个核心线程
prestartAllCoreThreads()初始化所有的核心线程

任务拒绝策略

当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。

ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。

ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)

ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务(前提是线程池没有关闭),即在调用execute方法的线程中执行 任务.run(),源码如下:

image.png

线程池的关闭

ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:

  • shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务

  • shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务

线程池动态改变大小

ThreadPoolExecutor提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),

  • setCorePoolSize:设置核心池大小

  • setMaximumPoolSize:设置线程池最大能创建的线程数目大小

当上述参数从小变大时,ThreadPoolExecutor进行线程池容量赋值,还可能立即创建新的线程来执行任务。

线程池的使用

    java doc并不建议我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法创建线程池;

1.Executors.newCachedThreadPool()

    创建一个大小为Integer.MAX_VALUE的线程池(corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE),任务进来立马创建新的线程去执行,线程空闲60秒则销毁(keepAliveTime=60,单位为:秒)。阻塞队列使用的SynchronousQueue。

image.png

2.Executors.newSingleThreadExecutor()

    创建一个大小为1的线程池(corePoolSize=1,maximumPoolSize=1),阻塞队列使用的LinkedBlockingQueue。

image.png

3.Executors.newFixedThreadPool(int i); 

    创建一个固定大小的线程池(corePoolSize=maximumPoolSize=i),阻塞队列使用的LinkedBlockingQueue。

image.png

除了这3种,还有其他创建线程池的方法。


线程池大小设置

    一般来说,CPU密集型(计算密集型)任务推荐线程数不要太多,可以设置为:cpu核数+1,减少cpu上下文切换;

    IO密集型任务因为更多时间消耗在IO上,cpu占用有限,推荐多设置线程数,可以设置为:cpu核数 * 2 + 1

    网上有这么一个公式,可以供参考:

最佳线程数目 = (线程等待时间(例如io操作或网络操作花费的时间)与线程CPU时间之比 + 1)* CPU数目

个人理解

    线程池其实是使用它准备好的线程来执行我们提供的任务的run()方法,并不是以new Thread(任务)方式来跑的;它负责管理它创建的线程(其实就是new Thread(Worker worker)),在它创建的线程里通过调用任务.run()来完成我们的任务.


附:测试代码

1.任务类:MyTask.java

/**
 * 任务类
 * Created by rookie on 2018/7/6.
 */
public class MyTask implements Runnable {

    //实际生产中,任务往往是一个复杂的类(有很多的属性和方法),在这里我们通过一个字符串来模拟
    private String text;

    //构造函数
    public MyTask(String text){
        this.text = text;
    }

    //在ThreadPoolExecutor中,任务并不是通过线程的start方法来直接执行的
    //线程池中的线程在任务队列中取到任务后,通过 任务.run()来执行本任务
    public void run() {
        //这里写我们需要执行的任务,可以是调用其他service,也可以是数据库操作,也可以是别的任何你需要的操作
        System.out.println("该任务的执行线程为: "+Thread.currentThread()+" , 任务执行: " + text);
    }
}

2.线程工厂类:MyThreadFactory.java

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 线程工厂
 * Created by rookie on 2018/7/6.
 */
public class MyThreadFactory implements ThreadFactory {

    //记录线程创建次数,使用原子int,避免多线程环境下的线程安全问题
    AtomicInteger atomicInteger = new AtomicInteger(0);

    //在ThreadPoolExecutor中,会将Worker传进来,通过此方法将Worker包装为thread
    //调用此方法创建线程效果相当于 new Thread(worker);
    //主要还是为了提供一个自定义线程的机会,例如自定义线程名等
    public Thread newThread(Runnable r) {
        //可以是简单的返回,如下:
        //return new Thread(r);

        //也可以做一些必要的操作(根据自己的需求),例如:我需要自定义每个线程的名字和优先级
        Thread thread = new Thread(r);
        thread.setName("imrookie-" + atomicInteger.incrementAndGet());//设置自己的线程名
        thread.setPriority(1);//根据实际需要设置线程优先级
        System.out.println("MyThreadFactory 创建一个线程...");
        return thread;
    }
}

3.测试入口:ThreadPoolTest.java

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 线程池测试类
 * Created by rookie on 2018/7/6.
 */
public class ThreadPoolTest {
    public static void main(String[] args) {
        //创建线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10,
                15,
                10,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(100),//当队列满了并且最大线程数满了,在控制台可以看到拒绝策略的结果(默认抛异常)
                //new LinkedBlockingDeque<Runnable>(),//由于LinkedBlockingDeque可以无限添加,所以线程数最多等于10(corePoolSize)
                new MyThreadFactory() );
        System.out.println("线程池创建完成!");
        for(int i=0;i<1000;i++){
            final int finalI = i;
            //模拟多线程环境下执行任务,使用了lambda表达式
            new Thread(() -> {
                //通过线程池的execute方法执行任务
                threadPoolExecutor.execute(new MyTask("任务--"+ finalI));
            }).start();
        }
    }
}


评论
0 人参与, 0 条评论
·联系方式

Q Q:243144837

Mail:243144837@qq.com

码云:https://gitee.com/I_M_ROOKIE

·公告

大家好,我是公告!

©2018 WangJY | 元码-轻云低代码 | 晋ICP备18002524号-1

晋公网安备 14050002000652号