线程池
# 线程池
# 实现原理
线程池主要分为线程和任务两个概念。ThreadPoolExecutor执行execute()方法按照以下流程:
- 若当前运行线程数如果少于核心线程数corePoolSize,那么创建新的线程执行任务(获取全局锁)。
- 若运行线程数量大于等于核心线程数corePoolSize,那么将任务加入阻塞队列BlockingQueue。
- 若当前阻塞队列已满,则创建新的线程执行任务(获取全局锁)。此时corePoolSize<线程数量<maximumSize
- 若创建后当前线程数量已经超出maximumSize,执行任务拒绝策略。
线程池的设计思路是尽量保证所有execute方法在预热后,都处在步骤2阶段执行,不需要获取全局锁。
# 线程池API
# 1.线程池构造方法
- corePoolSize:核心线程数,线程池创建好以后准备就绪的线程数量。
- maximumPoolSize:允许最大线程数量。若任务队列使用了无界的阻塞队列,那么该参数设置无效。
- keepAliveTime:工作线程空闲后,保持存活的时间。超时则会释放该空闲线程(核心线程不释放)
- workQueue:阻塞队列。如优先级队列等。
- threadFactory:线程的创建工厂
- unit:时间单位
- RejectExecutionHandler:拒绝策略。默认AbortPolicy直接抛出异常。
# 2.提交任务
- execute():提交不需要返回值的任务。无法判断任务是否被线程池执行成功。
- submit():提交需要返回值的任务,线程池会返回一个future类型对象,通过get()阻塞当前线程,直到获取任务结果。也可以通过get(timeout)超时返回。
# 3.关闭线程池
线程池通过调用shutdown()方法,关闭线程池。会中断所有没有正在执行任务的线程,可以确保任务完成。
它会遍历所有工作线程,然后逐个调用interrupt方法中断线程。因此对于无法响应中断的方法,则永远无法终止。
# 4.线程池监控
通过对线程池进行监控,方便出现问题时进行定位:
- largestPoolSize:曾经创建过的最大线程数量。可以知道线程池是否满过。
- completedTaskCount:线程池已完成任务数量。
- taskCount:线程池需要执行的任务数量。
另外,可以扩展自定义线程池,重写beforeExecute、afterExecute和terminated方法,在任务执行前,执行后,关闭线程池分别执行代码进行时间监控。
# 线程池配置
根据任务类型,配置不同的线程数量,假定当前CPU个数N:
- CPU密集型任务:线程一直在运算,因此不需要配置过多线程,可以配置N+1个。
- IO密集/数据库连接型任务:线程在执行任务时需要等待数据,一个任务可能会占用线程较长时间。因此需要配置尽可能多的线程,2*N个。
- 混合型任务:拆分成一个CPU密集型任务和一个IO密集型任务。若两个任务执行时间相差不大,则拆分后并行度大大提高。
任务阻塞队列:
- 优先级队列:适用于不同优先级、不同执行时间任务的情况。
- 有界队列:使用有界队列可以保护系统内存。假设多个任务都是SQL慢查询,那么工作线程容易被阻塞,任务挤压在队列中,最终抛出异常。而如果采用无界队列,那么任务队列会越来越多,最终导致系统不可用。
# 案例——任务执行时间监控
Map的key为任务,value为该任务开始时间。
每次工作线程执行任务前后分别会执行beforeExecute、afterExecute。
public class MyThreadPool {
static class BlagePool extends ThreadPoolExecutor {
public Map<Runnable, Long> map = new HashMap<>();
public static double MAX_JOB_RUNNING_TIME=0;
public BlagePool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
protected void beforeExecute(Thread t, Runnable r) {
map.put(r, System.currentTimeMillis());
}
protected void afterExecute(Runnable r, Throwable t) {
//存在beforeExecute后执行的情况
// if(!map.containsKey(r)) return ;
Long start = map.get(r);
long time = System.currentTimeMillis() - start;
MAX_JOB_RUNNING_TIME = Math.max(MAX_JOB_RUNNING_TIME, (double) time / 1000);
System.out.println(MAX_JOB_RUNNING_TIME);
}
}
public static void main(String[] args) {
BlagePool pool = new BlagePool(5, 10, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
for (int i = 0; i < 30; i++) {
pool.execute(new Job(i));
}
pool.shutdown();
}
static class Job implements Runnable {
private int jobNo;
public Job(int jobNo) {
this.jobNo = jobNo;
}
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(
" 任务"+String.valueOf(this.jobNo) +
" 正在被" + Thread.currentThread().getName() +
"调度");
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
编辑 (opens new window)
上次更新: 2023/12/15, 15:49:57