并发编程
多线程原理
创建线程的四种方法
提示
Thread 的 start()方法和 run()方法主要的区别是:
start()方法用于启动一个新线程,并在新线程中执行 run()方法中的代码;
直接调用 run() 方法不会创建新线程,而是在当前线程中执行 run() 方法中的代码
class MyThread extends Thread {
MyThread() {
super("name_01"); // 可设置线程名
}
public void run() {
...
}
}
// 运行
new MyThread().start();class MyThread implements Runnable {
@Override
public void run() {
...
}
}
// 运行
new Thread(new MyThread()).start();class MyThread implements Callable<String> {
@Override
public String call() throws Exception {
...
return "ok";
}
}
// 运行
FutureTask<String> futureTask = new FutureTask<>(new MyThread());
new Thread(futureTask).start(); // 执行线程
futureTask.get(); // 获取执行结果;主线程阻塞,直到成功获取// 前三种方式,在执行完成后都会销毁,不可复用。
//创建一个包含三个线程的线程池(仅学习,生产环境禁用Executors创建线程池)
ExecutorService pool = Executors.newFixedThreadPool(3);
// 提交异步执行 Runnable
pool.execute(() -> {});
// 提交异步执行 Callable
Future<String> future = pool.submit(() -> "666");
future.get(); // 阻塞获取执行结果Thread 类实现了 Runnable 接口,并且有一个 Runnable 类型的属性,源码如下:
实际上可以通过 Thread 的构造函数为 Runnable 属性赋值,当 Thread 中的 start 方法被调用后,会启动新线程并自动调用 run 方法,run 方法中实际调用的是 Runnable 实例的 run 方法;
package java.lang;
public class Thread implements Runnable {
...
private Runnable target; //执行目标
public void run() {
if(this.target != null) {
this.target.run(); //调用执行目标的run()方法
}
}
public Thread(Runnable target) { //包含执行目标的构造器
init(null, target, "Thread-" + nextThreadNum(), 0);
}
}继承 Thread 类或者实现 Runnable 接口不能获取异步执行的结果。Future 接口提供了取消任务、查询执行状态、获取执行结果等功能,FutureTask 是他的实现类,并且实现了 Runnable 接口,所以在 Thread 构造中可以传 FutureTask。
线程的核心原理
由于 CPU 的计算频率非常高,如 2GHz 的 单核 CPU 每秒计算 20 亿次,将 CPU 的时间从毫秒的维度进行分段,每一小段叫作一个 CPU 时间片。不同的操作系统、不同的 CPU,线程的 CPU 时间片长度都不同。一个时间片内的计算量也是非常巨大的。
Java 线程优先级
目前大部分操作系统线程调度都是使用抢占式调度模型(按优先级分配时间片),Java 的线程管理和调度是委托给操作系统完成的,因此 Java 的线程都有优先级。
在 Thread 类中 priority(默认值为 5,范围 1~10)属性用于定义线程优先级,优先级越高,线程获得 CPU 时间片的机会就越多。
Java 线程生命周期
在 Thread 类中 threadStatus 属性用于保存线程的状态;Java 线程的 6 种状态被定义在 Thread.State 中:
public static enum State { NEW, //新建;对应start()方法调用 RUNNABLE, //可执行:包含操作系统的就绪、运行两种状态; BLOCKED, //阻塞;不会占用CPU资源 WAITING, //无限期等待;一直持续到某个时刻被唤醒;不会被分配CPU时间片 TIMED_WAITING, //限时等待;不会被分配CPU时间片,如调用 sleep 等方法 TERMINATED; //终止 }
Java 操作线程的基本方法
Java 线程的常用操作基本上都定义在 Thread 类中。
停止线程
// 过时方法不建议使用,会直接终止线程;相当于直接关闭电源 thread.stop(); // 优雅的关闭,推荐使用 thread.interrupt();合并线程
相当于在线程 B 中插入线程 A,A 先执行完成,在继续执行 B。
Thread threadA = new Thread(() -> {}); Thread threadB = new Thread(() -> threadA.join()); threadA.start(); threadB.start();线程礼让
礼让(yield)只是让当前线程暂停一下,使得 CPU 去执行其他的线程;在 yield 时,线程放弃和重占 CPU 的时间是不确定的,线程礼让后状态为 RUNNABLE 状态而不是 BLOCKED。
Thread threadA = new Thread(() -> Thread.yield()); Thread threadB = new Thread(() -> {}); threadA.start(); threadB.start();线程守护
Java 中的线程分为两类:守护线程与用户线程。守护线程在后台提供某种通用服务,比如每启动一个 JVM 进程,都会在后台运行一系列的 GC 线程,这些 GC 线程就是守护线程。
Thread daemonThread = new Thread(() -> { while (true) { System.out.println("守护线程运行中..."); Thread.sleep(100); } }); Thread userThread = new Thread(() -> Thread.sleep(1000)); daemonThread.setDaemon(true); // 设置为守护线程 daemonThread.start(); // 守护线程不会无休止的执行,一旦用户线程结束,会被动停止 userThread.start();在上面的例子中,守护线程是一个死循环,理应永远执行下去,上面代码在 main 线程中执行,main 线程也是一个用户线程,在创建 daemonThread 和 userThread 后就提前结束了,但是 JVM 会等待所有用户进程结束后才会退出,userThread 一旦执行完成,JVM 不管守护线程 daemonThread 是否结束,会强制终止所有守护线程的执行。
线程池
Java 线程的创建非常昂贵,需要 JVM 和 OS(操作系统)配合完成大量的工作。在主要大厂的编程规范中,不允许在应用中自行显式地创建线程,线程必须通过线程池提供。
线程池能独立负责线程的创建、维护、分配和管理。
JUC
JUC 是 java.util.concurrent 工具包的简称,该工具包是从 JDK 1.5 开始加入 JDK 的,用于完成高并发、处理多线程的一个工具包。大致架构图:

Executor
Executor 的目的是执行目标任务(执行者),它只包含一个函数式方法:
void execute(Runnable command)ExecutorService
ExecutorService 继承于 Executor。可接收异步任务并转交给执行者。
//向线程池提交单个异步任务 <T> Future<T> submit(Callable<T> task); //向线程池提交批量异步任务 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks);AbstractExecutorService
AbstractExecutorService 是一个抽象类,它实现了 ExecutorService 接口。
ThreadPoolExecutor
ThreadPoolExecutor 就是大名鼎鼎的“线程池”实现类,它继承于 AbstractExecutorService 抽象类,是 JUC 线程池的核心实现类。
ThreadPoolTaskExecutor 是 Spring 框架中的类,是对 ThreadPoolExecutor 的封装,提供了更高级的功能和与 Spring 生态的集成。与 Spring 的@Async、任务调度等无缝集成。提供了更方便的监控和统计功能。
ScheduledExecutorService
是一个接口,继承于 ExecutorService,可以完成“延时”和“周期性”任务的调度线程池接口。
ScheduledThreadPoolExecutor
继承于 ThreadPoolExecutor,并实现了 ScheduledExecutorService 接口;类似于 Timer,但性能更加优越;
Executors
是一个静态工厂类,它通过静态工厂方法返回 ExecutorService、ScheduledExecutorService 等线程池示例对象;
大部分企业会禁止通过 Executors 创建线程池,因为其底层默认实现在大量线程处理中会出现问题(例如创建固定数量的线程池 newFixedThreadPool(),查看源码,底层默认使用无界阻塞队列 LinkedBlockingQueue 接收所有异步线程,若队列很大,JVM 会出现 OOM 异常)。以下仅作为学习示例:
// 创建单线程化线程池,与创建固定数量的线程池一样存在OOM问题 ExecutorService pool = Executors.newSingleThreadExecutor(); pool.execute(()->{}); // 向池中提交异步任务,接收Runnable,无返回值; pool.submit(()->{}); // 向池中提交异步任务,接收Runnable或Callable,返回Future pool.shutdown(); // 优雅的关闭线程池
线程池的任务调度流程
可通过标准构造器 ThreadPoolExecutor 去构造线程池,其中一个比较重要的构造器如下:
// 使用标准构造器构造一个普通的线程池
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数,即使线程空闲(Idle),也不会回收
int maximumPoolSize, // 线程数的上限
long keepAliveTime, TimeUnit unit, // 线程最大空闲(Idle)时长
BlockingQueue<Runnable> workQueue, // 任务的排队队列(阻塞队列)
ThreadFactory threadFactory, // 新线程的产生方式
RejectedExecutionHandler handler) // 拒绝策略线程池的任务调度流程(包含接收新任务和执行下一个任务)大致如下:
若当前工作线程数小于核心线程数,执行器总是优先创建一个新线程,而不是从线程池中获取空闲线程。
若当前工作线程数大于核心线程数,新接收的任务将被加入阻塞队列,一直到阻塞队列已满。若核心线程数量用完、阻塞队列没满,线程池不会创建新线程。
当完成一个任务的执行时,执行器总是优先从阻塞队列中获取下一个任务执行,直到阻塞队列为空,其中所有的缓存任务被取光。
若核心线程池数已经用完、阻塞队列已满的场景下,如果线程池收到新任务,会为新任务创建一个新线程(非核心线程)并立即执行。
若核心线程池数用完、阻塞队列已满、且池内创建的线程总数超过 maximumPoolSize,线程池会拒绝接收任务。
线程池数量确定
虽然使用线程池的好处很多,但是如果其线程数配置得不合理,不仅可能达不到预期效果,反而可能降低应用的性能。
一般情况下,针对不同类型的异步任务需要创建不同类型的线程池,并进行针对性的参数配置。
线程池的异步任务大致分为以下三类:
IO 密集型任务
主要是执行 IO 操作,执行 IO 操作的时间较长,导致 CPU 的利用率不高,Netty 的 IO 读写操作为此类任务的典型例子。
由于 IO 密集型任务的 CPU 使用率较低,导致线程空余时间很多,因此通常需要开 CPU 核心数两倍的线程。当 IO 线程空闲时,可以启用其他线程继续使用 CPU。
CPU 密集型任务
此类任务主要是执行计算任务,CPU 一直在运行,这种任务 CPU 的利用率很高。
并行的任务数量越多,花在任务切换的时间就越多,CPU 执行任务的效率就越低,要最高效地利用 CPU,并行执行的数量应当等于 CPU 核心数。
混合型任务
此类任务既要执行逻辑计算,又要进行 IO 操作(如 RPC 调用、数据库访问)。
对于此种情况,确定线程数业界有一个比较成熟的估算公式可以参考(其中线程等待时间为线程可能会花费在等待外部资源【如 IO 操作】上的时间,线程 CPU 时间为在 CPU 上执行代码的时间):
最佳线程数 = ((线程等待时间+线程CPU时间) / 线程CPU时间) * CPU核数
ThreadLocal 原理
在 Java 的多线程并发执行过程中,使每一个线程都得到一份独立而又不相互影响的变量,可以将变量放到 ThreadLocal 类型的对象中,它提供了线程局部变量的机制,使得每个线程都可以拥有自己的变量副本。
一个 ThreadLocal 实例可以理解为一个 Map,其中的 set 和 get 方法可以为不同的线程实例绑定变量;
在 JDK8 之前,每一个 ThreadLocal 的内部结构是一个 Map,其中 Key 为线程实例,Value 为变量值;JDK8 中,每一个 Thread 线程内部都有一个 ThreadLocalMap,其中 Key 为 ThreadLocal 实例,Value 为变量值;
新版本相对于老版本而言,变量由当前线程管理而不是 ThreadLocal,明显 Map 的体积比旧版的集中管理要小;而且线程一旦销毁,ThreadLocalMap 也会随之销毁,能减少内存消耗;
ThreadLocal 使用不当会导致严重的内存泄漏问题,尽量使用 private static final 修饰 ThreadLocal 实例,ThreadLocal 使用完成之后务必调用 remove()方法,可有效避免内存泄漏。
案例
ThreadLocal 可以实现各个线程的数据隔离,以下示例演示两个线程各自设置用户信息且不相互影响:
public class Main { public static void main(String[] args) { // 创建并启动2个线程 Thread thread1 = new Thread(() -> { InfoThreadLocal.setUserInfo("name", "zs"); System.out.println(InfoThreadLocal.getUserInfo("name")); }); Thread thread2 = new Thread(() -> { InfoThreadLocal.setUserInfo("name", "ls"); System.out.println(InfoThreadLocal.getUserInfo("name")); }); thread1.start(); // 打印zs thread2.start(); // 打印ls } } // 模拟保存每个线程的用户信息 class InfoThreadLocal { // withInitial方法会获取一个Supplier的返回值并保存到 ThreadLocal 中 private static final ThreadLocal<Map<String, String>> userInfo = ThreadLocal.withInitial(HashMap::new); // 设置当前线程的用户信息 public static void setUserInfo(String key, String value) { userInfo.get().put(key, value); } // 获取当前线程的用户信息 public static String getUserInfo(String key) { String info = userInfo.get().get(key); userInfo.remove(); // 模拟使用完,清除 return info; } }
Java 内置锁原理
Java 内置锁是一个互斥锁,这就意味着最多只有一个线程能够获得该锁。Java 中每个对象都可以用作锁,这些锁称为内置锁。线程进入同步代码块或方法时会自动获得该锁,在退出同步代码块或方法时会释放该锁。
synchronized 关键字
每个 Java 对象都隐含一把内置锁,使用 synchronized 相当于为方法或变量加锁,使得同一时刻只有一个线程可访问;
早期的 Java 版本中,synchronized 关键字实现为重量级锁,但在现代 JVM 中,它已经得到了很大程度的改进,并不总是重量级锁。在实际开发中,应该根据具体情况进行性能测试和优化,选择合适的同步机制。
private Integer num1 = 0;
private Integer num2 = 0;
// 保证同一时刻只有一个线程能访问方法
// 方法中包含多个资源,一旦线程调用后,会同时占有sum1和sum2的操作权,这样会导致在操作
// sum1而没有操作sum2时,sum2被白白占用(资源闲置等待),可使用同步块优化
public synchronized void selfPlus()
{
// ++操作是线程不安全的,包含内存取值、加一、内存赋值三个非原子性操作
num1++;
num2++;
}private Integer num1 = 0;
private Integer num2 = 0;
// 随便定义两个对象作为同步锁;注意,不同资源若使用同一把锁,会存在竞态条件,让我细细道来:
// 若下面两个同步块使用相同的锁对象,线程A进入selfPlus(),执行完num1++后释放锁;
// 此时线程B获取到锁,可能会导致不同的线程可以同时进入selfPlus(),
// 并交替执行不同的同步块,因此结果是不可预期的。
private Integer lock1 = new Integer(0);
private Integer lock2 = new Integer(0);
// 为了优化synchronized修饰方法出现的资源闲置等待问题,可使用同步代码块
public void selfPlus()
{
synchronized(this.lock1) {
num1++;
}
synchronized(this.lock2) {
num2++;
}
}内置锁
无锁
Java 对象刚创建时还没有任何线程来竞争,说明该对象处于无锁状态。
偏向锁
偏向(独占)锁是指不存在线程竞争的一个线程,会直接获取锁。
例如:线程 1 第一次访问一段同步代码,获取到了锁,且执行过程中没有其他线程尝试获取锁,此时锁对象会记录偏向于线程 1 (记录在对象头的 Mark Word 字段中),线程 1 下次访问即可直接获得锁;JVM 不会走竞争流程而是直接查对象头的偏向标志,对于单个线程来讲大大提高了获取锁的速度。
若线程 1 执行完成并释放锁后,偏向锁不会立即失效,而是等待一段时间;若此时线程 2 尝试获取锁,则锁对象又会偏向于线程 2。整个过程中,同步代码块同一时刻只有一个线程在尝试获取锁。
一旦同一时刻有多个线程尝试获取锁,则偏向锁升级为轻量级锁。
private static final Object lock = new Object(); // 锁对象 public static void main(String[] args) { Thread thread1 = new Thread(() -> doSomething()); Thread thread2 = new Thread(() -> doSomething()); thread1.start(); // 第一次获取锁,锁对象偏向于线程1 Thread.sleep(2000); // 模拟线程1已经释放锁 thread2.start(); // 线程1已释放锁,线程2获取锁,锁对象偏向于线程2 } private static void doSomething() { synchronized (lock) { ... } }轻量级锁
如果锁对象时常被多个线程竞争,偏向锁就是多余的,且其撤销的过程会带来一些性能开销;
当锁处于偏向锁,又被另一个线程企图抢占时,锁会膨胀(升级)为轻量级锁,两个线程公平竞争;企图抢占的线程会通过自旋(会消耗 cpu,一般有持续时间)的形式尝试获取锁,不会阻塞抢锁线程。
轻量级锁是一种自旋锁,自旋分为普通自旋(自旋次数固定)和自适应自旋锁(自旋次数动态改变),JDK 1.7 后,轻量级锁使用自适应自旋锁,JVM 启动时自动开启,且自旋时间由 JVM 自动控制。
轻量级锁也叫乐观锁、非阻塞同步锁。
重量级锁
当轻量级锁争用持续时间超过了自旋最大时间,这时争用线程进入阻塞状态,该锁膨胀(升级)为重量级锁。重量级锁会让其他申请的线程之间进入阻塞,性能降低。重量级锁也叫同步锁。
并发编程的三大问题
原子性
原子性是指一个操作是不可中断的整体,在并发执行的过程中不会被其它线程干扰。
原子性问题通常由于多线程同时操作共享资源而引发。比如,一个线程正在执行写操作,另一个线程同时执行读操作,可能导致读到不一致的数据。
可通过 CAS(Compare and Swap,比较并交换)解决并发编程中的原子性问题。
可见性
可见性是指一个线程对共享变量的修改能够及时地被其他线程所观察到。
可见性问题可能导致线程间数据不一致或者意外的行为。比如,一个线程将一个变量修改为某个值,但其他线程在之后的一段时间内仍然读取的是原来的值。
有序性
有序性是指程序执行的结果应该按照代码的先后顺序产生。
有序性问题可能会导致程序的执行顺序不确定,从而产生错误的结果或者意外的行为。
CAS 原理
CAS(Compare and Swap,比较并交换)是一种基于硬件原子指令的操作,用于实现多线程环境下的共享数据的安全更新。
它包含三个步骤:1.读取内存中的当前值、2.比较当前值与预期值是否相等、3.如果相等则用新值替换当前值,如果 CAS 失败就重复第 1、2 步,直到 CAS 成功,这种重复俗称 CAS 自旋。
例如,某个对象属性内存地址对应的值为 100,线程 A 欲将其值更新为 200,线程 B 更新为 300;由于线程是并发执行的,谁都有可能先执行,但是 CAS 是原子性的。假设线程 A 的 CAS(100,200)先执行,首先读取内存,由于内存地址的当前值值 100 与 CAS 期望值 100 相等,因此线程 A 执行成功,此时内存值为 200;接下来执行线程 B 的 CAS(100,300)操作,内存值 200 和期望值 100 不同,操作失败,线程 B 开始自旋,由于上一次读取内存值为 200,则此次执行 CAS(200,300)操作,读取内存比较并替换,完成更新操作;
CAS 是一种无锁算法,在 Java 中 CAS 主要是通过 sun.misc.Unsafe 类来实现的,因为它提供了直接访问底层内存的能力,但它太过底层,使用不当会导致程序崩溃;JUC 提供了一组原子类可以更安全地使用 CAS,它们在底层通常会使用 Unsafe 类提供的功能来实现原子性。
JUC 并发包中的原子类都存放在 java.util.concurrent.atomic 类路径下,具体如图: 
ABA 问题
CAS 原子操作性能高,因此其在 JUC 包中被广泛应用,如果使用得不合理,CAS 原子操作就会存在 ABA 问题。
问题描述:比如有一个共享变量 X,线程 T1 想要通过 CAS 修改为 Y,但是在修改之前线程 T2 把 X 修改为 Z 之后又修改回了 X,此时线程 A 使用 CAS 检查变量为 X,于是认为没有其他线程修改,于是将其修改为 Y;
解决办法:使用乐观锁,每次在执行数据的修改操作时带上一个版本号,执行修改操作并对版本号执行加 1 操作;JDK 提供 AtomicStampedReference 和 AtomicMarkableReference(简化版,不关心修改几次,只关心是否修改) 类来解决 ABA 问题,在 CAS 的基础上增加了一个 Stamp(印戳或标记)。
总体而言,CAS 存在 ABA 问题和自旋(一直不成功就一直循环)带来的开销问题;
高并发下提高 CAS 性能
在争用激烈的场景下,会导致大量的 CAS 空自旋,甚至有的线程会进入一个无限重复的循环中。可以使用 LongAdder 替代 AtomicInteger,以提升其性能。
以多线程累加为例,LongAdder 的牛逼之处在于将共享变量分离成一个数组,当多线程访问时,通过 Hash 算法将线程映射到数组的一个元素进行操作,最终获取 value 结果时,则将数组的元素求和。由于各个线程都在操作自己的数据,没有自旋等待操作,线程越多,和 CAS 性能差距就越明显。
LongAdder 的实现思路与 ConcurrentHashMap 中分段锁的基本原理非常相似,本质上都是不同的线程在不同的单元上进行操作,这样减少了线程竞争,提高了并发效率。
JUC 显式锁
与 Java 内置锁不同,JUC 显式锁是一种非常灵活的、使用纯 Java 语言实现的锁。
ReentrantLock 是 JUC 包提供的显式锁的一个基础实现类,ReentrantLock 类实现了 Lock 接口,是一个可重入的互斥锁(可重入独占锁)。它拥有与 synchronized 相同的并发性和内存语义,但是拥有了限时抢占、可中断抢占等一些高级锁特性。此外,ReentrantLock 基于内置的抽象队列同步器(Abstract Queued Synchronized,AQS)实现,在争用激烈的场景下,能表现出表内置锁更佳的性能。
可重入锁 ReentrantLock 累加示例
可重入锁即同一个线程可以多次获取同一个锁对象;下面示例中,testAdd 方法中连续两次获取锁都会成功,在锁未释放前,其他线程会阻塞式获取锁;
虽然有 10 个线程,但他们是依次执行的,每个线程执行 100 次累加(分两次执行),结果为 1000;
public class Test {
private int sum = 0; // 多线程共享变量
private final Lock lock = new ReentrantLock();
// 每个线程累加100次
public void testAdd() {
lock.lock(); // 阻塞式获取锁。第一次获取锁,加50次
// 由于加锁了,这 50 次累加完全由当前线程独自执行
// += 在多线程下是线程不安全的,加锁保证了线程安全性
for (int i = 0; i < 50; i++) sum += 1;
lock.lock(); // 第二次获取锁(可重入锁,同一个线程能直接获取),再加50次
for (int i = 0; i < 50; i++) sum += 1;
lock.unlock(); // 注意,必须释放锁,最好写在finally块中。
lock.unlock(); // 注意,加了两次锁,必须释放两次,不然会一直阻塞
}
public int getSum() { return sum; }
}
class Main {
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
CountDownLatch countDownLatch = new CountDownLatch(10); // 倒数闩
Test test = new Test();
for (int i = 0; i < 10; i++) {
executor.submit(() -> {
test.testAdd();
countDownLatch.countDown();
});
}
countDownLatch.await(); // 等待所有线程执行完毕,再执行主线程
System.out.println(test.getSum());
executor.shutdown();
}
}lock()方法用于阻塞抢锁,抢不到锁时线程会一直阻塞。
tryLock()方法用于尝试抢锁,该方法有返回值,如果成功就返回 true,如果失败(锁已被其他线程获取)就返回 false。
tryLock(long time,TimeUnit unit)方法和 tryLock()方法类似,只不过这个方法在抢不到锁时会阻塞一段时间。
显式锁实现线程通信(等待-通知)示例
在 Java 内置锁中,通过 Object 对象的 wait、notify 两类方法可简单实现线程间通信;基于 Lock 显示锁,JUC 通过 Condition 接口也实现了这一功能;
由于 Lock 有公平锁和非公平锁之分,而 Condition 是与 Lock 绑定的,因此就有与 Lock 一样的公平特性。
class Main1 {
public static void main(String[] args) throws InterruptedException {
final Lock lock = new ReentrantLock();
Condition condition = lock.newCondition(); // 获取一个显式锁绑定的Condition对象
new Thread(() -> {
lock.lock();
Thread.sleep(1000);
condition.signal(); // 发送通知
System.out.println("子线程发送通知!");
lock.unlock(); // 释放锁
}).start();
lock.lock();
System.out.println("我是等待方。。。");
condition.await(); // 开始等待子线程通知,并释放锁
System.out.println("收到子线程通知,开始运行!");
lock.unlock();
}
}LockSupport 线程阻塞与唤醒
LockSupport 是 JUC 提供的一个线程阻塞与唤醒的工具类,该工具类可以让线程在任意位置阻塞和唤醒,其所有的方法都是静态方法。
class Main1 {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
System.out.println("子线程即将被阻塞!");
LockSupport.park(); // 无限时阻塞,等待被唤醒后继续执行
System.out.println("子线程继续执行!");
});
thread.start();
System.out.println("主线程运行中!");
LockSupport.unpark(thread); // 唤醒继续执行
Thread.sleep(1000);
System.out.println("主线程运行结束!");
}
}LockSupport.park()与 Object.wait()的区别:
- Object.wait()方法需要在 synchronized 块中执行,而 LockSupport.park()可以在任意地方执行。
- LockSupport.park()不会抛出异常。
- 在没有被 Object.wait()阻塞之前被 Object.notify()唤醒,会抛出异常。而 LockSupport 则不会。
显式锁的分类
- 可重入锁和不可重入锁
可重入锁即同一个线程可以多次获取同一个锁对象,否则为不可重入锁;
- 悲观锁和乐观锁
悲观锁:每次操作数据的时候都认为别的线程会修改,所以每次读写数据时都会上锁,其他线程需要读写这个数据时就会阻塞,一直等到拿到锁。总体来说,悲观锁适用于写多读少的场景,遇到高并发写时性能高。
乐观锁:每次操作数据的时候都认为别的线程不会修改,所以不会上锁,它可以减少锁竞争带来的性能消耗,但是在更新的时候会判断在此期间数据有没有被其他线程修改。总体来说,乐观锁适用于读多写少的场景,因为读操作不会阻塞其他线程的写操作,遇到高并发读时性能高。
Java 中的乐观锁基本都是通过 CAS 自旋操作实现的,在争用激烈的场景下乐观锁的性能非常低,但是 JUC 的显式锁都是乐观锁,JUC 的显式锁都是基于 AQS 实现的,极大地减少了空的 CAS 自旋。
- 公平锁和非公平锁
公平锁是指不同的线程抢占锁的机会是公平的、平等的,从抢占时间上来说,先对锁进行抢占的线程一定被先满足,抢锁成功的次序体现为 FIFO(先进先出)顺序。
默认情况下,ReentrantLock 实例是非公平锁,如果在实例构造时传入了参数 true,所得到的就是公平锁。另外,ReentrantLock 的 tryLock()方法是一个特例,它是公平的。
- 可中断锁和不可中断锁
可中断锁:若某一线程 A 正占有锁在执行临界区代码,另一线程 B 正在阻塞式抢占锁,可能由于等待时间过长,线程 B 不想等待了,想先处理其他事情,我们可以让它中断自己的阻塞等待,这种就是可中断锁。
不可中断锁:一旦这个锁被其他线程占有,如果自己还想抢占,只能选择等待或者阻塞,直到别的线程释放这个锁,如果别的线程永远不释放锁,那么自己只能永远等下去。
Java 的 synchronized 内置锁就是一个不可中断锁,而 JUC 的显式锁(如 ReentrantLock)是一个可中断锁。
- 共享锁和独占锁
独占锁: 独占锁其实就是一种悲观锁,每次只有一个线程能持有的锁;某个只读线程获取锁,那么其他的读线程都只能等待,这种情况下就限制了读操作的并发性。
共享锁:允许多个线程同时获取锁,容许线程并发进入临界区。与独占锁不同,共享锁是一种乐观锁,它放宽了加锁策略,并不限制读/读竞争,允许多个执行读操作的线程同时访问共享资源。
JUC 的 ReentrantLock 类是一个标准的独占锁实现类。用 ReentrantLock 锁替代 ReentrantReadWriteLock 锁虽然可以保证线程安全,但是也会浪费一部分资源,因为多个读操作并没有线程安全问题,所以在读的地方使用读锁,在写的地方使用写锁,可以提高程序执行效率。
JUC 中的共享锁包括 Semaphore(信号量)、ReadLock(读写锁)中的读锁、CountDownLatch 倒数闩。
- 读写锁
读写锁的内部包含两把锁:一把是读(操作)锁,是一种共享锁;另一把是写(操作)锁,是一种独占锁。
JUC 包中的读写锁接口为 ReadWriteLock,主要有两个方法 readLock 和 writeLock,ReentrantReadWriteLock 则提供了读写锁的实现;多个线程可以同时获取读锁,但是写锁不能同时获取;
- 死锁
多个线程通过 AB-BA 模式抢占两个锁是造成多线程死锁比较普遍的原因。
AB-BA 模式:线程 X 按照先后次序去抢占锁 A 与锁 B,线程 Y 按照先后次序去抢占锁 B 与锁 A,两个线程互相等待从而造成死锁。
JUC 的同步工具类
JUC 的同步工具类一共有 3 个:Semaphore、CountDownLatch 和 CyclicBarrier。
Semaphore、CountDownLatch 二者都是基于共享锁实现的,共享锁就是在同一时刻允许多个线程持有的锁。当然,获得共享锁的线程只能读取临界区的数据,不能修改临界区的数据。
CyclicBarrier 是基于独占锁 ReentrantLock 独占锁实现。
CountDownLatch
CountDownLatch 可以指定一个计数值,在并发环境下由线程进行减一操作,当计数值变为 0 之后,被 await 方法阻塞的线程将会唤醒。
CountDownLatch 的方法在定义和使用上都非常简单,以下是一个简单示例:
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(10); // 倒数闩
AtomicInteger i = new AtomicInteger(0); // 记录上车人数
Runnable r = () -> {
System.out.println("第" + i.incrementAndGet() + "个人已经上车!");
latch.countDown(); // 线程减一操作
};
// 创建10个线程
List<Thread> threads = Arrays.stream(new Thread[10]).map(t -> new Thread(r)).toList();
threads.forEach(Thread::start);
latch.await(); // 等待所有线程执行完毕
System.out.println("人到齐,开车!");
}Semaphore
Semaphore 可以用来控制在同一时刻访问共享资源的线程数量。
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore = new Semaphore(2); // 创建信号量,含两个许可
AtomicInteger i = new AtomicInteger(0);
CountDownLatch latch = new CountDownLatch(10);
Runnable r = () -> {
boolean b = semaphore.tryAcquire(1); // 非阻塞尝试获取一个许可,成功获取就为true
if (b) {
System.out.println("第" + i.incrementAndGet() + "个线程处理中_" + b);
semaphore.release(); // 释放信号
}
latch.countDown();
};
// 创建10个线程
List<Thread> threads = Arrays.stream(new Thread[10]).map(t -> new Thread(r)).toList();
threads.forEach(Thread::start);
latch.await();
}提示
以上代码中,Semaphore 控制同一时刻只能有两个线程能执行逻辑,即同一时刻只允许两个线程获取信号,在信号未释放之前,其他线程无法执行业务逻辑。
CyclicBarrier
从字面上的意思看,CyclicBarrier 循环栅栏大概的意思就是一个可循环利用的屏障;CyclicBarrier 使用非常简单,它是 ReentrantLock 和 Condition 的组合使用。
和 CountDownLatch 类似,CyclicBarrier 的作用是会让所有线程都等待完成后才会继续下一步行动,只是 CyclicBarrier 可以有不止一个栅栏,因为它的栅栏(Barrier)可以重复使用(Cyclic)。
CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的场景。

// 构造方法
public CyclicBarrier(int parties) // parties 可理解为每个栅栏拦截的线程数
public CyclicBarrier(int parties, Runnable barrierAction) // Runnable 由最后一个每次到达栅栏的线程触发public static void main(String[] args) {
CyclicBarrier cyclicBarrier =
new CyclicBarrier(5, () -> System.out.println("最后一个每次到达栅栏的线程触发"));
Runnable r = () -> {
try {
cyclicBarrier.await(); // 到达第一个栅栏,线程调用 await() 表示自己已经到达栅栏
System.out.println("线程" + Thread.currentThread().getName() + "到达栅栏A");
cyclicBarrier.await(); // 到达第二个栅栏
System.out.println("线程" + Thread.currentThread().getName() + "到达栅栏B");
} catch (Exception e) { }
};
// 创建10个线程
List<Thread> threads = Arrays.stream(new Thread[10]).map(t -> new Thread(r)).toList();
threads.forEach(Thread::start);
System.out.println("执行成功!");
}AQS 抽象同步器
解决 CAS 恶性空自旋,JUC 并发包提供了一个基于双向队列的削峰抽象基础类 AbstractQueuedSynchronizer(抽象同步器类,简称为 AQS)。
CAS 操作无法获取锁时,AQS 将等待线程放入一个队列中,并阻塞线程,线程被阻塞后,不会再占用 CPU 资源,直到条件满足再由系统唤醒,使其重新参与竞争或继续执行。
公平锁和非公平锁就是基于队列节点的顺序决定的。
JUC 包内许多类都是基于 AQS 构建的,例如 ReentrantLock、CountDownLatch、ReentrantReadWriteLock、Semaphore(/ˈseməfɔː(r)/,线程同步工具)、FutureTask(异步回调工具) 等。
JUC 容器类
Java 的基础容器主要有 List、Set、Queue、Map 四个大类,基础容器类 ArrayList、LinkedList、HashMap 都是非线程安全的,Java 使用内置锁提供了一套线程安全的同步容器类如 Vector、Stack、HashTable、SynchronizedList 等,它们在需要同步访问的方法上添加关键字 synchronized,性能不高。
JUC 提供了一套高并发容器类,解决同步容器的性能问题。
JUC 高并发容器
List
主要有 CopyOnWriteArrayList,对应的基础容器为 ArrayList。
Set
主要有 CopyOnWriteArraySet(无序) 和 ConcurrentSkipListSet(有序)。
Map
主要有 ConcurrentHashMap(无序) 和 ConcurrentSkipListMap(有序)。
Queue
单向队列:ConcurrentLinkedQueue,按照 FIFO(先进先出)原则对元素进行排序。
双向队列:ConcurrentLinkedDeque,基于链表的双向队列,队列不允许 null 元素。
阻塞队列:
- ArrayBlockingQueue 基于数组实现的可阻塞的 FIFO 队列。
- LinkedBlockingQueue 基于链表实现的可阻塞的 FIFO 队列。
- PriorityBlockingQueue 按优先级排序的队列。
- DelayQueue 按照元素的 Delay 时间进行排序的队列。
- SynchronousQueue 无缓冲等待队列。
异步回调
调用 join()进行线程同步或使用 FutureTask 获取异步线程的执行结果,都需要在主线程中主动地调用被调用线程。例如下面使用 FutureTask 的一个例子,主线程需要主动获取子线程执行结果(主动调用),若子线程是一个耗时操作,主线程只能慢慢等待。
主动调用是一种阻塞式调用,它是一种单向调用,“调用方”要等待“被调用方”执行完毕才返回。
将主动调用的方向进行反转,这就是异步回调,回调是一种反向的调用模式,被调用方在执行完成后,会反向执行调用方所设置的钩子方法。
Java 中回调模式的标准实现类为 CompletableFuture,但他出现的时间比较晚,因此很多著名的中间件如 Guava、Netty 等都提供了自己的异步回调模式 API 供开发者使用。
FutureTask 主动调用
FutureTask 的 get()方法获取异步结果时,主线程会被阻塞(主动调用),异步阻塞的效率往往比较低。
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> task = new FutureTask<>(() -> "模拟一个Callable");
Thread thread = new Thread(task, "子线程");
thread.start();
// 阻塞获取异步执行结果
System.out.println("异步执行结果为:" + task.get());
}Guava 异步回调
public static void main(String[] args) {
// 创建一个ListeningExecutorService,用于执行异步任务
ListeningExecutorService executor =
MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
// 提交任务并得到ListenableFuture对象
ListenableFuture<String> future = executor.submit(() -> "模拟一个Callable");
// 添加回调函数,当任务完成时执行回调
Futures.addCallback(future, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
System.out.println("任务执行成功,结果为:" + result);
}
@Override
public void onFailure(Throwable t) {
System.out.println("任务执行失败,原因为:" + t.getMessage());
}
}, executor);
System.out.println("主线程可能已经结束了,子线程执行完成后,调用回调函数,不会影响主线程");
}CompletableFuture 异步回调
Java 8 提供了一个新的、具备异步回调能力的工具类——CompletableFuture,该类实现了 Future 和 CompletionStage 两个接口,还具备函数式编程的能力。
Future 接口提供了取消任务、查询执行状态、获取执行结果等功能,FutureTask 是他的实现类,并且实现了 Runnable 接口。
CompletionStage 代表某个同步或者异步计算的一个阶段,或者一系列异步任务中的一个子任务。
CompletableFuture 创建的异步任务会自动启动,不需要手动调用 start() 或者其他方法来启动任务。
创建任务并设置回调钩子
提示
可以使用 supplyAsync 或 runAsync 创建子任务。
可以使用 whenComplete 或 handle 处理异步任务结果或异常;exceptionally 可以单独设置异常钩子。
whenComplete 方法接收一个 BiConsumer(包含任务的【结果】或【异常信息】),不会改变任务的执行结果;
handle 方法接收一个 BiFunction,可在任务完成时可以对结果进行转换或处理;handle 方法会返回一个新的 CompletableFuture 对象,它可能是原始任务的引用,也可能是处理结果的函数返回的结果。
public static void main(String[] args) throws Exception {
// 创建任务,创建成功后会自动启动异步任务
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(
() -> "A-模拟一个Supplier,有返回值");
CompletableFuture<Void> futureB = CompletableFuture.runAsync(
() -> System.out.println("B-模拟一个Runnable,无返回值"));
// 异步回调,任务执行成功后或异常后调用
futureA.whenComplete((result, throwable) -> {
if (throwable == null) System.out.println("异步回调A-任务执行成功,结果为:" + result);
else System.out.println("执行失败:" + throwable.getMessage());
});
CompletableFuture<String> futureC = futureB.handle((result, throwable) -> {
if (throwable == null) return "异步回调B-任务执行成功,但是没有返回值,构造一个牛逼的值";
else return "执行失败:" + throwable.getMessage();
});
// 主动调用,get方法阻塞获取任务执行结果,超出指定时间会抛出异常,以防止无限等待
String a = futureA.get(2, TimeUnit.SECONDS);
// join和get方法类似,用于阻塞式获取处理结果,区别是join不会抛出异常
String c = futureC.join(); // 使用新对象futureC获取处理结果
System.out.println("主动调用A:" + a);
System.out.println("主动调用B:" + c);
}指定任务使用的线程池
在上节使用 supplyAsync 或 runAsync 创建子任务时,没有指定执行任务的线程池,默认会使用公共的 ForkJoinPool 线程池,线程数是 CPU 的核数。
对于不同类型的任务(IO 密集型任务线程池、CPU 密集型任务线程池和混合型任务线程池),强烈建议根据不同的业务类型创建不同的线程池,以避免互相干扰。
以下示例演示指定任务线程池:
// 创建任务,使用默认线程池
CompletableFuture.supplyAsync(() -> "");
// 创建任务,使用指定线程池
ExecutorService pool = Executors.newFixedThreadPool(10);
CompletableFuture.supplyAsync(() -> "", pool);异步任务的串行执行
如果两个异步任务需要串行(一个任务依赖另一个任务)执行,可以通过 CompletionStage 接口的 thenApply()、thenAccept()、thenRun()和 thenCompose()四个方法来实现。
thenApply()
thenApply()方法有三个重载版本:
- thenApply (Function<T,U> fn),后一个任务与前一个任务在同一个线程中执行
- thenApplyAsync (Function<T,U> fn),后一个任务与前一个任务不在同一个线程中执行
- thenApplyAsync (Function<T,U> fn, Executor executor),后一个任务在指定的 executor 线程池中执行
public static void main(String[] args) { // 第一个任务 CompletableFuture<Integer> futureA = CompletableFuture .supplyAsync(() -> 666); // 第二个任务,依赖第一个任务结果 CompletableFuture<Integer> futureB = futureA.thenApplyAsync(result -> { System.out.println("第一个异步任务结果为:" + result); return result * 2; }); Integer resultB = futureB.join();// 阻塞式获取异步结果 System.out.println("第二个异步任务结果为:" + resultB); }thenRun()
thenRun()方法与 thenApply()方法不同的是,不关心各个任务的处理结果。只要前一个任务执行完成,就开始执行后一个串行任务。
thenRun()也有三个重载版本,和 thenApply()类似,不再叙述。
public static void main(String[] args) { // 第一个任务 CompletableFuture<Void> futureA = CompletableFuture .runAsync(() -> System.out.println("第一个异步任务执行成功")); // 第二个任务 CompletableFuture<Void> futureB = futureA.thenRunAsync(() -> System.out.println("第一个异步任务执行成功后执行") ); Thread.sleep(1000); System.out.println("第二个异步任务:" + futureB.isDone()); }thenAccept()
后一个任务可以接收(或消费)前一个任务的处理结果,但是后一个任务没有结果输出。
public static void main(String[] args) { // 第一个任务 CompletableFuture<Integer> futureA = CompletableFuture .supplyAsync(() -> 666); // 第二个任务 CompletableFuture<Void> futureB = futureA.thenAcceptAsync(result -> System.out.println("我是第二个异步任务,没有结果输出;第一个异步任务结果为:" + result) ); futureB.join(); }thenCompose()
后一个任务可以接收(或消费)前一个任务的处理结果,但是后一个任务的返回值是一个 CompletableFuture。
public static void main(String[] args) { // 第一个任务 CompletableFuture<Integer> futureA = CompletableFuture .supplyAsync(() -> 666); // 第二个任务 CompletableFuture<Integer> futureB = futureA.thenComposeAsync(result ->{ System.out.println("第一个异步任务结果为:" + result); return CompletableFuture.supplyAsync(() -> 777); }); System.out.println("第一个异步任务结果为:" + futureB.join()); }
异步任务的合并执行
如果某个任务同时依赖另外两个异步任务的执行结果,就需要对另外两个异步任务进行合并。
对两个异步任务的合并可以通过 CompletionStage 接口的 thenCombine()、runAfterBoth()、thenAcceptBoth()三个方法来实现。
合并多个异步任务,可以调用 allOf()。
thenCombine()
thenCombine()会在两个任务都执行完成后,把两个任务的结果一起交给 thenCombine()来处理。
public static void main(String[] args) { CompletableFuture<Integer> futureA = CompletableFuture .supplyAsync(() -> 111); CompletableFuture<Integer> futureB = CompletableFuture .supplyAsync(() -> 222); // 结合前两个异步任务的处理结果,异步执行第三个异步任务 CompletableFuture<Integer> futureC = futureA .thenCombineAsync(futureB, (resultA, resultB) -> { System.out.println("第一个异步任务结果为:" + resultA); System.out.println("第二个异步任务结果为:" + resultB); return resultA + resultB; }); System.out.println("第三个异步任务结果为:" + futureC.join()); }runAfterBoth()
runAfterBoth()方法不关心每一步任务的输入参数和处理结果。
public static void main(String[] args) { CompletableFuture<Integer> futureA = CompletableFuture .supplyAsync(() -> 111); CompletableFuture<Integer> futureB = CompletableFuture .supplyAsync(() -> 222); CompletableFuture<Void> futureC = futureA .runAfterBothAsync(futureB, () -> { System.out.println("异步任务A和B执行成功后,异步任务C执行"); }); futureC.join(); }thenAcceptBoth()
第三个任务可以接收其合并过来的第一个任务、第二个任务的处理结果,但是第三个任务(合并任务)不能返回结果。
public static void main(String[] args) { CompletableFuture<Integer> futureA = CompletableFuture .supplyAsync(() -> 111); CompletableFuture<Integer> futureB = CompletableFuture .supplyAsync(() -> 222); // 结合前两个异步任务的处理结果,异步执行第三个异步任务 CompletableFuture<Void> futureC = futureA .thenAcceptBothAsync(futureB, (resultA, resultB) -> { System.out.println("第一个异步任务结果为:" + resultA); System.out.println("第二个异步任务结果为:" + resultB); System.out.println("第三个异步任务执行成功,但不返回执行结果"); }); futureC.join(); }allOf()
合并多个异步任务。
public static void main(String[] args) { CompletableFuture<Integer> futureA = CompletableFuture .supplyAsync(() -> 111); CompletableFuture<Integer> futureB = CompletableFuture .supplyAsync(() -> 222); CompletableFuture<Integer> futureC = CompletableFuture .supplyAsync(() -> 333); CompletableFuture<Void> futureAll = CompletableFuture .allOf(futureA, futureB, futureC); futureAll.thenRunAsync(()-> System.out.println("所有任务执行成功后干点牛皮事情")); }
异步任务的选择执行
按照执行速度对前两个任务进行选择,谁的结果返回速度快,谁的结果将作为第三步任务的输入。
对两个异步任务的选择可以通过 CompletionStage 接口的 applyToEither()、runAfterEither()和 acceptEither()三个方法来实现。
applyToEither()
哪个任务快就使用哪个任务的结果作为第三个任务的入参,第三个任务有输出结果。
public static void main(String[] args) { CompletableFuture<Integer> futureA = CompletableFuture .supplyAsync(() -> 111); CompletableFuture<Integer> futureB = CompletableFuture .supplyAsync(() -> 222); CompletableFuture<String> futureC = futureA .applyToEitherAsync(futureB, (result) -> { System.out.println("两个任务择优选择:" + result); return result + "--这个比较迅速!"; }); System.out.println("第三个异步任务结果为:" + futureC.join()); }runAfterEither()
哪个任务快就执行第三步,不关注执行的输入和输出结果。
public static void main(String[] args) { CompletableFuture<Integer> futureA = CompletableFuture .supplyAsync(() -> 111); CompletableFuture<Integer> futureB = CompletableFuture .supplyAsync(() -> 222); CompletableFuture<Void> futureC = futureA .runAfterEitherAsync(futureB, () -> { System.out.println("我只会被执行一次哦!" ); }); futureC.join(); }acceptEither()
哪个任务快就使用哪个任务的结果作为第三个任务的入参,但是第三个任务没有输出结果。
public static void main(String[] args) { CompletableFuture<Integer> futureA = CompletableFuture .supplyAsync(() -> 111); CompletableFuture<Integer> futureB = CompletableFuture .supplyAsync(() -> 222); CompletableFuture<Void> futureC = futureA .acceptEither(futureB, result -> { System.out.println("更迅速的任务结果为:" + result); System.out.println("但是我没有结果返回!"); }); futureC.join(); }
实战
执行耗时任务
需求:多个用户,可同时向程序提交多条待执行的 SQL 语句,但每条语句都十分耗时,为保证系统性能,要求同一时刻至多仅能有 4 条 SQL 语句同时执行。
分析需求,可将用户提交的 SQL 语句放入线程池的任务队列中,使用线程同步工具控制同一时刻线程的执行数量。线程池数量确定
/**
* 线程池配置类
*/
@Configuration
public class ThreadPoolConfig {
@Bean
public ThreadPoolTaskExecutor longTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数(根据服务器CPU核心数和任务数量设置)核心线程数 = CPU核心数
executor.setCorePoolSize(4);
// 最大线程数(根据任务类型和服务器资源设置) 最大线程数 = CPU核心数
executor.setMaxPoolSize(4);
// 任务队列容量(避免无界队列导致OOM)
executor.setQueueCapacity(200);
// 线程空闲时间(设置为0,表示线程永远不会因为空闲而被销毁)
executor.setKeepAliveSeconds(0);
// 线程名称前缀(方便日志排查)
executor.setThreadNamePrefix("LongTaskExecutor-");
// 拒绝策略(使用CallerRunsPolicy,由提交任务的线程直接执行任务)
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 初始化线程池
executor.initialize();
return executor;
}
}/**
* 非阻塞任务提交、模型运行限流并发控制
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class TaskExecutionService {
private final ThreadPoolTaskExecutor taskExecutor;
// 限制同时运行的线程数
private final Semaphore semaphore = new Semaphore(4);
/**
* 执行任务
* @param tasks 待执行的任务
* @param consumer 每个执行线程,执行过程中需要干的事情
* @param onComplete 每个执行线程,执行结束时需要干的事情
*/
public <T> void executeTaskList(List<T> tasks, Consumer<T> consumer, Runnable onComplete) {
// 获取线程池任务队列剩余空间
int remainingCapacity = taskExecutor.getThreadPoolExecutor().getQueue().remainingCapacity();
// 如果任务数量大于队列剩余空间,抛出异常
if (tasks.size() > remainingCapacity)
throw new ServiceException(500,
String.format("本次任务数量:%d,队列剩余容量: %d,提交任务失败!", tasks.size(), remainingCapacity));
// 计数器:记录当前线程提交任务总数量
AtomicInteger remainingTasks = new AtomicInteger(tasks.size());
for (T task : tasks) {
// 异步执行
taskExecutor.execute(() -> {
try {
semaphore.acquire(); // 获取许可
consumer.accept(task); // 执行一些事情
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release(); // 执行完毕后释放许可
// 任务计数减 1
if (remainingTasks.decrementAndGet() == 0) {
log.info("所有提交的任务已完成,执行 onComplete 操作...");
onComplete.run(); // 执行额外逻辑
}
}
});
}
}
}taskExecutionService.executeTaskList(sqlList,
// 执行每个 SQL 需要干的事情
sql -> {
List<JSONObject> list = dataSourceService.execute(sql); // 模拟耗时操作
log.info("当前 SQL 语句为:{}", sql);
log.info("执行结果为:{}", list);
},
// 当前任务行完成后要干的事情
() -> log.info("提交的所有语句被成功执行!")
);
