Blage's Coding Blage's Coding
Home
算法
  • 手写Spring
  • SSM
  • SpringBoot
  • JavaWeb
  • JAVA基础
  • 容器
  • Netty

    • IO模型
    • Netty初级
    • Netty原理
  • JVM
  • JUC
  • Redis基础
  • 源码分析
  • 实战应用
  • 单机缓存
  • MySQL

    • 基础部分
    • 实战与处理方案
    • 面试
  • ORM框架

    • Mybatis
    • Mybatis_Plus
  • SpringCloudAlibaba
  • MQ消息队列
  • Nginx
  • Elasticsearch
  • Gateway
  • Xxl-job
  • Feign
  • Eureka
  • 面试
  • 工具
  • 项目
  • 关于
🌏本站
🧸GitHub (opens new window)
Home
算法
  • 手写Spring
  • SSM
  • SpringBoot
  • JavaWeb
  • JAVA基础
  • 容器
  • Netty

    • IO模型
    • Netty初级
    • Netty原理
  • JVM
  • JUC
  • Redis基础
  • 源码分析
  • 实战应用
  • 单机缓存
  • MySQL

    • 基础部分
    • 实战与处理方案
    • 面试
  • ORM框架

    • Mybatis
    • Mybatis_Plus
  • SpringCloudAlibaba
  • MQ消息队列
  • Nginx
  • Elasticsearch
  • Gateway
  • Xxl-job
  • Feign
  • Eureka
  • 面试
  • 工具
  • 项目
  • 关于
🌏本站
🧸GitHub (opens new window)
  • JAVA基础

  • 集合容器

  • Netty

  • JVM

  • JUC

    • 并发机制初识
    • JMM语义与重排
    • 多线程通信与编程应用
      • 线程状态
      • Java线程属性
        • 线程优先级Priority
        • Daemon线程
        • interrupt线程中断
      • 线程之间的通信机制
        • 1.volatile和synchronized
        • 2.等待通知机制
        • 3.管道输入输出流
        • 4.thread.join
        • 5.ThreadLocal
        • 6.CountDownLatch
      • 线程应用实例
        • 1.超时控制的等待/通知机制
        • 2.数据库连接池
        • 3.线程池
        • 添加和移除worker线程,为什么需要获取jobs的并发锁❓
        • notify和notifyAll区别
    • Lock并发锁原理
    • 并发容器与框架
    • 原子操作类
    • 并发工具类
    • 线程池
    • Executor框架
    • 并发编程实践
    • JUC面试
  • Java
  • JUC
phan
2023-10-11
目录

多线程通信与编程应用

# 多线程通信与编程应用

本章节主要讨论如下两点:

  • 多线程通信方式
  • 多线程编程应用
    • 超时等待通知机制
    • 线程池

# 线程状态

JAVA线程状态可以分为以下六种状态,其中操作系统的就绪态和运行态在JAVA中都合并为"运行"状态:

  • 初始态—>Thread.start()—>运行态—>终止态
  • 阻塞态:等待进入synchronized获取锁的状态。——执行同步方法
  • 等待态:对象执行wait()方法,需要其它线程通知回到运行状态。
  • 超时等待态:处于等待态的线程到了超时时间后,自动回到运行状态。

# Java线程属性

一个Java程序分成多线程,包括main线程,可以充分利用分配到多个处理器核心,提高计算效率和性能,加快响应时间。

# 线程优先级Priority

每个线程可以通过设置优先级来控制分片时间,优先级越高,分配时间片数量越多。设置规则如下:

Thread thread=new Thread(job,"Thread-1");
thread.setPriority(10);
1
2
  • 针对频繁阻塞的线程(IO操作,休眠)需要设置较高优先级。需要CPU及时处理,提高响应时间。
  • 偏向于计算的线程则可以设置较低优先级。防止较长时间霸占使用CPU。

Thread.yield():当前线程让出时间片,处于就绪态。下一个时间片让给其它线程,或者再次被当前线程获得。

# Daemon线程

Daemon守护线程用以完成程序中后台调度和支持性工作。当虚拟机全都是Daemon线程时(比如main线程结束),表明当前已经没有线程任务需要执行,虚拟机退出,所有线程需要立即终止。常见的比如垃圾回收线程都是守护线程。

换而言之,如果当前线程需要设置在main线程结束后,也同时结束,那么可以设置为Daemon守护线程。

Thread thread = new Thread(new DaemonRunner(), "DaemonRunner");
thread.setDaemon(true);
thread.start();
1
2
3

# interrupt线程中断

其它线程A通过调用另一个Thread对象B的interrupt()方法,实现对B的中断操作。

注意:如果线程在sleep时,被调用其它线程调用interrupt方法,那么线程会报中断异常。

Thread对象可以通过isInterrupted方法,返回当前当前线程是否被中断。有两种情况中断标记会被复位:

  • 当前线程已处于终结状态。即使被中断过,也直接返回false。
  • 处在sleep的线程A被其它线程B执行中断方法,则A线程会抛出InterruptedException方法,抛出异常前Java虚拟机会将该线程中断标识位清除,此时返回false。
Thread thread = new Thread(new InterruptRun(), "interruptedThread-1");
thread.start();
thread.interrupt();
System.out.println("线程中断标志"+thread.isInterrupted());
1
2
3
4

Thread.interrupted():返回当前线程的中断状态,并将中断状态置为false复位。在被中断线程内部调用,感知外部是否对当前线程进行了中断操作。

“中断”本质上就是一个基于标志位的协作机制,被中断线程并不会中断程序执行,程序正常运行。

✨最佳实践:基于interrupt中断机制和volatile终止线程,并安全地关闭资源。

# 线程之间的通信机制

# 1.volatile和synchronized

volatile:保证每个线程读写共享变量都是从共享内存中进行。

synchronized:多线程在执行同步块或者同步方法时,获取锁的过程本质上是获取同步对象的监视器,同一时刻只能有一个线程能够获取到该监视器。

# 2.等待通知机制

两个线程之间使用中间桥梁——Object对象的wait、notify方法实现通信。

方法名称 描述
Object.wait() 假设线程A调用了obj对象的wait方法,那么线程A会进入“等待”状态,并会释放该对象的锁。仅当其它线程B通知或是被中断才返回。
Object.notify() 通知在该对象obj上wait的线程,从wait方法返回。
Object.wait(time) 超时自动返回

wait和notify相当于信号量同步机制,要想实现B线程执行完后再通知A线程,可以如下操作:

  • 线程A先获取obj对象的锁,并调用obj.wait()方法,进入等待状态。
  • 线程B执行自己的业务操作,然后再调用obj.notify()方法,唤醒A线程
  • 线程A被唤醒,从wait方法返回,继续执行后续逻辑。

使用时需要注意,wait和notify方法调用之前需要获取到对象obj的锁,而wait方法调用后会释放对象锁(防止死锁),notify方法调用结束后,释放obj对象锁后才是真正唤醒等待线程。例子如下:

public class WaitAndNotify {
    static Object obj = new Object();
    public static void main(String[] args) throws Exception{
        Thread thread = new Thread(new WaitDemo(), "wait-1");
        Thread thread1 = new Thread(new NotifyDemo(), "noty-2");
        thread.start();
        TimeUnit.SECONDS.sleep(1);
        thread1.start();
    }

    static class WaitDemo implements Runnable {
        @Override
        public void run() {
            try {
                synchronized (obj) {
                    obj.wait();
                }
                System.out.println("wait线程成功唤醒");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }
    static class NotifyDemo implements Runnable {
        @Override
        public void run() {
            synchronized (obj) {
                obj.notify();
                System.out.println("此处还未正式完成通知,唤醒线程");
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("释放锁,完成线程唤醒");
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

抽象成消费者和生产者模型,obj对象锁往往不能代表具体场景下的限制条件,因此需要结合while进行代码控制。对于等待线程而言,经典三段式如下:

sychronized(obj){
	while(条件不满足){ //满足条件则跳出循环
		obj.wait();
	}
}
1
2
3
4
5

对于通知线程而言:

synchronized(obj){
	修改条件
	obj.notify();
}
1
2
3
4

# 3.管道输入输出流

主要用于线程之间的数据传输。

class 描述
PipedOutputStream 字节输出流
PipedInputStream 字节输入流
PipedReader 字符读
PipedWriter 字符写

所谓管道只需要记住一点,写管道会将数据传入流向读管道。

线程之间可以通过管道进行交互,这两个管道需要进行关联。主线程将输入控制台的字符(通过系统标准输入流)读出来,并写入“写管道”:

while ((reveive = System.in.read()) != -1) {
    out.write(33);
}
1
2
3

另一个读线程(负责处理数据),则通过“读管道”拿到数据,进行进一步的处理:

static class Print implements Runnable {
    private PipedReader in;
    public Print(PipedReader in) {
        this.in = in;
    }
    @Override
    public void run() {
        int reveive = 0;
        try {
        //从管道中 拿到写进程传入的数据
            while ((reveive = in.read()) != -1) {
                System.out.print((char) reveive);
            }
        } catch (Exception e) {
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 4.thread.join

join方法用于控制两个线程之间的同步先后关系。

threadA.join():在线程B中调用线程A的join方法,则线程B进入等待状态,直到线程A终止之后线程B才会从join返回,继续执行剩下代码。

可以发现join的语义类似于等待通知机制,实际上join方法底层就是基于wait和notify实现的,以B线程中调用ThreadA.join()为例:

  • 每个线程终结死亡时,都会调用notifyAll方法释放所有等待自己的线程(thread.cpp源码)。

  • 调用其它线程的join方法时,会先调用isAlive()方法查看该线程对象是否存活,如果存活则wai进行等待

    public final synchronized void join() throws InterruptedException {
     // 被调用线程处于活跃状态
         while (isAlive()) {
         	wait(0);
         }
     // 被调用线程终结
    }
    
    1
    2
    3
    4
    5
    6
    7
    • B线程首先获取到A对象上的锁
    • A线程执行自己的wait方法,获取到锁的对象(即B线程)被阻塞
    • 当线程A结束之后,会执行notifyAll方法,释放所有等待A的所有线程
    • B线程被唤醒,同时A线程不再处于活跃态跳出死循环。B线程继续执行后续的方法
  • 与普通等待通知机制不同之处在于,join方法中并不存在中间obj对象,而是通知线程自身作为锁对象。

下面给出一个基于join方法,控制多线程同步顺序执行,打印输出的例子:

public class JoinDemo {
    static int num=0;
    public static void main(String[] args) {
        Thread pre = new Thread();
        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(new Join(pre), "Thread" + String.valueOf(i));
            thread.start();
            pre = thread;
        }
    }
    static class Join implements Runnable {
        private Thread thread;
        public Join(Thread thread){
            this.thread = thread;
        }
        @Override
        public void run() {
            try {
                thread.join();
                System.out.println(num++);
            } catch (InterruptedException e) {
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# 5.ThreadLocal

ThreadLocal线程变量:

  • 对于同一个线程来说,key为threadlocal对象,value为对象值。也就是说每个线程在一个ThreadLocal对象上只能独立存储一个值。
  • ThreadLocal对象通常是静态变量。
  • ThreadLocal变量的线程隔离性:一个ThreadLocal对象可以被多个不同线程访问,会维护不同线程的变量副本,存储对应不同线程对应条目的值。每个线程都可以在同一个ThreadLocal对象存储特定的数据。
static ThreadLocal<Long> threadLocal = new ThreadLocal<Long>();

public static void set(long tmp) {
    threadLocal.set(tmp);
}

public static long get() {
    return threadLocal.get();
}
1
2
3
4
5
6
7
8
9

# 6.CountDownLatch

CountDownLatch

用于同时控制多个不同线程的行为,包括同步和阻塞。

countDownLatch.countDown():当前计数减一,若对象计数为0,则所有线程await被阻塞的地方会返回。

countDownLatch.await():当前线程阻塞,直到countDownLatch对象计数减为0时,才将线程唤醒。

# 线程应用实例

# 1.超时控制的等待/通知机制

在原while条件中,添加超时时间判断。synchronized加锁可以加在函数上,或同步块。

public synchronized get(long mills){
    long future = System.currentTimeMillis() + mills;
    long remaining = mills;
    //数据库为空则进行超时等待
    while (pool.isEmpty() && remaining > 0) {
        pool.wait(remaining);
        //有可能出现还未超时但被中途唤醒的情况,且线程池为空的情况
        //那么就继续等
        remaining = future - System.currentTimeMillis();
    }	
}
1
2
3
4
5
6
7
8
9
10
11

# 2.数据库连接池

AtomicInteger

原子整数类型,通过incrementAndGet方法可以原子性的增加数值,并返回结果。

多线程下,只会有一个线程能够成功增加整数的值,其它线程会等待。

场景:创建一个只有十个连接对象的连接池。然后多线程模拟并发竞争获取数据库连接,线程数量10个,每个线程尝试获取连接20次,同个线程获取连接对象是串行执行。要求如下:

  • 连接池给线程提供超时获取,若到达超时时间还未能获取连接,则直接返回空.——采用超时等待机制
  • 保证多个线程同时开始竞争获取连接对象——CountDownLatch对象实现。
  • 每个线程获取到对象后,获取变量累加。因为这个累加过程属于多线程操作同一个共享变量,因此这个获取变量采用AtomInteger对象,并调用incrementAndGet方法进行累加。

连接池实现如下,设计时可以由简到复杂进行设计和思考:

  • 连接数有限,仅靠while判断连接池数量存在并发问题,会出现多个线程同时获取一个连接的情况。

    方案:sychronized+while

  • 如果当前连接池的连接对象都被获取了,那么下一个线程到来时,它的行为可以分为两种:

    • 一直忙等直到能够获取连接。实现方案是引入wait和notifyAll等待通知机制,多线程条件下,执行wait后会立刻释放锁,让其它更多的线程能够进入同步块。notifyAll后,只有一个线程能够抢到锁被唤醒。
    • 线程设置超时等待状态,到达超时时间后如果连接池数量还是为空,则退出释放锁。实现方案调用wait(time)方法,此时多个超时等待线程会以两种形式被唤醒,不管是哪种,在同步块中只能有一个线程在同步块中执行代码,其余的要么变为阻塞态,要么继续处于超时等待状态:
      • notifyAll唤醒:被唤醒时连接池不一定有空余连接对象,因此线程需要继续执行"等待",满足超时的语义。因此while需要添加一个等待时间判断。
      • 超时:多个线程都到达了超时时间,在同步块中,最终只有一个线程能够唤醒继续执行,其余线程变为阻塞态。
public class ConnectionPool {
    private LinkedList<Connection> pool = new LinkedList<>();

    public ConnectionPool(int initialSize) {
        if (initialSize > 0) {
            for (int i = 0; i < initialSize; i++) {
                pool.addLast(ConnectionDriver.createConnection());
            }
        }
    }
    public void releaseConnection(Connection connection) {
        if (connection != null) {
            synchronized (pool) {
                pool.addLast(connection);
                //通知所有等待使用连接池的线程
                pool.notifyAll();
                //pool.notify();
            }
        }
    }
    public Connection fetchConnection(long mills) throws InterruptedException {
        synchronized (pool) {
            if (mills <= 0) {
                //超时时间小于等于0,则需要一直等下去
                while (pool.isEmpty()) {
                    pool.wait();
                }
                return pool.removeFirst();
            } else {
                long future = System.currentTimeMillis() + mills;
                long remaining = mills;
                //数据库为空则进行超时等待
                while (pool.isEmpty() && remaining > 0) {
                    pool.wait(remaining);
                    //有可能出现还未超时,但多个线程同时被中途唤醒的情况
                    //此时仅有一个线程能够走出循环,其余线程需要继续等待下一次...因此虽然跳出超时等待状态,但是并非到达设定的时间
                    remaining = future - System.currentTimeMillis();
                }
                Connection result = null;
                if (!pool.isEmpty()) {
                    result = pool.removeFirst();
                }
                return result;
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

✨在多线程环境下,针对昂贵的资源的获取,可以复用上述超时机制。

# 3.线程池

线程池中,一般情况下任务数量(Job)可能会多于线程数量(Worker),因此需要分为Job对象和Worker对象。对线程池的操作主要分为以下几种:

  • 线程池初始化:创建多个worker对象,并启动每个执行线程worker。

  • 添加/删除线程:操作的对象是worker执行线程队列。删除时从队列删除,还需要关闭暂停worker。

  • 关闭线程池:修改执行线程队列中,每个worker线程的执行标志位。

  • 执行任务:将需要执行的任务job对象加入到待执行的任务队列当中。初始化后,此时所有执行线程都处于wait等待状态,因此每加入一个新的任务job,都需要通过等待通知机制notify唤醒其中一个执行线程。

Worker线程负责调用外部Job任务,关键在run方法:

  • 循环执行:每个线程一旦start进入运行态,while(running)循环执行每一个job任务,执行完一个再阻塞式获取下个job执行。
  • 执行每一个Job任务:三段式阻塞获取job,并调用job.run方法执行任务。

# 添加和移除worker线程,为什么需要获取jobs的并发锁❓

移除线程若不获取任务队列锁,那么很有可能出现这么一种情况:

  • 执行线程worker1获取分配到了job1
  • 外部线程2删除了worker1线程

因此可能导致job1任务会“消失”,不能被执行。

# notify和notifyAll区别

调用notify时,执行线程池的所有线程可能有两种情况:

  • 所有线程可能都在执行job.run()方法,那么此时没有需要被唤醒的线程。此时任务队列不为空。
  • 存在空闲的worker线程,那么将其唤醒。

故此处使用notify开销更小,已经足够了,不需要notifyAll将所有的线程从等待队列移动到阻塞队列。

public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {
    //线程池最大数量
    private static final int MAX_WORKER_NUMBERS = 10;
    //线程池最小数量
    private static final int MIN_WORKER_NUMBERS = 1;
    //线程池默认数量
    private static final int DEFAULT_WORKER_NUMBERS = 5;
    //待执行任务队列
    private final LinkedList<Job> jobs = new LinkedList<>();
    //执行线程队列
    private final List<Worker> workers = Collections.synchronizedList(new ArrayList<>());
    //工作线程数量
    private int workerNum = DEFAULT_WORKER_NUMBERS;
    //线程编号生成
    private AtomicLong threadNum = new AtomicLong();

    public DefaultThreadPool() {
        initializeWorker(DEFAULT_WORKER_NUMBERS);
    }

    public DefaultThreadPool(int workerNum) {
        if(workerNum>MAX_WORKER_NUMBERS) this.workerNum = MAX_WORKER_NUMBERS;
        else if(workerNum<MIN_WORKER_NUMBERS) this.workerNum = MIN_WORKER_NUMBERS;
        else this.workerNum = workerNum;
        initializeWorker(this.workerNum);
    }
    @Override
    public void excute(Job job) {
        if (job != null) {
            synchronized (jobs) {
                jobs.addLast(job);
                jobs.notify();
            }
        }
    }
    @Override
    public void shutdown() {
        for (Worker worker : workers) {
            worker.shutdown();
        }
    }
    @Override
    public void addWorker(int num) {
        synchronized (jobs) {
            if (num + this.workerNum > MAX_WORKER_NUMBERS) {
                num = MAX_WORKER_NUMBERS - this.workerNum;
            }
            initializeWorker(num);
            this.workerNum += num;
        }
    }
    @Override
    public void removeWorker(int num) {
        synchronized (jobs) {
            if (num > MIN_WORKER_NUMBERS) {
                throw new IllegalArgumentException("beyond workNum");
            }
            int count = 0;
            while (count < num) {
                if (workers.size()<=0) {
                    break;
                }
                Worker worker=workers.remove(0);
                worker.shutdown();
                count++;
            }
            this.workerNum -= num;
        }
    }
    @Override
    public int getJobSize() {
        return jobs.size();
    }

    public void initializeWorker(int workerNum) {
        for (int i = 0; i < workerNum; i++) {
            Worker worker = new Worker();
            workers.add(worker);
            Thread thread = new Thread(worker, "worker-" + threadNum.incrementAndGet());
            thread.start();
        }
    }

    class Worker implements Runnable {
        private volatile boolean running = true;
        @Override
        public void run() {
            //循环取出任务并执行,做完一个job做下一个
            while (running) {
                Job job = null;
                //三段式阻塞获取job
                synchronized (jobs) {
                    while (jobs.isEmpty()) {
                        try {
                            jobs.wait();
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                    job = jobs.removeFirst();
                }
                //及时释放锁
                if (job != null) {
                    try {
                        job.run();
                    } catch (Exception e) {
                        //忽略任务线程出现的所有异常
                    }
                }
            }
        }
        
        public void shutdown() {
            running = false;
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118

线程池在web服务器的使用:

public class Server{
	static DefaultThreadPool<HttpRequestHandler> threadPool=new DefaultThreadPool<>();
	public static void start(){
		while(循环接受socket连接){
			threadPool.excute(new HttpRequestHandler());
		}
	}
	static class HttpRequestHandler implements Runnable{
		public HttpRequestHandler(){
			//初始化
		}
		@Override
		public void run(){
			//连接处理逻辑
		}
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
编辑 (opens new window)
#JUC
上次更新: 2023/12/15, 15:49:57
JMM语义与重排
Lock并发锁原理

← JMM语义与重排 Lock并发锁原理→

Theme by Vdoing | Copyright © 2023-2024 blageCoder
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式