前言
前面我们在讲并发工具类的时候,多次提到线程池,今天我们就来走进线程池的旅地,首先我们先不讲线程池框架Executors,我们今天先来介绍如何自己定义一个线程池,是不是已经迫不及待了,那么就让我们开启今天的旅途吧。
什么是线程池?
线程池可以理解为一个专门管理线程生命周期的池子,里面的线程都可以由这个池子本身来调度,使用线程池有哪些好处呢?
提高线程的可管理性:使用线程池可以对线程进行统一的分配,调优和监控
提高响应速度:当任务到达时,任务可以不需要的等到线程创建就能立即执行
线程池的实现原理
首先我们看下面这张图,对着图进行分析:
过程分析:当任务到达时,首先会判断核心线程池是否还有空闲线程,如果有则创建一个新的工作线程执行任务,如果没有空闲线程,则说明核心线程池已满,进行工作队列是否满的判断,如果没有满,则将任务存放在等待队列中,如果工作队列也满了,则再去判断线程池是否满,如果没有满,则新建一个线程来执行任务,否则采取拒绝策略;
下面我们来对核心线程池提交任务过程进行分析,这是线程池的核心角色,首先我们看下源码:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 如果工作线程数量小于核心线程数,则创建一个新的工作线程执行任务
if (workerCountOf(c) corePoolSize) {
// 创建工作线程成功,则直接返回
if (addWorker(command, true))
return;
c = ctl.get();
}
// 工作线程核心线程数或者工作线程创建失败时,将任务放入等待队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果等待队列满了,并且工作线程数量大于线程池总数量,则采取拒绝策略
else if (!addWorker(command, false))
reject(command);
}
我们会发现,当可以分配线程来执行任务时,我们总是新建一个工作线程Worker来执行任务,我们来了解下Worker特殊的地方,工作线程不仅会执行当前的任务,而且当前任务执行完毕之后,还会去等待队列中获取任务来执行,通过工作线程的源码可以得知这一点:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
上面我们讲到的工作队列,我们之前学到过队列有很多种,例如阻塞队列和非阻塞队列,有界队列和无界队列,那么在我们线程池中当使用不同的工作工作队列又会有什么区别呢?
使用无界队列时:LinkedBlockingQueue,与有界队列相比,除非系统资源被耗尽,否则无界队列的任务队列不存在任务入队列失败的情况,当有新任务到来,系统的线程数小于corePoolSize时,则新建线程执行任务,当线程数量达到corePoolSize值后,则线程不会继续新建,如果此时还持续有任务进来,而没有空闲的线程资源,则任务会进入队列排队等待,若任务创建和处理的速度差异很大,则无界队列会保持快速增长,直到资源耗尽内存,任务会一直堆积,直到内存满了,这种情况永远不会有有界队列中工作线程和线程池总数的比较过程;
创建线程池:自定义线程池也是通过ThreadPoolExecutor(线程池执行器)来实现,构造方法如下
public ThreadPoolExecutor(int corePoolSize, //核心线程数--线程池初始化创建的线程数量
int maximumPoolSize, // 最大线程数,线程池中能创建的最大线程数
long keepAliveTime, // 线程空闲等待时间
TimeUnit unit, // 线程空闲等待时间的单位
BlockingQueueRunnable workQueue, // 存放待执行任务的等待队列
RejectedExecutionHandler handler // 拒绝任务的处理策略) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
特别说明,拒绝策略有如下几种:
CallerRunsPolicy策略:只要线程池未关闭,该策略直接在调用者线程中运行当前被丢弃的任务。显然这样不会真的丢弃任务,但是,调用者线程性能可能急剧下降
DiscardPolicy策略:不处理,直接丢弃掉
向线程池提交任务:execute方法在上面已经介绍过了,这里就不重复介绍了
关闭线程池:关闭线程池有下面2个方法
// 将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
// 调用线程中断方法
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
// 遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。
// shutdownNow会首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,
// 并返回等待执行任务的列表,如果任务不一定要执行完,可以使用此方法
public ListRunnable shutdownNow() {
ListRunnable tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
原文始发于微信公众号(Justin的后端书架):