博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ThreadPoolExecutor的应用和实现分析(中)—— 任务处理相关源码分析 线程利用(转)...
阅读量:6080 次
发布时间:2019-06-20

本文共 8818 字,大约阅读时间需要 29 分钟。

前面一篇文章从Executors中的工厂方法入手,已经对ThreadPoolExecutor的构造和使用做了一些整理。而这篇文章,我们将接着前面的介绍,从源码实现上对ThreadPoolExecutor在任务的提交、执行,线程重用和线程数维护等方面做下分析。

0.    ThreadPoolExecutor类的声明属性变量分析

1
public
class
ThreadPoolExecutor
extends
AbstractExecutorService

从这个类声明中我们可以看到java.util.ThreadPoolExecutor是继承于AbstractExecutorService的,而之前的文章我也提到过,AbstractExecutorService已经实现了一些任务提交处理的方法,如submit()方法都是在这个抽象类中实现的。但submit()方法,最后也是会调用ThreadPoolExecutor的execute()方法。

打开SunJDK中的ThreadPoolExecutor类源码,除了上篇文章提到的一些和构造方法中参数对应的属性之外,让我们看看还有什么:

  • mainLock 对整个ThreadPoolExecutor对象的锁
  • workers  存储工作线程对应Worker对象的HashSet
  • termination 线程池ThreadPoolExecutor对象的生命周期终止条件,和mainLock相关
  • largestPoolSize 线程池跑过的最大线程数
  • completedTaskCount 完成任务数
  • ctl 执行器ThreadPoolExecutor的生命周期状态和活动状态的worker数封装

稍微需要说一下最后一个, ctl是一个AtomicInteger对象,以位运算的方式打包封装了当前线程池ThreadPoolExecutor对象的状态和活动线程数两个数据

1.    执行器状态

ExecutorService中已经指定了这个接口对应的类要实现的方法,其中就包括shutdown()和shutdownNow()等方法。在ThreadPoolExecutor中指明了状态的含义,并包含其于ctl属性中。

ThreadPoolExecutor对象有五种状态,如下:

  • RUNNING 在ThreadPoolExecutor被实例化的时候就是这个状态
  • SHUTDOWN 通常是已经执行过shutdown()方法,不再接受新任务,等待线程池中和队列中任务完成
  • STOP 通常是已经执行过shutdownNow()方法,不接受新任务,队列中的任务也不再执行,并尝试终止线程池中的线程
  • TIDYING 线程池为空,就会到达这个状态,执行terminated()方法
  • TERMINATED terminated()执行完毕,就会到达这个状态,ThreadPoolExecutor终结

2.    Worker内部类

它既实现了Runnable,同时也是一个AQS ( AbstractQueuedSynchronizer )。

1
2
3
private
final
class
Worker
extends
AbstractQueuedSynchronizer
implements
Runnable

封装了3样东西,Runnable类的首个任务对象,执行的线程thread和完成的任务数(volatile)completedTasks。

1
2
3
final
Thread thread;
Runnable firstTask;
volatile
long
completedTasks;

这个类还提供了interruptIfStarted()这样一个方法,里面做了(getState()>= 0)的判断。与此呼应,Worker的构造方法里对state设置了-1,避免在线程执行前被停掉。

1
2
3
4
5
Worker(Runnable firstTask) {
    
setState(-
1
);
// inhibit interrupts until runWorker
    
this
.firstTask = firstTask;
    
this
.thread = getThreadFactory().newThread(
this
);
}

3. 提交任务

上篇文章已经提到了,提交新任务的时候,如果没达到核心线程数corePoolSize,则开辟新线程执行。如果达到核心线程数corePoolSize, 而队列未满,则放入队列,否则开新线程处理任务,直到maximumPoolSize,超出则丢弃处理。

这段源码逻辑如下,不细说了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public
void
execute(Runnable command) {
    
if
(command ==
null
)
        
throw
new
NullPointerException();
 
    
int
c = ctl.get();
    
if
(workerCountOf(c) < corePoolSize) {
        
if
(addWorker(command,
true
))
            
return
;
        
c = ctl.get();
    
}
    
if
(isRunning(c) && workQueue.offer(command)) {
        
int
recheck = ctl.get();
        
if
(! isRunning(recheck) && remove(command))
            
reject(command);
        
else
if
(workerCountOf(recheck) ==
0
)
            
addWorker(
null
,
false
);
    
}
    
else
if
(!addWorker(command,
false
))
        
reject(command);
}

4. addWorker()的实现

在上面提交任务的时候,会出现开辟新的线程来执行,这会调用addWorker()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
private
boolean
addWorker(Runnable firstTask,
boolean
core) {
    
retry:
    
for
(;;) {
        
int
c = ctl.get();
        
int
rs = runStateOf(c);
 
        
// Check if queue empty only if necessary.
        
if
(rs >= SHUTDOWN &&
            
! (rs == SHUTDOWN &&
               
firstTask ==
null
&&
               
! workQueue.isEmpty()))
            
return
false
;
 
        
for
(;;) {
            
int
wc = workerCountOf(c);
            
if
(wc >= CAPACITY ||
                
wc >= (core ? corePoolSize : maximumPoolSize))
                
return
false
;
            
if
(compareAndIncrementWorkerCount(c))
                
break
retry;
            
c = ctl.get(); 
// Re-read ctl
            
if
(runStateOf(c) != rs)
                
continue
retry;
            
// else CAS failed due to workerCount change; retry inner loop
        
}
    
}
 
    
boolean
workerStarted =
false
;
    
boolean
workerAdded =
false
;
    
Worker w =
null
;
    
try
{
        
final
ReentrantLock mainLock =
this
.mainLock;
        
w =
new
Worker(firstTask);
        
final
Thread t = w.thread;
        
if
(t !=
null
) {
            
mainLock.lock();
            
try
{
                
// Recheck while holding lock.
                
// Back out on ThreadFactory failure or if
                
// shut down before lock acquired.
                
int
c = ctl.get();
                
int
rs = runStateOf(c);
 
                
if
(rs < SHUTDOWN ||
                    
(rs == SHUTDOWN && firstTask ==
null
)) {
                    
if
(t.isAlive())
// precheck that t is startable
                        
throw
new
IllegalThreadStateException();
                    
workers.add(w);
                    
int
s = workers.size();
                    
if
(s > largestPoolSize)
                        
largestPoolSize = s;
                    
workerAdded =
true
;
                
}
            
}
finally
{
                
mainLock.unlock();
            
}
            
if
(workerAdded) {
                
t.start();
                
workerStarted =
true
;
            
}
        
}
    
}
finally
{
        
if
(! workerStarted)
            
addWorkerFailed(w);
    
}
    
return
workerStarted;
}

代码较长,我们可以分两大部分看:

第一段从第3行到第26行,是双层无限循环,尝试增加线程数到ctl变量,并且做一些比较判断,如果超出线程数限定或者ThreadPoolExecutor的状态不符合要求,则直接返回false,增加worker失败。

第二段从第28行开始到结尾,把firstTask这个Runnable对象传给Worker构造方法,赋值给Worker对象的task属性。Worker对象把自身(也是一个Runnable)封装成一个Thread对象赋予Worker对象的thread属性。锁住整个线程池并实际增加worker到workers的HashSet对象当中。成功增加后开始执行t.start(),就是worker的thread属性开始运行,实际上就是运行Worker对象的run方法。Worker的run()方法实际上调用了ThreadPoolExecutor的runWorker()方法。

5. 任务的执行runWorker()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
final
void
runWorker(Worker w) {
        
Thread wt = Thread.currentThread();
        
Runnable task = w.firstTask;
        
w.firstTask =
null
;
        
w.unlock();
// allow interrupts
        
boolean
completedAbruptly =
true
;
        
try
{
            
while
(task !=
null
|| (task = getTask()) !=
null
) {
                
w.lock();
                
// If pool is stopping, ensure thread is interrupted;
                
// if not, ensure thread is not interrupted.  This
                
// requires a recheck in second case to deal with
                
// shutdownNow race while clearing interrupt
                
if
((runStateAtLeast(ctl.get(), STOP) ||
                     
(Thread.interrupted() &&
                      
runStateAtLeast(ctl.get(), STOP))) &&
                    
!wt.isInterrupted())
                    
wt.interrupt();
                
try
{
                    
beforeExecute(wt, task);
                    
Throwable thrown =
null
;
                    
try
{
                        
task.run();
                    
}
catch
(RuntimeException x) {
                        
thrown = x;
throw
x;
                    
}
catch
(Error x) {
                        
thrown = x;
throw
x;
                    
}
catch
(Throwable x) {
                        
thrown = x;
throw
new
Error(x);
                    
}
finally
{
                        
afterExecute(task, thrown);
                    
}
                
}
finally
{
                    
task =
null
;
                    
w.completedTasks++;
                    
w.unlock();
                
}
            
}
            
completedAbruptly =
false
;
        
}
finally
{
            
processWorkerExit(w, completedAbruptly);
        
}
    
}

这段代码实际上就是执行提交给线程池执行的Runnable任务的实际内容。其中,值得注意的有以下几点:

  • 线程开始执行前,需要对worker加锁,完成一个任务后执行unlock()
  • 在任务执行前后,执行beforeExecute()和afterExecute()方法
  • 记录任务执行中的异常后,继续抛出
  • 每个任务完成后,会记录当前线程完成的任务数
  • 当worker执行完一个任务的时候,包括初始任务firstTask,会调用getTask()继续获取任务,这个方法调用是可以阻塞的
  • 线程退出,执行processWorkerExit(w, completedAbruptly)处理

5. Worker线程的复用和任务的获取getTask()

在上一段代码中,也就是runWorker()方法,任务的执行过程是嵌套在while循环语句块中的。每当一个任务执行完毕,会从头开始做下一次循环执行,实现了空闲线程的复用。而要执行的任务则是来自于getTask()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private
Runnable getTask() {
        
boolean
timedOut =
false
;
// Did the last poll() time out?
 
        
retry:
        
for
(;;) {
            
int
c = ctl.get();
            
int
rs = runStateOf(c);
 
            
// Check if queue empty only if necessary.
            
if
(rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                
decrementWorkerCount();
                
return
null
;
            
}
 
            
boolean
timed;     
// Are workers subject to culling?
 
            
for
(;;) {
                
int
wc = workerCountOf(c);
                
timed = allowCoreThreadTimeOut || wc > corePoolSize;
 
                
if
(wc <= maximumPoolSize && ! (timedOut && timed))
                     
break
;
                
if
(compareAndDecrementWorkerCount(c))
                     
return
null
;
                
c = ctl.get();
                
// Re-read ctl
                
if
(runStateOf(c) != rs)
                     
continue
retry;
                
// else CAS failed due to workerCount change; retry inner loop
             
}
             
try
{
                 
Runnable r = timed ?
                     
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                     
workQueue.take();
                 
if
(r !=
null
)
                     
return
r;
                 
timedOut =
true
;
             
}
catch
(InterruptedException retry) {
                 
timedOut =
false
;
             
}
         
}
     
}

getTask()实际上是从工作队列(workQueue)中取提交进来的任务。这个workQueue是一个BlockingQueue,通常当队列中没有新任务的时候,则getTask()会阻塞。另外,还有定时阻塞这样一段逻辑:如果从队列中取任务是计时的,则用poll()方法,并设置等待时间为keepAlive,否则调用阻塞方法take()。当poll()超时,则获取到的任务为null,timeOut设置为 true。这段代码也是放在一个for(;;)循环中,前面有判断超时的语句,如果超时,则return null。这意味着runWorker()方法的while循环结束,线程将退出,执行processWorkerExit()方法。

回头看看是否计时是如何确定的。

1
2
int
wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc &gt; corePoolSize;

即判断当前线程池的线程数是否超出corePoolSize,如果超出这个值并且空闲时间多于keepAlive则当前线程退出。

另外一种情况就是allowCoreThreadTimeOut为true,就是允许核心在空闲超时的情况下停掉。

6. 线程池线程数的维护和线程的退出处理

刚刚也提到了,我们再看下processWorkerExit()方法。这个方法最主要就是从workers的Set中remove掉一个多余的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private
void
processWorkerExit(Worker w,
boolean
completedAbruptly) {
         
if
(completedAbruptly)
// If abrupt, then workerCount wasn't adjusted
             
decrementWorkerCount();
         
final
ReentrantLock mainLock =
this
.mainLock;
         
mainLock.lock();
         
try
{
             
completedTaskCount += w.completedTasks;
             
workers.remove(w);
         
}
finally
{
             
mainLock.unlock();
         
}
         
tryTerminate();
         
int
c = ctl.get();
         
if
(runStateLessThan(c, STOP)) {
             
if
(!completedAbruptly) {
                 
int
min = allowCoreThreadTimeOut ?
0
: corePoolSize;
                 
if
(min ==
0
&& ! workQueue.isEmpty())
                    
min =
1
;
                 
if
(workerCountOf(c) >= min)
                    
return
;
// replacement not needed
            
}
            
addWorker(
null
,
false
);
        
}
    
}

这个方法的第二个参数是判断是否在runWorker()中正常退出了循环向下执行,如果不是,说明在执行任务的过程中出现了异常,completedAbruptly为true,线程直接退出,需要直接对活动线程数减1 。

之后,加锁统计完成的任务数,并从workers这个集合中移除当前worker。

执行tryTerminate(),这个方法后面会详细说,主要就是尝试将线程池推向TERMINATED状态。

最后比较当前线程数是不是已经低于应有的线程数,如果这个情况发生,则添加无任务的空Worker到线程池中待命。

以上,增加新的线程和剔除多余的线程的过程大概就是如此,这样线程池能保持额定的线程数,并弹性伸缩,保证系统的资源不至于过度消耗。

 

http://www.molotang.com/articles/522.html

 

转载于:https://www.cnblogs.com/softidea/p/5355775.html

你可能感兴趣的文章
iOS开发--使用RSA加密
查看>>
Linux模式设计系列( 内核与应用关联思考)
查看>>
【C#】1.3 WPF应用程序学习要点
查看>>
java 短信验证码===随机数
查看>>
Windows Server 2008 计划任务配置(任务计划程序)每分钟执行BAT
查看>>
【VNC】Linux环境VNC服务安装、配置与使用
查看>>
动态创建地图文档MXD并发布地图服务
查看>>
Repodata is over 2 weeks old. Install yum-cron? Or run: yum makecache fast
查看>>
单例模式讲解
查看>>
Linux权限管理(笔记)
查看>>
(笔记)电路设计(二)之串联匹配电阻的应用
查看>>
Linux下的grep搜索命令详解(二)
查看>>
C 基本语法
查看>>
Codeforces Round #258 (Div. 2) A. Game With Sticks 水题
查看>>
Server 2008 R2 事件查看器实现日志分析
查看>>
解决Cookie乱码问题
查看>>
javascript中的链表结构—从链表中删除元素
查看>>
用实例揭示notify()和notifyAll()的本质区别
查看>>
Android MediaPlayer接口及状态迁移
查看>>
JQuery------prevAll(),nextAll(),attr()方法的使用
查看>>