线程池探究

发表于 2020-07-07  32 次阅读


ThreadPoolExecutor

ThreadPoolExecutor是线程池核心类,类图如下:

f8c8b15a525b1e5f714ce962b1df7211.png

构造函数

4个构造函数:

public ThreadPoolExecutor( int corePoolSize,
                                     int maximumPoolSize,
                                     long keepAliveTime,
                                     TimeUnit unit,
                                     BlockingQueue<Runnable> workQueue)
public ThreadPoolExecutor( int corePoolSize,
                                     int maximumPoolSize,
                                     long keepAliveTime,
                                     TimeUnit unit,
                                     BlockingQueue<Runnable> workQueue,
                                     ThreadFactory threadFactory)
public ThreadPoolExecutor( int corePoolSize,
                                     int maximumPoolSize,
                                     long keepAliveTime,
                                     TimeUnit unit,
                                     BlockingQueue<Runnable> workQueue,
                                     RejectedExecutionHandler handler)
public ThreadPoolExecutor( int corePoolSize,
                                     int maximumPoolSize,
                                     long keepAliveTime,
                                     TimeUnit unit,
                                     BlockingQueue<Runnable> workQueue,
                                     ThreadFactory threadFactory,
                                     RejectedExecutionHandler handler)
参数数据类型功能说明
corePoolSizeint线程池启动后,在池中保持的线程的最小数量。需要说明的是线程数量是逐步到达corePoolSize值的;例如corePoolSize为5,任务为3,则只会创建3个线程而不是5个
maximumPoolSizeint线程池中能容纳的最大线程数量,如果超出,则使用RejectedExecutionHandler拒绝策略处理
keepAliveTimelong非核心线程的最大生命周期
unitTimeUnitkeepAliveTime的时间单位
workQueueBlockingQueue<Runnable>任务缓冲队列。当线程池中的线程都处于运行状态,而此时任务数量继续增加,则需要一个容器来容纳这些任务,这就是任务队列。这个任务队列是一个阻塞式的单端队列。
threadFactoryThreadFactory定义如何创建一个线程,可以设置线程的名称,并且可以确定是否是守护线程等。
handlerRejectedExecutionHandler拒绝任务处理器。由于超出线程数量和队列容量而对继续增加的任务进行处理的程序。

线程生命周期逻辑梳理

  1. 线程池创建完成时,里面没有一个线程。
  2. 调用execute()方法添加任务时,线程做如下判断:
    * 线程池中有空闲线程,使用某一空闲线程执行任务;
    * 线程池中没有空闲线程,且当前线程数<corePoolSize,创建新的核心线程执行当前任务;( 创建核心线程数阶段 )
    * 线程池中没有空闲线程,且 corePoolSize<=(待执行任务+正在执行任务)<corePoolSize+任务缓冲队列长度,任务会被放入缓冲队列中;( 核心线程数已满,任务存放进缓冲队列阶段 )
    * 线程池中没有空闲线程,且 corePoolSize+任务缓冲队列长度<=(待执行任务+正在执行任务)<maximumPoolSize+任务缓冲队列长度,创建非核心线程执行当前任务;( 缓冲队列已满,创建非核心线程阶段 )
    * 线程池中没有空闲线程,且 当前线程数=maximumPoolSize,缓冲队列已满,调用拒绝任务处理器处理当前任务;( 线程数已满,缓冲队列已满,调用拒绝任务处理器阶段 )
  3. 某一线程的当前任务执行完毕时,有如下逻辑判断:
    * 缓冲队列中有任务,获取队首任务并执行;
    * 缓冲队列为空,且一定时间内(keepAliveTime)未执行任务,若当前线程数>corePoolSize则销毁当前线程。( 线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小 )

举个栗子

线程池参数:
corePoolSize = 3
maximumPoolSize = 6
缓冲队列长队 = 10
拒绝策略选择默认

当加入20个任务时,执行顺序如下:

  1. 任务1-3直接创建核心线程执行。
  2. 任务4-13被放入缓冲队列中。
  3. 任务14-16创建非核心线程执行。
  4. 任务17-20被拒绝执行,抛出异常。

workQueue 常见缓冲队列

缓冲队列的长度决定了能够缓冲任务的最大值,常见的缓冲队列有三种:

  • 直接提交
  • 有界队列
  • 无界队列

直接提交 - SynchronousQueue

直接提交,实际上并没有缓冲作用 同时刻最大可执行任务数 maximumPoolSize

SynchronousQueue实际上是无界的,但是由于其特性:在某次添加元素后必须等待其他线程取走后才能继续添加 导致只有当上一个任务被执行后,当前任务才能添加进去。

e289484bd049102aeb4f724a75a9bc1e.png

有界队列 - ArrayBlockingQueue

最大可缓存数=队列的长度 
同时刻最大可执行任务数: maximumPoolSize + ArrayBlockingQueue队列长度
ArrayBlockingQueue 构造函数必传初始化长度。

f47045dbe3bf915f521223b7d9284380.png

无界队列 - LinkedBlockingQueue

maximumPoolSize参数无效,理论上可以缓冲无限的任务。
同时刻可执行最大任务数 = corePoolSize;
最多也只创建 corePoolSize个线程
注意:这里的无界是一个相对的概念;LinkedBlockingQueue不传初始化长度,默认长度 Integer.MAX_VALUE,当设置初始化长度为10时,是有界的队列

2cda8b2852a92d9c448e261969b73d54.png


ThreadFactory

ThreadFactory 顾名思义就是提供一个创建线程的工厂类,告诉jvm按照什么标准创建线程池中的线程。

ThreadFactory 是一个接口,通过实现该接口可以改变线程的名称、线程组、优先级、守护进程状态等等。如果从 newThread 返回 null 时 ThreadFactory 未能创建线程,则执行程序将继续运行,但不能执行任何任务。

public interface ThreadFactory {  
    Thread newThread(Runnable r);  
}

默认线程工厂类 - DefaultThreadFactory

默认ThreadFactory通过Executors.defaultThreadFactory()方法获取;

DefaultThreadFactory源码如下:

static class DefaultThreadFactory implements ThreadFactory {  
    private static final AtomicInteger poolNumber = new AtomicInteger(1);  
    private final ThreadGroup group;  
    private final AtomicInteger threadNumber = new AtomicInteger(1);  
    private final String namePrefix;  // 线程名称前缀

    DefaultThreadFactory() {  
        SecurityManager s = System.getSecurityManager();  
        group = (s != null) ? s.getThreadGroup() :  Thread.currentThread().getThreadGroup();  
        namePrefix = "pool-" +  poolNumber.getAndIncrement() +  "-thread-";  
    }  
    // 为线程池创建新的任务执行线程  
    public Thread newThread(Runnable r) {  
        // 线程对应的任务是Runnable对象r  
        Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(), 0);  
        // 设为非守护线程  
        if (t.isDaemon())  
            t.setDaemon(false);  
        // 将优先级设为Thread.NORM_PRIORITY  
        if (t.getPriority() != Thread.NORM_PRIORITY)  
            t.setPriority(Thread.NORM_PRIORITY);  
        return t;  
    }  
}  

通过第三方工具Jar,自定义ThreadFactory

ThreadFactory的实现难度较大,不推荐大家自己实现,如果是学习研究可以自行试着实现;

maven依赖:

<dependency>&nbsp;&nbsp;&nbsp; 
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.5</version>
</dependency>
ThreadFactory customizeFactory = new BasicThreadFactory.Builder()
                                        .daemon(true)  //设置是否为守护线程&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
                                        .namingPattern("Test-Task-%d")  // 正则表达式规定线程名&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
                                        .priority(NORM_PRIORITY)  //设置线程优先级&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
                                        .build();

RejectedExecutionHandler

当线程池线程数达到最大线程数且缓冲队列已满的情况下,execute 方法将调用RejectedExecutionHandler.rejectedExecution()方法处理任务;

RejectedExecutionHandler源码:

public interface RejectedExecutionHandler {&nbsp;&nbsp;&nbsp;
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

JDK提供四种预定义的处理程序策略:   

  • AbortPolicy   
  • CallerRunsPolicy   
  • DiscardPolicy
  • DiscardOldestPolicy

默认处理策略 - AbortPolicy

JDK中的默认处理策略,该策略直接抛出运行时异常RejectedExecutionException

AbortPolicy源码如下:

fba752e244532216969c0045eed46785.png
代码样例
53a88133a147fc524554f26610b61037.png

CallerRunsPolicy

CallerRunsPolicy策略:在调用execute()的线程中运行任务

CallerRunsPolicy源码如下:

b102f48d05ca22f12b781f0020e6e623.png
代码样例
026d9a77961f5e92cf2857ce736d8515.png

DiscardPolicy

DiscardPolicy策略:直接无视当前待处理任务,舍弃任务;

DiscardPolicy源码:

32507856795b84226e691cdc2033cc79.png
代码样例
e33c057212ef471bc1cfacd5e510055c.png

DiscardOldestPolicy

DiscardOldestPolicy策略:抛弃队列头的任务,把当前任务加到队列尾;

DiscardOldestPolicy源码:

663a402be3a26d1d1c749c41865dcc13.png
代码样例
f0d110d53789ff4b96c5d134f1daffdc.png

自定义实现RejectedExecutionHandler

class RejectedHander implements RejectedExecutionHandler {&nbsp;&nbsp;&nbsp;
    @Override&nbsp;&nbsp;&nbsp; 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("rejected " + (( NameTask ) r).getName());&nbsp;&nbsp;&nbsp; 
    }
}

Executor - 任务提交接口

Executor框架同java.util.concurrent.Executor 接口在Java 5中被引入。Executor框架是一个根据一组执行策略调用,调度,执行和控制的异步任务的框架。Executor存在的目的是提供一种将“任务提交”与”任务如何运行”分离开来的机制。

源码如下:

public interface Executor {  
    void execute(Runnable command);  
} 

ExecutorService - 任务周期管理接口

ExecutorService在Executor接口的基础上,添加了一些用于生命周期管理的方法,用来解决线程的生命周期问题。

部分源码如下:

public interface ExecutorService extends Executor {  
    void shutdown();  
    List<Runnable> shutdownNow();  
    boolean isShutdown();  
    boolean isTerminated();  
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;  
    // 省略部分方法  
}

Executors - 工具类

Executors为创建Executor,ExecutorService,ScheduledExecutorService,ThreadFactory和Callable类提供了一些方法,类似于集合中的Collections类的功能。

Executors提供4预设线程池:

  • newCachedThreadPool
  • newFixedThreadPool
  • newSingleThreadExecutor
  • newScheduledThreadPool

newCachedThreadPool

源码:

fb1252f0a67dc38ea74dfb26262e3210.png

它是一种线程数量不定的线程池,并且只有非核心线程

该线程池比较适合没有固定大小并且比较快速就能完成的小任务,它将为每个任务创建一个线程。

那这样子它与直接创建线程对象(new Thread())有什么区别呢?看到它的第三个参数60L和第四个参数TimeUnit.SECONDS了吗?好处就在于60秒内能够重用已创建的线程。超过60s的闲置线程就会被回收。Integer.MAX_VALUE 是一个非常大的数,实际上就相当于最大线程数可以任意大。

newFixedThreadPool

源码:

791656549b922a38e2693dc68114f3a0.png

它是一种线程数量固定的线程池,如果提交的任务数量大于限制的最大线程数,那么这些任务将排队,当有一个线程的任务结束之后,将会根据调度策略继续等待执行下一个任务。

使用无界队列,理论上是可以处理无限任务,不会触发拒绝策略。

newSingleThreadExecutor

线程数量为1的固定线程的线程池,如果提交了多个任务,那么这些任务将会排队,每个任务都会在下一个任务开始之前运行结束,所有的任务将会使用相同的线程。

源码:

004c2fa29a7b7dc5ad29549974b944cb.png

newScheduledThreadPool

核心线程数是固定的,非核心线程数是没有限制的,并且当非核心线程闲置时,会被立即回收主要用于执行定时任务和具有固定周期的重复任务

源码:

c6696532671565e7a00c9973b3d2bb57.png
9f0317d26c7d3c41bea5f60d5490ba6c.png


从业时长3年半的佛系码农,并不会唱跳、rap和篮球