1. 简介
在引入线程池之前,我们先来了解几个事情:
- 线程的创建和销毁是有代价的,如线程创建需要时间和相关计算资源。如果在Web服务器上为每个来到的请求都创建一个线程,而大多数请求都是轻量级的处理过程。那么创建线程的代价与请求处理的代价相比就非常大了,导致影响整体性能。
- 当线程数量达到能让CPU忙绿起来的时候,此时再创建线程,线程也基本处于闲置状态,这时候多出来的线程除了占用内存外,还可能因为与其他线程争用CPU资源导致出现其他性能开销.
- 在可创建线程的数量上存在一个限制,如果超过这个限制,可能会抛出
OutOfMemoryError
异常。
这时候如果能出现一个东西能够对线程的生命周期进行管理,对现有的线程重复利用,并且能够以一种简单的方式将任务的提交与执行相解耦。没错,这就是线程池(Thread Pool),在要了解Java中的线程池,首先必须了解ThreadPoolExecutor
这个类。
2. ThreadPoolExecutor详解
类继承图
构造函数
/线程池配置信息,volatile修饰保证变量在多线程下的可见性private volatile int corePoolSize;private volatile int maximumPoolSize;private volatile long keepAliveTime;private final BlockingQueueworkQueue;private volatile ThreadFactory threadFactory;private volatile RejectedExecutionHandler handler;private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }复制代码
从上面的JDK中ThreadPoolExecutor
类的构造函数源码看出该构造函数一共有7个参数,下面介绍七个参数的含义:
参数 | 含义 |
---|---|
corePoolSize | 基本大小,即线程池中的核心线程数 |
maximumPoolSize | 最大大小,即线程池中允许的最大线程数 |
keepAliveTime | 存活时间,当线程的没执行任务时,空闲的时间超过了这个时间就会被标记为可回收,直到线程池的大小超过基本大小,被标记的线程就会被终止 |
unit | keepAliveTime 的单位,有DAYS 、HOURS 、MINUTES 、SECONDS 、MILLISECONDS 、MICROSECONDS 、NANOSECONDS 7个单位可选 |
workQueue | 工作队列,一个用来保存等待被执行的任务的阻塞队列 |
threadFactory | 线程工厂。线程池在创建线程时通过调用线程工厂的Thread newThread(Runnable r) 来创建线程 |
handler | 饱和策略。当阻塞队列已满、线程池当前的线程数已达到最大值且没有线程处于空闲状态时,此时对于提交过来的任务将执行饱和策略。(如果某个任务提交到一个已关闭的Executor时,也会执行饱和策略) |
ThreadPoolExecutor
类中有四个重载的构造函数,每个构造函数都必须指定上表中的前5个参数,最后两个参数可以随意指定,不指定的话构造函数会使用默认的线程工厂和饱和策略:
线程工厂(ThreadFactory)
线程池创建线程都是通过的ThreadFactory
的Thread newThread(Runnable r)
方法来创建的。下面是Executors
类里的默认线程工厂方法的源码。
static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }复制代码
从上面可以看出默认线程工厂创建出的是一个非守护、优先级为Thread.NORM_PRIORITY
的线程。如果想要自己定制线程工厂满足需求,只需实现ThreadFactory
接口的Thread newThread(Runnable r)
方法。
饱和策略(RejectedExecutionHandler)
JDK中的ThreadPoolExecutor
类提供了4种不同的RejectedExecutionHandler
实现:
AbortPolicy
默认的饱和策略,该策略抛出未检查(运行时异常)的RejectedExecutionException
。DiscardPolicy
不执行任何操作,直接抛弃任务CallerRunsPolicy
在调用者线程中执行该任务DiscardOldestPolicy
丢弃阻塞队列中的第一个任务, 然后重新将该任务交给线程池执行
同样的,可以通过实现RejectedExecutionHandler
接口自定义饱和策略。
线程池状态和线程数量
/代表线程池当前状态和线程数量的原子变量private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3; /COUNT_BITS为29private static final int CAPACITY = (1 << COUNT_BITS) - 1; /CAPACITY为能表示的最大线程数。/线程池状态private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;/对线程池状态和线程数量进行打包和拆包的函数:private static int runStateOf(int c) { return c & ~CAPACITY; }private static int workerCountOf(int c) { return c & CAPACITY; }private static int ctlOf(int rs, int wc) { return rs | wc; }/判断线程池状态的三个函数private static boolean runStateLessThan(int c, int s) { return c < s;}private static boolean runStateAtLeast(int c, int s) { return c >= s;}private static boolean isRunning(int c) { return c < SHUTDOWN;}/线程数量增1,成功返回true,失败返回falseprivate boolean compareAndIncrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect + 1); }/线程数量减1,成功返回true,失败返回falseprivate boolean compareAndDecrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect - 1);}/线程数量减1,失败则重试直到成功private void decrementWorkerCount() { do {} while (! compareAndDecrementWorkerCount(ctl.get()));}复制代码
AtomicInteger
类型的变量ctl
用高3位来表示当前线程池状态,低29位来表示当前的线程数。
Java线程池有5种不同的状态,分别为运行(RUNNING
)、关闭(SHUTDOWN
)、停止(STOP
)、整理(TIDYING
)、结束(TERMINATED
)。 在ThreadPoolExecutor
里由5个整型常量表示,每个整型常量的都由高3位表示状态:
RUNNING
高3位为111,该状态的线程池会接收新任务,并处理阻塞队列中的任务SHUTDOWN
高3位为000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务。调用void shutdown()
方法实现STOP
高3位为001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务。调用List<Runnable> shutdownNow()
实现。TIDYING
高3位为010,当线程池关闭后阻塞队列的任务已完成或线程池停止,然后workerCount
(当前线程数量)为0,线程池进入该状态后会调用terminated()
方法进入TERMINATED
状态。TERMINATED
高3位为011
启动线程池
当创建完一个ThreadPoolExecutor
对象后,线程池里并没有线程。一般都是调用void execute(Runnable command)
执行任务时才创建线程并启动,不过可以通过调用如下方法预先创建核心线程并启动(在addWorker方法里启动):
public int prestartAllCoreThreads() { int n = 0; while (addWorker(null, true)) ++n; return n; }复制代码
执行过程
如上图所示,当调用void execute(Runnable command)
这个方法执行任务时: - 判断当前线程池线程数量是否小于核心线程池大小,是则创建线程并启动,否则到第2步
- 判断任务队列是否已满,未满则将任务加入阻塞队列,已满则到第3步
- 判断当前线程池线程数量是否小于最大线程池大小,是则创建线程并启动,否则执行饱和策略
public void execute(Runnable command) { /任务为空,抛出空指针异常 if (command == null) throw new NullPointerException(); int c = ctl.get(); /判断当前线程数量是否小于核心线程数 if (workerCountOf(c) < corePoolSize) { /是则添加一个核心线程(true表示核心线程)到线程池,并且启动线程执行任务(addWorker方法里会启动) if (addWorker(command, true)) return; /添加成功则返回 c = ctl.get(); } /线程池处于运行状态则向阻塞队列添加该任务 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); /判断线程池是否处于运行状态,不是就移除刚才添加的任务 if (! isRunning(recheck) && remove(command)) /移除成功就执行饱和策略,这样整个方法就结束了 reject(command); /否则若处于运行状态或移除失败,这时无论处于哪种情况任务都在阻塞队列里,判断当前线程数量是否为0 else if (workerCountOf(recheck) == 0) 若是则添加一个线程并启动 addWorker(null, false); } else if (!addWorker(command, false)) reject(command);} 复制代码
addWorker方法
boolean addWorker(Runnable firstTask, boolean core)
方法的作用就是创建Worker
对象并启动这个对象里的线程(Worker
里一个Thread
类型的字段)。
private final ReentrantLock mainLock = new ReentrantLock();private final HashSetworkers = new HashSet ();private int largestPoolSize;private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); /如果线程池不处于运行状态,理论上不应该添加一个执行该任务的线程,但如果满足下面三个条件的话就可以通过: 1. 线程池状态是关闭 2. 要执行的任务为空 3. 阻塞队列不为空 因为线程池关闭后不允许提交任务,但关闭后会执行完阻塞队列的任务,所以允许添加一个firstTask为空的线程 来帮助执行完阻塞队列里的任务 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); /若当前线程池的线程数量达到了线程池所允许的最大线程数或所指定要添加线程类型的线程数量则返回false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; /到这里前面的限制条件都通过,现在尝试将线程数量增一,成功则退出最外层的循环 if (compareAndIncrementWorkerCount(c)) break retry; /失败则重新获取线程池状态,状态改变则从最外层循环开始执行,不变则从内循环开始执行 c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { /构造一个Worker对象,每个Worker对象绑定一个线程 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); /若线程池处于运行状态或处于关闭且firstTask为null if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { 线程提前启动,则抛出异常 if (t.isAlive()) throw new IllegalThreadStateException(); /将w加到Worker的集合里 workers.add(w); 获取Worker集合大小,若大小比largestPoolSize大小大,则更新一下 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; /添加成功 workerAdded = true; } } finally { mainLock.unlock(); } /若添加成功则启动线程 if (workerAdded) { t.start(); workerStarted = true; } } } finally { /若启动失败(t线程为空或添加过程中抛出异常)则执行addWorkerFailed方法 if (! workerStarted) addWorkerFailed(w); } return workerStarted;}复制代码
Worker类
线程池维护的线程其实是一组Worker对象,Worker封装了线程也继承了AbstractQueuedSynchronizer
类并实现了Runnable
接口,重写了void run()
方法。至于为什么要继承AbstractQueuedSynchronizer
类,请看下面的runWorker
方法讲解。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ private static final long serialVersionUID = 6138294804551838833L; final Thread thread; Runnable firstTask; /绑定这个对象线程已执行完成的任务数 volatile long completedTasks; Worker(Runnable firstTask) { /阻止中断,在任务获取前不允许中断 setState(-1); this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /线程启动时执行的方法 public void run() { runWorker(this); } protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } /获取锁,不可重入 public void lock() { acquire(1); } /尝试获取锁 public boolean tryLock() { return tryAcquire(1); } /释放锁 public void unlock() { release(1); } /判断锁是否被独占 public boolean isLocked() { return isHeldExclusively(); } /中断已开始执行的线程,这个就是为什么要设置setState(-1)的一个原因了,这个方法会被`shutdownNow()`方法调用。 void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }}复制代码
runWorker方法
上面说到为什么Worker
类要继承AbstractQueuedSynchronizer
,其实是要用锁的状态来区分空闲线程和非空闲线程,在执行runWorker
方法中:
- 获取任务时没有加锁(空闲状态,可中断线程)
- 要执行任务时才加锁(不允许中断线程)
在调用void tryTerminate()
和void shutdown()
这两个方法时,会中断空闲线程,所以没有在执行任务的线程就可能被中断。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); /允许中断,与Worker构造函数的setState(-1)是一对的 boolean completedAbruptly = true; try { /获取到任务才进入循环 while (task != null || (task = getTask()) != null) { /加锁,表示非空闲状态 w.lock(); /1. 如果线程池状态大于等于STOP并且本线程未中断,则应该执行中断方法 2. 或者执行Thread.interrupted()方法判断本线程是否中断并且清除中断状态, 如果发现线程池状态大于等于STOP则执行中断方法。 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { /ThreadPoolExecutor中的beforeExecute(wt, task)方法一个空方法,用来留给继承ThreadPoolExecutor的类 来重写该方法并在任务执行前执行 beforeExecute(wt, task); Throwable thrown = null; try { /执行获取到的任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { /ThreadPoolExecutor中的afterExecute(task,thrown)方法也是一个空方法,用来留给继承 ThreadPoolExecutor的类来重写该方法并在任务执行后执行 afterExecute(task, thrown); } } finally { task = null; /该线程执行的任务加1,即使抛出异常 w.completedTasks++; /释放锁,表示回到空闲状态 w.unlock(); } } /执行到这一步表示是由于获取不到任务而正常退出的,所以completedAbruptly为false completedAbruptly = false; } finally { /无论怎样退出都要执行 processWorkerExit(w, completedAbruptly); }}复制代码
getTask方法
private Runnable getTask() { /表示获取任务是否已超时 boolean timedOut = false; for (;;) { int c = ctl.get(); int rs = runStateOf(c); /1. 若线程池状态大于等于停止状态,此时线程池不再处理队列的任务,并且会回收所有线程(不管空不空闲), 所以此时应该把线程池线程数量减1,并且获取的任务为空 /2. 处于关闭状态且任务队列为空,表示任务队列为空且不会有任务提交,所以线程数减1,并且获取的任务为空 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); /是否启用超时机制。当允许核心线程超时或当前线程数超过核心线程则启用 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; /如果线程数量超过线程池所允许的最大线程数或者启用超时机制情况下获取任务超时,理论上应该回收线程。 但是如果该线程是线程池中的最后一个线程且任务队列不为空就可以不回收,继续运行,要是还有其他线程或者任务队列为空则回收该线程。 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { /尝试将线程数量减1,成功返回null,失败继续从循环开始处开始。这里为什么不是用decrementWorkerCount() 这种不会失败的方法减1而采用这种方式。是因为 wc > 1,如果线程池不只有一个线程它们互相发现不只一个线程, 且它们同时执行不会失败的将线程数量减一的方法,到时线程池线程数量可能就为0了,哪么队列中的任务就没线程执行了。 if (compareAndDecrementWorkerCount(c)) return null; continue; } try { /1. 如果启用超时机制就执行poll()方法,在keepAliveTime纳秒内还没获取就返回null。 2. 如果未启用超时机制就执行take()方法,队列没任务就一直阻塞直到有任务。 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; /到这里就是因为超时获取不到任务 timedOut = true; } catch (InterruptedException retry) { /在执行take()过程中被中断并不算超时 timedOut = false; } }}复制代码
processWorkerExit方法
private void processWorkerExit(Worker w, boolean completedAbruptly) { /由于不是获取不到任务而正常退出的,得在这里将线程数减1,正常退出的在getTask()方法有这个减1操作 if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; /加锁,因为HashSet和completedTaskCount不是线程安全的 mainLock.lock(); try { /将线程执行的任务数统一加到线程池维护的completedTaskCount字段 completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } /尝试将线程池设置为结束状态 tryTerminate(); int c = ctl.get(); /满足当前线程池状态小于STOP(运行或关闭状态)才继续 if (runStateLessThan(c, STOP)) { 若线程是异常退出runWorker方法就直接添加一个没有带初始任务的非核心线程 if (!completedAbruptly) { /这三行代码找出当前线程池所至少存在的线程数 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; /如果当前线程数已经大于等于min,就直接返回,否则添加一个没有带初始任务的非核心线程 if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); }}复制代码
下图是向线程池提交任务后,线程池的正常执行过程:
tryTerminate方法
terminate
(结束)是线程池的最后一个状态,只能由关闭或停止状态转变为结束状态。 final void tryTerminate() { for (;;) { int c = ctl.get(); /如果满足下面任意一个条件就没办法到达结束状态 1. 线程池处于运行状态 2. 线程池状态是TIDYING或已经是结束状态 3. 线程池处于关闭状态且任务队列不为空 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; /当前线程数量不为0也无法到达结束状态 if (workerCountOf(c) != 0) { /中断一个空闲线程 interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { /尝试将线程池状态设置为TIDYING,失败重循环开始处开始 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { /terminated()是一个空方法,留给继承ThreadPoolExecutor的类覆盖 terminated(); } finally { /尝试将线程池状态设置为TERMINATED ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } }}复制代码
关闭操作
我们可以通过调用void shutdown()
方法关闭线程池,关闭后线程池后不允许接受新任务
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { / 安全策略判断 checkShutdownAccess(); /设置线程池状态为SHUTDOWN状态 advanceRunState(SHUTDOWN); /中断所有空闲线程 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } /尝试结束线程池 tryTerminate();}复制代码
停止操作
我们可以在运行和关闭状态下通过调用void shutdownNow()
方法停止线程池,停止后线程池后不允许接受新任务,也不会执行阻塞队列里的任务,还会中断当前所有的线程。
public ListshutdownNow() { List tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { / 安全策略判断 checkShutdownAccess(); /设置线程池状态为STOP状态 advanceRunState(STOP); /中断所有线程,不管是空闲还是非空闲 interruptWorkers(); /取出阻塞队列的所有任务 tasks = drainQueue(); } finally { mainLock.unlock(); } /尝试结束线程池 tryTerminate(); return tasks;}复制代码
3. 线程池的配置
Executors
提供了四种静态工厂方法来创建四种不同配置的线程池:
-
newFixedThreadPool(int nThreads)
接受一个int类型的nThreads变量,创建一个核心线程数和最大线程数都为
nThreads
的线程池(即最大线程数为nThreads),且使用一个无界的阻塞队列LinkedBlockingQueue
。如果不设置核心线程超时的话,创建的线程是不会超时的。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());}复制代码
-
newSingleThreadExecutor()
创建一个核心线程数和最大线程数都为1的线程池(即最大线程数为1),且使用一个无界的阻塞队列
LinkedBlockingQueue
,不设置核心线程超时的话,创建的线程也是不会超时的。唯一线程可以保证任务的顺序执行,如果这个唯一的线程执行过程中因为异常而结束的话,在processWorkerExit
方法最后会判断是否因异常而结束而创建一个新线程继续运行。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));}复制代码
-
newCachedThreadPool()
创建一个核心线程数为0,最大线程数为
Integer.MAX_VALUE
的线程池,超时时间为60秒,所以线程空闲时间超过60秒就会被回收。使用了一个同步队列作为阻塞队列,同步队列不存储元素,且在一端进行插入,另一端要有移除操作插入才会成功,否则插入操作会阻塞等待。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue());}复制代码
-
newScheduledThreadPool()
创建一个核心线程数为
corePoolSize
的线程池,用于指定的时间内周期性的执行所的任务。ScheduledThreadPoolExecutor
继承自ThreadPoolExecutor
。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize);}复制代码