Blage's Coding Blage's Coding
Home
算法
  • 手写Spring
  • SSM
  • SpringBoot
  • JavaWeb
  • JAVA基础
  • 容器
  • Netty

    • IO模型
    • Netty初级
    • Netty原理
  • JVM
  • JUC
  • Redis基础
  • 源码分析
  • 实战应用
  • 单机缓存
  • MySQL

    • 基础部分
    • 实战与处理方案
    • 面试
  • ORM框架

    • Mybatis
    • Mybatis_Plus
  • SpringCloudAlibaba
  • MQ消息队列
  • Nginx
  • Elasticsearch
  • Gateway
  • Xxl-job
  • Feign
  • Eureka
  • 面试
  • 工具
  • 项目
  • 关于
🌏本站
🧸GitHub (opens new window)
Home
算法
  • 手写Spring
  • SSM
  • SpringBoot
  • JavaWeb
  • JAVA基础
  • 容器
  • Netty

    • IO模型
    • Netty初级
    • Netty原理
  • JVM
  • JUC
  • Redis基础
  • 源码分析
  • 实战应用
  • 单机缓存
  • MySQL

    • 基础部分
    • 实战与处理方案
    • 面试
  • ORM框架

    • Mybatis
    • Mybatis_Plus
  • SpringCloudAlibaba
  • MQ消息队列
  • Nginx
  • Elasticsearch
  • Gateway
  • Xxl-job
  • Feign
  • Eureka
  • 面试
  • 工具
  • 项目
  • 关于
🌏本站
🧸GitHub (opens new window)
  • JAVA基础

  • 集合容器

  • Netty

  • JVM

  • JUC

    • 并发机制初识
    • JMM语义与重排
    • 多线程通信与编程应用
    • Lock并发锁原理
    • 并发容器与框架
    • 原子操作类
    • 并发工具类
    • 线程池
      • 实现原理
      • 线程池API
        • 1.线程池构造方法
        • 2.提交任务
        • 3.关闭线程池
        • 4.线程池监控
      • 线程池配置
      • 案例——任务执行时间监控
    • Executor框架
    • 并发编程实践
    • JUC面试
  • Java
  • JUC
phan
2023-10-19
目录

线程池

# 线程池

# 实现原理

线程池主要分为线程和任务两个概念。ThreadPoolExecutor执行execute()方法按照以下流程:

  1. 若当前运行线程数如果少于核心线程数corePoolSize,那么创建新的线程执行任务(获取全局锁)。
  2. 若运行线程数量大于等于核心线程数corePoolSize,那么将任务加入阻塞队列BlockingQueue。
  3. 若当前阻塞队列已满,则创建新的线程执行任务(获取全局锁)。此时corePoolSize<线程数量<maximumSize
  4. 若创建后当前线程数量已经超出maximumSize,执行任务拒绝策略。

线程池的设计思路是尽量保证所有execute方法在预热后,都处在步骤2阶段执行,不需要获取全局锁。

# 线程池API

# 1.线程池构造方法

  • corePoolSize:核心线程数,线程池创建好以后准备就绪的线程数量。
    • maximumPoolSize:允许最大线程数量。若任务队列使用了无界的阻塞队列,那么该参数设置无效。
    • keepAliveTime:工作线程空闲后,保持存活的时间。超时则会释放该空闲线程(核心线程不释放)
    • workQueue:阻塞队列。如优先级队列等。
    • threadFactory:线程的创建工厂
    • unit:时间单位
    • RejectExecutionHandler:拒绝策略。默认AbortPolicy直接抛出异常。

# 2.提交任务

  • execute():提交不需要返回值的任务。无法判断任务是否被线程池执行成功。
  • submit():提交需要返回值的任务,线程池会返回一个future类型对象,通过get()阻塞当前线程,直到获取任务结果。也可以通过get(timeout)超时返回。

# 3.关闭线程池

线程池通过调用shutdown()方法,关闭线程池。会中断所有没有正在执行任务的线程,可以确保任务完成。

它会遍历所有工作线程,然后逐个调用interrupt方法中断线程。因此对于无法响应中断的方法,则永远无法终止。

# 4.线程池监控

通过对线程池进行监控,方便出现问题时进行定位:

  • largestPoolSize:曾经创建过的最大线程数量。可以知道线程池是否满过。
  • completedTaskCount:线程池已完成任务数量。
  • taskCount:线程池需要执行的任务数量。

另外,可以扩展自定义线程池,重写beforeExecute、afterExecute和terminated方法,在任务执行前,执行后,关闭线程池分别执行代码进行时间监控。

# 线程池配置

根据任务类型,配置不同的线程数量,假定当前CPU个数N:

  • CPU密集型任务:线程一直在运算,因此不需要配置过多线程,可以配置N+1个。
  • IO密集/数据库连接型任务:线程在执行任务时需要等待数据,一个任务可能会占用线程较长时间。因此需要配置尽可能多的线程,2*N个。
  • 混合型任务:拆分成一个CPU密集型任务和一个IO密集型任务。若两个任务执行时间相差不大,则拆分后并行度大大提高。

任务阻塞队列:

  • 优先级队列:适用于不同优先级、不同执行时间任务的情况。
  • 有界队列:使用有界队列可以保护系统内存。假设多个任务都是SQL慢查询,那么工作线程容易被阻塞,任务挤压在队列中,最终抛出异常。而如果采用无界队列,那么任务队列会越来越多,最终导致系统不可用。

# 案例——任务执行时间监控

Map的key为任务,value为该任务开始时间。

每次工作线程执行任务前后分别会执行beforeExecute、afterExecute。

public class MyThreadPool {
    static class BlagePool extends ThreadPoolExecutor {
        public Map<Runnable, Long> map = new HashMap<>();
        public static double MAX_JOB_RUNNING_TIME=0;

        public BlagePool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }

        protected void beforeExecute(Thread t, Runnable r) {
            map.put(r, System.currentTimeMillis());
        }
        protected void afterExecute(Runnable r, Throwable t) {
            //存在beforeExecute后执行的情况
//            if(!map.containsKey(r)) return ;
            Long start = map.get(r);
            long time = System.currentTimeMillis() - start;
            MAX_JOB_RUNNING_TIME = Math.max(MAX_JOB_RUNNING_TIME, (double) time / 1000);
            System.out.println(MAX_JOB_RUNNING_TIME);
        }
    }

    public static void main(String[] args) {
        BlagePool pool = new BlagePool(5, 10, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
        for (int i = 0; i < 30; i++) {
            pool.execute(new Job(i));
        }
        pool.shutdown();
    }
    
    static class Job implements Runnable {
        private int jobNo;
        public Job(int jobNo) {
            this.jobNo = jobNo;
        }
        @Override
        public void run() {
            try {
                TimeUnit.SECONDS.sleep(3);
                System.out.println(
                                "  任务"+String.valueOf(this.jobNo) +
                                " 正在被" + Thread.currentThread().getName() +
                                "调度");
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
编辑 (opens new window)
#JUC
上次更新: 2023/12/15, 15:49:57
并发工具类
Executor框架

← 并发工具类 Executor框架→

Theme by Vdoing | Copyright © 2023-2024 blageCoder
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式