文章目录
- 前言
- 传统派
- 维新派
前言
题目:一个初始值为零的变量,多个线程对其交替操作,分别加1减1
实现步骤:
- 线程操作资源类
- 判断,干活,通知
- 防止虚假唤醒机制,即:多线程的判断需要用 while,不能使用 if(jdk 要求的,保证线程不会出错)
传统派
加锁实现,通过加锁保证线程的安全
class ShareData {// 资源类
private int number = 0;
// 锁
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
public void increment() throws InterruptedException {
lock.lock();
try {
// 1. 判断,不能使用 if
// if (number != 0) {
while (number != 0) {
// 有值,等待消费,不能生产
condition.await();
}
// 2. 干活
number++;
System.out.println(Thread.currentThread().getName() + "\t" + number);
// 3. 通知,唤醒
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 手动解锁
lock.unlock();
}
}
public void decrement() {
lock.lock();
try {
// 1. 判断
// if (number == 0) {
while (number == 0) {
// 有值,等待消费,不能生产
condition.await();
}
// 2. 干活
number--;
System.out.println(Thread.currentThread().getName() + "\t" + number);
// 3. 通知,唤醒
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class Test {
public static void main(String[] args) throws Exception {
// 传统
ShareData shareData = new ShareData();
new Thread(() -> {
for (int i = 1; i <= 5; i++) {
try {
shareData.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "Produce0").start();
new Thread(() -> {
for (int i = 1; i <= 5; i++) {
shareData.decrement();
}
}, "Consume0").start();
// 多个线程验证性质 3
// 发现不是生产一个就消费一个了
new Thread(() -> {
for (int i = 1; i <= 5; i++) {
try {
shareData.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "Produce1").start();
new Thread(() -> {
for (int i = 1; i <= 5; i++) {
shareData.decrement();
}
}, "Consume2").start();
}
}
输出结果:
维新派
使用阻塞队列和原子类实现
class MyResource {
// 默认开启
private volatile boolean FLAG = true;
// 原子类作为共享变量
private final AtomicInteger atomicInteger = new AtomicInteger();
// 阻塞队列实现锁的功能
BlockingQueue<String> blockingQueue = null;
public MyResource(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
}
public void myProd() throws InterruptedException {
String date = null;
boolean retValue;
while (FLAG) {
date = atomicInteger.incrementAndGet() + "";
retValue = blockingQueue.offer(date, 2L, TimeUnit.SECONDS);
if (retValue) {
System.out.println(Thread.currentThread().getName() + "\t插入队列" + date + "成功");
} else {
System.out.println(Thread.currentThread().getName() + "\t插入队列" + date + "失败");
}
// 一秒生产一个
TimeUnit.SECONDS.sleep(1);
}
System.out.println(Thread.currentThread().getName() + "\t停止生产");
}
public void myConsumer() throws InterruptedException {
String result = null;
while (FLAG) {
result = blockingQueue.poll(2L, TimeUnit.SECONDS);
if (result == null || result.equalsIgnoreCase("")) {
FLAG = false;
System.out.println(Thread.currentThread().getName() + "\t超过两秒没有取到,退出");
System.out.println();
System.out.println();
return;
}
System.out.println(Thread.currentThread().getName() + "\t消费队列" + result + "成功");
}
}
public void stop() {
this.FLAG = false;
}
}
public class Test {
public static void main(String[] args) throws Exception {
MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10));
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t生产线程启动");
try {
myResource.myProd();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Prod").start();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t消费线程启动");
try {
myResource.myConsumer();
System.out.println();
System.out.println();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Consumer").start();
Thread.sleep(5000);
System.out.println();
System.out.println("时间到,main 线程叫停");
myResource.stop();
}
}
输出结果: