线程优先级
在Java线程中,通过一个整型成员变量priority
来控制优先级,优先级的范围从1~10,在线程构建的时候可以通过setPriority(int)方法来修改优先级,默认优先级是5,优先级高的线程分配时间片的数量要多于优先级低的线程。
public class Priority {
private static volatile boolean notStart = true;
private static volatile boolean notEnd = true;
public static void main(String[] args) throws Exception {
List<Job> jobs = new ArrayList<>();
for (int i = 0; i < 10; i++) {
// MIN: 1 MAX: 10
int priority = i < 5 ? Thread.MIN_PRIORITY : Thread.MAX_PRIORITY;
Job job = new Job(priority);
jobs.add(job);
Thread thread = new Thread(job, "Thread:" + i);
// 设置线程优先级(操作系统可能不会理会Java线程对优先级的设置) 不一定会被系统优先调度
thread.setPriority(priority);
thread.start();
}
notStart = false;
TimeUnit.SECONDS.sleep(10);
notEnd = false;
for (Job job : jobs) {
System.out.println("Job Priority : " + job.priority + ", Count:" + job.jobCount);
}
}
static class Job implements Runnable {
private final int priority;
private long jobCount;
public Job(int priority) {
this.priority = priority;
}
public void run() {
while (notStart) {
Thread.yield();
}
while (notEnd) {
Thread.yield();
jobCount++;
}
}
}
}
线程状态
Thread#State 六种状态
public enum State {
NEW,
RUNNABLE,
BLOCKED,
WAITING,
TIMED_WAITING,
TERMINATED;
}
线程在自身的生命周期中,并不是固定地处于某个状态,而是随着代码的执行在不同的状态之间进行切换,Java线程状态变迁:
等待有超时时间:TIMED_WAITING
等待无超时时间:WAITING
同步代码(synchronized)未获得锁:BLOCKED
注意:Java将操作系统中的运行和就绪两个状态合并称为运行状态。
阻塞状态是线程阻塞在进入synchronized关键字修饰的方法或代码块(获取锁)时的状态,但阻塞在java.concurrent包中Lock接口的线程状态却是等待状态,因为java.concurrent包中Lock接口对于阻塞的实现均使用了LockSupport类中的相关方法
。
安全地终止线程
/**
* 安全地终止线程
* volatile / interrupt volatile变量或者中断标识
*/
public class Shutdown {
public static void main(String[] args) throws Exception {
Runner one = new Runner();
Thread countThread = new Thread(one, "CountThread");
countThread.start();
// 睡眠1秒,main线程对CountThread进行中断,使CountThread能够感知中断而结束
TimeUnit.SECONDS.sleep(1);
countThread.interrupt();
Runner two = new Runner();
countThread = new Thread(two, "CountThread");
countThread.start();
// 睡眠1秒,main线程对Runner two进行取消,使CountThread能够感知on为false而结束
TimeUnit.SECONDS.sleep(1);
two.cancel();
}
private static class Runner implements Runnable {
private long i;
/**
* 可见性
*/
private volatile boolean on = true;
@Override
public void run() {
// !!!
while (on && !Thread.currentThread().isInterrupted()) {
i++;
}
System.out.println("Count i = " + i);
}
public void cancel() {
on = false;
}
}
}
通过标识位或者中断操作的方式能够使线程在终止时有机会去清理资源,而不是武断地将线程停止,因此这种终止线程的做法显得更加安全和优雅。
Thread.join()的使用
含义是:当前线程A等待thread线程终止之后才从thread.join()返回。
public class Join {
/**
* 线程顺序执行
*/
public static void main(String[] args) throws Exception {
Thread previous = Thread.currentThread();
for (int i = 0; i < 10; i++) {
// 每个线程拥有前一个线程的引用,需要等待前一个线程终止,才能从等待中返回
Thread thread = new Thread(new Domino(previous), String.valueOf(i));
thread.start();
previous = thread;
}
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + " terminate.");
}
static class Domino implements Runnable {
private Thread thread;
public Domino(Thread thread) {
this.thread = thread;
}
public void run() {
try {
// 线程结束才从这返回
thread.join();
} catch (InterruptedException e) {
}
System.out.println(Thread.currentThread().getName() + " terminate.");
}
}
}
输出:
main terminate.
0 terminate.
1 terminate.
2 terminate.
3 terminate.
4 terminate.
5 terminate.
6 terminate.
7 terminate.
8 terminate.
9 terminate.
每个线程终止的前提是前驱线程的终止,每个线程等待前驱线程终止后,才从join()方法返回,这里涉及了等待/通知机制(等待前驱线程结束,接收前驱线程结束通知)。
ThreadLocal的使用
ThreadLocal,即线程变量,是一个以ThreadLocal对象为键、任意对象为值的存储结构。
这个结构被附带在线程上,也就是说一个线程可以根据一个ThreadLocal对象查询到绑定在这个线程上的一个值。【实现线程隔离】
/**
* ThreadLocal 以ThreadLocal对象key , 泛型类型为value (线程隔离)
*/
public class ProfilerThreadLocal {
// 第一次get()方法调用时会进行初始化(如果set方法没有调用),每个线程会调用一次
private static final ThreadLocal<Long> TIME_THREADLOCAL = ThreadLocal.withInitial(System::currentTimeMillis);
public static final void begin() {
TIME_THREADLOCAL.set(System.currentTimeMillis());
}
public static final long end() {
return System.currentTimeMillis() - TIME_THREADLOCAL.get();
}
public static void main(String[] args) throws Exception {
ProfilerThreadLocal.begin();
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + " Cost: " + ProfilerThreadLocal.end() + " mills");
new Thread(() -> {
ProfilerThreadLocal.begin();
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + " Cost: " + ProfilerThreadLocal.end() + " mills");
}, "test-thread").start();
}
}
输出:
main Cost: 1007 mills
test-thread Cost: 3011 mills
等待/通知机制
一个线程修改了一个对象的值,而另一个线程感知到了变化,然后进行相应的操作,整个过程开始于一个线程,而最终执行又是另一个线程。前者是生产者,后者就是消费者,这种模式隔离了“做什么”(what)和“怎么做”(How),在功能层面上实现了解耦,体系结构上具备了良好的伸缩性。
等待/通知的相关方法
/**
* 等待/通知机制
*/
public class WaitNotify {
static boolean flag = true;
static Object lock = new Object();
public static void main(String[] args) throws Exception {
Thread waitThread = new Thread(new Wait(), "WaitThread");
waitThread.start();
TimeUnit.SECONDS.sleep(1);
Thread notifyThread = new Thread(new Notify(), "NotifyThread");
notifyThread.start();
// wait -> notify -> running -> sleep
}
static class Wait implements Runnable {
public void run() {
// 加锁,拥有lock的Monitor
synchronized (lock) {
// 当条件不满足时,继续wait,同时释放了lock的锁 【当前线程放到对象的等待队列】
while (flag) {
try {
System.out.println(Thread.currentThread() + " flag is true. wait "
+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
lock.wait();
} catch (InterruptedException e) {
}
}
// 条件满足时,完成工作
System.out.println(Thread.currentThread() + " flag is false. running "
+ new SimpleDateFormat(" HH:mm:ss").format(new Date()));
}
}
}
static class Notify implements Runnable {
public void run() {
// 加锁,拥有lock的Monitor
synchronized (lock) {
// 获取lock的锁,然后进行通知,通知时不会释放lock的锁,
// 直到当前线程释放了lock后,WaitThread才能从wait方法中返回
System.out.println(Thread.currentThread() + " hold lock. notify " +
new SimpleDateFormat("HH:mm:ss").format(new Date()));
// 【将等待队列中所有线程全部移到同步队列】
lock.notifyAll();
flag = false;
SleepUtils.second(5);
}
// 再次加锁
synchronized (lock) {
System.out.println(Thread.currentThread() + " hold lock again. sleep "
+ new SimpleDateFormat(" HH:mm:ss").format(new Date()));
SleepUtils.second(5);
}
}
}
}
调用wait()、notify()以及notifyAll()时需要注意的细节:
- 使用wait()、notify()和notifyAll()时需要
先对调用对象加锁
。 - 调用wait()方法后,线程状态由RUNNING变为WAITING,并将当前线程放置到对象的等待队列。
- 调用notify()或notifAll()的线程释放锁之后,等待线程才有机会从wait()返回。
- notify()方法将等待队列中的一个等待线程从等待队列中移到同步队列中,而notifyAll()方法则是将等待队列中所有的线程全部移到同步队列,被移动的线程状态由WAITING变为BLOCKED。
上述全流程:
等待/通知的经典范式
等待方
:
- 获取对象的锁。
- 如果条件不满足,那么调用对象的wait()方法,被通知后仍要检查条件。
- 条件满足则执行对应的逻辑。
伪代码如下:
synchronized(对象) {
while(条件不满足) {
对象.wait();
}
对应的处理逻辑
}
通知方
:
- 获得对象的锁。
- 改变条件。
- 通知所有等待在对象上的线程。
伪代码如下:
synchronized(对象) {
改变条件
对象.notifyAll();
}
等待超时模式
这样一个场景:调用一个方法时等待一段时间(一般来说是给定一个时间段),如果该方法能够在给定的时间段之内得到结果,那么将结果立刻返回,反之,超时返回默认结果。
等待超时模式的伪代码:
// 对当前对象加锁
public synchronized Object get(long mills) throws InterruptedException {
long future = System.currentTimeMillis() + mills;
long remaining = mills;
// 当超时大于0并且result返回值不满足要求
while (result == null && remaining > 0) {
wait(remaining);
remaining = future - System.currentTimeMillis();
}
return result;
}
等待超时模式就是在等待/通知范式基础上增加了超时控制,这使得该模式相比原有范式更具有灵活性,因为即使方法执行时间过长,也不会“永久”阻塞调用者,而是会按照调用者的要求“按时”返回。
举个例子:
/**
* 等待超时模式应用
*/
public class ConnectionPool {
private final LinkedList<Connection> pool = new LinkedList<>();
public ConnectionPool(int initialSize) {
if (initialSize > 0) {
for (int i = 0; i < initialSize; i++) {
pool.addLast(ConnectionDriver.createConnection());
}
}
}
public void releaseConnection(Connection connection) {
if (connection != null) {
synchronized (pool) {
// 连接释放后需要进行通知,这样其他消费者能够感知到连接池中已经归还了一个连接
pool.addLast(connection);
pool.notifyAll();
}
}
}
// 在mills内无法获取到连接,将会返回null
public Connection fetchConnection(long mills) throws InterruptedException {
synchronized (pool) {
// 完全超时
if (mills <= 0) {
while (pool.isEmpty()) {
pool.wait();
}
return pool.removeFirst();
} else {
long future = System.currentTimeMillis() + mills;
long remaining = mills;
while (pool.isEmpty() && remaining > 0) {
pool.wait(remaining);
remaining = future - System.currentTimeMillis();
}
Connection result = null;
if (!pool.isEmpty()) {
result = pool.removeFirst();
}
return result;
}
}
}
}
测试:
public class ConnectionPoolTest {
static ConnectionPool pool = new ConnectionPool(10);
// 保证所有ConnectionRunner能够同时开始
static CountDownLatch start = new CountDownLatch(1);
// main线程将会等待所有ConnectionRunner结束后才能继续执行
static CountDownLatch end;
public static void main(String[] args) throws Exception {
// 线程数量,可以修改线程数量进行观察
int threadCount = 50;
end = new CountDownLatch(threadCount);
int count = 20;
AtomicInteger got = new AtomicInteger();
AtomicInteger notGot = new AtomicInteger();
for (int i = 0; i < threadCount; i++) {
Thread thread = new Thread(new ConnetionRunner(count, got, notGot),
"ConnectionRunnerThread");
thread.start();
}
// 保证线程同时开始执行
start.countDown();
// 保证线程线程全部执行完毕,统一打印结果 countDown 数量 + 1,数量达到之后 await 之后代码才会执行
end.await();
System.out.println("total invoke: " + (threadCount * count));
System.out.println("got connection: " + got);
System.out.println("not got connection " + notGot);
}
static class ConnetionRunner implements Runnable {
int count;
AtomicInteger got;
AtomicInteger notGot;
public ConnetionRunner(int count, AtomicInteger got, AtomicInteger notGot) {
this.count = count;
this.got = got;
this.notGot = notGot;
}
public void run() {
try {
start.await();
} catch (Exception ex) {
}
while (count > 0) {
try {
// 从线程池中获取连接,如果1000ms内无法获取到,将会返回null
// 分别统计连接获取的数量got和未获取到的数量notGot
Connection connection = pool.fetchConnection(1000);
if (connection != null) {
try {
connection.createStatement();
connection.commit();
} finally {
pool.releaseConnection(connection);
got.incrementAndGet();
}
} else {
notGot.incrementAndGet();
}
} catch (Exception ex) {
} finally {
count--;
}
}
end.countDown();
}
}
}
随着客户端线程的逐步增加,客户端出现超时无法获取连接的比率不断升高。虽然客户端线程在这种超时获取的模式下会出现连接无法获取的情况,但是它能够保证客户端线程不会一直挂在连接获取的操作上,而是 “按时”返回,并告知客户端连接获取出现问题,是系统的一种自我保护机制。
模拟简单线程池
顶层接口
顶层接口,决定该做什么
/**
* 线程池顶层接口
*/
public interface ThreadPool<Job extends Runnable> {
// 执行一个Job,这个Job需要实现Runnable
void execute(Job job);
// 关闭线程池
void shutdown();
// 增加工作者线程
void addWorkers(int num);
// 减少工作者线程
void removeWorker(int num);
// 得到正在等待执行的任务数量
int getJobSize();
}
默认实现类
默认实现类,实现所有方法实现
public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {
// 线程池最大限制数
private static final int MAX_WORKER_NUMBERS = 10;
// 线程池默认的数量
private static final int DEFAULT_WORKER_NUMBERS = 5;
// 线程池最小的数量
private static final int MIN_WORKER_NUMBERS = 1;
// 这是一个工作列表,将会向里面插入工作
private final LinkedList<Job> jobs = new LinkedList<>();
// 工作者列表
private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());
// 工作者线程的数量
private int workerNum = DEFAULT_WORKER_NUMBERS;
// 线程编号生成
private final AtomicLong threadNum = new AtomicLong();
public DefaultThreadPool() {
initializeWorkers(DEFAULT_WORKER_NUMBERS);
}
public DefaultThreadPool(int num) {
workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : Math.max(num, MIN_WORKER_NUMBERS);
initializeWorkers(workerNum);
}
public void execute(Job job) {
if (job != null) {
// 添加一个工作,然后进行通知
synchronized (jobs) {
jobs.addLast(job);
jobs.notify();
}
}
}
public void shutdown() {
for (Worker worker : workers) {
worker.shutdown();
}
}
public void addWorkers(int num) {
synchronized (jobs) {
// 限制新增的Worker数量不能超过最大值
if (num + this.workerNum > MAX_WORKER_NUMBERS) {
num = MAX_WORKER_NUMBERS - this.workerNum;
}
initializeWorkers(num);
this.workerNum += num;
}
}
public void removeWorker(int num) {
synchronized (jobs) {
if (num >= this.workerNum) {
throw new IllegalArgumentException("beyond workNum");
}
// 按照给定的数量停止Worker
int count = 0;
while (count < num) {
Worker worker = workers.get(count);
if (workers.remove(worker)) {
worker.shutdown();
count++;
}
}
this.workerNum -= count;
}
}
public int getJobSize() {
return jobs.size();
}
// 初始化线程工作者
private void initializeWorkers(int num) {
for (int i = 0; i < num; i++) {
Worker worker = new Worker();
workers.add(worker);
Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.incrementAndGet());
thread.start();
}
}
// 工作者,负责消费任务
class Worker implements Runnable {
// 是否工作
private volatile boolean running = true;
public void run() {
while (running) {
Job job = null;
synchronized (jobs) {
// 如果工作者列表是空的,那么就wait
while (jobs.isEmpty()) {
try {
// 等待被唤醒
jobs.wait();
} catch (InterruptedException ex) {
// 感知到外部对WorkerThread的中断操作,返回
Thread.currentThread().interrupt();
return;
}
}
// 取出一个Job
job = jobs.removeFirst();
}
if (job != null) {
try {
job.run();
} catch (Exception ex) {
// 忽略Job执行中的Exception
}
}
}
}
public void shutdown() {
running = false;
}
}
}
使用时,只需往线程池里添加任务即可。