一、同步模式之保护性暂停
即
Guarded Suspension
,用在一个线程等待另一个线程的执行结果
产生结果的线程和使用结果的线程是一一对应的,有多少个生产结果的线程就有多少个使用结果的线程。
要点
- 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
- 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
- JDK 中,join 的实现、Future 的实现,采用的就是此模式
- 因为要等待另一方的结果,因此归类到同步模式
(一)、带延时的写法
不用将控锁的那个对象设置成全局的
package com.itcast.test;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.List;
@Slf4j(topic = "c.Test6")
public class Test6 {
public static void main(String[] args){
GuardedObjectV2 v2 = new GuardedObjectV2();
new Thread(() -> {
Thread.sleep(1);
v2.complete(null);
Thread.sleep(1);
v2.complete(Arrays.asList("a", "b", "c"));
}).start();
Object response = v2.get(2500);
if (response != null) {
log.debug("get response: [{}] lines", ((List<String>) response).size());
} else {
log.debug("can't get response");
}
}
}
@Slf4j(topic = "c.GuardedObjectV2")
class GuardedObjectV2 {
private Object response;
private final Object lock = new Object();
public Object get(long millis) {
synchronized (lock) {
// 1) 记录最初时间
long begin = System.currentTimeMillis();
// 2) 已经经历的时间
long timePassed = 0;
while (response == null) {
// 4) 假设 millis 是 1000,结果在 400 时唤醒了,那么还有 600 要等
long waitTime = millis - timePassed;
log.debug("waitTime: {}", waitTime);
if (waitTime <= 0) {
log.debug("break...");
break;
}
try {
//这里不是millis,因为出现虚假唤醒的情况下只用再等 millis - timePassed
lock.wait(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 3) 如果提前被唤醒,这时已经经历的时间假设为 400
timePassed = System.currentTimeMillis() - begin;
log.debug("timePassed: {}, object is null {}",
timePassed, response == null);
}
return response;
}
}
public void complete(Object response) {
synchronized (lock) {
// 条件满足,通知等待线程
this.response = response;
log.debug("notify...");
lock.notifyAll();
}
}
}
(二)、多任务版
理解为顾客点外卖、外卖员送外卖到外卖柜、顾客收外卖更合适。
package com.itcast.test;
import lombok.extern.slf4j.Slf4j;
import java.util.Hashtable;
import java.util.Map;
import java.util.Set;
@Slf4j(topic = "c.Test6")
public class Test6 {
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 3; i++) {
new People().start();
}
Thread.sleep(1000);
for (Integer id : Mailboxes.getIds()) {
new Postman(id, "内容" + id).start();
}
}
}
//中间解耦类,与业务不相关,这里为了方便取名为Mailboxes
class Mailboxes {
private static Map<Integer, GuardedObject> boxes = new Hashtable<>();
private static int id = 1;
// 产生唯一 id
private static synchronized int generateId() {
return id++;
}
public static GuardedObject getGuardedObject(int id) {
return boxes.remove(id);
}
public static GuardedObject createGuardedObject() {
GuardedObject go = new GuardedObject(generateId());
boxes.put(go.getId(), go);
return go;
}
public static Set<Integer> getIds() {
return boxes.keySet();
}
}
//业务相关类 People Postman
@Slf4j(topic = "c.People")
class People extends Thread{
@Override
public void run() {
// 收信
GuardedObject guardedObject = Mailboxes.createGuardedObject();
log.debug("开始收信 id:{}", guardedObject.getId());
Object mail = guardedObject.get(5000);
log.debug("收到信 id:{}, 内容:{}", guardedObject.getId(), mail);
}
}
@Slf4j(topic = "c.Postman")
class Postman extends Thread {
private int id;
private String mail;
public Postman(int id, String mail) {
this.id = id;
this.mail = mail;
}
@Override
public void run() {
GuardedObject guardedObject = Mailboxes.getGuardedObject(id);
log.debug("送信 id:{}, 内容:{}", id, mail);
guardedObject.complete(mail);
}
}
class GuardedObject{
private int id;
public GuardedObject(int id) {
this.id = id;
}
public int getId() {
return id;
}
private Object response;
public synchronized Object get(long timeout){
long begin = System.currentTimeMillis();
long passTime = 0;
while (response == null){
if (passTime > timeout ){
break;
}
try {
this.wait(passTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
passTime = System.currentTimeMillis() - begin;
}
return response;
}
public synchronized void complete(Object response){
this.response = response;
this.notifyAll();
}
}
二、异步模式之生产者/消费者
package com.itcast.test;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
@Slf4j(topic = "c.Test7")
public class Test7 {
public static void main(String[] args) {
MessageQueue messageQueue = new MessageQueue(2);
// 4 个生产者线程, 下载任务
for (int i = 0; i < 4; i++) {
int id = i;
new Thread(() -> {
try {
log.debug("download...");
List<String> response = Downloader.download();
log.debug("try put message({})", id);
messageQueue.put(new Message(id, response));
} catch (IOException e) {
e.printStackTrace();
}
}, "生产者" + i).start();
}
// 1 个消费者线程, 处理结果
new Thread(() -> {
while (true) {
Message message = messageQueue.take();
List<String> response = (List<String>) message.getMessage();
log.debug("take message({}): [{}] lines", message.getId(), response.size());
}
}, "消费者").start();
}
}
class Message {
private int id;
private Object message;
public Message(int id, Object message) {
this.id = id;
this.message = message;
}
public int getId() {
return id;
}
public Object getMessage() {
return message;
}
}
@Slf4j(topic = "c.MessageQueue")
class MessageQueue {
private LinkedList<Message> queue;
private int capacity;
public MessageQueue(int capacity) {
this.capacity = capacity;
queue = new LinkedList<>();
}
public Message take() {
synchronized (queue) {
while (queue.isEmpty()) {
log.debug("没货了, wait");
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Message message = queue.removeFirst();
queue.notifyAll();
return message;
}
}
public void put(Message message) {
synchronized (queue) {
while (queue.size() == capacity) {
log.debug("库存已达上限, wait");
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(message);
queue.notifyAll();
}
}
}
三、同步模式之顺序控制
(一)、固定运行顺序
先打印2在打印1
1、wait notify 版
package com.itcast.test;
import lombok.extern.slf4j.Slf4j;
@Slf4j(topic = "c.Test7")
public class Test7 {
// 用来同步的对象
static Object obj = new Object();
// t2 运行标记, 代表 t2 是否执行过
static boolean t2runed = false;
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
synchronized (obj) {
// 如果 t2 没有执行过
while (!t2runed) {
try {
// t1 先等一会
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.println(1);
});
Thread t2 = new Thread(() -> {
System.out.println(2);
synchronized (obj) {
// 修改运行标记
t2runed = true;
// 通知 obj 上等待的线程(可能有多个,因此需要用 notifyAll)
obj.notifyAll();
}
});
t1.start();
t2.start();
}
}
2、Park Unpark 版
package com.itcast.test;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.LockSupport;
@Slf4j(topic = "c.Test7")
public class Test7 {
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
try { Thread.sleep(1000); } catch (InterruptedException e) { }
// 当没有『许可』时,当前线程暂停运行;有『许可』时,用掉这个『许可』,当前线程恢复运行
LockSupport.park();
System.out.println("1");
});
Thread t2 = new Thread(() -> {
System.out.println("2");
// 给线程 t1 发放『许可』(多次连续调用 unpark 只会发放一个『许可』)
LockSupport.unpark(t1);
});
t1.start();
t2.start();
}
}
3、可以使用await/signal
(二)、交替输出
线程
1
输出
a 5
次,线程
2
输出
b 5
次,线程
3
输出
c 5
次。现在要求输出
abcabcabcabcabc
怎么实现
1、wait notify 版
package com.itcast.test;
import lombok.extern.slf4j.Slf4j;
@Slf4j(topic = "c.Test7")
public class Test7 {
public static void main(String[] args) {
SyncWaitNotify syncWaitNotify = new SyncWaitNotify(1, 5);
new Thread(() -> {
syncWaitNotify.print(1, 2, "a");
}).start();
new Thread(() -> {
syncWaitNotify.print(2, 3, "b");
}).start();
new Thread(() -> {
syncWaitNotify.print(3, 1, "c");
}).start();
}
}
class SyncWaitNotify {
private int flag;
private int loopNumber;
public SyncWaitNotify(int flag, int loopNumber) {
this.flag = flag;
this.loopNumber = loopNumber;
}
public void print(int waitFlag, int nextFlag, String str) {
for (int i = 0; i < loopNumber; i++) {
synchronized (this) {
while (this.flag != waitFlag) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.print(str);
flag = nextFlag;
this.notifyAll();
}
}
}
}
2、Lock 条件变量版
package com.itcast.test;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j(topic = "c.Test7")
public class Test7 {
public static void main(String[] args) {
AwaitSignal as = new AwaitSignal(5);
Condition aWaitSet = as.newCondition();
Condition bWaitSet = as.newCondition();
Condition cWaitSet = as.newCondition();
new Thread(() -> {
as.print("a", aWaitSet, bWaitSet);
}).start();
new Thread(() -> {
as.print("b", bWaitSet, cWaitSet);
}).start();
new Thread(() -> {
as.print("c", cWaitSet, aWaitSet);
}).start();
as.start(aWaitSet);
}
}
@Slf4j(topic = "c.AwaitSignal")
class AwaitSignal extends ReentrantLock {
public void start(Condition first) {
this.lock();
try {
log.debug("start");
first.signal();
} finally {
this.unlock();
}
}
public void print(String str, Condition current, Condition next) {
for (int i = 0; i < loopNumber; i++) {
this.lock();
try {
current.await();
log.debug(str);
next.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
this.unlock();
}
}
}
// 循环次数
private int loopNumber;
public AwaitSignal(int loopNumber) {
this.loopNumber = loopNumber;
}
}
3、park/unpark
package com.itcast.test;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.LockSupport;
@Slf4j(topic = "c.Test7")
public class Test7 {
public static void main(String[] args) {
SyncPark syncPark = new SyncPark(5);
Thread t1 = new Thread(() -> {
syncPark.print("a");
});
Thread t2 = new Thread(() -> {
syncPark.print("b");
});
Thread t3 = new Thread(() -> {
syncPark.print("c\n");
});
syncPark.setThreads(t1, t2, t3);
syncPark.start();
}
}
@Slf4j(topic = "c.SyncPark")
class SyncPark {
private int loopNumber;
private Thread[] threads;
public SyncPark(int loopNumber) {
this.loopNumber = loopNumber;
}
public void setThreads(Thread... threads) {
this.threads = threads;
}
public void print(String str) {
for (int i = 0; i < loopNumber; i++) {
LockSupport.park();
System.out.print(str);
LockSupport.unpark(nextThread());
}
}
private Thread nextThread() {
Thread current = Thread.currentThread();
int index = 0;
for (int i = 0; i < threads.length; i++) {
if(threads[i] == current) {
index = i;
break;
}
}
if(index < threads.length - 1) {
return threads[index+1];
} else {
return threads[0];
}
}
public void start() {
for (Thread thread : threads) {
thread.start();
}
LockSupport.unpark(threads[0]);
}
}
四、终止模式之两阶段终止模式
(一)、利用 isInterrupted
package com.itcast.test;
import lombok.extern.slf4j.Slf4j;
@Slf4j(topic = "c.Test5")
public class Test5 {
public static void main(String[] args) throws InterruptedException {
TwoPhaseTermination twoPhaseTermination = new TwoPhaseTermination();
twoPhaseTermination.start();
Thread.sleep(3500);
twoPhaseTermination.stop();
}
}
@Slf4j(topic = "c.TwoPhaseTermination")
class TwoPhaseTermination{
private Thread monitor;
public void start(){
monitor = new Thread(new Runnable() {
@Override
public void run() {
while (true){
Thread current = Thread.currentThread();
if (current.isInterrupted()){
log.debug("进程结束之前进行的工作(料理后事)");
break;
}
try {
Thread.sleep(1000); //睡眠过程中被打断,抛出异常,isInterrupted重置为false
log.debug("执行业务的相关功能"); //此时被打断,直接在下次循环中执行打断程序
} catch (InterruptedException e){
log.debug("在睡眠时被打断");
current.interrupt(); //将打断标记重新置为true,下次循环时执行结束程序
}
}
}
});
monitor.start();
}
public void stop(){
monitor.interrupt();
}
}
(二)、利用停止标记
package com.itcast.test;
import lombok.extern.slf4j.Slf4j;
@Slf4j(topic = "c.Test5")
public class Test5 {
public static void main(String[] args) throws InterruptedException {
TwoPhaseTermination twoPhaseTermination = new TwoPhaseTermination();
twoPhaseTermination.start();
Thread.sleep(3500);
twoPhaseTermination.stop();
}
}
@Slf4j(topic = "c.TwoPhaseTermination")
class TwoPhaseTermination{
private Thread monitor;
private volatile boolean stop = false;
public void start(){
monitor = new Thread(new Runnable() {
@Override
public void run() {
while (true){
Thread current = Thread.currentThread();
if (stop){
log.debug("进程结束之前进行的工作(料理后事)");
break;
}
try {
Thread.sleep(1000); //睡眠过程中被打断,抛出异常,isInterrupted重置为false
log.debug("执行业务的相关功能"); //此时被打断,直接在下次循环中执行打断程序
} catch (InterruptedException e){
}
}
}
});
monitor.start();
}
public void stop(){
stop = true;
monitor.interrupt();
}
}
五、同步模式之 Balking
Balking
(犹豫)模式用在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做了,直接结束返回
比如监控电脑状态的线程只需要开启一个时:
public class MonitorService {
// 用来表示是否已经有线程已经在执行启动了
private volatile boolean starting;
public void start() {
log.info("尝试启动监控线程...");
synchronized (this) {
if (starting) {
return;
}
starting = true;
}
// 真正启动监控线程...
开启监控线程
}
}
六、享元模式
(一)、体现
1、包装类
在
JDK
中
Boolean
,
Byte
,
Short
,
Integer
,
Long
,
Character
等包装类提供了
valueOf
方法,例如
Long
的valueOf 会缓存
-128~127
之间的
Long
对象,在这个范围之间会重用对象,大于这个范围,才会新建
Long
对象:
public static Long valueOf(long l) {
final int offset = 128;
if (l >= -128 && l <= 127) { // will cache
return LongCache.cache[(int)l + offset];
}
return new Long(l);
}
注意:
- Byte, Short, Long 缓存的范围都是 -128~127
- Character 缓存的范围是 0~127
- Integer的默认范围是 -128~127
- 最小值不能变,但最大值可以通过调整虚拟机参数 ` -Djava.lang.Integer.IntegerCache.high` 来改变
- Boolean 缓存了 TRUE 和 FALSE
2、String 串池
3、BigDecimal BigInteger
是不可变的
(二)、实现一个简单的连接池
class Pool {
// 1. 连接池大小
private final int poolSize;
// 2. 连接对象数组
private Connection[] connections;
// 3. 连接状态数组 0 表示空闲, 1 表示繁忙
private AtomicIntegerArray states;
// 4. 构造方法初始化
public Pool(int poolSize) {
this.poolSize = poolSize;
this.connections = new Connection[poolSize];
this.states = new AtomicIntegerArray(new int[poolSize]);
for (int i = 0; i < poolSize; i++) {
connections[i] = new MockConnection("连接" + (i + 1));
}
}
// 5. 借连接
public Connection borrow() {
while (true) {
for (int i = 0; i < poolSize; i++) {
// 获取空闲连接
if (states.get(i) == 0) {
if (states.compareAndSet(i, 0, 1)) {
log.debug("borrow {}", connections[i]);
return connections[i];
}
}
}
// 如果没有空闲连接,当前线程进入等待
synchronized (this) {
try {
log.debug("wait...");
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
// 6. 归还连接
public void free(Connection conn) {
for (int i = 0; i < poolSize; i++) {
if (connections[i] == conn) {
states.set(i, 0);
synchronized (this) {
log.debug("free {}", conn);
this.notifyAll();
}
break;
}
}
}
}
class MockConnection implements Connection {
// 实现略
}
使用连接池
Pool pool = new Pool(2);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
Connection conn = pool.borrow();
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
pool.free(conn);
}).start();
}
七、异步模式之工作线程
让有限的工作线程(
Worker Thread
)来轮流异步处理无限多的任务。也可以将其归类为分工模式,它的典型实现就是线程池,也体现了经典设计模式中的享元模式。
不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率。
例如,如果一个餐馆的工人既要招呼客人(任务类型
A
),又要到后厨做菜(任务类型
B
)显然效率不咋地,分成 服务员(线程池A
)与厨师(线程池
B
)更为合理,当然你能想到更细致的分工
(一)、饥饿
public class TestDeadLock {
static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");
static Random RANDOM = new Random();
static String cooking() {
return MENU.get(RANDOM.nextInt(MENU.size()));
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(() -> {
log.debug("处理点餐...");
Future<String> f = executorService.submit(() -> {
log.debug("做菜");
return cooking();
});
try {
log.debug("上菜: {}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
/*executorService.execute(() -> {
log.debug("处理点餐...");
Future<String> f = executorService.submit(() -> {
log.debug("做菜");
return cooking();
});
try {
log.debug("上菜: {}", f.get());
} catch (InterruptedException | ExecutionException e) { e.printStackTrace();
}
});*/
}
}