JUC工具包介绍

目录

1. 引言

2. 介绍JUC工具包

2.1. JUC工具包的概述和作用

2.2. 什么是JUC工具包?

2.2.1. JUC工具包与传统线程编程的区别和优势

3. 线程池(Executor)

3.1. 线程池的概念和优势

3.1.1. ThreadPoolExecutor类的介绍和使用示例

3.1.2. ThreadPoolExecutor

3.1.2.1. 参数配置

3.1.2.1.1. 必填参数

3.1.2.1.2. BlockingQueue

3.1.2.1.3. 选填参数

3.1.2.2. 使用示例

3.1.3. 继承体系介绍

3.1.3.1. Executor

3.1.3.2. ExecutorService

3.1.3.3. AbstractExecutorService

3.1.3.4. ScheduledExecutorService

3.2. 其他系统线程池

3.2.1. 核心参数

3.2.2. 阻塞队列

4. 并发集合类

5. JUC并发集合类介绍

5.1. ConcurrentHashMap

5.2. ConcurrentLinkedQueue

5.3. CopyOnWriteArrayList

5.4. ConcurrentSkipListSet

6. 原子类(Atomic)

6.1. 原子类的概念和作用

6.2. AtomicInteger和AtomicLong

6.3. AtomicReference的使用示例

7. 锁(Lock)

7.1. Lock接口和ReentrantLock类的概念和作用

7.2. ReentrantLock的基本用法和注意事项

7.3. Lock的高级功能和扩展示例

8. 同步器(Synchronizers)

8.1. 同步器的概念和作用

(倒计时门闩)

(循环栅栏)

(阶段器)

8.2. CountDownLatch(倒计时门闩)

8.3. CyclicBarrier(循环栅栏)

8.4. Semaphore(信号量)

8.5. Exchanger(交换器)

8.6. Phaser(阶段器)

9. 并发框架

9.1. CompletableFuture

9.1.1. 简介

9.1.2. 核心方法

9.1.3. 示例

9.2. Fork/Join框架

9.2.1. 简介

9.2.2. Fork/Join框架的核心概念

9.2.3. Fork/Join框架的核心类:

9.2.4. Fork/Join框架的使用示例

10. 结语

(Java.util.concurrent,jdk1.8)

1. 引言

在并发编程中,线程安全和性能是两个重要的考虑因素,JUC工具包提供了一系列高效、可扩展、线程安全的组件和类,可以帮助开发者更方便地编写高性能和可维护的并发代码。

本次将介绍JUC工具包中常用的组件和类,包括线程池、并发集合类、并发工具类、原子类、锁、同步器和并发框架等。

2. 介绍JUC工具包

2.1. JUC工具包的概述和作用

2.2. 什么是JUC工具包?

JUC工具包,全称为Java.util.concurrent工具包,是Java提供的用于实现多线程编程的工具集合。它是在Java 5版本中引入的,并在后续版本中得到了进一步的扩展和优化。

JUC工具包中包含了许多常用的组件和类,如线程池(Executor)、并发集合类(Concurrent Collections)、原子类(Atomic)、锁(Lock)、同步器(Synchronizers)等。这些组件和类提供了一些高效、可扩展和线程安全的机制,可以帮助开发者更好地处理并发访问、线程同步和协作等问题。

2.2.1. JUC工具包与传统线程编程的区别和优势

  1. 更高级别的抽象:JUC工具包提供了一些高级别的并发编程抽象,如锁、线程池、并发集合等,相比传统的线程编程更加方便和易用。这些抽象隐藏了底层线程操作的复杂性,使得开发人员能够更专注于业务逻辑而不是线程管理。
  2. 更好的性能和可扩展性:JUC工具包中的各种类和数据结构都经过优化,能够提供更好的性能和可扩展性。例如,使用ConcurrentHashMap代替传统的线程安全的HashMap,可以获得更高的并发性能。
  3. 更强大的线程同步机制:JUC工具包提供了更强大的线程同步机制,如Lock和Condition,相较于传统的synchronized关键字,它们更灵活、可扩展,并且提供了更多的高级特性,如公平锁、重入锁等。
  4. 更灵活的线程通信:JUC工具包提供了更灵活的线程通信机制,如Semaphore、CountDownLatch和CyclicBarrier。这些工具能够帮助线程在特定的点上进行等待和唤醒,更精确地控制并发执行的顺序和逻辑。
  5. 更高效的并发集合:JUC工具包提供了一系列高效的线程安全的并发集合类,如ConcurrentHashMap、ConcurrentLinkedQueue等。这些集合类在多线程环境下能够提供更好的性能,同时保证数据的一致性和线程安全

3. 线程池(Executor)

3.1. 线程池的概念和优势

JUC线程池(Java.util.concurrent.ThreadPoolExecutor)是Java提供的一个用于管理和调度线程的工具类。它是JUC工具包中最重要的组件之一,用于处理并发编程中线程的创建、复用和销毁,以及任务的提交和执行。

3.1.1. ThreadPoolExecutor类的介绍和使用示例

整体关系类图:

3.1.2. ThreadPoolExecutor

ThreadPoolExecutor实现的顶层接口是Executor,它只定义了一个void execute(Runnable command)方法。这样设计的出发点是,将任务提交和任务执行进行解耦,开发者只需要提供一个实现了Runnable接口的线程对象即可,线程的调度与执行交由Executor执行器负责。
 

首先,ThreadPoolExecutor内部有两个核心存储机制:workers线程集合(Thread Pool)和workQueue任务队列(阻塞队列)。线程池实现时要解决的核心问题就是用多少线程去并发执行多少任务,因此ThreadPoolExecutor首先做的是将线程和任务解耦,分开管理。

  • 线程池Thread Pool:ThreadPoolExecutor提供了一个内部类Worker,它实现了Runnable接口,被当做工作线程,同时有一个HashSet<Worker>类型的workers集合用来当做线程池Thread Pool。线程池被划分为两部分,一部分是核心线程池core pool,可以复用的线程被放在这里,它有一个属性corePoolSize,用来指定核心池的线程数上限;核心线程池以外的部分用来存放临时线程,通过maximumPoolSize指定整个线程池的线程数上限。
  • 任务队列workQueue:ThreadPoolExecutor提供了一个BlockingQueue<Runnable>类型的workQueue任务队列,用于缓存提交到线程池的任务。

然后我们看一下当一个任务被提交给线程池时,线程池是如何处理的。

  1. Step1:任务被提交后,线程池先判断当前核心池是否已满。没有满的话,就直接申请创建一个worker线程放入核心池,并立即执行提交的任务,Woker内部有个Runnable firstTask用于接收这种直接执行的任务,并在自己的run方法里优先执行firstTask;如果核心池已满,则进入下一步。
  2. Step2:核心池已满时,判断任务队列是否已满。未满则将任务加入任务队列,之前在Step1中被创建出来的核心工作线程会在执行完自己的firstTask之后尝试从任务队列获取任务来执行;任务队列已满的话,进入下一步。
  3. Step3:任务队列已满时,判断整个线程池是否已满。未满则创建一个worker线程作为非核心线程加入线程池,并直接执行提交的任务,与核心工作线程一样,firstTask执行结束后,非核心工作线程也会尝试从任务队列拉取任务执行,但不同的是,如果在限时内没有拉取到任务,该非核心工作线程会被回收;整个线程池已满的话,则进入下一步。
  4. Step4:根据线程池创建时指定的RejectedExecutionHandler拒绝处理策略,拒绝提交的任务。
     
3.1.2.1. 参数配置

首先看构造方法

public ThreadPoolExecutor(int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue)

public ThreadPoolExecutor(int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue,
                            ThreadFactory threadFactory)

public ThreadPoolExecutor(int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue,
                            RejectedExecutionHandler handler)

public ThreadPoolExecutor(int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue,
                            ThreadFactory threadFactory,
                            RejectedExecutionHandler handler)
3.1.2.1.1. 必填参数
  1. int corePoolSize:核心池最大线程数。如前所述,线程池中的线程有"正式员工"和"临时工"的区别。核心池中的工作线程是正式员工,任务队列即使空了,这些没事干的核心线程也不会被回收;非核心工作线程作为临时工就比较惨了,如果在超时前不能从任务队列拉取的任务,那就会被回收掉。
  2. int maximumPoolSize:整个线程池中的最大线程数。虽然有临时工编制,但正式员工和临时工加起来也不能超过这个上限。
  3. long keepAliveTime:非核心线程闲置时间上限。如果临时工超过这个时间都没能从任务队列拉取到任务,就会被回收。
  4. TimeUnit unit:keepAliveTime的时间单位。
  5. BlockingQueue<Runnable> workQueue:任务队列,通过传入的阻塞队列缓存等待执行的任务。
3.1.2.1.2. BlockingQueue

Java中的阻塞队列(BlockingQueue)与普通队列相比有一个重要的特点:在阻塞队列为空时,会阻塞当前线程的元素获取操作。具体来说,在一个线程从一个空的阻塞队列中获取元素时线程会被阻塞,直到阻塞队列中有了元素;当队列中有元素后,被阻塞的线程会自动被唤醒(唤醒过程不需要用户程序干预)。

java.util.concurrent.BlockingQueue 接口有以下阻塞队列的实现:FIFO 队列

  • ArrayBlockQueue:由数组结构组成的有界阻塞队列,是一个数组实现的有界阻塞队列(有界队列),队列中的元素按FIFO排序。ArrayBlockingQueue在创建时必须设置大小,接收的任务超出corePoolSize数量时,任务被缓存到该阻塞队列中,任务缓存的数量只能为创建时设置的大小,若该阻塞队列满,则会为新的任务创建线程,直到线程池中的线程总数大于maximumPoolSize启用拒绝策略。
  • LinkedBlockingQueue:由链表结构组成的无界(默认大小 Integer.MAX_VALUE)的阻塞队列,也可以设置有界队列。如果默认无界队列,当接收的任务数量超出corePoolSize数量时,则新任务可以被无限制地缓存到该阻塞队列中,直到资源耗尽。有两个快捷创建线程池的工厂方法Executors.newSingleThreadExecutor和Executors.newFixedThreadPool使用了这个队列,并且都没有设置容量(无界队列)。
  • PriorityBlockQueue:支持优先级排序的无界阻塞队列。
  • DelayedQueue:使用优先级队列实现的延迟无界阻塞队列,这是一个无界阻塞延迟队列,底层基于PriorityBlockingQueue实现,队列中每个元素都有过期时间,当从队列获取元素(元素出队)时,只有已经过期的元素才会出队,而队列头部的元素是最先过期的元素。快捷工厂方法Executors.newScheduledThreadPool所创建的线程池使用此队列。
  • SynchronousQueue:不存储元素的阻塞队列,每一个生产线程会阻塞到有一个 put 的线程放入元素为止。每次put时必须被get,否则一直阻塞。

与普通队列(LinkedList、ArrayList等)的不同点在于阻塞队列中阻塞添加和阻塞删除方法,以及线程安全:

阻塞添加 put():当阻塞队列元素已满时,添加队列元素的线程会被阻塞,直到队列元素不满时才重新唤醒线程执行

阻塞删除 take():在队列元素为空时,删除队列元素的线程将被阻塞,直到队列不为空再执行删除操作(一般会返回被删除的元素)

核心方法

方法类型

抛出异常

特殊值

阻塞

超时

插入(尾)

add(e)

offer(e)

put(e)

offer(e,time,unit)

移除(头)

remove()

poll()

take()

poll(time,unit)

检查(队首元素)

element()

peek()

不可用

不可用

  • 抛出异常组:

当阻塞队列满时:在往队列中 add 插入元素会抛出 IIIegalStateException: Queue full

当阻塞队列空时:再往队列中 remove 移除元素,会抛出 NoSuchException

  • 特殊值组:

插入方法:成功 true,失败 false

移除方法:成功返回出队列元素,队列没有就返回 null

  • 阻塞组:

当阻塞队列满时,生产者继续往队列里 put 元素,队列会一直阻塞生产线程直到队列有空间 put 数据或响应中断退出

当阻塞队列空时,消费者线程试图从队列里 take 元素,队列会一直阻塞消费者线程直到队列中有可用元素

  • 超时退出:当阻塞队列满时,队里会阻塞生产者线程一定时间,超过限时后生产者线程会退出
3.1.2.1.3. 选填参数
  • RejectedExecutionHandler handler:拒绝处理策略,当线程池已满时会采取这个策略对提交的任务进行残忍的拒绝。

四种系统的拒绝策略:

  1. ThreadPoolExecutor.AbortPolicy:默认拒绝策略,丢弃新提交的任务并抛出RejectedExecutionException异常。
  2. ThreadPoolExecutor.DiscardPolicy:丢弃新提交的的任务,但是不抛出异常。
  3. ThreadPoolExecutor.DiscardOldestPolicy:丢弃任务队列头部任务(最老的),然后重新尝试执行execute,如果再次失败,重复此过程。
  4. ThreadPoolExecutor.CallerRunsPolicy:由调用方线程,即提交任务的线程处理该任务。
3.1.2.2. 使用示例
private void testThreadPoolExecutorByFuture() {
    List<Future<Integer>> futures = new ArrayList<>();
    ExecutorService executorService = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
    for (int i = 0; i < 100; i++) {
        // 一般使用submit一个一个异步提交任务而不使用invokeAll一把提交所有任务,因为invokeAll会阻塞当前线程直到所有线程都执行结束。
        Future<Integer> future = executorService.submit(new RandomIntInTen());
        futures.add(future);
    }
    executorService.shutdown();
    try {
        // 一般采用遍历Future集合一个一个get的方式等待所有线程执行结束并获得执行结果
        for (Future<Integer> future : futures) {
            Integer number = future.get();
            System.out.println(number);
        }
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}

3.1.3. 继承体系介绍

3.1.3.1. Executor

Executor 线程池顶级接口,类似一个线程池工厂。接口中只有一个execute()方法,接收Runnable类型。注意这里返回值类型是void。实现了最复杂的运行部分execute,它一方面维护自身生命周期,另一方面管理线程和任务,让池中的线程并发地执行提交进来的任务。

3.1.3.2. ExecutorService

ExecutorService继承自Executor接口,添加了关闭线程池以及等待中断等方法。同时添加了submit来提交线程任务,除了接收Runnable以外,还可以接收Callable类型,也增加了返回值。

// 不再接收新的任务,但会等待已经提交的任务执行结束
void shutdown()

// 不再接收新的任务,并尝试中断所有正在执行的任务,不再处理正在等待执行的任务,并返回正在等待执行的任务列表。
List<Runnable> shutdownNow()

// 返回该线程池是否已经被shutdown
boolean isShutdown()

// 返回该线程池在被shutdown之后是否所有任务都已经结束
// 注意,如果不先调用shutdown或shutdownNow,则isTerminated永远不会返回true
boolean isTerminated()

// 在调用shutdown或shutdownNow之后,或之前当前线程被中断过,则让当前线程限时等待直到所有任务执行结束。
boolean awaitTermination(long timeout, TimeUnit unit)

// 提交一个Callable任务,并返回该任务的Futrue对象
<T> Future<T> submit(Callable<T> task)

// 提交一个Runnable任务与其预计返回值,并返回该任务的Futrue对象
<T> Future<T> submit(Runnable task, T result)

// 提交一个Runnable任务,并返回该任务的Futrue对象
Future<?> submit(Runnable task)

// 执行入参传入的 Callable 任务集合中的所有任务,等到全部结束后返回它们的 Future 对象List
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)

// 带超时时间的 invokeAll
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)

// 执行入参传入的 Callable 任务集合中的所有任务,等到任意一个任务正常结束后返回该任务的执行结果
<T> T invokeAny(Collection<? extends Callable<T>> tasks)

// 带超时时间的 invokeAny
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
3.1.3.3. AbstractExecutorService

AbstractExecutorService是实现ExecutorService接口的抽象类。

它没有增加新的抽象方法定义,而是实现了submit等接口,并提供了一些通用方法,作为一个模板去调用不同的接口方法,将Executor和ExecutorService中定义的那些接口方法串联起来,保证下一层的类只需要关注具体的执行任务的方法execute即可。

3.1.3.4. ScheduledExecutorService

该类是为了实现带有定时器功能的线程池。ScheduledExecutorService也是一个接口。包含了定时和延迟处理的方法。


 

3.2. 其他系统线程池

3.2.1. 核心参数

与线程数量相关的参数(corePoolSize、maximumPoolSize、keepAliveTime)的设置,通常的指导原则:

  • 对于CPU密集型任务(任务几乎不会IO阻塞),线程池大小应该接近CPU核数,比如CPU核数 + 1,或者CPU核数 × 2,可以使用Runtime.getRuntime().availableProcessors()获取系统可用的处理器数目。实际开发中很少有这种单纯的CPU密集型任务。
  • 对于IO密集型任务(任务主要在IO读写),线程池大小应该比CPU核数大,问题是大多少合适。原则上,IO密集度越高,CPU密集度越低,那么线程数就应该越多。但由于线程间切换的损耗,线程数也不能无限增多。通常的理论指导值是CPU数量 * CPU利用率 * (1 + 线程等待时间/线程CPU时间),这个公式的问题是,里面的值很难统计到。所以还有一个指导公式是CPU核数 × 2 + IO设备数,比如8核CPU读写一块磁盘,那线程池的建议值就是8 × 2 + 1 = 17,这个公式给出的建议值往往偏小了,实际的场景中基本不会有单纯的IO密集任务,IO与CPU运算总是复杂地交缠在一起,甚至还有一些代码上控制的等待与阻塞,因此线程数量通常可以设置地更高,这样才能更好地压榨出CPU的性能。
  • 对于混合型任务,建议将CPU密集与IO密集拆开来,使用不同的线程池及参数设置来应对不同阶段的任务。实在拆不开就当做IO密集来处理。在实际的多线程开发的设计中,应该避免一个任务流程过长或步骤过于复杂,这会导致一个任务在CPU密集与IO密集两种特征之间来回变换,增加线程池参数调整的难度。

3.2.2. 阻塞队列
 

4. 并发集合类

5. JUC并发集合类介绍

Java.util.concurrent(JUC)工具包中提供了一系列强大的并发集合类,用于在多线程环境下安全地操作和共享数据。这些集合类提供了线程安全的数据结构,能够有效地处理并发访问的问题,提高并发程序的性能和可靠性。

5.1. ConcurrentHashMap

ConcurrentHashMap是线程安全的哈希表实现,相较于传统的HashMap,它能够提供更高的并发性能。JDK 8还引入了一些新的方法,如forEach()、search()和reduce(),使得对ConcurrentHashMap的遍历和计算更加便捷

ConcurrentHashMap的使用示例:

1ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
2map.put("key1", 1);
3map.put("key2", 2);
4int value = map.get("key1");

5.2. ConcurrentLinkedQueue

ConcurrentLinkedQueue是线程安全的无界队列,它采用了无锁算法(lock-free algorithm)实现。它具有高效的插入和移除操作,适用于多线程环境下的任务调度和消息传递。

ConcurrentLinkedQueue的使用示例:

1ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
2queue.offer("item1");
3queue.offer("item2");
4String item = queue.poll();

5.3. CopyOnWriteArrayList

CopyOnWriteArrayList是线程安全的动态数组实现,它通过实现写时复制(Copy-On-Write)机制来保证线程安全。在对集合进行修改操作时,会创建一个新的副本,保留原始集合的不变性,从而允许并发读取操作而不需要加锁。

CopyOnWriteArrayList的使用示例:

1CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
2list.add("item1");
3list.add("item2");
4String item = list.get(0);

5.4. ConcurrentSkipListSet

ConcurrentSkipListSet是线程安全的有序集合实现,基于跳表(SkipList)数据结构。它具有高效的有序插入和遍历操作,并且能够提供更好的并发性能。

ConcurrentSkipListSet的使用示例:

1ConcurrentSkipListSet<String> set = new ConcurrentSkipListSet<>();
2set.add("item1");
3set.add("item2");
4String firstItem = set.first();

JUC并发集合类为并发编程提供了强大的工具和数据结构,能够简化并发操作、提高并发性能和安全性。根据具体的需求,选择合适的并发集合类可以在多线程环境下有效地管理和共享数据。

6. 原子类(Atomic)

6.1. 原子类的概念和作用

6.2. AtomicInteger和AtomicLong

6.3. AtomicReference的使用示例

7. 锁(Lock)

7.1. Lock接口和ReentrantLock类的概念和作用

7.2. ReentrantLock的基本用法和注意事项

7.3. Lock的高级功能和扩展示例

8. 同步器(Synchronizers)

8.1. 同步器的概念和作用

在JUC(Java.util.concurrent)工具包中,Synchronizers是一组用于同步多个线程之间操作的类。这些类能够帮助线程在特定的点上进行等待和唤醒,实现线程之间的协调和同步。下面介绍几个常用的Synchronizers类:

Synchronizers类

功能描述

常用方法

CountDownLatch

(倒计时门闩)

允许一个或多个线程等待其他线程完成操作后再继续执行

- await(): 等待其他线程完成操作
- countDown(): 减少计数器的值

CyclicBarrier

(循环栅栏)

允许一组线程相互等待,直到所有线程都达到一个同步点后再继续执行

- await(): 等待所有线程达到同步点

Semaphore(信号量)

控制同时访问某个资源的线程数量,或控制并发执行的任务数量

- acquire(): 获取一个许可证
- release(): 释放一个许可证

Exchanger(交换)

提供线程间进行数据交换的机制,允许两个线程在同一个时刻交换彼此的数据

- exchange(): 进行数据交换

Phaser

(阶段器)

提供了分阶段的同步机制,可用于多个线程分阶段地协同工作。线程通过arriveAndAwaitAdvance()方法等待其他线程进入下一阶段。常用于多阶段任务的同步。

- arrive():线程到达同步点
- awaitAdvance():等待其他线程到达同步点并进入下一阶段

8.2. CountDownLatch(倒计时门闩)

CountDownLatch用于等待一个或多个线程完成一组操作,它的计数器初始值设定为等待的线程数量。线程在完成操作后,调用countDown()方法将计数器减1,等待的线程通过await()方法等待计数器为0。常用于控制主线程等待多个子线程完成任务。

使用CountDownLatch的示例:

CountDownLatch latch = new CountDownLatch(3); // 初始化计数器为3

// 子线程任务
new Thread(() -> {
    // 执行任务
    latch.countDown(); // 完成任务,计数器减1
}).start();

// 主线程等待子线程完成任务
try {
    latch.await(); // 等待计数器为0
} catch (InterruptedException e) {
    e.printStackTrace();
}

8.3. CyclicBarrier(循环栅栏)

CyclicBarrier用于多个线程等待彼此到达一个同步点,然后继续执行。它的计数器初始值设定为等待的线程数量,当一个线程到达同步点时,调用await()方法等待其他线程,直到计数器达到设定的值,所有线程同时继续执行。常用于任务分解和并行计算。

使用CyclicBarrier的示例:

CyclicBarrier barrier = new CyclicBarrier(3); // 初始化同步点为3

// 子线程任务
new Thread(() -> {
    // 执行任务
    try {
        barrier.await(); // 等待其他线程到达同步点
    } catch (InterruptedException | BrokenBarrierException e) {
        e.printStackTrace();
    }
}).start();

// 主线程等待子线程完成任务
try {
    barrier.await(); // 等待其他线程到达同步点
} catch (InterruptedException | BrokenBarrierException e) {
    e.printStackTrace();
}

8.4. Semaphore(信号量)

Semaphore类是用于控制并发访问线程数量的工具。它基于信号量的概念,可以用来限制同时访问某个资源的线程数量。

Semaphore的构造方法有两个重载版本:

使用示例:

import java.util.concurrent.Semaphore;

public class SemaphoreExample {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3); // 创建一个许可数为3的Semaphore对象

        // 创建10个线程去获取许可
        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(() -> {
                try {
                    semaphore.acquire(); // 获取一个许可
                    System.out.println(Thread.currentThread().getName() + "获取到了许可");
                    Thread.sleep(2000); // 模拟线程执行一段时间
                    semaphore.release(); // 释放一个许可
                    System.out.println(Thread.currentThread().getName() + "释放了许可");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            thread.start();
        }
    }
}

8.5. Exchanger(交换器)

Exchanger是Java并发包中的一个同步工具类,它用于在两个线程之间交换数据。Exchanger提供了一个exchange方法,可以让两个线程在同一个位置进行数据交换。

Exchanger的构造方法有两个重载版本:

  1. Exchanger():创建一个Exchanger对象。
  2. Exchanger(boolean fair):创建一个Exchanger对象,并根据fair参数决定是否使用公平性策略。

Exchanger类的主要方法是exchange()方法,它有两个重载版本:

  1. V exchange(V x):当前线程调用exchange方法时将数据x传递给另一个线程,并等待另一个线程也调用exchange方法。当两个线程都调用了exchange方法后,它们会互相交换数据,并返回对方传递的数据。如果另一个线程还没有调用exchange方法,当前线程则会一直阻塞,直到另一个线程调用了exchange方法。
  2. V exchange(V x, long timeout, TimeUnit unit):与上述方法类似,但是在指定的时间内如果另一个线程没有调用exchange方法,则当前线程会继续执行,而不会一直阻塞。

以下是一个使用Exchanger的示例代码:

import java.util.concurrent.Exchanger;

public class ExchangerExample {
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();

        Thread thread1 = new Thread(() -> {
            try {
                String data = "Hello from Thread 1";
                System.out.println("Thread 1 before exchange: " + data);
                String exchangedData = exchanger.exchange(data); // 交换数据
                System.out.println("Thread 1 after exchange: " + exchangedData);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        Thread thread2 = new Thread(() -> {
            try {
                String data = "Hello from Thread 2";
                System.out.println("Thread 2 before exchange: " + data);
                String exchangedData = exchanger.exchange(data); // 交换数据
                System.out.println("Thread 2 after exchange: " + exchangedData);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        thread1.start();
        thread2.start();
    }
}

8.6. Phaser(阶段器)

Phaser提供了分阶段的同步机制,可用于多个线程分阶段地协同工作。它将工作分为多个阶段,每个阶段的线程可以等待其他线程到达同一阶段,然后一起继续执行。Phaser的计数器初始值为参与线程的数量,通过arriveAndAwaitAdvance()方法等待其他线程进入下一阶段。

使用Phaser的示例:

Phaser phaser = new Phaser(3); // 初始化参与线程数量为3

// 子线程任务
new Thread(() -> {
    // 执行任务
    phaser.arriveAndAwaitAdvance(); // 等待其他线程到达同一阶段
}).start();

// 主线程等待子线程完成任务
phaser.arriveAndAwaitAdvance(); // 等待其他线程到达同一阶段

9. 并发框架

9.1. CompletableFuture

9.1.1. 简介

CompletableFuture是Java 8中引入的一个并发框架,用于处理异步操作和构建异步应用程序。它是对Future接口的扩展,提供了更多的功能和灵活性。

CompletableFuture类可以用于执行异步任务,并在任务完成后执行一些操作。它可以通过回调函数、组合和转换等方式来处理异步任务的结果。

9.1.2. 核心方法

方法签名

描述

thenApply(Function<? super T, ? extends U> fn)

对异步任务的结果应用一个函数,并返回一个新的CompletableFuture对象,表示应用函数后的结果。

thenAccept(Consumer<? super T> action)

对异步任务的结果应用一个消费函数,不返回任何结果。

thenRun(Runnable action)

在异步任务完成后执行一个Runnable操作。

thenCompose(Function<? super T, ? extends CompletionStage> fn)

对异步任务的结果应用一个函数,该函数会返回一个新的CompletionStage对象。可以使用该方法实现多个异步任务的串行执行。

thenCombine(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn)

将当前异步任务的结果与另一个CompletionStage对象的结果进行组合,并返回一个新的CompletableFuture对象。

thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)

对当前异步任务和另一个CompletionStage对象的结果都应用一个消费函数。

whenComplete(BiConsumer<? super T, ? super Throwable> action)

当异步任务完成时,执行一个消费函数,可以处理任务的结果或异常。

exceptionally(Function<Throwable, ? extends T> fn)

当异步任务出现异常时,执行一个函数来处理异常,并返回一个新的CompletableFuture对象。

handle(BiFunction<? super T, Throwable, ? extends U> fn)

当异步任务完成时,执行一个函数来处理任务的结果或异常,并返回一个新的CompletableFuture对象。

9.1.3. 示例

下面是一个CompletableFuture的使用示例:

import java.util.concurrent.CompletableFuture;

public class CompletableFutureExample {
    public static void main(String[] args) {
        // 创建一个CompletableFuture对象,表示一个异步任务
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");

        // 在异步任务完成后执行一些操作
        CompletableFuture<String> result = future.thenApplyAsync(s -> s + " World");

        // 使用回调函数处理异步任务的结果
        result.thenAcceptAsync(s -> System.out.println(s));
        
        // 等待异步任务完成
        result.join();
    }
}

9.2. Fork/Join框架

9.2.1. 简介

Fork/Join框架是Java并发框架中的一部分,用于并行执行任务的框架。它基于"分治"的思想,将大任务拆分成小任务进行并行处理,然后将结果合并得到最终的结果。

9.2.2. Fork/Join框架的核心概念

  • 工作窃取(Work Stealing):Fork/Join框架中的线程池采用了工作窃取算法。每个线程都有一个自己的工作队列,当一个线程完成自己的任务后,它可以从其他线程的工作队列中窃取任务来执行。这种机制可以提高线程的利用率。
  • 分治任务(Divide and Conquer):Fork/Join框架将大任务拆分成小任务进行并行处理,然后将结果合并得到最终结果。这种任务的拆分和合并过程可以递归地进行。

9.2.3. Fork/Join框架的核心类:

  • ForkJoinPool:Fork/Join框架的线程池,负责管理和调度任务的执行。
  • ForkJoinTask:Fork/Join框架中任务的抽象类,有两个主要的子类:RecursiveAction(没有返回值的任务)和RecursiveTask(有返回值的任务)。

9.2.4. Fork/Join框架的使用示例

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class ForkJoinExample extends RecursiveTask<Integer> {
    private static final int THRESHOLD = 10;
    private int[] array;
    private int start;
    private int end;

    public ForkJoinExample(int[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        if (end - start <= THRESHOLD) {
            int sum = 0;
            for (int i = start; i < end; i++) {
                sum += array[i];
            }
            return sum;
        } else {
            int mid = (start + end) / 2;
            ForkJoinExample leftTask = new ForkJoinExample(array, start, mid);
            ForkJoinExample rightTask = new ForkJoinExample(array, mid, end);

            leftTask.fork();
            rightTask.fork();

            int leftResult = leftTask.join();
            int rightResult = rightTask.join();

            return leftResult + rightResult;
        }
    }

    public static void main(String[] args) {
        int[] array = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};

        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinExample task = new ForkJoinExample(array, 0, array.length);
        int result = forkJoinPool.invoke(task);

        System.out.println("Sum: " + result);
    }
}

在上面的示例中,我们使用ForkJoinExample类来计算一个整数数组的和。如果数组的大小小于等于阈值(THRESHOLD),则直接计算和;否则,将数组拆分成两部分,分别由左右两个子任务计算,然后合并子任务的结果。

通过Fork/Join框架,我们可以方便地实现递归式的并行计算,提高计算性能。

10. 结语

JUC工具包是Java并发编程中的一个重要组成部分,提供了丰富的并发编程工具和数据结构。它可以帮助我们更好地处理线程同步和数据共享的问题,提高程序的性能和可靠性。通过学习和使用JUC工具包,我们可以简化并发编程的实现,提高代码的可读性和维护性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/146096.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Docker安装Octoprint 3D打印控制软件

Octoprint简介 Octoprint是一个运行在Linux系统上的开源套件&#xff0c;可以为普通的3D打印机添加强大的外围管理功能。 web管理界面远程操控摄像头实时监控视频录制、延时摄影在线切片图形化的温度曲线显示手机监控操作免SD卡和U盘通过插件和USB/GPIO接口实现更多功能 Oct…

第三天课程 RabbitMQ

RabbitMQ 1.初识MQ 1.1.同步和异步通讯 微服务间通讯有同步和异步两种方式&#xff1a; 同步通讯&#xff1a;就像打电话&#xff0c;需要实时响应。 异步通讯&#xff1a;就像发邮件&#xff0c;不需要马上回复。 两种方式各有优劣&#xff0c;打电话可以立即得到响应&am…

分布式锁介绍

为什么需要分布式锁 在单机部署的系统中&#xff0c;使用线程锁来解决高并发的问题&#xff0c;多线程访问共享变量的问题达到数据一致性&#xff0c;如使用synchornized、ReentrantLock等。 但是在后端集群部署的系统中&#xff0c;程序在不同的JVM虚拟机中运行&#xff0c;且…

PostGIS学习教程五:数据

教程的数据是有关纽约市的四个shapefile文件和一个包含社会人口经济数据的数据表。在前面一节我们已经将shapefile加载为PostGIS表&#xff0c;在后面我们将添加社会人口经济数据。 下面描述了每个数据集的记录数量和表属性。这些属性值和关系是我们以后分析的基础。 要在pgAdm…

fileread任意文件读取学习笔记

任意文件读取概述 一些网站的需求&#xff0c;可能会提供文件查看与下载的功能。如果对用户查看或下载的文件没有限制或者限制绕过&#xff0c;就可以查看或下载任意文件。这些文件可以是源代码文件&#xff0c;配置文件&#xff0c;敏感文件等等。 任意文件读取会造成&#x…

Python学习:同步异步阻塞与非阻塞

嗨喽&#xff0c;大家好呀~这里是爱看美女的茜茜呐 一、状态介绍 在了解其他概念之前&#xff0c;我们首先要了解进程的几个状态。 在程序运行的过程中&#xff0c;由于被操作系统的调度算法控制&#xff0c;程序会进入几个状态&#xff1a;就绪&#xff0c;运行和阻塞。 就绪…

LeetCode | 225. 用队列实现栈

LeetCode | 225. 用队列实现栈 OJ链接 此题可以用两个队列去实现一个栈&#xff0c;每次始终保持一个队列为空&#xff0c; 入栈操作相当于给非空队列进行入队操作 入数据&#xff0c;把不为空的队列入 出数据&#xff0c;把不为空的队列数据导入为空&#xff0c;直到最后一…

1334. 阈值距离内邻居最少的城市

分析题目两点“阈值距离”、“邻居最少”。 “阈值距离”相当于定了个上界&#xff0c;求节点之间的最短距离。 “邻居最少”相当于能连接的点的数量。 求节点之间的最短距离有以下几种方法&#xff1a; 在这道题当中&#xff0c;n的范围是100以内&#xff0c;所以可以考虑O(n…

超详细!!新手必看!STM32--独立看门狗IWBG

一、看门狗是什么&#xff1f; 答&#xff1a;看门狗是一个12bit的递减计数器。当计数器的值从某个值一直减到0的时候&#xff0c;系统就会产生一个复位信号&#xff0c;CPU收到复位信号&#xff0c;系统复位重新运行。在计数没减到0之前&#xff0c;重置了计数器的值的话&…

降水短临预报模型trajGRU简介

1 前言 trajGRU 是在对 convLSTM 的改进&#xff0c;且这两个模型是同一个作者。 convLSTM 在降水短临预报这块已经超越传统模型&#xff0c;但其是局部不变性的(location-invariant)&#xff0c;而自然的运动和转换(如旋转)是局部变化的(location-invariant)。作者为了能够使…

【python 生成器 面试必备】yield关键字,协程必知必会系列文章--自己控制程序调度,体验做上帝的感觉 2

这篇文章要解决的问题&#xff1a;How to Pass Value to Generators Using the “yield” Expression in Python ref:https://python.plainenglish.io/yield-python-part-ii-e93abb619a16 1.如何传值 yield 是一个表达式&#xff01;&#xff01;&#xff01;&#xff01; yi…

⑤ 【MySQL】DCL语句 —— 用户管理、权限控制

个人简介&#xff1a;Java领域新星创作者&#xff1b;阿里云技术博主、星级博主、专家博主&#xff1b;正在Java学习的路上摸爬滚打&#xff0c;记录学习的过程~ 个人主页&#xff1a;.29.的博客 学习社区&#xff1a;进去逛一逛~ MySQL用户与权限 ⑤ 【MySQL】DCL语句 —— 用…

AWTK实现汽车仪表Cluster/DashBoard嵌入式GUI开发(七):FREERTOS移植

前言: 一般的GUI工程都需要一个操作系统,可能是linux,重量级的,也可能是FreeRTOS,轻量级的。 一句话理解那就是工程就是FreeRTOS task任务的集合。 一个main函数可以看到大框架: 很显然,除了第一个是硬件配置的初始化,中间最重要的部分就是要创建任务,把AWTK的应用…

临床决策分析(DCA)演示APP:理解DCA分析

临床决策分析&#xff08;DCA&#xff09;演示APP&#xff1a;理解DCA分析 之前讨论了DCA分析的分析过程和作用&#xff0c;认为其最主要的作用是确定预测模型的决策阈值&#xff0c;从而促进预测模型与临床的结合。DCA的影响不止于此&#xff0c;在DCA分析中&#xff0c;预测…

JLMR Micro Super Resolution Algorithm国产微超分算法DEMO

一、简介 目前&#xff0c;做超分算法基本还是以AI训练为主&#xff0c;但是AI基本上都是基于既定场景的训练。而传统的算法基本上都是利用上下文的纹理预测、插值等方案&#xff0c;在图像放大过程中会出现模糊&#xff0c;或马赛克等现象。 我们基于加权概率模型&#xff0c…

Control的Invoke和BeginInvoke

近日&#xff0c;被Control的Invoke和BeginInvoke搞的头大&#xff0c;就查了些相关的资料&#xff0c;整理如下。感谢这篇文章对我的理解Invoke和BeginInvoke的真正含义 。 (一&#xff09;Control的Invoke和BeginInvoke 我们要基于以下认识&#xff1a; &#xff08;1&#x…

【ASP.NET】Hello World

文章目录 1. 几个概念2. 搭建开发环境2.1 .NET SDK2.2 IDE & Editor 3 First Project3.1 步骤3.2 模板3.3 项目结构3.4 请求的处理流程 Reference Link 1. 几个概念 .NET 是一个平台&#xff0c;包括 .NET Framework、.NET Core、ASP.NET、C#等&#xff0c;可以构建桌面、W…

手写一个starter

文章目录 starter命令规则项目演示新建工程Pom引入依赖定义属性配置定义自动配置类配置EnableAutoConfiguration业务实现项目中使用 什么是Starter&#xff1f;Starter其实就是我们经常在maven中的导入的各种模块&#xff0c;自定义Starter可以快速的满足开发的需求&#xff0c…

夸克发布自研大模型 加速下一代搜索体验创新

国产大模型阵营再添新锐选手。11月14日&#xff0c;阿里巴巴智能信息事业群发布全栈自研、千亿级参数的夸克大模型&#xff0c;将应用于通用搜索、医疗健康、教育学习、职场办公等众多场景。夸克App将借助自研大模型全面升级&#xff0c;加速迈向年轻人工作、学习、生活的AI助手…

智能运维软件,提升效率的利器

随着信息技术的飞速发展&#xff0c;企业对于IT系统的依赖程度日益加深。为保障IT系统的稳定运行&#xff0c;越来越多的企业选择智能运维管理软件&#xff0c;以全面高效的监控和管理系统和资产情况。 一、运维监控平台的重要性 无监控&#xff0c;不运维。将资产并入监控系…