Blage's Coding Blage's Coding
Home
算法
  • 手写Spring
  • SSM
  • SpringBoot
  • JavaWeb
  • JAVA基础
  • 容器
  • Netty

    • IO模型
    • Netty初级
    • Netty原理
  • JVM
  • JUC
  • Redis基础
  • 源码分析
  • 实战应用
  • 单机缓存
  • MySQL

    • 基础部分
    • 实战与处理方案
    • 面试
  • ORM框架

    • Mybatis
    • Mybatis_Plus
  • SpringCloudAlibaba
  • MQ消息队列
  • Nginx
  • Elasticsearch
  • Gateway
  • Xxl-job
  • Feign
  • Eureka
  • 面试
  • 工具
  • 项目
  • 关于
🌏本站
🧸GitHub (opens new window)
Home
算法
  • 手写Spring
  • SSM
  • SpringBoot
  • JavaWeb
  • JAVA基础
  • 容器
  • Netty

    • IO模型
    • Netty初级
    • Netty原理
  • JVM
  • JUC
  • Redis基础
  • 源码分析
  • 实战应用
  • 单机缓存
  • MySQL

    • 基础部分
    • 实战与处理方案
    • 面试
  • ORM框架

    • Mybatis
    • Mybatis_Plus
  • SpringCloudAlibaba
  • MQ消息队列
  • Nginx
  • Elasticsearch
  • Gateway
  • Xxl-job
  • Feign
  • Eureka
  • 面试
  • 工具
  • 项目
  • 关于
🌏本站
🧸GitHub (opens new window)
  • JAVA基础

  • 集合容器

  • Netty

  • JVM

  • JUC

    • 并发机制初识
    • JMM语义与重排
    • 多线程通信与编程应用
    • Lock并发锁原理
    • 并发容器与框架
      • 1.ConcurrentHashMap
        • get操作
        • put操作
        • size操作
      • 2.ConcurrentLinkedQueue
      • 3.阻塞队列
      • 4.Fork/Join框架
        • 框架使用案例
        • fork源码
        • Join源码
    • 原子操作类
    • 并发工具类
    • 线程池
    • Executor框架
    • 并发编程实践
    • JUC面试
  • Java
  • JUC
phan
2023-10-17
目录

并发容器与框架

# 并发容器与框架

# 1.ConcurrentHashMap

问题:HashMap不能解决多并发场景问题,HashTable因为整个容器都使用一把synchronized锁,效率低下。

ConcurrentHashMap采用分段锁思想,数据分段存储并用分段锁控制。

一个ConcurrentHashMap包含一个Segment可重入锁数组,一个Segment元素包含一个HashEntry数组。

# get操作

通过散列算法,首先定位到Segment,然后再定位到HashEntry。

Segment内一些共享变量采用volatile修饰,包括当前Segment内元素大小count和value值,整个get操作只需要对它们进行读操作,因此都不需要加锁,性能高效。

transient volatile int count;
volatile V value;
1
2

# put操作

put操作必须加锁。首先定位到Segment,然后执行插入操作。

插入操作中需要对数组进行扩容,判断Segment内的HashEntry数组是否超过容量,超过扩充为原来容量的两倍。为了性能高效,扩容只会对某个Segment进行,而不会对整个ConcurrentHashMap容器扩容。

# size操作

通过两次累加所有Segment的count的方式计算容器大小。如果“统计时发现count发生了变化”,则需要加锁。

判断统计时容器发生了变化:addCount()方法中通过modCount变量,在put,remove方法操作元素前将modCount加1,统计size前后如果modCount不一致,则说明容器发生变化。

# 2.ConcurrentLinkedQueue

线程安全队列有两种实现方式:

  • 阻塞式:队列的出队和入队使用锁来控制
  • 非阻塞式:循环CAS

ConcurrentLinkedQueue是一个非阻塞的线程安全队列。主要解决以下问题:

  • 入队:多个线程同时入队,导致获取到的tail节点并不是真正的尾节点,有其它线程执行了入队操作更新了尾节点,需要重新获取队列的尾节点。
  • 出队:多个线程同时出队,当前head节点CAS设置为空失败,则说明已经被其它线程进行了一次出队操作。

# 3.阻塞队列

阻塞队列用于生产者消费者场景,它有以下特点:

  • 队列插入时,队列已满则会被阻塞。
  • 队列删除时,队列为空会被阻塞。

对于插入和删除操作,有几种处理方法:①一直阻塞 ②超时阻塞 ③抛出异常 ④返回特殊值

Java提供七个阻塞队列,其中具有以下特性:

  • DelayQueue:支持延时从队列获取元素。当延迟期(相当于设置时间期限)够了之后,才能从队列提取元素。
  • SynchronousQueue:支持将数据从一个线程传递给另一个线程处理,本身不存储元素。

# 4.Fork/Join框架

并行任务执行框架,分为两个步骤:

  • 分割:将大任务分割成子任务,分割的子任务放在多个双端队列中。
  • 合并:启动线程分别从双端队列中获取任务执行,每个线程对应一个双端队列。执行完的结果都统一放在一个队列中,最后启动一个线程从队列里拿数据进行合并。

当一个工作线程的队列没有任务时,会随机从其它工作线程队列的尾部获取任务执行。

# 框架使用案例

Fork/Join核心在于实现compute()方法,它的逻辑包括判断任务是否需要分割,然后调用fork()执行最后合并任务结果。提供如下类和方法实现:

  • ForkJoinTask:定义创建ForkJoin任务。用户自定义的任务类需要继承这个类别,一般会继承两个子类:

    • RecursiveAction:用于没有返回结果的任务。
    • RecusiveTask<返回类型>:用于有返回结果的任务。
  • ForkJoinPool:调用submit(task)方法执行ForkJoinTask任务。

每次在调用任务的fork()方法时,会重新进入实现的compute()方法。join()方法则会等待子任务执行完,并得到结果。

使用ForkJoin计算1+2+3+4的案例如下:

public class CountTask extends RecursiveTask<Integer> {
    private static final int THRESHOLD=2;
    private int start;
    private int end;
    public CountTask(int start, int end) {
        this.start = start;
        this.end = end;
    }
    @Override
    protected Integer compute() {
        int sum = 0;
        boolean canFork = end - start >= THRESHOLD;
        //判断是否需要分割子任务
        if (!canFork) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        }
        else {
            int mid=(start+end)/2;
            CountTask countTask1 = new CountTask(start, mid);
            CountTask countTask2 = new CountTask(mid + 1, end);
            //执行子任务
            countTask1.fork();
            countTask2.fork();
            //等待任务执行
            Integer result1 = countTask1.join();
            Integer result2 = countTask2.join();
            sum = result1 + result2;
        }
        return sum;
    }
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        CountTask countTask = new CountTask(1, 4);
        Future<Integer> result = forkJoinPool.submit(countTask);
        System.out.println(result.get());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

另外,可以通过task.isCompletedAbnormally()判断是否抛出异常或者任务取消,getException()方法获取异常。

# fork源码

将当前任务存放在ForkJoinTask数组队列,再调用ForkJoinPool的signalWork方法唤醒或者创建一个工作线程执行任务。fork()会触发调用子任务的compute()方法。

# Join源码

注意这里要区分Task.join()方法和线程的thread.join()方法。虽然都是进入等待状态,但是前者是等待任务,后者是等待线程。

doJoin()方法查看任务状态,如果是已经完成则返回任务结果。

编辑 (opens new window)
#JUC
上次更新: 2023/12/15, 15:49:57
Lock并发锁原理
原子操作类

← Lock并发锁原理 原子操作类→

Theme by Vdoing | Copyright © 2023-2024 blageCoder
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式