上文中我们讲了Java库中自带的阻塞队列,并且讲了如何用阻塞队列来实现生产者消费者模型
【Java】用Java库中自带的阻塞队列以及用阻塞队列实现生产者-消费者模型
下面我们来讲如何用代码实现一个阻塞队列
1、实现一个阻塞队列
阻塞队列 = 普通队列 + 线程安全 + 阻塞
(1)首先实现一个普通队列
class MyBlockingQueue{
private int head = 0;
private int tail = 0;
private int size = 0;
String[] array;
public MyBlockingQueue(){
array = new String[1000];
}
//取出队首元素
public String take() throws InterruptedException {
//如果队列为空,则返回null
if (size == 0){
return null;
}
//取出队首元素
String elem = array[head];
//如果head已经到了队尾,那么下一个置0
if(head == array.length){
head = 0;
}
head++;
size--;
return elem;
}
//放入元素
public void put(String elem) throws InterruptedException {
if (size == array.length){
return;
}
array[tail] = elem;
if (tail == array.length){
tail = 0;
}
tail++;
size++;
}
}
(2)线程安全
由于put()和take()方法中对各个变量都进行了多次修改,因此我们在实现线程安全时,直接对这两段代码加锁
public String take() throws InterruptedException {
synchronized{
if (size == 0){
return null;
}
String elem = array[head];
if(head == array.length){
head = 0;
}
head++;
size--;
return elem;
}
}
public void put(String elem) throws InterruptedException {
synchronized{
if (size == array.length){
return;
}
array[tail] = elem;
if (tail == array.length){
tail = 0;
}
tail++;
size++;
}
}
并且为了防止内存可见性问题和指令重排序问题,我们给三个变量加上volatile关键字进行修饰
(什么是可见性问题和指令重排序问题?)
【Java】volatile-内存可见性问题
【Java】多线程-单例模式/volatile-指令重排序
private volatile int head = 0;
private volatile int tail = 0;
private volatile int size = 0;
(3)阻塞
最后再加上阻塞
取队首元素时,如果队列为空,那么我们直接进行阻塞;等到下一次在另一个线程放入元素时将其唤醒
放元素时,如果队列满了,我们将这个线程阻塞;等到队列可用时,我们在另一个线程唤醒
public String take() throws InterruptedException {
synchronized (this){
if (size == 0){
this.wait();
}
String elem = array[head];
if(head == array.length){
head = 0;
}
head++;
size--;
this.notify();
return elem;
}
}
public void put(String elem) throws InterruptedException {
synchronized (this){
if (size == array.length){
this.wait();
}
array[tail] = elem;
if (tail == array.length){
tail = 0;
}
tail++;
size++;
this.notify();
}
}
注意他们唤醒的对应关系
(4)while循环
这其中还存在一个问题,那就是wait()的对象只能被notify()唤醒吗?
答案是不。除了用notify()唤醒,发生InterruptedException异常也可以将对象唤醒
假设队列为空的情况下,发生了InterruptedException异常,对象被唤醒,代码继续往下执行,再想取元素便会出错。因此这种情况下我们还要继续判断队列是否为空
为了解决这个问题,我们将if判断改为while()循环判断,就可以避免上面情况发生
//取出队首元素
public String take() throws InterruptedException {
synchronized (this){
while (size == 0){
this.wait();
}
String elem = array[head];
if(head == array.length){
head = 0;
}
head++;
size--;
this.notify();
return elem;
}
}
//放入元素
public void put(String elem) throws InterruptedException {
synchronized (this){
//判断队列是否满了,如果满了则阻塞
while (size == array.length){
this.wait();
}
array[tail] = elem;
if (tail == array.length){
tail = 0;
}
tail++;
size++;
this.notify();
}
}
(5)完整代码
实现阻塞队列的完整代码如下
class MyBlockingQueue{
private volatile int head = 0;
private volatile int tail = 0;
private volatile int size = 0;
String[] array;
public MyBlockingQueue(){
array = new String[1000];
}
//取出队首元素
public String take() throws InterruptedException {
synchronized (this){
while (size == 0){
this.wait();
}
String elem = array[head];
if(head == array.length){
head = 0;
}
head++;
size--;
this.notify();
return elem;
}
}
//放入元素
public void put(String elem) throws InterruptedException {
synchronized (this){
//判断队列是否满了,如果满了则阻塞
while (size == array.length){
this.wait();
}
array[tail] = elem;
if (tail == array.length){
tail = 0;
}
tail++;
size++;
this.notify();
}
}
}
2、实现生产者-消费者模型
代码如下
class MyBlockingQueue{
private volatile int head = 0;
private volatile int tail = 0;
private volatile int size = 0;
String[] array;
public MyBlockingQueue(){
array = new String[1000];
}
//取出队首元素
public String take() throws InterruptedException {
synchronized (this){
while (size == 0){
this.wait();
}
String elem = array[head];
if(head == array.length){
head = 0;
}
head++;
size--;
this.notify();
return elem;
}
}
//放入元素
public void put(String elem) throws InterruptedException {
synchronized (this){
//判断队列是否满了,如果满了则阻塞
while (size == array.length){
this.wait();
}
array[tail] = elem;
if (tail == array.length){
tail = 0;
}
tail++;
size++;
this.notify();
}
}
}
public class demo2 {
public static void main(String[] args) {
MyBlockingQueue myBlockingQueue = new MyBlockingQueue();
//生产者
Thread thread1 = new Thread(()->{
int n = 0;
while (true){
try {
myBlockingQueue.put(n +"");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("生产元素"+n);
n++;
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
//消费者
Thread thread2 = new Thread(()->{
while (true){
try {
System.out.println("消费元素" + myBlockingQueue.take());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
thread1.start();
thread2.start();
}
}
运行结果如图