java thread reuse(good)

在前面介绍JUC的文章中,提到了关于线程池Execotors的创建介绍,在文章:《java之JUC系列-外部Tools》中第一部分有详细的说明,请参阅;

 

第十三章 ThreadPoolExecutor源码解析,十三张源码

ThreadPoolExecutor使用方式、工作机理以及参数的详细介绍,请参照《第十二章
ThreadPoolExecutor使用与工作机理 》

1、源代码主要掌握两个部分

  • 线程池的创建:构造器
  • 提交任务到线程池去执行:execute()

 

2、构造器

2.1、一些属性:

图片 1 /** *
runState provides the main lifecyle control, taking on values: * *
RUNNING -> SHUTDOWN * On invocation of shutdown(), perhaps
implicitly in finalize() * (RUNNING or SHUTDOWN) -> STOP * On
invocation of shutdownNow() * SHUTDOWN -> TERMINATED * When both
queue and pool are empty * STOP -> TERMINATED * When pool is empty
*/ volatile int runState; static final int RUNNING =
0;//接收新的任务,会处理队列中的任务 static final int SHUTDOWN =
1;//不接收新的任务,但是会处理队列中的任务 static final int STOP =
2;//不接收新的任务,也不会处理队列中的任务,而且还会中断正在执行的任务
static final int TERMINATED = 3;//STOP+中止所有线程 private final
BlockingQueue<Runnable> workQueue;//队列 /** * 对poolSize,
corePoolSize, maximumPoolSize, runState, and workers set上锁 */ private
final ReentrantLock mainLock = new ReentrantLock(); /** *
支持awaitTermination的等待条件 */ private final Condition termination =
mainLock.newCondition(); /** *
pool中的所有工作线程集合;仅仅在持有mainLock的时候才允许被访问 */
private final HashSet<Worker> workers = new
HashSet<Worker>(); private volatile long keepAliveTime; /** *
false(默认):当核心线程处于闲置状态时,也会存活 *
true:核心线程使用keepAliveTime来决定自己的存活状态 */ private volatile
boolean allowCoreThreadTimeOut; /** * Core pool
size,仅仅在持有mainLock的时候才允许被更新, *
因为是volatile允许并发读(即使是在更新的过程中) */ private volatile
int corePoolSize; /** * Maximum pool size, 其他同上 */ private
volatile int maximumPoolSize; /** * Current pool size, 其他同上 */
private volatile int poolSize; /** * 回绝处理器 */ private volatile
RejectedExecutionHandler handler; /** *
所有的线程都通过这个线程工厂的addThread方法来创建。 */ private volatile
ThreadFactory threadFactory; /** * Tracks largest attained pool size.
*/ private int largestPoolSize; /** *
已经完成的任务数.仅仅在工作线程被终结的时候这个数字才会被更新 */
private long completedTaskCount; /** *
默认的回绝处理器(回绝任务并抛出异常) */ private static final
RejectedExecutionHandler defaultHandler = new AbortPolicy(); View Code

说明:因为属性不多,这里列出了全部属性。

 

2.2、构造器:

图片 2 public
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long
keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler); } public
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long
keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize,
keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long
keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize,
keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(),
handler); } public ThreadPoolExecutor(int corePoolSize, int
maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler) { /* * 检查参数 */ if (corePoolSize
< 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize
|| keepAliveTime < 0) throw new IllegalArgumentException(); if
(workQueue == null || threadFactory == null || handler == null) throw
new NullPointerException(); /* * 初始化值 */ this.corePoolSize =
corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue =
workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime);//转成纳秒
this.threadFactory = threadFactory; this.handler = handler; } View Code

说明:4个构造器(1个5参+2个6参+1个7参)

注意:默认情况下,构造器只会初始化参数,不会提前构建好线程

建议:构造器参数众多,建议使用构建器模式,关于构建器模式的实际使用范例,请参照《第二章
Google guava cache源码解析1–构建缓存器》

构造器中默认线程工厂的创建:Executors中的方法

图片 3 public
static ThreadFactory defaultThreadFactory() { return new
DefaultThreadFactory(); } /** * 默认的线程工厂 */ static class
DefaultThreadFactory implements ThreadFactory { static final
AtomicInteger poolNumber = new AtomicInteger(1);//池数量 final
ThreadGroup group;//线程组 final AtomicInteger threadNumber = new
AtomicInteger(1);//线程数量 final String namePrefix; /* *
创建默认的线程工厂 */ DefaultThreadFactory() { SecurityManager s =
System.getSecurityManager(); group = (s != null)? s.getThreadGroup() :
Thread.currentThread().getThreadGroup(); namePrefix = “pool-” +
poolNumber.getAndIncrement() + “-thread-“; } /* * 创建一个新的线程 */
public Thread newThread(Runnable r) { Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),//新线程的名字 0); /* *
将后台线程设置为应用线程 */ if (t.isDaemon()) t.setDaemon(false); /*
* 将线程的优先级全部设置为NORM_PRIORITY */ if (t.getPriority() !=
Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; }
} View Code

说明,其中的newThread()方法会在第三部分用到。

 

3、提交任务的线程池去执行execute(Runnable command)

图片 4 public
void execute(Runnable command) { if (command == null) throw new
NullPointerException(); /** *
这一块儿就是整个工作机理的部分(代码比较精致) *
1、addIfUnderCorePoolSize *
1)如果当前线程数poolSize<核心线程数corePoolSize并且pool的状态为RUNNING,
* 1.1)先获取锁 *
1.2)根据传入的任务firstTask创建一个Work对象,在该对象中编写了run()方法,在该run()方法中会真正的去执行firstTask的run()
*
说明:关于Work对象run部分的内容,查看Work内部类的run()方法上边的注释以及与其相关方法的注释
*
1.3)通过线程工厂与上边创建出来的work对象w创建新的线程t,将w加入工作线程集合,
*
然后启动线程t,之后就会自动执行w中的run(),w中的run()又会调用firstTask的run(),即处理真正的业务逻辑
* * 2、如果poolSize>=corePoolSize或者上边的执行失败了 *
1)如果pool的状态处于RUNNING,将该任务入队(offer(command)) *
如果入队后,pool的状态不是RUNNING了或者池中的线程数为0了,下边的逻辑具体去查看注释
* 2)addIfUnderMaximumPoolSize(同addIfUnderCorePoolSize) *
如果增加线程也不成功,则回绝任务。 * */ if (poolSize >=
corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState ==
RUNNING && workQueue.offer(command)) { if (runState != RUNNING ||
poolSize == 0) ensureQueuedTaskHandled(command); } else if
(!addIfUnderMaximumPoolSize(command)) reject(command); // is shutdown or
saturated } } View Code

 

3.1、addIfUnderCorePoolSize(Runnable firstTask)

图片 5 /** *
创建并且启动一个新的线程来处理任务 *
1、其第一个任务就是传入的firstTask参数 *
2、该方法仅仅用于当前线程数小于核心线程数并且pool没有被关掉的时候 */
private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t =
null; final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//获取锁 try { if (poolSize < corePoolSize &&
runState == RUNNING) t = addThread(firstTask);//创建新线程 } finally {
mainLock.unlock();//释放锁 } return t != null; } View Code

addThread(Runnable firstTask)

图片 6 private
Thread addThread(Runnable firstTask) { Worker w = new
Worker(firstTask);//构造一个work Thread t =
threadFactory.newThread(w);//创建线程 boolean workerStarted = false; if
(t != null) {// if (t.isAlive()) //如果t线程已经启动了,而且还没有死亡
throw new IllegalThreadStateException(); w.thread = t;
workers.add(w);//将w工作线程加入workers线程池 int nt =
++poolSize;//当前的池数量+1 if (nt > largestPoolSize) largestPoolSize
= nt; try { t.start();//启动线程 workerStarted = true; } finally { if
(!workerStarted)//启动线程没有成功
workers.remove(w);//将w从workers集合中删除 } } return t; } View Code

newThread(Runnable r)

该方法在构建上边的默认线程工厂部分已经说过了。

 

Work内部类:

图片 7/** *
工作线程。 */ private final class Worker implements Runnable { /** *
在每一个任务的执行前后都会获取和释放runLock。 *
该锁只要是为了防止中断正在执行任务的work线程 */ private final
ReentrantLock runLock = new ReentrantLock(); /** * Initial task to
run before entering run loop. * 1、Possibly null. */ private Runnable
firstTask; /** * 每个work线程完成的任务总量 * accumulated into
completedTaskCount upon termination. */ volatile long completedTasks;
Thread thread; /** * 该work中的线程是不是确实正在执行了run() */
volatile boolean hasRun = false; Worker(Runnable firstTask) {
this.firstTask = firstTask; } /* * true:已经有线程持有了该锁 */
boolean isActive() { return runLock.isLocked(); } private void
runTask(Runnable task) { final ReentrantLock runLock = this.runLock;
runLock.lock();//获取锁runLock try { /* *
如果pool状态为STOP或TERMINATED,确保线程被打断; *
如果不是,确保线程不要被打断 */ if ((runState >= STOP ||
(Thread.interrupted() && runState >= STOP)) && hasRun)
thread.interrupt(); /* *
确保afterExecute会被执行仅仅当任务完成了(try)或抛出了异常(catch) */
boolean ran = false; beforeExecute(thread,
task);//执行任务的run()方法之前要执行的操作 try {
task.run();//执行线程的run()方法 ran = true; afterExecute(task,
null);//执行任务的run()方法之后要执行的操作 ++completedTasks; } catch
(RuntimeException ex) { if (!ran) afterExecute(task, ex); throw ex; } }
finally { runLock.unlock();//释放锁runLock } } /** * Main run loop *
运行当前任务task,运行结束后,尝试获取队列中的其他任务, *
如果最后通过各种方式都获取不到,就回收该线程,如果获取到了,就用该线程继续执行接下来的任务
* 最后,当获取不到任何任务去执行时,就将该线程从works线程集合中删除掉
*/ public void run() { try { hasRun = true; Runnable task = firstTask;
firstTask = null; while (task != null || (task = getTask()) != null) {
runTask(task);//运行该任务 task = null; } } finally {
workerDone(this);//将该线程从works集合中删除 } } } View Code

说明:这里列出了该内部类的全部属性和常用方法。

 

getTask()

图片 8 /** *
获取下一个worker线程将要运行的任务 * Gets the next task for a worker
thread to run. */ Runnable getTask() { for (;;) {//无限循环 try { int
state = runState; if (state > SHUTDOWN) return null; Runnable r; if
(state == SHUTDOWN) // Help drain queue r =
workQueue.poll();//处理queue中的任务 //下面的runState==RUNNING else if
(poolSize > corePoolSize || allowCoreThreadTimeOut)
//从队头获取任务,如果没有任务,等待keepAliveTime的时间 r =
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); else
//从队头获取任务,如果没有任务,阻塞等待 r = workQueue.take(); if (r !=
null) return r; if (workerCanExit()) {//允许回收获取任务失败的线程 if
(runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers();//中断闲置的work线程 return null; } // Else retry
} catch (InterruptedException ie) { // On interruption, re-check
runState } } } View Code

workerCanExit()

图片 9 /** *
检测一个获取任务失败的work线程是否可以退出了。 *
出现下面三种情况,work线程就会死亡。 *
1、如果pool的状态为STOP或TERMINATED * 2、队列为空 *
3、允许回收核心线程并且池中的线程数大于1和corePoolSize的最大值 */
private boolean workerCanExit() { final ReentrantLock mainLock =
this.mainLock; mainLock.lock(); boolean canExit; try { canExit =
runState >= STOP || workQueue.isEmpty() || (allowCoreThreadTimeOut &&
poolSize > Math.max(1, corePoolSize)); } finally { mainLock.unlock();
} return canExit; } View Code

workerDone(Worker w)

图片 10 void
workerDone(Worker w) { final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); try { completedTaskCount += w.completedTasks;
workers.remove(w);//从workers集合中删除该线程 if (–poolSize ==
0)//如果池中的线程数为0 tryTerminate(); } finally { mainLock.unlock(); }
} View Code

 

3.2、ensureQueuedTaskHandled(Runnable command)

图片 11 /** *
在一个task入队之后重新检查state。 *
当一个task入队后,pool的state发生了变化,该方法就会被调用。 *
如果一个task入队的同时,shutdownNow方法发生了调用,该方法就必须从队列中移除并回绝
* 否则该方法会保证至少有一个线程来处理入队的task */ private void
ensureQueuedTaskHandled(Runnable command) { final ReentrantLock mainLock
= this.mainLock; mainLock.lock(); boolean reject = false; Thread t =
null; try { int state = runState; if (state != RUNNING &&
workQueue.remove(command)) reject = true; else if (state < STOP &&
poolSize < Math.max(corePoolSize, 1) && !workQueue.isEmpty()) t =
addThread(null); } finally { mainLock.unlock(); } if (reject)
reject(command); } View Code

 

3.3、addIfUnderMaximumPoolSize(Runnable firstTask)

图片 12 private
boolean addIfUnderMaximumPoolSize(Runnable firstTask) { Thread t = null;
final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if
(poolSize < maximumPoolSize && runState == RUNNING) t =
addThread(firstTask); } finally { mainLock.unlock(); } return t != null;
} View Code

说明:该方法的其他方法与addIfUnderCorePoolSize(Runnable firstTask)一样。

 

3.4、reject(Runnable command)

图片 13 void
reject(Runnable command) { handler.rejectedExecution(command, this); }
View Code
图片 14 public
static class AbortPolicy implements RejectedExecutionHandler { public
AbortPolicy() { } /** 直接抛异常 */ public void
rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new
RejectedExecutionException(); } } View Code

 

说明:明白了上一章将的线程池机理,按着这个机理去看源代码是非常容易的事情。

总结:

  • 上一章的工作机理
  • 上一章的参数详细说明

ThreadPoolExecutor源码解析,十三张源码
ThreadPoolExecutor使用方式、工作机理以及参数的详细介绍,请参照《第十二章
ThreadPoolExecutor使…

文章中其实说明了外部的使用方式,但是没有说内部是如何实现的,为了加深对实现的理解,在使用中可以放心,我们这里将做源码解析以及反馈到原理上,Executors工具可以创建普通的线程池以及schedule调度任务的调度池,其实两者实现上还是有一些区别,但是理解了ThreadPoolExecutor,在看ScheduledThreadPoolExecutor就非常轻松了,后面的文章中也会专门介绍这块,但是需要先看这篇文章。

I have always read that creating threads is expensive. I also know that
you cannot rerun a thread.

使用Executors最常用的莫过于是使用:Executors.newFixedThreadPool(int)这个方法,因为它既可以限制数量,而且线程用完后不会一直被cache住;那么就通过它来看看源码,回过头来再看其他构造方法的区别:

I see in the doc of Executors class: Creates a thread pool that creates
new threads as needed, but will reuse previously constructed threads
when they are available.

在《java之JUC系列-外部Tools》文章中提到了构造方法,为了和本文对接,再贴下代码:

Mind the word ‘reuse’.

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}

How do thread pools ‘reuse’ threads?

其实你可以自己new一个ThreadPoolExecutor,来达到自己的参数可控的程度,例如,可以将LinkedBlockingQueue换成其它的(如:SynchronousQueue),只是可读性会降低,这里只是使用了一种设计模式。

Answer:

我们现在来看看ThreadPoolExecutor的源码是怎么样的,也许你刚开始看他的源码会很痛苦,因为你不知道作者为什么是这样设计的,所以本文就我看到的思想会给你做一个介绍,此时也许你通过知道了一些作者的思想,你也许就知道应该该如何去操作了。

The thread pool consists of a number of fixed worker threads that can
take tasks from an internal task queue.
So if one task ends,
the thread does not end
but waits for the next task. If you
abort a thread, it is automatically replaced.

这里来看下构造方法中对那些属性做了赋值:

Look at
the documentation for
more details.

源码段1:

From Thread.start() Javadoc:

   public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
 * Causes this thread to begin execution; the Java Virtual Machine 
 * calls the <code>run</code> method of this thread.

这里你可以看到最终赋值的过程,可以先大概知道下参数的意思:

BUT then inside each Thread’s run() method Runnable shall be
dequeued and the run() method of each Runnable is going to be
called. So each thread can process several Runnable. That’s what they
refer to by “thread reuse”.

corePoolSize:核心运行的poolSize,也就是当超过这个范围的时候,就需要将新的Thread放入到等待队列中了;

One way to do your own thread pool is to use a blocking queue on to
which you enqueue runnables and have each of your thread, once it’s done
processing the run() method of a Runnable, dequeue the
next Runnable (or block) and run its run() method, then rinse and
repeat.

maximumPoolSize:一般你用不到,当大于了这个值就会将Thread由一个丢弃处理机制来处理,但是当你发生:newFixedThreadPool的时候,corePoolSize和maximumPoolSize是一样的,而corePoolSize是先执行的,所以他会先被放入等待队列,而不会执行到下面的丢弃处理中,看了后面的代码你就知道了。

I guess part of the confusion (and it is a bit confusing) comes from the
fact that a Thread takes a Runnable and upon calling start() the
Runnable ‘s run() method is called while the default thread
pools also take Runnable.

workQueue:等待队列,当达到corePoolSize的时候,就向该等待队列放入线程信息(默认为一个LinkedBlockingQueue),运行中的队列属性为:workers,为一个HashSet;内部被包装了一层,后面会看到这部分代码。

keepAliveTime:默认都是0,当线程没有任务处理后,保持多长时间,cachedPoolSize是默认60s,不推荐使用。

Worker所在的线程启动后,首先执行创建其时传入的Runnable任务,执行完成后,循环调用getTask来获取新的任务,在没有任务的情况下,退出此线程。

threadFactory:是构造Thread的方法,你可以自己去包装和传递,主要实现newThread方法即可;

getTask方法实现:

handler:也就是参数maximumPoolSize达到后丢弃处理的方法,java提供了5种丢弃处理的方法,当然你也可以自己弄,主要是要实现接口:RejectedExecutionHandler中的方法:

Runnable getTask() {
for (;;) {
try {
int state = runState;
if (state > SHUTDOWN)
return null;
Runnable r;
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
r = workQueue.take();
if (r != null)
return r;
if (workerCanExit()) {
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers();
return null;
}
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
}
}
}
getTask就是通过WorkQueue的poll或task方法来获取下一个要执行的任务。
回到execute方法 ,execute 方法部分实现:

public
void rejectedExecution(Runnabler, ThreadPoolExecutor e)

if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated

java默认的是使用:AbortPolicy,他的作用是当出现这中情况的时候会抛出一个异常;其余的还包含:

如果当前线程池数量大于corePoolSize或addIfUnderCorePoolSize方法执行失败,则执行后续操作;如果线程池处于运行状态并且workQueue中成功加入任务,再次判断如果线程池的状态不为运行状态或当前线程池数为0,则调用ensureQueuedTaskHandled方法

1、CallerRunsPolicy:如果发现线程池还在运行,就直接运行这个线程

ensureQueuedTaskHandled方法实现:
private void ensureQueuedTaskHandled(Runnable command) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
boolean reject = false;
Thread t = null;
try {
int state = runState;
if (state != RUNNING && workQueue.remove(command))
reject = true;
else if (state < STOP &&
poolSize < Math.max(corePoolSize, 1) &&
!workQueue.isEmpty())
t = addThread(null);
} finally {
mainLock.unlock();
}
if (reject)
reject(command);
else if (t != null)
t.start();
}
ensureQueuedTaskHandled方法判断线程池运行,如果状态不为运行状态,从workQueue中删除,
并调用reject做拒绝处理。
reject方法实现:
void reject(Runnable command) {
handler.rejectedExecution(command, this);
}

2、DiscardOldestPolicy:在线程池的等待队列中,将头取出一个抛弃,然后将当前线程放进去。

再次回到execute方法,

3、DiscardPolicy:什么也不做

if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
如线程池workQueue
offer失败或不处于运行状态,调用addIfUnderMaximumPoolSize,addIfUnderMaximumPoolSize方法基本和addIfUnderCorePoolSize实现类似,不同点在于根据最大线程数(maximumPoolSize)进行比较,如果超过最大线程数,返回false,调用reject方法,下面是addIfUnderMaximumPoolSize方法实现:

4、AbortPolicy:java默认,抛出一个异常:RejectedExecutionException。

private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < maximumPoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}

通常你得到线程池后,会调用其中的:submit方法或execute方法去操作;其实你会发现,submit方法最终会调用execute方法来进行操作,只是他提供了一个Future来托管返回值的处理而已,当你调用需要有返回值的信息时,你用它来处理是比较好的;这个Future会包装对Callable信息,并定义一个Sync对象(),当你发生读取返回值的操作的时候,会通过Sync对象进入锁,直到有返回值的数据通知,具体细节先不要看太多,继续向下:

  1. 添加任务处理流程
    当一个任务通过execute(Runnable)方法欲添加到线程池时:
    如果当前线程池中的数量小于corePoolSize,并线程池处于Running状态,创建并添加的任务。
    如果当前线程池中的数量等于corePoolSize,并线程池处于Running状态,缓冲队列
    workQueue未满,那么任务被放入缓冲队列、等待任务调度执行。
    如果当前线程池中的数量大于corePoolSize,缓冲队列workQueue已满,并且线程池中的数量小于maximumPoolSize,新提交任务会创建新线程执行任务。
    如果当前线程池中的数量大于corePoolSize,缓冲队列workQueue已满,并且线程池中的数量等于maximumPoolSize,新提交任务由Handler处理。

来看看execute最为核心的方法吧:

当线程池中的线程大于corePoolSize时,多余线程空闲时间超过keepAliveTime时,会关闭这部分线程。

源码段2:

 

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
            if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }
            else if (!addIfUnderMaximumPoolSize(command))
                reject(command); // is shutdown or saturated
        }
    }

根据程序正常执行的路线来看,这个方法中比较重要的两个地方分别是:
1、workQueue.offer(command)
workQueue在上面提到过,是BlockingQueue<Runnable>类型的变量,这条语句就是将Runnable类型的实例加入到队列中。
2、ensureQueuedTaskHandled(command)
这个是线程执行的关键语句。看看它的源码:

这段代码看似简单,其实有点难懂,很多人也是这里没看懂,没事,我一个if一个if说:

Java代码
图片 15

首先第一个判定空操作就不用说了,下面判定的poolSize >=
corePoolSize成立时候会进入if的区域,当然它不成立也有可能会进入,他会判定addIfUnderCorePoolSize是否返回false,如果返回false就会进去;

  1. public class
    ThreadPoolExecutor extends
    AbstractExecutorService {
  2. ……….
  3. private void
    ensureQueuedTaskHandled(Runnable command) {
  4. final ReentrantLock mainLock = this.mainLock;
  5. mainLock.lock();
  6. boolean reject = false;

  7. Thread t = null;

  8. try {
  9. int state = runState;
  10. if (state != RUNNING &&
    workQueue.remove(command))
  11. reject = true;
  12. else if (state < STOP
    &&
  13. poolSize < Math.max(corePoolSize, 1) &&

  14. !workQueue.isEmpty())

  15. t = addThread(null);
  16. } finally {
  17. mainLock.unlock();
  18. }
  19. if (reject)
  20. reject(command);
  21. else if (t != null)
  22. t.start();
  23. }
  24. ……….
  25. }

我们先来看下addIfUnderCorePoolSize方法的源码是什么:

在这里我们就可以看到最终执行了t.start()方法来运行线程。在这之前的重点是t=addThread(null)方法,看看addThread方法的源码:

源码段3:

Java代码
图片 16

    private boolean addIfUnderCorePoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < corePoolSize && runState == RUNNING)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        if (t == null)
            return false;
        t.start();
        return true;
    }
  1. public class
    ThreadPoolExecutor extends
    AbstractExecutorService {
  2. ……….
  3. private Thread addThread(Runnable firstTask) {

  4. Worker w = new Worker(firstTask);

  5. Thread t = threadFactory.newThread(w);
  6. if (t != null) {

  7. w.thread = t;

  8. workers.add(w);
  9. int nt = ++poolSize;
  10. if (nt > largestPoolSize)
  11. largestPoolSize = nt;
  12. }
  13. return t;
  14. }
  15. ……….
  16. }

可以发现,这段源码是如果发现小雨corePoolSize就会创建一个新的线程,并且调用线程的start()方法将线程运行起来:这个addThread()方法,我们先不考虑细节,因为我们还要先看到前面是怎么进去的,这里可以发信啊,只有没有创建成功Thread才会返回false,也就是当当前的poolSize
> corePoolSize的时候,或线程池已经不是在running状态的时候才会出现;

这里两个重点,很明显:
1、Worker w = new Worker(firstTask)
2、Thread t = threadFactory.newThread(w)
先看Worker是个什么结构:

注意:这里在外部判定一次poolSize和corePoolSize只是初步判定,内部是加锁后判定的,以得到更为准确的结果,而外部初步判定如果是大于了,就没有必要进入这段有锁的代码了。

Java代码
图片 17

此时我们知道了,当前线程数量大于corePoolSize的时候,就会进入【代码段2】的第一个if语句中,回到【源码段2】,继续看if语句中的内容:

  1. public class
    ThreadPoolExecutor extends
    AbstractExecutorService {
  2. ……….
  3. private final class Worker implements
    Runnable {
  4. ……….
  5. Worker(Runnable firstTask) {
  6. this.firstTask = firstTask;
  7. }
    1. private Runnable firstTask;
  8. ……….
    1. public void run() {
  9. try {

  10. Runnable task = firstTask;
  11. firstTask = null;
  12. while (task != null ||
    (task = getTask()) != null) {

  13. runTask(task);

  14. task = null;
  15. }
  16. } finally {
  17. workerDone(this);
  18. }
  19. }
  20. }
    1. Runnable getTask() {
  21. for (;;) {
  22. try {
  23. int state = runState;
  24. if (state > SHUTDOWN)
  25. return null;

  26. Runnable r;

  27. if (state == SHUTDOWN) // Help drain queue
  28. r = workQueue.poll();
  29. else if (poolSize >
    corePoolSize || allowCoreThreadTimeOut)
  30. r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
  31. else
  32. r = workQueue.take();
  33. if (r != null)

  34. return r;

  35. if (workerCanExit()) {
  36. if (runState >= SHUTDOWN) // Wake up others
  37. interruptIdleWorkers();
  38. return null;

  39. }

  40. // Else retry
  41. } catch (InterruptedException ie) {
  42. // On interruption, re-check runState
  43. }
  44. }
  45. }
  46. }
  47. ……….
  48. }

这里标记为

Worker是一个内部类。根据之前可以知道,传入addThread的参数是null,也就是说Work中firstTask为null。
在看看newThread是一个什么方法:

源码段4

Java代码
图片 18

if (runState == RUNNING && workQueue.offer(command)) {
   if (runState != RUNNING || poolSize == 0)
       ensureQueuedTaskHandled(command);
   }
   else if (!addIfUnderMaximumPoolSize(command))
       reject(command); // is shutdown or saturated
  1. public class Executors {

  2. ……….

  3. static class
    DefaultThreadFactory implements ThreadFactory
    {
  4. ……….
  5. public Thread newThread(Runnable r) {
  6. Thread t = new Thread(group, r,
  7. namePrefix + threadNumber.getAndIncrement(),
  8. 0);
  9. if (t.isDaemon())
  10. t.setDaemon(false);
  11. if (t.getPriority() != Thread.NORM_PRIORITY)

  12. t.setPriority(Thread.NORM_PRIORITY);

  13. return t;
  14. }
  15. ……….
  16. }
  17. ……….
  18. }

第一个if,也就是当当前状态为running的时候,就会去执行workQueue.offer(command),这个workQueue其实就是一个BlockingQueue,offer()操作就是在队列的尾部写入一个对象,此时写入的对象为线程的对象而已;所以你可以认为只有线程池在RUNNING状态,才会在队列尾部插入数据,否则就执行else
if,其实else
if可以看出是要做一个是否大于MaximumPoolSize的判定,如果大于这个值,就会做reject的操作,关于reject的说明,我们在【源码段1】的解释中已经非常明确的说明,这里可以简单看下源码,以应征结果:

通过源码可以得知threadFactory的实际类型是DefaultThreadFactory,而DefaultThreadFactory是Executors的一个嵌套内部类。

源码段5:

之前我们提到了t.start()这个方法执行了线程。那么现在从头顺一下,看看到底是执行了谁的run方法。首先知道,t=addThread(null),而addThread内部执行了下面三步,Worker
w = new Worker(null);Thread t = threadFactory.newThread(w);return
t;这里两个t是一致的。
从这里可以看出,t.start()实际上执行的是Worker内部的run方法。run()内部会在if条件里面使用“短路”:判断firstTask是否为null,若不是null则直接执行firstTask的run方法;如果是null,则调用getTask()方法来获取Runnable类型实例。从哪里获取呢?workQueue!在execute方法中,执行ensureQueuedTaskHandled(command)之前就已经把Runnable类型实例放入到workQueue中了,所以这里可以从workQueue中获取到。

    private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < maximumPoolSize && runState == RUNNING)
                //在corePoolSize = maximumPoolSize下,该代码几乎不可能运行
                t = addThread(firstTask); 
        } finally {
            mainLock.unlock();
        }
        if (t == null)
            return false;
        t.start();
        return true;
}
void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }

也就是如果线程池满了,而且线程池调用了shutdown后,还在调用execute方法时,就会抛出上面说明的异常:RejectedExecutionException

 

再回头来看下【代码段4】中进入到等待队列后的操作:

 

if (runState != RUNNING || poolSize == 0)

                   ensureQueuedTaskHandled(command);

Java代码  图片 19

这段代码是要在线程池运行状态不是RUNNING或poolSize ==
0才会调用,他是干啥呢?

  1. private boolean addIfUnderCorePoolSize(Runnable firstTask) {  
  2.        Thread t = null;  
  3.        final ReentrantLock mainLock = this.mainLock;  
  4.        mainLock.lock();  
  5.        try {  
  6.         //poolSize < corePoolSize 即当前工作线程的数量一定要小于你设置的线程最大数量  
  7.         //CachedThreadPool永远也不会进入该方法,因为它的corePoolSize初始为0  
  8.            if (poolSize < corePoolSize && runState == RUNNING)  
  9.                t = addThread(firstTask);  
  10.        } finally {  
  11.            mainLock.unlock();  
  12.        }  
  13.        if (t == null)  
  14.            return false;  
  15.        t.start();   //线程执行了  
  16.        return true;  
  17.    }  

他为什么会不等于RUNNING呢?外面那一层不是判定了他==
RUNNING了么,其实有时间差就是了,如果是poolSize ==
0也会执行这段代码,但是里面的判定条件是如果不是RUNNING,就做reject操作,在第一个线程进去的时候,会将第一个线程直接启动起来;很多人也是看这段代码很绕,因为不断的循环判定类似的判定条件,你主要记住他们之间有时间差,要取最新的就好了。

    看’t.start()’,这表示工作线程启动了,工作线程t启动的前提条件是’t =
addThread(firstTask);
‘返回值t必须不为null。好了,现在想看看java线程池中工作线程是怎么样的吗?请看addThread方法: 
   

此时貌似代码看完了?咦,此时有问题了:

Java代码  图片 20

1、 
等待中的线程在后来是如何跑起来的呢?线程池是不是有类似Timer一样的守护进程不断扫描线程队列和等待队列?还是利用某种锁机制,实现类似wait和notify实现的?

  1. private Thread addThread(Runnable firstTask) {  
  2.     //Worker就是典型的工作线程,所以的核心线程都在工作线程中执行  
  3.        Worker w = new Worker(firstTask);  
  4.        //采用默认的线程工厂生产出一线程。注意就是设置一些线程的默认属性,如优先级、是否为后台线程等  
  5.        Thread t = threadFactory.newThread(w);   
  6.        if (t != null) {  
  7.            w.thread = t;  
  8.            workers.add(w);  
  9.          //没生成一个工作线程 poolSize加1,但poolSize等于最大线程数corePoolSize时,则不能再生成工作线程  
  10.            int nt = ++poolSize;    
  11.            if (nt > largestPoolSize)  
  12.                largestPoolSize = nt;  
  13.        }  
  14.        return t;  
  15.    }  

2、  线程池的运行队列和等待队列是如何管理的呢?这里还没看出影子呢!

   看见没,Worker就是工作线程类,它是ThreadPoolExecutor中的一个内部类。下面,我们主要分析Worker类,如了解了Worker类,那基本就了解了java线程池的整个原理了。不用怕,Worker类的逻辑很简单,它其实就是一个线程,实现了Runnable接口的,所以,我们先从run方法入手,run方法源码如下: 

NO,NO,NO!

 

Java在实现这部分的时候,使用了怪异的手段,神马手段呢,还要再看一部分代码才晓得。

Java代码  图片 21

在前面【源码段3】中,我们看到了一个方法叫:addThread(),也许很少有人会想到关键在这里,其实关键就是在这里:

  1. public void run() {  
  2.             try {  
  3.                 Runnable task = firstTask;  
  4.                 firstTask = null;  
  5.                 /** 
  6.                  * 注意这段while循环的执行逻辑,没执行完一个核心线程后,就会去线程池 
  7.                  * 队列中取下一个核心线程,如取出的核心线程为null,则当前工作线程终止 
  8.                  */  
  9.                 while (task != null || (task = getTask()) != null) {  
  10.                     runTask(task);  //你所提交的核心线程(任务)的运行逻辑  
  11.                     task = null;  
  12.                 }  
  13.             } finally {  
  14.                 workerDone(this); // 当前工作线程退出  
  15.             }  
  16.         }  
  17.     }  

我们看看addThread()方法到底做了什么。

    从源码中可看出,我们所提交的核心线程(任务)的逻辑是在Worker中的runTask()方法中实现的。这个方法很简单,自己可以打开看看。这里要注意一点,在runTask()方法中执行核心线程时是调用核心线程的run()方法,这是一个寻常方法的调用,千万别与线程的启动(start())混合了。这里还有一个比较重要的方法,那就是上述代码中while循环中的getTask()方法,它是一个从池队列中取的核心线程(任务)的方法。具体代码如下: 

源码段6:

   

    private Thread addThread(Runnable firstTask) {
        Worker w = new Worker(firstTask);
        Thread t = threadFactory.newThread(w);
        if (t != null) {
            w.thread = t;
            workers.add(w);
            int nt = ++poolSize;
            if (nt > largestPoolSize)
                largestPoolSize = nt;
        }
        return t;
    }

Java代码  图片 22

这里创建了一个Work,其余的操作,就是讲poolSize叠加,然后将将其放入workers的运行队列等操作;

  1. Runnable getTask() {  
  2.         for (;;) {  
  3.             try {  
  4.                 int state = runState;  
  5.                 if (state > SHUTDOWN)    
  6.                     return null;  
  7.                 Runnable r;  
  8.                 if (state == SHUTDOWN)  //帮助清空队列  
  9.                     r = workQueue.poll();  
  10.                /* 
  11.                 * 对于条件1,如果可以超时,则在等待keepAliveTime时间后,则返回一null对象,这时就 
  12.                 *  销毁该工作线程,这就是CachedThreadPool为什么能回收空闲线程的原因了。 
  13.                 * 注意以下几点:1.这种功能情况一般不可能在fixedThreadPool中出现 
  14.                 *            2.在使用CachedThreadPool时,条件1一般总是成立,因为CachedThreadPool的corePoolSize 
  15.                 *              初始为0 
  16.                 */  
  17.                 else if (poolSize > corePoolSize || allowCoreThreadTimeOut)  //——————条件1  
  18.                     r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);    
  19.                 else  
  20.                     r = workQueue.take();       //如果队列不存在任何元素 则一直等待。 FiexedThreadPool典型模式———-条件2  
  21.                 if (r != null)  
  22.                     return r;  
  23.                 if (workerCanExit()) {       //————————–条件3  
  24.                     if (runState >= SHUTDOWN) // Wake up others  
  25.                         interruptIdleWorkers();  
  26.                     return null;  
  27.                 }  
  28.                 // Else retry  
  29.             } catch (InterruptedException ie) {  
  30.                 // On interruption, re-check runState  
  31.             }  
  32.         }  
  33.     }  

我们主要关心Worker是干什么的,因为这个threadFactory对我们用途不大,只是做了Thread的命名处理;而Worker你会发现它的定义也是一个Runnable,外部开始在代码段中发现了调用哪个这个Worker的start()方法,也就是线程的启动方法,其实也就是调用了Worker的run()方法,那么我们重点要关心run方法是如何处理的

    从这个方法中,我们需要了解一下几点: 
   
1.CachedThreadPool获得任务逻辑是条件1,条件1的处理逻辑请看注释,CachedThreadPool执行条件1的原因是:CachedThreadPool的corePoolSize时刻为0。 

源码段7:

   
2.FixedThreadPool执行的逻辑为条件2,从’workQueue.take()’中我们就明白了为什么FixedThreadPool不会释放工作线程的原因了(除非你关闭线程池)。 

       public void run() {
            try {
                Runnable task = firstTask;
                firstTask = null;
                while (task != null || (task = getTask()) != null) {
                    runTask(task);
                    task = null;
                }
            } finally {
                workerDone(this);
            }
        }

   
最后,我们了解下Worker(工作线程)终止时的处理吧,这个对理解CachedThreadPool有帮助,具体代码如下: 
   

FirstTask其实就是开始在创建work的时候,由外部传入的Runnable对象,也就是你自己的Thread,你会发现它如果发现task为空,就会调用getTask()方法再判定,直到两者为空,并且是一个while循环体。

Java代码  图片 23

那么看看getTask()方法的实现为:

  1. /** 
  2.     * 工作线程退出要处理的逻辑 
  3.     * @param w 
  4.     */  
  5.    void workerDone(Worker w) {  
  6.        final ReentrantLock mainLock = this.mainLock;  
  7.        mainLock.lock();  
  8.        try {  
  9.            completedTaskCount += w.completedTasks;   
  10.            workers.remove(w);  //从工作线程缓存中删除  
  11.            if (–poolSize == 0) //poolSize减一,这时其实又可以创建工作线程了  
  12.                tryTerminate(); //尝试终止  
  13.        } finally {  
  14.            mainLock.unlock();  
  15.        }  
  16.    }  

源码段8:

    注意workDone()方法中的tyrTerminate()方法,它是你以后理解线程池中shuDown()以及CachedThreadPool原理的关键,具体代码如下:   

     Runnable getTask() {
        for (;;) {
            try {
                int state = runState;
                if (state > SHUTDOWN)
                    return null;
                Runnable r;
                if (state == SHUTDOWN)  // Help drain queue
                    r = workQueue.poll();
                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                else
                    r = workQueue.take();
                if (r != null)
                    return r;
                if (workerCanExit()) {
                    if (runState >= SHUTDOWN) // Wake up others
                        interruptIdleWorkers();
                    return null;
                }
                // Else retry
            } catch (InterruptedException ie) {
                // On interruption, re-check runState
            }
        }
    }

   

你会发现它是从workQueue队列中,也就是等待队列中获取一个元素出来并返回!

Java代码  图片 24

回过头来根据代码段6理解下:

  1. private void tryTerminate() {  
  2.     //终止的前提条件就是线程池里已经没有工作线程(Worker)了  
  3.        if (poolSize == 0) {  
  4.            int state = runState;  
  5.            /** 
  6.             * 如果当前已经没有了工作线程(Worker),但是线程队列里还有等待的线程任务,则创建一个 
  7.             * 工作线程来执行线程队列中等待的任务 
  8.             */  
  9.            if (state < STOP && !workQueue.isEmpty()) {      
  10.                state = RUNNING; // disable termination check below  
  11.                Thread t = addThread(null);  
  12.                if (t != null)  
  13.                    t.start();  
  14.            }  
  15.            //设置池状态为终止状态  
  16.            if (state == STOP || state == SHUTDOWN) {  
  17.                runState = TERMINATED;  
  18.                termination.signalAll();   
  19.                terminated();   
  20.            }  
  21.        }  
  22.    }  

当前线程运行完后,在到workQueue中去获取一个task出来,继续运行,这样就保证了线程池中有一定的线程一直在运行;此时若跳出了while循环,只有workQueue队列为空才会出现或出现了类似于shutdown的操作,自然运行队列会减少1,当再有新的线程进来的时候,就又开始向worker里面放数据了,这样以此类推,实现了线程池的功能。

这里可以看下run方法的finally中调用的workerDone方法为:

 

源码段9:

前面一篇文章从Executors中的工厂方法入手,已经对ThreadPoolExecutor的构造和使用做了一些整理。而这篇文章,我们将接着前面的介绍,从源码实现上对ThreadPoolExecutor在任务的提交、执行,线程重用和线程数维护等方面做下分析。

    void workerDone(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
            if (--poolSize == 0)
                tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

0.    ThreadPoolExecutor类的声明属性变量分析

注意这里将workers.remove(w)掉,并且调用了—poolSize来做操作。

1
public class ThreadPoolExecutor extends AbstractExecutorService

至于tryTerminate是做了更多关于回收方面的操作。

从这个类声明中我们可以看到java.util.ThreadPoolExecutor是继承于AbstractExecutorService的,而之前的文章我也提到过,AbstractExecutorService已经实现了一些任务提交处理的方法,如submit()方法都是在这个抽象类中实现的。但submit()方法,最后也是会调用ThreadPoolExecutor的execute()方法。

最后我们还要看一段代码就是在【源码段6】中出现的代码调用为:runTask(task);这个方法也是运行的关键。

打开SunJDK中的ThreadPoolExecutor类源码,除了上篇文章提到的一些和构造方法中参数对应的属性之外,让我们看看还有什么:

源码段10:

  • mainLock 对整个ThreadPoolExecutor对象的锁
  • workers  存储工作线程对应Worker对象的HashSet
  • termination
    线程池ThreadPoolExecutor对象的生命周期终止条件,和mainLock相关
  • largestPoolSize 线程池跑过的最大线程数
  • completedTaskCount 完成任务数
  • ctl 执行器ThreadPoolExecutor的生命周期状态和活动状态的worker数封装
     private void runTask(Runnable task) {
            final ReentrantLock runLock = this.runLock;
            runLock.lock();
            try {
                if (runState < STOP &&
                    Thread.interrupted() &&
                    runState >= STOP)
                    thread.interrupt();

                boolean ran = false;
                beforeExecute(thread, task);
                try {
                    task.run();
                    ran = true;
                    afterExecute(task, null);
                    ++completedTasks;
                } catch (RuntimeException ex) {
                    if (!ran)
                        afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                runLock.unlock();
            }
        }

稍微需要说一下最后一个,
ctl是一个AtomicInteger对象,以位运算的方式打包封装了当前线程池ThreadPoolExecutor对象的状态和活动线程数两个数据

你可以看到,这里面的task为传入的task信息,调用的不是start方法,而是run方法,因为run方法直接调用不会启动新的线程,也是因为这样,导致了你无法获取到你自己的线程的状态,因为线程池是直接调用的run方法,而不是start方法来运行。

1.    执行器状态

这里有个beforeExecuteafterExecute方法,分别代表在执行前和执行后,你可以做一段操作,在这个类中,这两个方法都是【空body】的,因为普通线程池无需做更多的操作。

ExecutorService中已经指定了这个接口对应的类要实现的方法,其中就包括shutdown()和shutdownNow()等方法。在ThreadPoolExecutor中指明了状态的含义,并包含其于ctl属性中。

如果你要实现类似暂停等待通知的或其他的操作,可以自己extends后进行重写构造;

ThreadPoolExecutor对象有五种状态,如下:

本文没有介绍关于ScheduledThreadPoolExecutor调用的细节,下一篇文章会详细说明,因为大部分代码和本文一致,区别在于一些细节,在介绍:ScheduledThreadPoolExecutor的时候,会明确的介绍它与TimerTimerTask的巨大区别,区别不在于使用,而是在于本身内在的处理细节。

  • RUNNING 在ThreadPoolExecutor被实例化的时候就是这个状态
  • SHUTDOWN
    通常是已经执行过shutdown()方法,不再接受新任务,等待线程池中和队列中任务完成
  • STOP
    通常是已经执行过shutdownNow()方法,不接受新任务,队列中的任务也不再执行,并尝试终止线程池中的线程
  • TIDYING 线程池为空,就会到达这个状态,执行terminated()方法
  • TERMINATED
    terminated()执行完毕,就会到达这个状态,ThreadPoolExecutor终结

2.    Worker内部类

它既实现了Runnable,同时也是一个AQS ( AbstractQueuedSynchronizer )。

1
2
3
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable

封装了3样东西,Runnable类的首个任务对象,执行的线程thread和完成的任务数(volatile)completedTasks。

1
2
3
final Thread thread;
Runnable firstTask;
volatile long completedTasks;

这个类还提供了interruptIfStarted()这样一个方法,里面做了(getState()>=
0)的判断。与此呼应,Worker的构造方法里对state设置了-1,避免在线程执行前被停掉。

1
2
3
4
5
Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}
  1. 提交任务

上篇文章已经提到了,提交新任务的时候,如果没达到核心线程数corePoolSize,则开辟新线程执行。如果达到核心线程数corePoolSize,
而队列未满,则放入队列,否则开新线程处理任务,直到maximumPoolSize,超出则丢弃处理。

这段源码逻辑如下,不细说了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
 
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}
  1. addWorker()的实现

在上面提交任务的时候,会出现开辟新的线程来执行,这会调用addWorker()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
 
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
 
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
 
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        final ReentrantLock mainLock = this.mainLock;
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int c = ctl.get();
                int rs = runStateOf(c);
 
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

代码较长,我们可以分两大部分看:

第一段从第3行到第26行,是双层无限循环,尝试增加线程数到ctl变量,并且做一些比较判断,如果超出线程数限定或者ThreadPoolExecutor的状态不符合要求,则直接返回false,增加worker失败。

第二段从第28行开始到结尾,把firstTask这个Runnable对象传给Worker构造方法,赋值给Worker对象的task属性。Worker对象把自身(也是一个Runnable)封装成一个Thread对象赋予Worker对象的thread属性。锁住整个线程池并实际增加worker到workers的HashSet对象当中。成功增加后开始执行t.start(),就是worker的thread属性开始运行,实际上就是运行Worker对象的run方法。Worker的run()方法实际上调用了ThreadPoolExecutor的runWorker()方法。

  1. 任务的执行runWorker()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

这段代码实际上就是执行提交给线程池执行的Runnable任务的实际内容。其中,值得注意的有以下几点:

  • 线程开始执行前,需要对worker加锁,完成一个任务后执行unlock()
  • 在任务执行前后,执行beforeExecute()和afterExecute()方法
  • 记录任务执行中的异常后,继续抛出
  • 每个任务完成后,会记录当前线程完成的任务数
  • 当worker执行完一个任务的时候,包括初始任务firstTask,会调用getTask()继续获取任务,这个方法调用是可以阻塞的
  • 线程退出,执行processWorkerExit(w, completedAbruptly)处理
  1. Worker线程的复用和任务的获取getTask()

在上一段代码中,也就是runWorker()方法,任务的执行过程是嵌套在while循环语句块中的。每当一个任务执行完毕,会从头开始做下一次循环执行,实现了空闲线程的复用。而要执行的任务则是来自于getTask()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
 
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
 
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
 
            boolean timed;      // Are workers subject to culling?
 
            for (;;) {
                int wc = workerCountOf(c);
                timed = allowCoreThreadTimeOut || wc > corePoolSize;
 
                if (wc <= maximumPoolSize && ! (timedOut && timed))
                     break;
                if (compareAndDecrementWorkerCount(c))
                     return null;
                c = ctl.get();
                // Re-read ctl
                if (runStateOf(c) != rs)
                     continue retry;
                // else CAS failed due to workerCount change; retry inner loop
             }
             try {
                 Runnable r = timed ?
                     workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                     workQueue.take();
                 if (r != null)
                     return r;
                 timedOut = true;
             } catch (InterruptedException retry) {
                 timedOut = false;
             }
         }
     }

getTask()实际上是从工作队列(workQueue)中取提交进来的任务。这个workQueue是一个BlockingQueue,通常当队列中没有新任务的时候,则getTask()会阻塞。另外,还有定时阻塞这样一段逻辑:如果从队列中取任务是计时的,则用poll()方法,并设置等待时间为keepAlive,否则调用阻塞方法take()。当poll()超时,则获取到的任务为null,timeOut设置为
true。这段代码也是放在一个for(;;)循环中,前面有判断超时的语句,如果超时,则return
null。这意味着runWorker()方法的while循环结束,线程将退出,执行processWorkerExit()方法。

回头看看是否计时是如何确定的。

1
2
int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc &gt; corePoolSize;

即判断当前线程池的线程数是否超出corePoolSize,如果超出这个值并且空闲时间多于keepAlive则当前线程退出。

另外一种情况就是allowCoreThreadTimeOut为true,就是允许核心在空闲超时的情况下停掉。

  1. 线程池线程数的维护和线程的退出处理

刚刚也提到了,我们再看下processWorkerExit()方法。这个方法最主要就是从workers的Set中remove掉一个多余的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void processWorkerExit(Worker w, boolean completedAbruptly) {
         if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
             decrementWorkerCount();
         final ReentrantLock mainLock = this.mainLock;
         mainLock.lock();
         try {
             completedTaskCount += w.completedTasks;
             workers.remove(w);
         } finally {
             mainLock.unlock();
         }
         tryTerminate();
         int c = ctl.get();
         if (runStateLessThan(c, STOP)) {
             if (!completedAbruptly) {
                 int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                 if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                 if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

这个方法的第二个参数是判断是否在runWorker()中正常退出了循环向下执行,如果不是,说明在执行任务的过程中出现了异常,completedAbruptly为true,线程直接退出,需要直接对活动线程数减1

之后,加锁统计完成的任务数,并从workers这个集合中移除当前worker。

执行tryTerminate(),这个方法后面会详细说,主要就是尝试将线程池推向TERMINATED状态。

最后比较当前线程数是不是已经低于应有的线程数,如果这个情况发生,则添加无任务的空Worker到线程池中待命。

以上,增加新的线程和剔除多余的线程的过程大概就是如此,这样线程池能保持额定的线程数,并弹性伸缩,保证系统的资源不至于过度消耗。

 

 

You can leave a response, or trackback from your own site.

Leave a Reply

网站地图xml地图