1.ThreadPoolExecutor的初始化参数介绍
corePoolSize:线程池的基本大小,核心线程数,核心线程会一直存活,即使没有任务需要处理。当线程数小于核心线程数时,即使现有的线程空闲,线程池也会优先创建新线程来处理任务,而不是直接交给现有的线程处理。
maximumPoolSize:线程池最大大小,当线程数大于或等于核心线程,且任务队列已满时,线程池会创建新的线程,直到线程数量达到maxPoolSize。如果线程数已等于maxPoolSize,且任务队列已满,则已超出线程池的处理能力,线程池会拒绝处理任务而抛出异常。
workQueue:等待队列,当达到corePoolSize的时候,就向该等待队列放入线程信息(默认为一个LinkedBlockingQueue),运行中的队列属性为:workers。
keepAliveTime:默认都是0,当线程没有任务处理后,保持多长时间,cachedPoolSize是默认60s,不推荐使用。
threadFactory:是构造Thread的方法,你可以自己去包装和传递,主要实现newThread方法即可。
handler:也就是参数maximumPoolSize达到后丢弃处理的方法,java提供了5种丢弃处理的方法;java默认的是使用AbortPolicy,他的作用是当出现这中情况的时候会抛出一个异常。
2.ThreadPoolExecutor线程建立流程
线程池按以下行为执行任务:
(1)当线程数小于核心线程数时,创建线程。
(2)当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。
(3)当线程数大于等于核心线程数,且任务队列已满
- 若线程数小于最大线程数,创建线程
- 若线程数等于最大线程数,抛出异常,拒绝任务
3.ThreadPoolExecutor源码—execute()方法
execute执行时:
1.调用workCountOf()方法获取当前线程池中线程数量,判断当前线程数量是否超过了核心线程数,若未超过,则直接添加一个核心线程,并用此线程完成当前提交的任务。
2.若当前线程数已经超过核心线程数且ctl状态等于RUNNING且工作成功进入工作等待队列,则我们进一步复查ctl,若ctl状态依旧为RUNNING,且调用workCountOf()方法检查发现此时的工作线程数为0时,将添加一个工作线程;若此时已经不为RUNNING,则尝试移除任务,并调用拒绝任务方法:reject(command)。(这里之所以需要复查ctl状态是由于在执行workQueue.offer(command)方法时,ctl状态随时可能由于调用shutDown方法或者shutDownNow方法而发生变化)。
3.如果上述两种情况都不吻合,即此时已经有超过核心线程数的的线程在工作,且任务队列也已堆满,则尝试增加一个工作线程(如果此时线程数达到限定最大线程数,则会失败),若失败则调用拒绝任务方法reject(command).
在execute的方法中添加工作线程的所调用的法为addWorker(Runnable runnable,Boolean core).该方法接受两个参数,runnable对象为这个新建线程的第一个工作任务,可以为空。core指代新建的任务是否是核心线程。
4.ThreadPoolExecutor源码— addWorker()方法
return false;
表示ctl状态为RUNNING状态或者为SHUTDOWN状态且此时任务队列仍有任务未执行完时,可以继续调用addWorker添加工作线程,但不能新建任务,即firstTask参数必须为null.否则这里将返回false,即新建工作线程失败。
// else CAS failed due to workerCount change; retry inner loop
这部分对当前线程数量做了判断,依据core参数,若core参数为true,即添加的为核心线程,则当前工作的线程数量不应当超过corePoolSize,否则返回false。若core参数为false,即添加的为普通线程,则当前工作的线程数量不应当超过maximumPoolSize,否则返回false。通过上述校验,则调用compareAndIncrementWorkerCount(c)尝试增加当前ctl计数,若成功则跳出外曾循环。若失败则重复检查当前线程池ctl状态(之所以检查ctl是因为造成失败的原因为ctl发生变化,这有两种可能,一种是作为下29位的工作线程计数发生变化,一种是作为上3位的状态标志位发生变化,这里检查的就是上3位的变化),若是ctl状态发生变化,则重新尝试外层循环(这是有可能外层循环会由于ctl状态的变化而直接return false)。若是ctl计数发生变化,则重新尝试内层循环。
5.ThreadPoolExecutor源码—runWorker()方法
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } |
1.先说明一个变量completedAbruptly,这个布尔值被用于指代runWorker停止的原因,当completedAbruptly为true时代表worker停止是因为worker执行的外部业务逻辑代码抛出异常引起的。当completedAbruptly为false时代表worker停止是线程池内部工作机制下的正常退出。
2.获取当前执行线程,获取当前worker的第一个任务置于task变量中,之后执行w.firstTask = null。若不不将worker对象的firstTask置为空,则firstTask引用指向的task对象将不会被gc回收。将worker对象解锁,从而允许其在尚未获得task之前被中断。进入while循环,这里将调用getTask()方法获取任务task对象(可能造成线程等待),若因为各种原因(下面说明getTask()方法时会说到)返回null(即获取不到任务)则停止while循环,将completedAbruptly置为false。
3.进入循环内部,代表已经获取到可执行的task对象,则锁定worker对象(保证不被shutDown方法中断),接着做条件判断,若ctl状态超过STOP态,或者当前线程已经因为线程池内部状态变化而被中断(如何判断中断是因为线程池内部状态变化而中断的?重复检查ctl状态即可,若线程的中断信号是在外部逻辑代码中设置的(不使用线程池的shutDownNow方法,直接使用的thread.interrupt()方法),则ctl状态不会为STOP,这样就可判定为不是因为线程池内部状态变化而引起的中断),则设置该工作线程为中断状态(wt.interrupt())。否则执行beforeExecute(wt,task)钩子方法,用户可以重写该方法而达到一些自定义需求(例如统计)。接着执行获得task的run方法,至此该worker成功获得并执行了一个任务。执行期间抛出的异常都有处理,不再多说。最终将置task为空,增加worker的完成任务数(completedTasks),随后解锁worker。
4.在最外层的finally块中,执行了processWorkerExit(w,completedAbruptly)方法,该方法会处理处理统计信息,将worker的completedTask累加入线程池的completedTaskCount,并从线程池的workers中移除该worker,之后尝试终止线程池(因为这个worker可能是当前线程池中最后一个worker,tryTerminate方法应该在所有可能终止当前线程的地方被调用)。最后根据completedAbruptly的值选择策略,若为true,则直接调用addWorker增加一个新worker用以替代当前移除的worker;若为false则判断当前线程池大小是否小于允许的最小线程池大小(可能是corePoolSize也可能小于),若小于则调用addWorker增加一个新worker,否则什么也不做。
6.ThreadPoolExecutor源码—getTask()方法
1.同样首先说明一个变量,timeOut,这个布尔值指代当前getTask方法是否超时未能获取到task对象。
2.外层for循环首先校验ctl状态,若ctl状态大于SHUTDOWN或者等于SHUTDOWN且任务队列workQueue为空,则返回null。否则进入内层循环,若当前线程数量小于maximumPoolSize且并未超时或者当前线程池不存在超时限制(由timed变量指代),则跳出循环,否则则尝试减小ctl计数,并返回null。若尝试减小ctl计数失败(由于ctl值变化引起,上面addWorker中第2点已经提到过),则先检查ctl状态,若失败是由ctl状态变化引起,则尝试外层循环,若失败是由于ctl计数变化而引起则尝试内层循环。
3.在第二点中,跳出循环之后,将根据timed的值做不同策略。若timed为true,即线程池存在超时限制,则执行workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),这个方法将只在keepAliveTime时间内等待获取任务,一旦超过则返回null;若timed为false,则执行workQueue.take()方法,该方法将无限时等待任务,直至获取任务或者出现中断异常。在这两个方法之一返回之后,若返回task为null,则设置timeOut为true,尝试外层循环;若task不为null,则返回成功获取的task对象。
4.如果在getTask期间出现中断异常,则设置timeOut为false,并且重试外层循环,即InterruptedException并不会对getTask方法造成任何影响,真正能影响getTask方法的是ctl状态的转变。
7.ThreadPoolExecutor源码—shutdown()方法
1.tryTerminate()方法尝试终止线程池活动,满足终止条件的因素有两个:首先,ctl状态为STOP,或者为SHUTDOWN且任务队列为空(STOP状态之所以一直都不用判断workQueue是因为上面讲到的,shutdownNow()方法调用了drainQueue()方法清空了workQueue所以其必然为空,这里解释一下);其次,ctl计数为0。第二个条件的满足是由一连串连锁反应保证的,shutdownNow()方法置ctl状态为STOP,使得所有worker调用getTask()方法满足rs>=SHUTDOWN条件从而调用decrementWorkerCount()方法,这将最终导致ctl计数为0,同时所有work都将从getTask()方法获得null,最终导致runWorker()方法调用processWorkerExit()方法,将workers数组真正清空。shutdown()方法稍微复杂,它置ctl状态为SHUTDOWN,但是线程池仍将继续运行,直至所有workers将工作队列中的任务全部完成,之后的逻辑和stop状态下的完全一样,不再多说。
2.在保证了上述两个条件之后,tryTerminate()方法获取住所mainLock,置ctl状态为TIDYING,之后执行terminated钩子方法,最后置ctl状态为TERMINATED。