Java 线程池总结

线程池概念

说的简单明了一点,就是管理线程的一个池子,是一种基于池化思想管理线程的工具。

为解决资源分配的问题,线程池采用了“池化”(Pooling)思想。池化,顾名思义,是为了最大化收益并最小化风险,而将资源统一在一起管理的一种思想。

Pooling is the grouping together of resources (assets, equipment, personnel, effort, etc.) for the purposes of maximizing advantage or minimizing risk to the users. The term is used in finance, computing and equipment management.——wikipedia

为什么需要线程池

Java 线程使用的是一对一线程模型来实现的,线程每次创建和销毁都会有一定的性能消耗,线程池的作用就是把创建好的线程统一管理起来,能避免重复创建和销毁带来的开销,从而达到减低资源损耗,提升程序响应速度的好处。

线程池体系结构图

Java 中的线程池核心实现类是 ThreadPoolExecutor ,顶层接口是 Executor ,顶层接口 Executor 提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象执行命令,将任务的运行逻辑提交到执行器( Executor )中,由 Executor 框架完成线程的调配和任务的执行部分。

ExecutorService 接口增加了一些能力:

(1)扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;

(2)提供了管控线程池的方法,比如停止线程池的运行。

AbstractExecutorService 则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。

最下层的实现类 ThreadPoolExecutor 实现最复杂的运行部分, ThreadPoolExecutor 将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。

ThreadPoolExecutor 线程池

线程池的运行状态

ThreadPoolExecutor 的源码中可以看出,ThreadPoolExecutor线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量 (workerCount)。

AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
int COUNT_BITS = Integer.SIZE - 3;       // 29  https://jinglingwang.cn
int CAPACITY   = (1 << COUNT_BITS) - 1;  // 二进制表示是:29个1 (1 1111 1111 1111 1111 1111 1111 1111)

它同时包含两部分的信息:线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),高3位保存runState,低29位保存workerCount,两个变量之间互不干扰。

运行状态和线程数量计算方法如以下:

private static int runStateOf(int c)     { return c & ~CAPACITY; }  // 返回运行状态
private static int workerCountOf(int c)  { return c & CAPACITY; }   // 返回线程数量,workerCount是已被允许启动且未被允许停止的数量
private static int ctlOf(int rs, int wc) { return rs | wc; }        // 通过运行状态和线程数量生成ctl

线程池一共有5个运行状态,源码如下:

// 用高3位来表示运行状态
// -1的二进制位全是1,然后左移29位,最后结果是0-29位是0,30位以上都是1,二进制:1110 0000 0000 0000 0000 0000 0000 0000
private static final int RUNNING    = -1 << COUNT_BITS;
// 0 左移29位还是0  https://jinglingwang.cn
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 1左移29位,结果就是第30位是1,二进制:0010 0000 0000 0000 0000 0000 0000 0000
private static final int STOP       =  1 << COUNT_BITS;
// 2的二进制是10,左移29位,结果就是第31位是1,二进制:0100 0000 0000 0000 0000 0000 0000 0000
private static final int TIDYING    =  2 << COUNT_BITS;
// 3的二进制是11,左移29位,结果就是第30和31位是1,二进制:0110 0000 0000 0000 0000 0000 0000 0000
private static final int TERMINATED =  3 << COUNT_BITS;

他们的含义分别是:

  1. RUNNING
    表示可以接收新的任务,并且可以处理阻塞队列中的任务
  2. SHUTDOWN
    关闭状态,不能接收新提交的任务,但可以继续处理阻塞队列中的已存在的任务
  3. STOP
    不再接收新的任务,也不处理队列中的任务,并且会中断正在处理任务的线程
  4. TIDYING
    所有任务已经中止,且工作线程数量(workCount)为0。最后变迁到这个状态的线程将要执行terminated()钩子方法,只会有一个线程执行这个方法
  5. TERMINATED
    中止状态,已经执行完terminated()钩子方法;by: https://jinglingwang.cn

线程池的生命周期运转图:

线程池的核心参数

corePoolSize

  • 核心线程数;运行中的线程数小于corePoolSize时, 就会创建一个核心线程, 否则丢队列中

maximumPoolSize

  • 最大线程数;队列满了之后, 创建线程发现运行中的线程数等于corePoolSize时, 会创建一个非核心线程, 加上corePoolSize后不能大于maximumPoolSize

keepAliveTime

allowCoreThreadTimeOut

timeUnit

  • 线程保持空闲时间单位

workQueue

  • 任务队列, 且必须是实现了BlockingQueue的阻塞队列

threadFactory

  • 线程工厂
  • 默认使用的是Executors工具类中的Executors.defaultThreadFactory()类,这个类有个缺点,创建的线程的名称是自动生成的,无法自定义以区分不同的线程池,且它们都是非守护线程。
  • 那怎么自定义一个线程工厂呢?其实也很简单,自己实现一个ThreadFactory,然后把名称和是否是守护进程当作构造方法的参数传进来就可以了。

handler

  • 拒绝策略;常用的策略有:
    1. AbortPolicy:抛出RejectedExecutionException异常(默认的策略)
    2. DiscardPolicy:Does nothing, 什么也不做
    3. DiscardOldestPolicy:从队列中poll一个任务, 然后再执行(要求线程池未关闭)
    4. CallerRunsPolicy:直接执行(要求线程池未关闭)

线程池运行机制

线程池的任务调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。其执行过程如下:

RUNNING

execute执行流程图:

线程池的阻塞队列

阻塞队列是线程池的核心。线程池中是以生产者消费者模式,然后通过一个阻塞队列来实现的。阻塞队列缓存任务,工作线程从阻塞队列中获取任务。

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。

这两个附加的操作是:

  1. 在队列为空时,获取元素的线程会等待队列变为非空。
  2. 当队列为满时,存储元素的线程会等待队列变为非满可用。

不同的阻塞队列可以实现不一样的任务存取策略。

常见的阻塞队列有:

  1. ArrayBlockingQueue
    基于数组实现的有界阻塞队列;按照FIFO的原则对元素进行排序;支持公平锁和非公平锁;
  2. LinkedBlockingQueue
    基于链表实现的有界阻塞队列;按照FIFO的原则对元素进行排序;默认长度是Integer.MAX_VALUE,使用需谨慎;
  3. LinkedBlockingDueue
    基于链表实现的双向阻塞队列;队头和队尾都可以添加和移除元素;默认长度是Integer.MAX_VALUE,使用需谨慎;
  4. LinkedTransferQueue
    基于链表实现的无界阻塞队列;与其他队列相比多了transfer方法。
  5. PriorityBlockingQueue
    基于数组实现的无界阻塞队列;支持线程优先级排序,默认自然排序并支持自定义实现compareTo()方法指定排序规则;不能保证同优先级元素的顺序;
  6. DelayQueue
    基于PriorityQueue实现的无界阻塞队列;可以实现延迟获取,指定时间后才可以从队列中获取到元素;
  7. SynchronousQueue
    一个不存储元素的阻塞队列;每一次put操作必须等待take操作,否则不能再put元素;支持公平锁和非公平锁。isEmpty()始终返回true;

Executors 线程池

Executors 是JDK1.5开始提供的一个线程池工具类,支持直接调用方法创建6种不同的线程池,他们分别是:

  1. newCachedThreadPool()
    缓存线程以便重复使用,如果限制 60 秒没被使用,则会被移除缓存;用的是 SynchronousQueue 队列;队列长度为 Integer.MAX_VALUE;
  2. newFixedThreadPool(int nThreads)
    创建一个数量固定的线程池;用的是 LinkedBlockingQueue 队列;队列长度为 Integer.MAX_VALUE;
  3. newSingleThreadExecutor()
    创建一个单线程线程池。用的是 LinkedBlockingQueue 队列;队列长度为 Integer.MAX_VALUE;
  4. newScheduledThreadPool(int corePoolSize)
    创建一个数量固定的线程池,支持执行定时性或周期性任务;用的是 DelayedWorkQueue 队列;
  5. newSingleThreadScheduledExecutor()
    单线程的 newScheduledThreadPool;
  6. newWorkStealingPool()
    Java 8 新增创建线程池的方法,此线程池会并行处理任务,不能保证执行顺序。用的是 ForkJoinPool

不建议通过Executors 线程池去创建线程池,尤其是FixedThreadPool、SingleThreadPool和CachedThreadPool,因为他们允许的队列长度都是Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。