并发容器与框架
# 并发容器与框架
# 1.ConcurrentHashMap
问题:HashMap不能解决多并发场景问题,HashTable因为整个容器都使用一把synchronized锁,效率低下。
ConcurrentHashMap采用分段锁思想,数据分段存储并用分段锁控制。
一个ConcurrentHashMap包含一个Segment可重入锁数组,一个Segment元素包含一个HashEntry数组。
# get操作
通过散列算法,首先定位到Segment,然后再定位到HashEntry。
Segment内一些共享变量采用volatile修饰,包括当前Segment内元素大小count和value值,整个get操作只需要对它们进行读操作,因此都不需要加锁,性能高效。
transient volatile int count;
volatile V value;
2
# put操作
put操作必须加锁。首先定位到Segment,然后执行插入操作。
插入操作中需要对数组进行扩容,判断Segment内的HashEntry数组是否超过容量,超过扩充为原来容量的两倍。为了性能高效,扩容只会对某个Segment进行,而不会对整个ConcurrentHashMap容器扩容。
# size操作
通过两次累加所有Segment的count的方式计算容器大小。如果“统计时发现count发生了变化”,则需要加锁。
判断统计时容器发生了变化:addCount()方法中通过modCount变量,在put,remove方法操作元素前将modCount加1,统计size前后如果modCount不一致,则说明容器发生变化。
# 2.ConcurrentLinkedQueue
线程安全队列有两种实现方式:
- 阻塞式:队列的出队和入队使用锁来控制
- 非阻塞式:循环CAS
ConcurrentLinkedQueue是一个非阻塞的线程安全队列。主要解决以下问题:
- 入队:多个线程同时入队,导致获取到的tail节点并不是真正的尾节点,有其它线程执行了入队操作更新了尾节点,需要重新获取队列的尾节点。
- 出队:多个线程同时出队,当前head节点CAS设置为空失败,则说明已经被其它线程进行了一次出队操作。
# 3.阻塞队列
阻塞队列用于生产者消费者场景,它有以下特点:
- 队列插入时,队列已满则会被阻塞。
- 队列删除时,队列为空会被阻塞。
对于插入和删除操作,有几种处理方法:①一直阻塞 ②超时阻塞 ③抛出异常 ④返回特殊值
Java提供七个阻塞队列,其中具有以下特性:
- DelayQueue:支持延时从队列获取元素。当延迟期(相当于设置时间期限)够了之后,才能从队列提取元素。
- SynchronousQueue:支持将数据从一个线程传递给另一个线程处理,本身不存储元素。
# 4.Fork/Join框架
并行任务执行框架,分为两个步骤:
- 分割:将大任务分割成子任务,分割的子任务放在多个双端队列中。
- 合并:启动线程分别从双端队列中获取任务执行,每个线程对应一个双端队列。执行完的结果都统一放在一个队列中,最后启动一个线程从队列里拿数据进行合并。
当一个工作线程的队列没有任务时,会随机从其它工作线程队列的尾部获取任务执行。
# 框架使用案例
Fork/Join核心在于实现compute()方法,它的逻辑包括判断任务是否需要分割,然后调用fork()执行最后合并任务结果。提供如下类和方法实现:
ForkJoinTask:定义创建ForkJoin任务。用户自定义的任务类需要继承这个类别,一般会继承两个子类:
- RecursiveAction:用于没有返回结果的任务。
- RecusiveTask<返回类型>:用于有返回结果的任务。
ForkJoinPool:调用submit(task)方法执行ForkJoinTask任务。
每次在调用任务的fork()方法时,会重新进入实现的compute()方法。join()方法则会等待子任务执行完,并得到结果。
使用ForkJoin计算1+2+3+4的案例如下:
public class CountTask extends RecursiveTask<Integer> {
private static final int THRESHOLD=2;
private int start;
private int end;
public CountTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
boolean canFork = end - start >= THRESHOLD;
//判断是否需要分割子任务
if (!canFork) {
for (int i = start; i <= end; i++) {
sum += i;
}
}
else {
int mid=(start+end)/2;
CountTask countTask1 = new CountTask(start, mid);
CountTask countTask2 = new CountTask(mid + 1, end);
//执行子任务
countTask1.fork();
countTask2.fork();
//等待任务执行
Integer result1 = countTask1.join();
Integer result2 = countTask2.join();
sum = result1 + result2;
}
return sum;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
CountTask countTask = new CountTask(1, 4);
Future<Integer> result = forkJoinPool.submit(countTask);
System.out.println(result.get());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
另外,可以通过task.isCompletedAbnormally()判断是否抛出异常或者任务取消,getException()方法获取异常。
# fork源码
将当前任务存放在ForkJoinTask数组队列,再调用ForkJoinPool的signalWork方法唤醒或者创建一个工作线程执行任务。fork()会触发调用子任务的compute()方法。
# Join源码
注意这里要区分Task.join()方法和线程的thread.join()方法。虽然都是进入等待状态,但是前者是等待任务,后者是等待线程。
doJoin()方法查看任务状态,如果是已经完成则返回任务结果。