生产者消费者
与保护性暂停中的不同,不需要产生结果和消费结果的线程一一对应。
生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
JDK 中各种阻塞队列,采用的就是这种模式
代码实现:
首先,设计消息队列类MessageQueue,需要指定容量capacity,用双向链表list作为容器。
提供take方法:检查list是否是空,空的话就wait,如果不空就打印Message,并唤醒所有线程。
提供put方法:检查list是否满了,满了的话就wait,如果不满就添加Message,并唤醒所有线程。
可以看到,以上的写法都是使用wait和notify的模板写法。
class MessageQueue{
private LinkedList<Message> list = new LinkedList<>();
private int capacity;
public MessageQueue(int capacity) {
this.capacity = capacity;
}
//取
public Message take(){
synchronized (list){
while (list.isEmpty()){
try {
log.debug("队列空了");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Message message = list.removeFirst();
log.debug("取出来了{}", message);
list.notifyAll();
return message;
}
}
//存
public void put(Message message){
synchronized (list){
while(list.size() == capacity){
try {
log.debug("队列满了");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.addLast(message);
log.debug("添加了{}", message);
list.notifyAll();
}
}
}
设计Message类,需要一个唯一标识id,还需要一个Object类型的value值。
final class Message{
private int id;
private Object value;
public Message(int id, Object value) {
this.id = id;
this.value = value;
}
public int getId() {
return id;
}
public Object getValue() {
return value;
}
@Override
public String toString() {
return "Message{" +
"id=" + id +
", value=" + value +
'}';
}
}
完整代码:
public class ProductAndConsumer {
public static void main(String[] args) {
MessageQueue q = new MessageQueue(3);
for (int i = 1; i <= 5; i++){
//lambda表达式必须传final的值,不能变。
int id = i;
new Thread(()->{
q.put(new Message(id, "v"+id));
}, "生产者").start();
}
new Thread(()->{
while (true){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Message t = q.take();
}
}, "消费者").start();
}
}
/**
* 消息队列类
*/
@Slf4j(topic = "c.test")
class MessageQueue{
private LinkedList<Message> list = new LinkedList<>();
private int capacity;
public MessageQueue(int capacity) {
this.capacity = capacity;
}
//取
public Message take(){
synchronized (list){
while (list.isEmpty()){
try {
log.debug("队列空了");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Message message = list.removeFirst();
log.debug("取出来了{}", message);
list.notifyAll();
return message;
}
}
//存
public void put(Message message){
synchronized (list){
while(list.size() == capacity){
try {
log.debug("队列满了");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.addLast(message);
log.debug("添加了{}", message);
list.notifyAll();
}
}
}
/**
* Message对象,只有get方法,并且final不可继承,也不会被子类重写方法,很安全。
*/
final class Message{
private int id;
private Object value;
public Message(int id, Object value) {
this.id = id;
this.value = value;
}
public int getId() {
return id;
}
public Object getValue() {
return value;
}
@Override
public String toString() {
return "Message{" +
"id=" + id +
", value=" + value +
'}';
}
}
某次的运行结果
21:17:06 [生产者] c.test - 添加了Message{id=5, value=v5}
21:17:06 [生产者] c.test - 添加了Message{id=2, value=v2}
21:17:06 [生产者] c.test - 添加了Message{id=3, value=v3}
21:17:06 [生产者] c.test - 队列满了
21:17:06 [生产者] c.test - 队列满了
21:17:07 [消费者] c.test - 取出来了Message{id=5, value=v5}
21:17:07 [生产者] c.test - 添加了Message{id=1, value=v1}
21:17:07 [生产者] c.test - 队列满了
21:17:08 [消费者] c.test - 取出来了Message{id=2, value=v2}
21:17:08 [生产者] c.test - 添加了Message{id=4, value=v4}
21:17:09 [消费者] c.test - 取出来了Message{id=3, value=v3}
21:17:10 [消费者] c.test - 取出来了Message{id=1, value=v1}
21:17:11 [消费者] c.test - 取出来了Message{id=4, value=v4}
21:17:12 [消费者] c.test - 队列空了