Java 并发编程
- 一、线程创建
- 1.继承 Thread 类
- 2.实现 Runnable 接口
- 3.实现 Callable 接口
- 二、线程方法
- 三、线程同步
- 1.同步代码块
- 2.同步方法
- 3.ReentrantLock
- 4.乐观锁
- 四、线程池
- 1.ThreadPoolExecutor
- 2.Executors
一、线程创建
1.继承 Thread 类
通过继承 Thread
类来创建线程是最简单的方法之一。只需要创建一个继承自 Thread
的子类,并重写其 run()
方法,然后通过调用子类的 start()
方法来启动线程。
这种方法的优点是简单易用,适用于简单的线程逻辑。不过由于 Java 不支持多重继承,因此通过继承 Thread
类来创建线程会限制类的继承关系。
package atreus.ink;
import java.lang.management.ManagementFactory;
public class MyThread extends Thread {
@Override
public void run() {
long threadId = Thread.currentThread().getId();
String processId = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
System.out.println("My pid is " + processId + ", my tid is " + threadId + ".");
}
}
package atreus.ink;
import java.lang.management.ManagementFactory;
public class Main {
public static void main(String[] args) {
Thread thread = new MyThread();
thread.start();
long threadId = Thread.currentThread().getId();
String processId = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
System.out.println("My pid is " + processId + ", my tid is " + threadId + ".");
}
}
My pid is 66201, my tid is 13.
My pid is 66201, my tid is 1.
注意事项:主线程启动子线程需要调用 start()
方法而不是 run()
方法,调用 run()
方法会将线程对象当作普通的 Java 对象来进行方法调用,并不会向操作系统注册线程,实际还是单线程执行。
2.实现 Runnable 接口
通过实现 Runnable
接口来创建线程是更加灵活的方法。通过这种方式,一个类既可以实现其他接口,又可以创建线程。
package atreus.ink;
import java.lang.management.ManagementFactory;
public class MyRunnable implements Runnable{
@Override
public void run() {
long threadId = Thread.currentThread().getId();
String processId = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
System.out.println("My pid is " + processId + ", my tid is " + threadId + ".");
}
}
package atreus.ink;
import java.lang.management.ManagementFactory;
public class Main {
public static void main(String[] args) {
Runnable runnable = new MyRunnable();
new Thread(runnable).start();
long threadId = Thread.currentThread().getId();
String processId = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
System.out.println("My pid is " + processId + ", my tid is " + threadId + ".");
}
}
My pid is 7388, my tid is 14.
My pid is 7388, my tid is 1.
此外,对于通过实现 Runnable 接口的创建方法,还可以通过匿名内部类和 Lambda 进行代码简化。
package atreus.ink;
import java.lang.management.ManagementFactory;
public class Main {
public static void main(String[] args) {
// 1.通过匿名内部类进行代码简化
Runnable runnable = new Runnable() {
@Override
public void run() {
long threadId = Thread.currentThread().getId();
String processId = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
System.out.println("My pid is " + processId + ", my tid is " + threadId + ".");
}
};
new Thread(runnable).start();
// 2.通过Lambda表达式进行简化
new Thread(() -> {
long threadId = Thread.currentThread().getId();
String processId = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
System.out.println("My pid is " + processId + ", my tid is " + threadId + ".");
}).start();
long threadId = Thread.currentThread().getId();
String processId = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
System.out.println("My pid is " + processId + ", my tid is " + threadId + ".");
}
}
3.实现 Callable 接口
Callable
接口类似于 Runnable
接口,但它可以返回一个值,并且可以抛出异常。线程通过 ExecutorService
的 submit
方法来调用 Callable
,并返回一个 Future
对象。
package atreus.ink;
import java.lang.management.ManagementFactory;
import java.util.concurrent.Callable;
public class MyCallable implements Callable<String> {
@Override
public String call() throws Exception {
long threadId = Thread.currentThread().getId();
String processId = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
return "My pid is " + processId + ", my tid is " + threadId + ".";
}
}
package atreus.ink;
import java.lang.management.ManagementFactory;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> callable = new MyCallable();
FutureTask<String> task = new FutureTask<>(callable);
new Thread(task).start();
System.out.println(task.get()); // get方法会使主线程等待子线程执行完毕,然后获取结果
long threadId = Thread.currentThread().getId();
String processId = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
System.out.println("My pid is " + processId + ", my tid is " + threadId + ".");
}
}
My pid is 25580, my tid is 14.
My pid is 25580, my tid is 1.
当然,这种创建方法也可以通过匿名内部类和 Lambda 表达式简化。
package atreus.ink;
import java.lang.management.ManagementFactory;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> task = new FutureTask<>(() -> {
long threadId = Thread.currentThread().getId();
String processId = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
return "My pid is " + processId + ", my tid is " + threadId + ".";
});
new Thread(task).start();
System.out.println(task.get());
long threadId = Thread.currentThread().getId();
String processId = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
System.out.println("My pid is " + processId + ", my tid is " + threadId + ".");
}
}
二、线程方法
Thread 类提供了很多与线程操作相关的方法:
常用方法 | 说明 |
---|---|
public static Thread currentThread() | 获取当前执行的线程对象 |
public void run() | 线程的任务方法 |
public void start() | 启动线程 |
public String getName() | 获取当前线程的名称,线程名称默认是 Thread-索引 |
public void setName(String name) | 为线程设置名称 |
public static void sleep(long time) | 让当前执行的线程休眠 |
public final void join() | 调用这个方法的线程将等待被调用的线程执行完成,然后再继续执行,类似于 POSIX 线程库中的 pthread_join |
package atreus.ink;
public class Main {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
});
thread.start();
thread.join(); // 主线程会在此处等待子线程执行完毕
System.out.println(Thread.currentThread().getName());
}
}
三、线程同步
1.同步代码块
同步代码块是通过使用 synchronized
关键字来实现的,它可以用来保护代码块,确保在同一时刻只有一个线程能够进入同步代码块。一个典型的用法是将需要同步的代码放在同步代码块中,并指定一个锁对象作为同步的依据。
虽然任意一个唯一的对象(比如字符串)都可以作为同步代码块的锁对象,但锁的粒度过大或过小都会导致并发安全问题。对于实例方法,通常使用 this
作为锁对象,对于静态方法,通常使用类的字节码对象 类名.class
作为锁对象。
package atreus.ink;
public class Main {
private static int counter;
private static final Object lock = new Object();
public static void main(String[] args) {
Runnable incrementTask = () -> {
for (int i = 0; i < 10000; i++) {
// 同步代码块
synchronized (lock) {
counter++;
}
}
};
Thread thread1 = new Thread(incrementTask);
Thread thread2 = new Thread(incrementTask);
thread1.start();
thread2.start();
try {
thread1.join();
thread2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Final counter value: " + counter);
}
}
Final counter value: 20000
此外,在同步代码块中还可以通过 wait()
方法让当前线程进入等待状态,直到其他线程调用相同对象的 notify()
或 notifyAll()
方法来唤醒它。
package atreus.ink;
public class Main {
public static void main(String[] args) {
final Object lock = new Object();
// 等待线程
Thread waiter = new Thread(() -> {
synchronized (lock) {
System.out.println("Waiter: Waiting for a notification...");
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Waiter: Got a notification!");
}
});
// 通知线程
Thread notifier = new Thread(() -> {
synchronized (lock) {
System.out.println("Notifier: Performing some work...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Notifier: Work done, notifying the waiter...");
lock.notify();
}
});
waiter.start();
notifier.start();
}
}
Waiter: Waiting for a notification...
Notifier: Performing some work...
Notifier: Work done, notifying the waiter...
Waiter: Got a notification!
2.同步方法
同步方法是通过在方法声明中使用 synchronized 关键字来实现的,它可以将整个方法体都变成一个同步代码块。同步方法底层通过隐式锁对象实现,只是锁的范围是整个方法代码。如果方法是实例方法,同步方法默认用 this
作为的锁对象。如果方法是静态方法,同步方法默认用 类名.class
作为的锁对象。
同步方法的优点是简单,可以很方便地实现线程同步。不过锁的范围较大,可能影响性能,因为其他不需要同步的代码也会被锁住。
package atreus.ink;
public class Main {
private static int counter;
public static synchronized void increment() {
for (int i = 0; i < 10000; i++) {
counter++;
}
}
public static void main(String[] args) {
Thread thread1 = new Thread(Main::increment);
Thread thread2 = new Thread(Main::increment);
thread1.start();
thread2.start();
try {
thread1.join();
thread2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Final counter value: " + counter);
}
}
Final counter value: 20000
3.ReentrantLock
ReentrantLock
是 Java 提供的一个可重入锁,它相比于使用 synchronized
关键字具有更大的灵活性。通过 ReentrantLock
,你可以显式地获取锁和释放锁,从而精确地控制同步范围。
ReentrantLock
提供了更多的功能,比如可重入性、可定时的锁等待、公平性设置等。但需要注意,使用 ReentrantLock
需要手动释放锁,因此务必在 finally
块中释放锁,以防止死锁情况的发生。
package atreus.ink;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Main {
private static final Lock lock = new ReentrantLock();
private static int counter;
public static void main(String[] args) {
Runnable incrementTask = () -> {
for (int i = 0; i < 10000; i++) {
lock.lock();
try {
counter++;
} finally {
// 放在finally块中保证锁一定能被释放
lock.unlock();
}
}
};
Thread thread1 = new Thread(incrementTask);
Thread thread2 = new Thread(incrementTask);
thread1.start();
thread2.start();
try {
thread1.join();
thread2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Final counter value: " + counter);
}
}
Final counter value: 20000
4.乐观锁
ReentrantLock
、同步代码块和同步方法都是基于悲观锁思想实现的,意味着它们假定在执行临界区代码期间会发生并发冲突。在高并发场景下,由于激烈的锁竞争,可能会导致线程阻塞,从而降低性能。特别是在多读场景下,悲观锁可能引入大量的额外并发开销,因为每个读操作都需要获得独占锁。
相比之下,乐观锁的思想更适合多读场景。乐观锁假定数据操作不存在并发冲突,因此不会引起锁竞争,也不会导致线程阻塞和死锁。乐观锁通常在提交修改时才验证资源是否被其他线程修改。然而,在多写场景下,乐观锁可能会由于频繁的冲突而引起失败和重试,这可能会对性能产生一定的影响。不过在多写场景下乐观锁会频繁失败和重试,这同样会对性能造成一定影响。
java.util.concurrent.atomic
包下面 AtomicInteger
、AtomicLong
、AtomicIntegerArray
、AtomicReference
等原子变量类均基于乐观锁的思想实现。
package atreus.ink;
import java.util.concurrent.atomic.AtomicInteger;
public class Main {
private static final AtomicInteger counter = new AtomicInteger(0);
public static void main(String[] args) {
Runnable incrementTask = () -> {
for (int i = 0; i < 10000; i++) {
counter.incrementAndGet();
}
};
Thread thread1 = new Thread(incrementTask);
Thread thread2 = new Thread(incrementTask);
thread1.start();
thread2.start();
try {
thread1.join();
thread2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Final counter value: " + counter.get());
}
}
package atreus.ink;
import java.util.concurrent.atomic.AtomicReference;
public class Main {
private static final AtomicReference<Integer> counterRef = new AtomicReference<>(0);
public static void main(String[] args) {
Runnable incrementTask = () -> {
for (int i = 0; i < 10000; i++) {
while (true) {
Integer current = counterRef.get();
Integer updated = current + 1;
if (counterRef.compareAndSet(current, updated)) {
break;
}
}
}
};
Thread thread1 = new Thread(incrementTask);
Thread thread2 = new Thread(incrementTask);
thread1.start();
thread2.start();
try {
thread1.join();
thread2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Final counter value: " + counterRef.get());
}
}
Final counter value: 20000
四、线程池
1.ThreadPoolExecutor
Java 中的线程池接口为 ExecutorService
,一个常用的实现类为 ThreadPoolExecutor
,其构造函数为:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
参数说明:
corePoolSize
:线程池的核心线程数,即任务队列未达到队列容量时,最大可以同时运行的线程数量。即使线程是空闲的,它们也不会被销毁,除非线程池被关闭。maximumPoolSize
:线程池的最大线程数。在没有核心线程空闲的情况下,如果任务数量增加,线程池可以扩展到最大线程数。如果任务数量继续增加,超过线程池最大大小的任务将会被拒绝执行。keepAliveTime
:非核心线程的最大空闲时间。当线程池中的线程数量超过corePoolSize
,多余的非核心线程会在空闲时间超过keepAliveTime
后被销毁,以减少资源占用。unit
:时间单位,用于指定keepAliveTime
的时间单位。workQueue
:用于存储等待执行的任务的阻塞队列。当所有核心线程都忙碌时,新任务将被放入队列等待执行。常用的队列类型包括LinkedBlockingQueue
、ArrayBlockingQueue
、PriorityBlockingQueue
等。threadFactory
:用于创建线程的工厂。可以通过提供自己实现的ThreadFactory
自定义线程的创建过程。handler
:拒绝策略,用于处理无法提交给线程池执行的任务。当任务数量超过线程池最大大小且队列已满时,将使用拒绝策略处理任务。常见的策略有AbortPolicy
、CallerRunsPolicy
、DiscardPolicy
等。
注意事项:
- 新任务提交时发现核心线程都在忙,任务队列也满了,并且还可以创建临时线程,此时才会创建临时线程。
- 核心线程和临时线程都在忙,任务队列也满了,新的任务过来的时候才会开始拒绝任务。
常用方法:
方法名称 | 说明 |
---|---|
void execute(Runnable command) | 执行 Runnable 任务 |
Future<T> submit(Callable<T> task) | 执行 callable 任务,返回未来任务对象,用于获取线程返回的结果 |
void shutdown() | 等全部任务执行完毕后,再关闭线程池 |
List<Runnable> shutdownNow() | 立刻关闭线程池,停止正在执行的任务,并返回队列中未执行的任务 |
package atreus.ink;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
public class MyRunnable implements Runnable {
@Override
public void run() {
System.out.printf("[%s] %s\n", Thread.currentThread().getName(),
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
package atreus.ink;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) {
ExecutorService pool = new ThreadPoolExecutor(2, 3,
8, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
Runnable target = new MyRunnable();
pool.execute(target); // 核心线程
pool.execute(target); // 核心线程
pool.execute(target); // 任务队列等待
pool.execute(target); // 任务队列等待
pool.execute(target); // 任务队列满,启动一个临时线程
pool.execute(target); // 核心线程和临时线程忙,同时任务队列已满,拒绝任务
pool.shutdown();
}
}
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task atreus.ink.MyRunnable@7a0ac6e3 rejected from java.util.concurrent.ThreadPoolExecutor@71be98f5[Running, pool size = 3, active threads = 3, queued tasks = 2, completed tasks = 0]
at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825)
at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355)
at atreus.ink.Main.main(Main.java:20)
[pool-1-thread-2] 2023-08-30 15:57:44
[pool-1-thread-1] 2023-08-30 15:57:44
[pool-1-thread-3] 2023-08-30 15:57:44
[pool-1-thread-1] 2023-08-30 15:57:47
[pool-1-thread-2] 2023-08-30 15:57:47
新任务拒绝策略:
策略 | 详解 |
---|---|
ThreadPoolExecutor.AbortPolicy | 丢弃任务并抛出 RejectedExecutionException 异常,是默认的策略 |
ThreadPoolExecutor.DiscardPolicy | 丢弃任务,但是不抛出异常,这是不推荐的做法 |
ThreadPoolExecutor.DiscardOldestPolicy | 抛弃队列中等待最久的任务,然后把当前任务加入队列中 |
ThreadPoolExecutor.CallerRunsPolicy | 由主线程负责调用任务的 run() 方法从而绕过线程池直接执行 |
2.Executors
Executors
是一个线程池的工具类,提供了很多静态方法用于返回不同特点的线程池对象。
方法名称 | 说明 |
---|---|
public static ExecutorService newFixedThreadPool(int nThreads) | 创建固定线程数量的线程池,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程替代它 |
public static ExecutorService newSingleThreadExecutor() | 创建只有一个线程的线程池对象,如果该线程出现异常而结束,那么线程池会补充一个新线程 |
public static ExecutorService newCachedThreadPool() | 线程数量随着任务增加而增加,如果线程任务执行完毕且空闲了 60s 则会被回收掉。 |
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) | 创建一个线程池,可以实现在给定的延迟后运行任务或者定期执行任务 |
package atreus.ink;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
public class MyRunnable implements Runnable {
@Override
public void run() {
System.out.printf("[%s] %s\n", Thread.currentThread().getName(),
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
}
}
package atreus.ink;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
Runnable target = new MyRunnable();
// 延迟1秒后执行target任务
pool.schedule(target, 1, TimeUnit.SECONDS);
// 延迟2秒后,每隔3秒执行一次target任务
pool.scheduleAtFixedRate(target, 2, 3, TimeUnit.SECONDS);
Thread.sleep(10 * 1000);
pool.shutdown();
}
}
[pool-1-thread-1] 2023-08-30 16:26:33
[pool-1-thread-2] 2023-08-30 16:26:34
[pool-1-thread-2] 2023-08-30 16:26:37
[pool-1-thread-2] 2023-08-30 16:26:40
参考:
https://www.bilibili.com/video/BV1Cv411372m/?p=175
https://javaguide.cn/java/concurrent/