多线程通信与编程应用
# 多线程通信与编程应用
本章节主要讨论如下两点:
- 多线程通信方式
- 多线程编程应用
- 超时等待通知机制
- 线程池
# 线程状态

JAVA线程状态可以分为以下六种状态,其中操作系统的就绪态和运行态在JAVA中都合并为"运行"状态:
- 初始态—>Thread.start()—>运行态—>终止态
- 阻塞态:等待进入synchronized获取锁的状态。——执行同步方法
- 等待态:对象执行wait()方法,需要其它线程通知回到运行状态。
- 超时等待态:处于等待态的线程到了超时时间后,自动回到运行状态。
# Java线程属性
一个Java程序分成多线程,包括main线程,可以充分利用分配到多个处理器核心,提高计算效率和性能,加快响应时间。
# 线程优先级Priority
每个线程可以通过设置优先级来控制分片时间,优先级越高,分配时间片数量越多。设置规则如下:
Thread thread=new Thread(job,"Thread-1");
thread.setPriority(10);
2
- 针对频繁阻塞的线程(IO操作,休眠)需要设置较高优先级。需要CPU及时处理,提高响应时间。
- 偏向于计算的线程则可以设置较低优先级。防止较长时间霸占使用CPU。
Thread.yield():当前线程让出时间片,处于就绪态。下一个时间片让给其它线程,或者再次被当前线程获得。
# Daemon线程
Daemon守护线程用以完成程序中后台调度和支持性工作。当虚拟机全都是Daemon线程时(比如main线程结束),表明当前已经没有线程任务需要执行,虚拟机退出,所有线程需要立即终止。常见的比如垃圾回收线程都是守护线程。
换而言之,如果当前线程需要设置在main线程结束后,也同时结束,那么可以设置为Daemon守护线程。
Thread thread = new Thread(new DaemonRunner(), "DaemonRunner");
thread.setDaemon(true);
thread.start();
2
3
# interrupt线程中断
其它线程A通过调用另一个Thread对象B的interrupt()方法,实现对B的中断操作。
注意:如果线程在sleep时,被调用其它线程调用interrupt方法,那么线程会报中断异常。
Thread对象可以通过isInterrupted方法,返回当前当前线程是否被中断。有两种情况中断标记会被复位:
- 当前线程已处于终结状态。即使被中断过,也直接返回false。
- 处在sleep的线程A被其它线程B执行中断方法,则A线程会抛出InterruptedException方法,抛出异常前Java虚拟机会将该线程中断标识位清除,此时返回false。
Thread thread = new Thread(new InterruptRun(), "interruptedThread-1");
thread.start();
thread.interrupt();
System.out.println("线程中断标志"+thread.isInterrupted());
2
3
4
Thread.interrupted():返回当前线程的中断状态,并将中断状态置为false复位。在被中断线程内部调用,感知外部是否对当前线程进行了中断操作。
“中断”本质上就是一个基于标志位的协作机制,被中断线程并不会中断程序执行,程序正常运行。
✨最佳实践:基于interrupt中断机制和volatile终止线程,并安全地关闭资源。
# 线程之间的通信机制
# 1.volatile和synchronized
volatile:保证每个线程读写共享变量都是从共享内存中进行。
synchronized:多线程在执行同步块或者同步方法时,获取锁的过程本质上是获取同步对象的监视器,同一时刻只能有一个线程能够获取到该监视器。
# 2.等待通知机制
两个线程之间使用中间桥梁——Object对象的wait、notify方法实现通信。
| 方法名称 | 描述 |
|---|---|
| Object.wait() | 假设线程A调用了obj对象的wait方法,那么线程A会进入“等待”状态,并会释放该对象的锁。仅当其它线程B通知或是被中断才返回。 |
| Object.notify() | 通知在该对象obj上wait的线程,从wait方法返回。 |
| Object.wait(time) | 超时自动返回 |
wait和notify相当于信号量同步机制,要想实现B线程执行完后再通知A线程,可以如下操作:
- 线程A先获取obj对象的锁,并调用obj.wait()方法,进入等待状态。
- 线程B执行自己的业务操作,然后再调用obj.notify()方法,唤醒A线程
- 线程A被唤醒,从wait方法返回,继续执行后续逻辑。
使用时需要注意,wait和notify方法调用之前需要获取到对象obj的锁,而wait方法调用后会释放对象锁(防止死锁),notify方法调用结束后,释放obj对象锁后才是真正唤醒等待线程。例子如下:
public class WaitAndNotify {
static Object obj = new Object();
public static void main(String[] args) throws Exception{
Thread thread = new Thread(new WaitDemo(), "wait-1");
Thread thread1 = new Thread(new NotifyDemo(), "noty-2");
thread.start();
TimeUnit.SECONDS.sleep(1);
thread1.start();
}
static class WaitDemo implements Runnable {
@Override
public void run() {
try {
synchronized (obj) {
obj.wait();
}
System.out.println("wait线程成功唤醒");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
static class NotifyDemo implements Runnable {
@Override
public void run() {
synchronized (obj) {
obj.notify();
System.out.println("此处还未正式完成通知,唤醒线程");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("释放锁,完成线程唤醒");
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
抽象成消费者和生产者模型,obj对象锁往往不能代表具体场景下的限制条件,因此需要结合while进行代码控制。对于等待线程而言,经典三段式如下:
sychronized(obj){
while(条件不满足){ //满足条件则跳出循环
obj.wait();
}
}
2
3
4
5
对于通知线程而言:
synchronized(obj){
修改条件
obj.notify();
}
2
3
4
# 3.管道输入输出流
主要用于线程之间的数据传输。
| class | 描述 |
|---|---|
| PipedOutputStream | 字节输出流 |
| PipedInputStream | 字节输入流 |
| PipedReader | 字符读 |
| PipedWriter | 字符写 |
所谓管道只需要记住一点,写管道会将数据传入流向读管道。
线程之间可以通过管道进行交互,这两个管道需要进行关联。主线程将输入控制台的字符(通过系统标准输入流)读出来,并写入“写管道”:
while ((reveive = System.in.read()) != -1) {
out.write(33);
}
2
3
另一个读线程(负责处理数据),则通过“读管道”拿到数据,进行进一步的处理:
static class Print implements Runnable {
private PipedReader in;
public Print(PipedReader in) {
this.in = in;
}
@Override
public void run() {
int reveive = 0;
try {
//从管道中 拿到写进程传入的数据
while ((reveive = in.read()) != -1) {
System.out.print((char) reveive);
}
} catch (Exception e) {
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 4.thread.join
join方法用于控制两个线程之间的同步先后关系。
threadA.join():在线程B中调用线程A的join方法,则线程B进入等待状态,直到线程A终止之后线程B才会从join返回,继续执行剩下代码。
可以发现join的语义类似于等待通知机制,实际上join方法底层就是基于wait和notify实现的,以B线程中调用ThreadA.join()为例:
每个线程终结死亡时,都会调用notifyAll方法释放所有等待自己的线程(thread.cpp源码)。
调用其它线程的join方法时,会先调用isAlive()方法查看该线程对象是否存活,如果存活则wai进行等待
public final synchronized void join() throws InterruptedException { // 被调用线程处于活跃状态 while (isAlive()) { wait(0); } // 被调用线程终结 }1
2
3
4
5
6
7- B线程首先获取到A对象上的锁
- A线程执行自己的wait方法,获取到锁的对象(即B线程)被阻塞
- 当线程A结束之后,会执行notifyAll方法,释放所有等待A的所有线程
- B线程被唤醒,同时A线程不再处于活跃态跳出死循环。B线程继续执行后续的方法
与普通等待通知机制不同之处在于,join方法中并不存在中间obj对象,而是通知线程自身作为锁对象。
下面给出一个基于join方法,控制多线程同步顺序执行,打印输出的例子:
public class JoinDemo {
static int num=0;
public static void main(String[] args) {
Thread pre = new Thread();
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(new Join(pre), "Thread" + String.valueOf(i));
thread.start();
pre = thread;
}
}
static class Join implements Runnable {
private Thread thread;
public Join(Thread thread){
this.thread = thread;
}
@Override
public void run() {
try {
thread.join();
System.out.println(num++);
} catch (InterruptedException e) {
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 5.ThreadLocal
ThreadLocal线程变量:
- 对于同一个线程来说,key为threadlocal对象,value为对象值。也就是说每个线程在一个ThreadLocal对象上只能独立存储一个值。
- ThreadLocal对象通常是静态变量。
- ThreadLocal变量的线程隔离性:一个ThreadLocal对象可以被多个不同线程访问,会维护不同线程的变量副本,存储对应不同线程对应条目的值。每个线程都可以在同一个ThreadLocal对象存储特定的数据。
static ThreadLocal<Long> threadLocal = new ThreadLocal<Long>();
public static void set(long tmp) {
threadLocal.set(tmp);
}
public static long get() {
return threadLocal.get();
}
2
3
4
5
6
7
8
9
# 6.CountDownLatch
CountDownLatch
用于同时控制多个不同线程的行为,包括同步和阻塞。
countDownLatch.countDown():当前计数减一,若对象计数为0,则所有线程await被阻塞的地方会返回。
countDownLatch.await():当前线程阻塞,直到countDownLatch对象计数减为0时,才将线程唤醒。
# 线程应用实例
# 1.超时控制的等待/通知机制
在原while条件中,添加超时时间判断。synchronized加锁可以加在函数上,或同步块。
public synchronized get(long mills){
long future = System.currentTimeMillis() + mills;
long remaining = mills;
//数据库为空则进行超时等待
while (pool.isEmpty() && remaining > 0) {
pool.wait(remaining);
//有可能出现还未超时但被中途唤醒的情况,且线程池为空的情况
//那么就继续等
remaining = future - System.currentTimeMillis();
}
}
2
3
4
5
6
7
8
9
10
11
# 2.数据库连接池
AtomicInteger
原子整数类型,通过incrementAndGet方法可以原子性的增加数值,并返回结果。
多线程下,只会有一个线程能够成功增加整数的值,其它线程会等待。
场景:创建一个只有十个连接对象的连接池。然后多线程模拟并发竞争获取数据库连接,线程数量10个,每个线程尝试获取连接20次,同个线程获取连接对象是串行执行。要求如下:
- 连接池给线程提供超时获取,若到达超时时间还未能获取连接,则直接返回空.——采用超时等待机制
- 保证多个线程同时开始竞争获取连接对象——CountDownLatch对象实现。
- 每个线程获取到对象后,获取变量累加。因为这个累加过程属于多线程操作同一个共享变量,因此这个获取变量采用AtomInteger对象,并调用incrementAndGet方法进行累加。
连接池实现如下,设计时可以由简到复杂进行设计和思考:
连接数有限,仅靠while判断连接池数量存在并发问题,会出现多个线程同时获取一个连接的情况。
方案:sychronized+while
如果当前连接池的连接对象都被获取了,那么下一个线程到来时,它的行为可以分为两种:
- 一直忙等直到能够获取连接。实现方案是引入wait和notifyAll等待通知机制,多线程条件下,执行wait后会立刻释放锁,让其它更多的线程能够进入同步块。notifyAll后,只有一个线程能够抢到锁被唤醒。
- 线程设置超时等待状态,到达超时时间后如果连接池数量还是为空,则退出释放锁。实现方案调用wait(time)方法,此时多个超时等待线程会以两种形式被唤醒,不管是哪种,在同步块中只能有一个线程在同步块中执行代码,其余的要么变为阻塞态,要么继续处于超时等待状态:
- notifyAll唤醒:被唤醒时连接池不一定有空余连接对象,因此线程需要继续执行"等待",满足超时的语义。因此while需要添加一个等待时间判断。
- 超时:多个线程都到达了超时时间,在同步块中,最终只有一个线程能够唤醒继续执行,其余线程变为阻塞态。
public class ConnectionPool {
private LinkedList<Connection> pool = new LinkedList<>();
public ConnectionPool(int initialSize) {
if (initialSize > 0) {
for (int i = 0; i < initialSize; i++) {
pool.addLast(ConnectionDriver.createConnection());
}
}
}
public void releaseConnection(Connection connection) {
if (connection != null) {
synchronized (pool) {
pool.addLast(connection);
//通知所有等待使用连接池的线程
pool.notifyAll();
//pool.notify();
}
}
}
public Connection fetchConnection(long mills) throws InterruptedException {
synchronized (pool) {
if (mills <= 0) {
//超时时间小于等于0,则需要一直等下去
while (pool.isEmpty()) {
pool.wait();
}
return pool.removeFirst();
} else {
long future = System.currentTimeMillis() + mills;
long remaining = mills;
//数据库为空则进行超时等待
while (pool.isEmpty() && remaining > 0) {
pool.wait(remaining);
//有可能出现还未超时,但多个线程同时被中途唤醒的情况
//此时仅有一个线程能够走出循环,其余线程需要继续等待下一次...因此虽然跳出超时等待状态,但是并非到达设定的时间
remaining = future - System.currentTimeMillis();
}
Connection result = null;
if (!pool.isEmpty()) {
result = pool.removeFirst();
}
return result;
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
✨在多线程环境下,针对昂贵的资源的获取,可以复用上述超时机制。
# 3.线程池
线程池中,一般情况下任务数量(Job)可能会多于线程数量(Worker),因此需要分为Job对象和Worker对象。对线程池的操作主要分为以下几种:
线程池初始化:创建多个worker对象,并启动每个执行线程worker。
添加/删除线程:操作的对象是worker执行线程队列。删除时从队列删除,还需要关闭暂停worker。
关闭线程池:修改执行线程队列中,每个worker线程的执行标志位。
执行任务:将需要执行的任务job对象加入到待执行的任务队列当中。初始化后,此时所有执行线程都处于wait等待状态,因此每加入一个新的任务job,都需要通过等待通知机制notify唤醒其中一个执行线程。
Worker线程负责调用外部Job任务,关键在run方法:
- 循环执行:每个线程一旦start进入运行态,while(running)循环执行每一个job任务,执行完一个再阻塞式获取下个job执行。
- 执行每一个Job任务:三段式阻塞获取job,并调用job.run方法执行任务。
# 添加和移除worker线程,为什么需要获取jobs的并发锁❓
移除线程若不获取任务队列锁,那么很有可能出现这么一种情况:
- 执行线程worker1获取分配到了job1
- 外部线程2删除了worker1线程
因此可能导致job1任务会“消失”,不能被执行。
# notify和notifyAll区别
调用notify时,执行线程池的所有线程可能有两种情况:
- 所有线程可能都在执行job.run()方法,那么此时没有需要被唤醒的线程。此时任务队列不为空。
- 存在空闲的worker线程,那么将其唤醒。
故此处使用notify开销更小,已经足够了,不需要notifyAll将所有的线程从等待队列移动到阻塞队列。
public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {
//线程池最大数量
private static final int MAX_WORKER_NUMBERS = 10;
//线程池最小数量
private static final int MIN_WORKER_NUMBERS = 1;
//线程池默认数量
private static final int DEFAULT_WORKER_NUMBERS = 5;
//待执行任务队列
private final LinkedList<Job> jobs = new LinkedList<>();
//执行线程队列
private final List<Worker> workers = Collections.synchronizedList(new ArrayList<>());
//工作线程数量
private int workerNum = DEFAULT_WORKER_NUMBERS;
//线程编号生成
private AtomicLong threadNum = new AtomicLong();
public DefaultThreadPool() {
initializeWorker(DEFAULT_WORKER_NUMBERS);
}
public DefaultThreadPool(int workerNum) {
if(workerNum>MAX_WORKER_NUMBERS) this.workerNum = MAX_WORKER_NUMBERS;
else if(workerNum<MIN_WORKER_NUMBERS) this.workerNum = MIN_WORKER_NUMBERS;
else this.workerNum = workerNum;
initializeWorker(this.workerNum);
}
@Override
public void excute(Job job) {
if (job != null) {
synchronized (jobs) {
jobs.addLast(job);
jobs.notify();
}
}
}
@Override
public void shutdown() {
for (Worker worker : workers) {
worker.shutdown();
}
}
@Override
public void addWorker(int num) {
synchronized (jobs) {
if (num + this.workerNum > MAX_WORKER_NUMBERS) {
num = MAX_WORKER_NUMBERS - this.workerNum;
}
initializeWorker(num);
this.workerNum += num;
}
}
@Override
public void removeWorker(int num) {
synchronized (jobs) {
if (num > MIN_WORKER_NUMBERS) {
throw new IllegalArgumentException("beyond workNum");
}
int count = 0;
while (count < num) {
if (workers.size()<=0) {
break;
}
Worker worker=workers.remove(0);
worker.shutdown();
count++;
}
this.workerNum -= num;
}
}
@Override
public int getJobSize() {
return jobs.size();
}
public void initializeWorker(int workerNum) {
for (int i = 0; i < workerNum; i++) {
Worker worker = new Worker();
workers.add(worker);
Thread thread = new Thread(worker, "worker-" + threadNum.incrementAndGet());
thread.start();
}
}
class Worker implements Runnable {
private volatile boolean running = true;
@Override
public void run() {
//循环取出任务并执行,做完一个job做下一个
while (running) {
Job job = null;
//三段式阻塞获取job
synchronized (jobs) {
while (jobs.isEmpty()) {
try {
jobs.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
job = jobs.removeFirst();
}
//及时释放锁
if (job != null) {
try {
job.run();
} catch (Exception e) {
//忽略任务线程出现的所有异常
}
}
}
}
public void shutdown() {
running = false;
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
线程池在web服务器的使用:
public class Server{
static DefaultThreadPool<HttpRequestHandler> threadPool=new DefaultThreadPool<>();
public static void start(){
while(循环接受socket连接){
threadPool.excute(new HttpRequestHandler());
}
}
static class HttpRequestHandler implements Runnable{
public HttpRequestHandler(){
//初始化
}
@Override
public void run(){
//连接处理逻辑
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17