双端队列、优先级队列、阻塞队列
文章目录
- 双端队列、优先级队列、阻塞队列
- 1 双端队列
- 1.1 概述
- 1.2 应用实例
- 1.2.1 双端链表实现
- 1.2.2 数组实现
- 1.2.3 测试代码
- 1.3 课后作业- LeeTCode103
- 2. 优先级队列
- 2.1 概述
- 2.2 基于无序数组实现
- 2.3 基于有序数组实现
- 2.3 堆实现优先级队列
- 2.4 总结
- 2.5 练习-LeetCode23 合并K个升序链表
- 3. 阻塞队列
- 3.1 单锁实现
- 3.2 双锁实现
本节也来自于黑马数据结构与算法
1 双端队列
1.1 概述
双端队列、队列、栈对比
定义 | 特点 | |
---|---|---|
队列 | 一端删除(头)另一端添加(尾) | First In First Out |
栈 | 一端删除和添加(顶) | Last In First Out |
双端队列 | 两端都可以删除、添加 | |
优先级队列 | 优先级高者先出队 | |
延时队列 | 根据延时时间确定优先级 | |
并发非阻塞队列 | 队列空或满时不阻塞 | |
并发阻塞队列 | 队列空时删除阻塞、队列满时添加阻塞 |
注1:
- Java 中 LinkedList 即为典型双端队列实现,不过它同时实现了 Queue 接口,也提供了栈的 push pop 等方法
注2:
不同语言,操作双端队列的方法命名有所不同,参见下表
操作 Java JavaScript C++ leetCode 641 尾部插入 offerLast push push_back insertLast 头部插入 offerFirst unshift push_front insertFront 尾部移除 pollLast pop pop_back deleteLast 头部移除 pollFirst shift pop_front deleteFront 尾部获取 peekLast at(-1) back getRear 头部获取 peekFirst at(0) front getFront
1.2 应用实例
黑马代码如下
接口定义
package com.atguigu.linkedlist;
/**
* @author 小小低头哥
* @version 1.0
* 黑马程序接口定义
*/
public interface Deque<E>{
//向队列的头部添加元素
boolean offerFirst(E e);
//像队列的尾部添加元素
boolean offerLast(E e);
//移除第一个元素
E pollFirst();
//移除最后一个元素
E pollLast();
//显示第一个元素
E peekFirst();
//显示最后一个元素
E peekLast();
//是否为空
boolean isEmpty();
//是否为满
boolean isFull();
}
1.2.1 双端链表实现
class ListedListDeque<E> implements Deque<E>, Iterable {
@Override
public boolean offerFirst(E e) { //添加到头部
if (isFull()) {
return false;
}
//对应双端链表头部的插入操作
Node<E> a = sentinel;
Node<E> b = sentinel.next;
Node<E> added = new Node<>(a, e, b);
a.next = added;
b.prev = added;
size++;
return true;
}
@Override
public boolean offerLast(E e) { //添加到尾部
if (isFull()) { //如果为空 返回false
return false;
}
//对应双端链表尾部的插入操作
Node<E> b = sentinel;
Node<E> a = sentinel.prev;
Node<E> added = new Node<>(a, e, b);
a.next = added;
b.prev = added;
size++;
return true;
}
@Override
public E pollFirst() { //移除第一个元素
if (isEmpty()) {
return null;
}
//对应双端链表头部的删除操作
Node<E> a = sentinel;
Node<E> removed = sentinel.next;
Node<E> b = removed.next;
a.next = b;
b.prev = a;
size--;
return removed.value;
}
@Override
public E pollLast() { //移除最后一个元素
if (isEmpty()) {
return null;
}
//对应双端链表尾部的插入操作
Node<E> b = sentinel;
Node<E> removed = sentinel.prev;
Node<E> a = removed.prev;
a.next = b;
b.prev = a;
size--;
return removed.value;
}
@Override
public E peekFirst() {
if (isEmpty()) {
return null;
}
return sentinel.next.value;
}
@Override
public E peekLast() {
if (isEmpty()) {
return null;
}
return sentinel.prev.value;
}
@Override
public boolean isEmpty() {
return size == 0;
}
@Override
public boolean isFull() {
return size == capacity;
}
@Override
public Iterator iterator() { //迭代器 用于遍历
return new Iterator() {
Node<E> p = sentinel.next;
@Override
public boolean hasNext() {
return p != sentinel;
}
@Override
public E next() {
E value = p.value;
p = p.next;
return value;
}
};
}
static class Node<E> { //静态内部类 结点类的定义
Node<E> prev;
E value;
Node<E> next;
public Node(Node<E> prev, E value, Node<E> next) {
this.prev = prev;
this.value = value;
this.next = next;
}
public Node(E value) {
this.value = value;
}
}
int capacity;
int size;
Node<E> sentinel = new Node<>(null, null, null);
public ListedListDeque(int capacity) {
this.capacity = capacity;
sentinel.next = sentinel;
sentinel.prev = sentinel;
}
1.2.2 数组实现
class ArrayDeque<E> implements Deque<E>, Iterable<E> {
/*
h - head 指向第一个元素所在的位置
t - tail 指向最后一个元素的下一个位置
head == tail 空
head - tail == 数组长度 - 1 满
*/
static int inc(int i, int length) { //加一时 转换成循环的有效索引值
if (i + 1 >= length) {
return 0;
}
return i + 1;
}
static int dec(int i, int length) { //减一时 转换成循环的有效索引值
if (i - 1 < 0) {
return length - 1;
}
return i - 1;
}
E[] array;
int head;
int tail;
public ArrayDeque(int capacity){
array = (E[]) new Object[capacity + 1];
}
@Override
public boolean offerFirst(E e) {
if(isFull()){ //如果满了
return false; //返回false
}
//没满 则添加头元素
head = dec(head,array.length); //先将head指向新的头元素
array[head] = e; //再添加新的值
return false;
}
@Override
public boolean offerLast(E e) {
if(isFull()){ //满了返回false
return false;
}
//没满则添加尾元素
array[tail] = e;
tail = inc(tail, array.length); //加一
return false;
}
@Override
public E pollFirst() {
if(isEmpty()){ //空则返回null
return null;
}
//不为空则
E e = array[head]; //先返回头元素
head = inc(head,array.length); //再返回head++的数值
return e;
}
@Override
public E pollLast() {
if(isEmpty()){
return null;
}
//不为空则
tail = dec(tail, array.length); //先tail--
return array[tail]; //返回最后一个数据
}
@Override
public E peekFirst() {
if(isEmpty()){
return null;
}
return array[head];
}
@Override
public E peekLast() {
if(isEmpty()){
return null;
}
return array[dec(tail,array.length)];
}
@Override
public boolean isEmpty() {
//head==tail 指向的第一个元素位置同时也是最后一个元素位置的下一个位置时
//说明此数组为空
return head == tail; //当位置相同时为空
}
@Override
public boolean isFull() {
return (tail + array.length - head) % array.length == array.length - 1;
}
@Override
public Iterator<E> iterator() { //迭代器
return new Iterator<E>() {
int head1 = head;
@Override
public boolean hasNext() {
return head1 != tail; //为假说明遍历完毕
}
@Override
public E next() {
E e = array[head1]; //返回对应值
head1 = inc(head1,array.length);
return e;
}
};
}
}
注意:以上代码还有一个需要考虑的地方
比如当存放的是int类型数组时,由于置不置零arr每个元素的占用的空间仍然是4个字节,所以无序考虑内存的释放。但是当arr存放的是引用类型时,比如Node等,那如果不置null,则当用不到此位置时,此位置仍然会占用一个Node类所占用的空间。如果置null,则空间得到了释放。以下是修改后的代码。
@Override
public E pollFirst() {
if(isEmpty()){ //空则返回null
return null;
}
//不为空则
E e = array[head]; //先返回头元素
array[head] = null; //置null 释放内存 help GC
head = inc(head,array.length); //再返回head++的数值
return e;
}
@Override
public E pollLast() {
if(isEmpty()){
return null;
}
//不为空则
tail = dec(tail, array.length); //先tail--
E e = array[tail];
array[tail] = null; //置null 释放内存 help GC
return e; //返回最后一个数据
}
1.2.3 测试代码
public static void main(String[] args) {
ArrayDeque<Object> deque = new ArrayDeque<>(7);
assertTrue(deque.isEmpty());
deque.offerLast(1);
deque.offerLast(2);
deque.offerLast(3);
deque.offerFirst(4);
deque.offerFirst(5);
deque.offerFirst(6);
deque.offerFirst(7);
Iterator<Object> iterator = deque.iterator();
while (iterator.hasNext()){
System.out.println(iterator.next());
}
assertEquals(7,deque.pollFirst());
assertEquals(6,deque.pollFirst());
assertEquals(3,deque.pollLast());
assertEquals(2,deque.pollLast());
assertEquals(1,deque.pollLast());
assertEquals(4,deque.pollLast());
assertEquals(5,deque.pollLast());
assertNull(deque.pollLast());
assertTrue(deque.isEmpty());
}
判断都没问题 代码应该没啥问题
1.3 课后作业- LeeTCode103
给你二叉树的根节点 root
,返回其节点值的 锯齿形层序遍历 。(即先从左往右,再从右往左进行下一层遍历,以此类推,层与层之间交替进行)。
package com.atguigu.linkedlist;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
/**
* @author 小小低头哥
* @version 1.0
* 二叉树的锯齿形层序遍历
*/
public class E01LeetCode103 {
public static void main(String[] args) {
TreeNode root = new TreeNode(3);
TreeNode treeNode1 = new TreeNode(9);
TreeNode treeNode2 = new TreeNode(20);
TreeNode treeNode3 = new TreeNode(15);
TreeNode treeNode4 = new TreeNode(7);
root.left = treeNode1;
root.right = treeNode2;
treeNode2.left = treeNode3;
treeNode2.right = treeNode4;
List<List<Integer>> lists = zigzagLevelOrder(root);
for (List list : lists) {
System.out.println(list);
}
}
public static List<List<Integer>> zigzagLevelOrder(TreeNode root) {
List<List<Integer>> result = new ArrayList<>(); //用List存放所有层节点数
Queue<TreeNode> queue = new LinkedList<>(); //队列 存放每一层结点并释放上一层节点
LinkedList<Integer> level; //存放每一层的结点值
Boolean odd = true; //代表是否是奇数层 true是奇数层
int c1 = 1;//记录下每一层有多少个结点
if (root == null) { //说明没有结点
return result;
}
TreeNode q; //结点索引
queue.offer(root); //压入元素
while (c1 != 0) { //循环遍历 直到下一层没有结点结束
int c2 = 0; //暂存下一层的节点数
level = new LinkedList<>(); //重新将level置空
for (int i = 0; i < c1; i++) { //每一个结点开始循环遍历
q = queue.poll(); //弹出首元素
if(odd){ //说明是奇数层 从左到右
level.addLast(q.val); //将此节点保存最后节点处
}else { //说明是偶数成 从右到左
level.addFirst(q.val); //将此节点保存到首节点处
}
if (q.left != null) { //如果左结点不为空
c2++;
queue.offer(q.left); //压入队列尾部中
}
if (q.right != null){ //如果右结点不为空
c2++;
queue.offer(q.right); //压入队列尾部中
}
}
result.add(level); //将level加入到result中
odd = !odd; //odd取反
c1 = c2;
}
return result;
}
}
class TreeNode {
int val;
TreeNode left;
TreeNode right;
TreeNode() {
}
TreeNode(int val) {
this.val = val;
}
TreeNode(int val, TreeNode left, TreeNode right) {
this.val = val;
this.left = left;
this.right = right;
}
}
2. 优先级队列
2.1 概述
优先级高的数值先出队
2.2 基于无序数组实现
接口如下:
package com.atguigu.queue;
/**
* @author 小小低头哥
* @version 1.0
* 优先级接口
*/
public interface Priority {
/**
* 约定 返回对象的优先级 约定数字越大 优先级越高
* @return 优先级
*/
int priority();
}
package com.atguigu.queue;
/**
* @author 小小低头哥
* @version 1.0
* 队列接口
*/
public interface Queue<E> {
boolean offer(E value); //添加元素
E poll(); //弹出元素
E peek(); //查看栈顶元素
boolean isEmpty(); //是否为空
boolean isFull(); //是否满了
}
其它代码如下
package com.atguigu.queue;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
/**
* @author 小小低头哥
* @version 1.0
* 利用无序数组实现优先级队列
*/
public class PriorityQueue1Demo{
@Test
public void poll(){
PriorityQueue1<Priority> queue = new PriorityQueue1<>(5);
queue.offer(new Entry("task1",4));
queue.offer(new Entry("task2",3));
queue.offer(new Entry("task3",2));
queue.offer(new Entry("task4",5));
queue.offer(new Entry("task5",1));
assertFalse(queue.offer(new Entry("task6",7)));
assertEquals(5,queue.poll().priority());
assertEquals(4,queue.poll().priority());
assertEquals(3,queue.poll().priority());
assertEquals(2,queue.poll().priority());
assertEquals(1,queue.poll().priority());
System.out.println("执行到这 说明没问题");
}
}
class Entry implements Priority{
public String value;
public int priority;
public Entry(String value, int priority) {
this.value = value;
this.priority = priority;
}
@Override
public int priority() {
return priority;
}
@Override
public String toString() {
return "Entry{" +
"value='" + value + '\'' +
", priority=" + priority +
'}';
}
}
class PriorityQueue1<E extends Priority> implements Queue<E> {
Priority[] array;
int size; //表示大小
public PriorityQueue1(int capacity) {
array = new Priority[capacity];
}
@Override
public boolean offer(E value) {
if (isFull()) { //满的话直接返回false
return false;
}
//直接向数组尾部加元素
array[size++] = value;
return true;
}
private int selectMax() { //返回优先级最高的索引值
int max = 0; //
for (int i = 1; i < size; i++) {
if (array[i].priority() > array[max].priority()) { //找到优先级更大的
max = i;
}
}
return max;
}
@Override
public E poll() {
if (isEmpty()) { //空数组直接返回null
return null;
}
int max = selectMax();
E value = (E) array[max];
remove(max); //删除数组的此位置 一定要删除 否则selectMax遍历的时候仍然有可能会遍历这个元素
return value;
}
private void remove(int index) {
// 数组大小没变 只是将元素从index+1位置开始向前移动了一位
// 最后一个有效位置元素不再有效 size--
if (index < size - 1) { //如果要删除的元素不是最后一个元素
System.arraycopy(array, index + 1, array, index, size - 1 - index);
}
size--;
}
@Override
public E peek() { //返回优先级最高的元素
if (isEmpty()) { //空数组直接返回null
return null;
}
return (E) array[selectMax()];
}
@Override
public boolean isEmpty() {
return size == 0; //为0则表示为空
}
@Override
public boolean isFull() {
return size == array.length;
}
}
2.3 基于有序数组实现
class PriorityQueue2<E extends Priority> implements Queue<E> {
Priority[] array;
int size; //表示大小
public PriorityQueue2(int capacity) {
array = new Priority[capacity];
}
@Override
public boolean offer(E value) { //将优先级从小到大进行插入
if (isFull()) {
return false;
}
insert(value);
size++;
return true;
}
private void insert(E e){
//利用插入排序
int i = size;
while (i > 0 && array[i - 1].priority() > e.priority()) { //当找到比value优先级小的则跳出循环
array[i] = array[i - 1];
i--;
}
array[i] = e;
}
@Override
public E poll() { //优先级高的先出
if (isEmpty()) {
return null;
}
E e = (E) array[size - 1];
array[--size] = null; //垃圾回收 help GC
return e;
}
@Override
public E peek() {
if (isEmpty()) {
return null;
}
return (E) array[size - 1];
}
@Override
public boolean isEmpty() {
return size == 0; //为0则表示为空
}
@Override
public boolean isFull() {
return size == array.length;
}
}
2.3 堆实现优先级队列
计算机科学中,堆是一种基于树的数据结构,通常用完全二叉树实现。堆的特性如下
- 在大顶堆中,任意节点 C 与它的父节点 P 符合 P . v a l u e ≥ C . v a l u e P.value \geq C.value P.value≥C.value
- 而小顶堆中,任意节点 C 与它的父节点 P 符合 P . v a l u e ≤ C . v a l u e P.value \leq C.value P.value≤C.value
- 最顶层的节点(没有父亲)称之为 root 根节点
完全二叉树(Complete Binary Tree) 特点:最后一层可能未填满,靠左对齐
**满二叉树(Full Binary Tree)**特点:每一层都是填满的
**大顶堆:**父节点比子节点大
**小顶堆:**父节点比子节点小
特征
- 如果从索引 0 开始存储节点数据
- 节点 i i i 的父节点为 f l o o r ( ( i − 1 ) / 2 ) floor((i-1)/2) floor((i−1)/2),当 i > 0 i>0 i>0 时
- 节点 i i i 的左子节点为 2 i + 1 2i+1 2i+1,右子节点为 2 i + 2 2i+2 2i+2,当然它们得 < s i z e < size <size
- 如果从索引 1 开始存储节点数据
- 节点 i i i 的父节点为 f l o o r ( i / 2 ) floor(i/2) floor(i/2),当 i > 1 i > 1 i>1 时
- 节点 i i i 的左子节点为 2 i 2i 2i,右子节点为 2 i + 1 2i+1 2i+1,同样得 < s i z e < size <size
代码如下
class PriorityQueue3<E extends Priority> implements Queue<E> {
Priority[] array;
int size; //表示大小
public PriorityQueue3(int capacity) {
array = new Priority[capacity];
}
/*
1. 入堆新元素 加入到数组末尾(索引位置child)
2. 不断比较新加元素与它父节点(parent)优先级
- 如果父节点优先级低 则向下移动 并找到下一个parent
- 直至父节点优先级更高或chile==0为止
*/
@Override
public boolean offer(E value) {
if (isFull()) {
return false;
}
int child = size++; //size新加元素索引
int parent = (child - 1) / 2; //父节点索引
//直到新节点小于一个父节点或者比较完root节点 退出循环
while (value.priority() > array[parent].priority() && child > 0) {
array[child] = array[parent]; //将父节点撤下来 作为子节点
child = parent; //记录父节点的位置
parent = (parent - 1) / 2; //再找父节点的父节点
}
array[child] = value;
return true;
}
/*
1. 交换元素和尾部元素 让尾部元素出队
2. (下滑)
- 从堆顶开始 将元素与两个子节点较大者交换
- 直到父节点大于两个子节点 或没有子节点了为止
这样就使得又变了大顶堆
*/
@Override
public E poll() {
if (isEmpty()) {
return null;
}
//将优先级最大元素和最后的元素进行交换
swap(0, size - 1);
size--; //数组元素减一 则最大元素不存在索引值中
Priority e = array[size]; //得到优先级最大的元素用于返回
array[size] = null; //便于垃圾回收 help GC
//将首元素下潜
down(0);
return (E) e;
}
private void down(int parent) {
int left = 2 * parent + 1;
int right = left + 1;
int max = parent; //max指向优先级高的 初始化假设父元素优先级最高
if (left < size && array[left].priority() > array[max].priority()) { //如果左节点存在且优先级大于max
max = left; //将max指向left
}
if (right < size && array[right].priority() > array[max].priority()) {//如果右节点存在且优先级大于max
max = right; //将max指向right
}
if (max != parent) { //说明父节点确实不是优先级最高的
swap(max, parent); //那就进行交换
down(max); //递归调用
}
}
private void swap(int i, int j) { //交换 i 和 j 元素
Priority t = array[i];
array[i] = array[j];
array[j] = t;
}
@Override
public E peek() {
if (isEmpty()) {
return null;
}
return (E) array[0];
}
@Override
public boolean isEmpty() {
return size == 0; //为0则表示为空
}
@Override
public boolean isFull() {
return size == array.length;
}
}
2.4 总结
**基于无序数组:**入队时由于直接排在数组最后,时间复杂度为 O ( 1 ) O(1) O(1);出队时由于需要逐个比较优先级,时间复杂度为 O ( n ) O(n) O(n)。
**基于有序数组:**入队时由于使用到了插入排序,最坏时间时间复杂度为 O ( n ) O(n) O(n);出队时由于直接出数组最后一个,时间复杂度为 O ( 1 ) O(1) O(1)。
**堆实现:**不管入队出队,都没有直接取某一个元素或者逐个比较,而是都进行了父节点与子节点的优先级比较,所以时间复杂度都为 O ( l o g ( n ) ) O(log(n)) O(log(n))。
2.5 练习-LeetCode23 合并K个升序链表
使用小顶堆实现的优先级队列解决本题;
小顶堆和大顶堆实现代码雷同,就不再给出,直接给出其它代码
public static void main(String[] args) {
ListNode listNode1 = new ListNode(-2);
ListNode listNode4 = new ListNode(-1);
ListNode listNode5 = new ListNode(-1);
ListNode listNode7 = new ListNode(-1);
listNode1.next = listNode4;
listNode4.next = listNode5;
listNode5.next = listNode7;
ListNode listNode11 = new ListNode(-10);
ListNode listNode3 = new ListNode(-6);
ListNode listNode44 = new ListNode(4);
listNode11.next = listNode3;
listNode3.next = listNode44;
ListNode listNode2 = new ListNode(-10);
ListNode listNode6 = new ListNode(-9);
listNode2.next = listNode6;
ListNode Null = null;
// ListNode[] lists = {listNode1, listNode11, listNode2, Null};
ListNode[] lists = {listNode1, Null};
for (ListNode listNode : lists) {
while (listNode != null) {
System.out.print(listNode.val + " ");
listNode = listNode.next;
}
System.out.println();
}
ListNode listNode = mergeKLists2(lists);
while (listNode != null) {
System.out.print(listNode.val + " ");
listNode = listNode.next;
}
}
public static ListNode mergeKLists2(ListNode[] lists) {
if (lists == null) {
return null;
}
int n = 0;
for (ListNode listNode : lists) {
while (listNode != null) {
n++;
listNode = listNode.next;
}
}
PriorityQueue heap = new PriorityQueue(n);
for (ListNode listNode : lists) {
while (listNode != null) {
heap.offer(listNode); //将每个头节点加入到队列中
listNode = listNode.next;
}
}
ListNode listNode = new ListNode(-1, null); //作为头节点
ListNode p = listNode;
while (!heap.isEmpty()) { //如果不为空一直循环
p.next = heap.poll(); //推出来并放在链表的后面
p = p.next;
}
p.next = null; //!!!!!!!千万不能忘!!!!!
return listNode.next;
}
public static ListNode mergeKLists(ListNode[] lists) {
if (lists == null) {
return null;
}
PriorityQueue heap = new PriorityQueue(lists.length);
for (ListNode listNode : lists) {
if (listNode != null) {
heap.offer(listNode); //将每个头节点加入到队列中
}
}
ListNode listNode = new ListNode(-1, null); //作为头节点
ListNode p = listNode;
while (!heap.isEmpty()) { //如果不为空一直循环
p.next = heap.poll(); //推出来并放在链表的后面
p = p.next; //
if (p.next != null) { //如果此结点所在链表后面结点
heap.offer(p.next); //则把下一个结点放进去
}
}
return listNode.next;
}
其中 mergeKLists 函数中的小顶堆 每次都只包含每个链表中的一个结点进行比较
mergeKLists2 函数是先将每个链表中所有结点都放入堆中,然后再一个个弹出。速度变快了,但是占用内存空间变大了。
3. 阻塞队列
这一节的内容都是直接从黑马那里拷贝过来的,太细也太多了
之前的队列在很多场景下都不能很好地工作,例如
- 大部分场景要求分离向队列放入(生产者)、从队列拿出(消费者)两个角色、它们得由不同的线程来担当,而之前的实现根本没有考虑线程安全问题
- 队列为空,那么在之前的实现里会返回 null,如果就是硬要拿到一个元素呢?只能不断循环尝试
- 队列为满,那么再之前的实现里会返回 false,如果就是硬要塞入一个元素呢?只能不断循环尝试
因此我们需要解决的问题有
- 用锁保证线程安全
- 用条件变量让等待非空线程与等待不满线程进入等待状态,而不是不断循环尝试,让 CPU 空转
有同学对线程安全还没有足够的认识,下面举一个反例,两个线程都要执行入队操作(几乎在同一时刻)
public class TestThreadUnsafe {
private final String[] array = new String[10];
private int tail = 0;
public void offer(String e) {
array[tail] = e;
tail++;
}
@Override
public String toString() {
return Arrays.toString(array);
}
public static void main(String[] args) {
TestThreadUnsafe queue = new TestThreadUnsafe();
new Thread(()-> queue.offer("e1"), "t1").start();
new Thread(()-> queue.offer("e2"), "t2").start();
}
}
执行的时间序列如下,假设初始状态 tail = 0,在执行过程中由于 CPU 在两个线程之间切换,造成了指令交错
线程1 | 线程2 | 说明 |
---|---|---|
array[tail]=e1 | 线程1 向 tail 位置加入 e1 这个元素,但还没来得及执行 tail++ | |
array[tail]=e2 | 线程2 向 tail 位置加入 e2 这个元素,覆盖掉了 e1 | |
tail++ | tail 自增为1 | |
tail++ | tail 自增为2 | |
最后状态 tail 为 2,数组为 [e2, null, null …] |
糟糕的是,由于指令交错的顺序不同,得到的结果不止以上一种,宏观上造成混乱的效果
3.1 单锁实现
Java 中要防止代码段交错执行,需要使用锁,有两种选择
- synchronized 代码块,属于关键字级别提供锁保护,功能少
- ReentrantLock 类,功能丰富
以 ReentrantLock 为例
ReentrantLock lock = new ReentrantLock();
public void offer(String e) {
lock.lockInterruptibly();
try {
array[tail] = e;
tail++;
} finally {
lock.unlock();
}
}
只要两个线程执行上段代码时,锁对象是同一个,就能保证 try 块内的代码的执行不会出现指令交错现象,即执行顺序只可能是下面两种情况之一
线程1 | 线程2 | 说明 |
---|---|---|
lock.lockInterruptibly() | t1对锁对象上锁 | |
array[tail]=e1 | ||
lock.lockInterruptibly() | 即使 CPU 切换到线程2,但由于t1已经对该对象上锁,因此线程2卡在这儿进不去 | |
tail++ | 切换回线程1 执行后续代码 | |
lock.unlock() | 线程1 解锁 | |
array[tail]=e2 | 线程2 此时才能获得锁,执行它的代码 | |
tail++ |
- 另一种情况是线程2 先获得锁,线程1 被挡在外面
- 要明白保护的本质,本例中是保护的是 tail 位置读写的安全
事情还没有完,上面的例子是队列还没有放满的情况,考虑下面的代码(这回锁同时保护了 tail 和 size 的读写安全)
ReentrantLock lock = new ReentrantLock();
int size = 0;
public void offer(String e) {
lock.lockInterruptibly();
try {
if(isFull()) {
// 满了怎么办?
}
array[tail] = e;
tail++;
size++;
} finally {
lock.unlock();
}
}
private boolean isFull() {
return size == array.length;
}
之前是返回 false 表示添加失败,前面分析过想达到这么一种效果:
- 在队列满时,不是立刻返回,而是当前线程进入等待
- 什么时候队列不满了,再唤醒这个等待的线程,从上次的代码处继续向下运行
ReentrantLock 可以配合条件变量来实现,代码进化为
ReentrantLock lock = new ReentrantLock();
Condition tailWaits = lock.newCondition(); // 条件变量
int size = 0;
public void offer(String e) {
lock.lockInterruptibly();
try {
while (isFull()) {
tailWaits.await(); // 当队列满时, 当前线程进入 tailWaits 等待
}
array[tail] = e;
tail++;
size++;
} finally {
lock.unlock();
}
}
private boolean isFull() {
return size == array.length;
}
- 条件变量底层也是个队列,用来存储这些需要等待的线程,当队列满了,就会将 offer 线程加入条件队列,并暂时释放锁
- 将来我们的队列如果不满了(由 poll 线程那边得知)可以调用 tailWaits.signal() 来唤醒 tailWaits 中首个等待的线程,被唤醒的线程会再次抢到锁,从上次 await 处继续向下运行
思考为何要用 while 而不是 if,设队列容量是 3
操作前 | offer(4) | offer(5) | poll() | 操作后 |
---|---|---|---|---|
[1 2 3] | 队列满,进入tailWaits 等待 | [1 2 3] | ||
[1 2 3] | 取走 1,队列不满,唤醒线程 | [2 3] | ||
[2 3] | 抢先获得锁,发现不满,放入 5 | [2 3 5] | ||
[2 3 5] | 从上次等待处直接向下执行 | [2 3 5 ?] |
关键点:
- 从 tailWaits 中唤醒的线程,会与新来的 offer 的线程争抢锁,谁能抢到是不一定的,如果后者先抢到,就会导致条件又发生变化
- 这种情况称之为虚假唤醒,唤醒后应该重新检查条件,看是不是得重新进入等待
最后的实现代码
/**
* 单锁实现
* @param <E> 元素类型
*/
public class BlockingQueue1<E> implements BlockingQueue<E> {
private final E[] array;
private int head = 0;
private int tail = 0;
private int size = 0; // 元素个数
@SuppressWarnings("all")
public BlockingQueue1(int capacity) {
array = (E[]) new Object[capacity];
}
ReentrantLock lock = new ReentrantLock();
Condition tailWaits = lock.newCondition();
Condition headWaits = lock.newCondition();
@Override
public void offer(E e) throws InterruptedException {
lock.lockInterruptibly();
try {
while (isFull()) {
tailWaits.await();
}
array[tail] = e;
if (++tail == array.length) {
tail = 0;
}
size++;
headWaits.signal();
} finally {
lock.unlock();
}
}
@Override
public void offer(E e, long timeout) throws InterruptedException {
lock.lockInterruptibly();
try {
long t = TimeUnit.MILLISECONDS.toNanos(timeout);
while (isFull()) {
if (t <= 0) {
return;
}
t = tailWaits.awaitNanos(t);
}
array[tail] = e;
if (++tail == array.length) {
tail = 0;
}
size++;
headWaits.signal();
} finally {
lock.unlock();
}
}
@Override
public E poll() throws InterruptedException {
lock.lockInterruptibly();
try {
while (isEmpty()) {
headWaits.await();
}
E e = array[head];
array[head] = null; // help GC
if (++head == array.length) {
head = 0;
}
size--;
tailWaits.signal();
return e;
} finally {
lock.unlock();
}
}
private boolean isEmpty() {
return size == 0;
}
private boolean isFull() {
return size == array.length;
}
}
- public void offer(E e, long timeout) throws InterruptedException 是带超时的版本,可以只等待一段时间,而不是永久等下去,类似的 poll 也可以做带超时的版本,这个留给大家了
注意
- JDK 中 BlockingQueue 接口的方法命名与我的示例有些差异
- 方法 offer(E e) 是非阻塞的实现,阻塞实现方法为 put(E e)
- 方法 poll() 是非阻塞的实现,阻塞实现方法为 take()
3.2 双锁实现
单锁的缺点在于:
- 生产和消费几乎是不冲突的,唯一冲突的是生产者和消费者它们有可能同时修改 size
- 冲突的主要是生产者之间:多个 offer 线程修改 tail
- 冲突的还有消费者之间:多个 poll 线程修改 head
如果希望进一步提高性能,可以用两把锁
- 一把锁保护 tail
- 另一把锁保护 head
ReentrantLock headLock = new ReentrantLock(); // 保护 head 的锁
Condition headWaits = headLock.newCondition(); // 队列空时,需要等待的线程集合
ReentrantLock tailLock = new ReentrantLock(); // 保护 tail 的锁
Condition tailWaits = tailLock.newCondition(); // 队列满时,需要等待的线程集合
先看看 offer 方法的初步实现
@Override
public void offer(E e) throws InterruptedException {
tailLock.lockInterruptibly();
try {
// 队列满等待
while (isFull()) {
tailWaits.await();
}
// 不满则入队
array[tail] = e;
if (++tail == array.length) {
tail = 0;
}
// 修改 size (有问题)
size++;
} finally {
tailLock.unlock();
}
}
上面代码的缺点是 size 并不受 tailLock 保护,tailLock 与 headLock 是两把不同的锁,并不能实现互斥的效果。因此,size 需要用下面的代码保证原子性
AtomicInteger size = new AtomicInteger(0); // 保护 size 的原子变量
size.getAndIncrement(); // 自增
size.getAndDecrement(); // 自减
代码修改为
@Override
public void offer(E e) throws InterruptedException {
tailLock.lockInterruptibly();
try {
// 队列满等待
while (isFull()) {
tailWaits.await();
}
// 不满则入队
array[tail] = e;
if (++tail == array.length) {
tail = 0;
}
// 修改 size
size.getAndIncrement();
} finally {
tailLock.unlock();
}
}
对称地,可以写出 poll 方法
@Override
public E poll() throws InterruptedException {
E e;
headLock.lockInterruptibly();
try {
// 队列空等待
while (isEmpty()) {
headWaits.await();
}
// 不空则出队
e = array[head];
if (++head == array.length) {
head = 0;
}
// 修改 size
size.getAndDecrement();
} finally {
headLock.unlock();
}
return e;
}
下面来看一个难题,就是如何通知 headWaits 和 tailWaits 中等待的线程,比如 poll 方法拿走一个元素,通知 tailWaits:我拿走一个,不满了噢,你们可以放了,因此代码改为
@Override
public E poll() throws InterruptedException {
E e;
headLock.lockInterruptibly();
try {
// 队列空等待
while (isEmpty()) {
headWaits.await();
}
// 不空则出队
e = array[head];
if (++head == array.length) {
head = 0;
}
// 修改 size
size.getAndDecrement();
// 通知 tailWaits 不满(有问题)
tailWaits.signal();
} finally {
headLock.unlock();
}
return e;
}
问题在于要使用这些条件变量的 await(), signal() 等方法需要先获得与之关联的锁,上面的代码若直接运行会出现以下错误
java.lang.IllegalMonitorStateException
两把锁这么嵌套使用,非常容易出现死锁,因此得避免嵌套,两段加锁的代码变成平级的样子。
性能还可以进一步提升
-
代码调整后 offer 并没有同时获取 tailLock 和 headLock 两把锁,因此两次加锁之间会有空隙,这个空隙内可能有其它的 offer 线程添加了更多的元素,那么这些线程都要执行 signal(),通知 poll 线程队列非空吗?
- 每次调用 signal() 都需要这些 offer 线程先获得 headLock 锁,成本较高,要想法减少 offer 线程获得 headLock 锁的次数
- 可以加一个条件:当 offer 增加前队列为空,即从 0 变化到不空,才由此 offer 线程来通知 headWaits,其它情况不归它管
-
队列从 0 变化到不空,会唤醒一个等待的 poll 线程,这个线程被唤醒后,肯定能拿到 headLock 锁,因此它具备了唤醒 headWaits 上其它 poll 线程的先决条件。如果检查出此时有其它 offer 线程新增了元素(不空,但不是从0变化而来),那么不妨由此 poll 线程来唤醒其它 poll 线程。
这个技巧被称之为级联通知(cascading notifies),类似的原因
- 在 poll 时队列从满变化到不满,才由此 poll 线程来唤醒一个等待的 offer 线程,目的也是为了减少 poll 线程对 tailLock 上锁次数,剩下等待的 offer 线程由这个 offer 线程间接唤醒
最终的代码为
public class BlockingQueue2<E> implements BlockingQueue<E> {
private final E[] array;
private int head = 0;
private int tail = 0;
private final AtomicInteger size = new AtomicInteger(0);
ReentrantLock headLock = new ReentrantLock();
Condition headWaits = headLock.newCondition();
ReentrantLock tailLock = new ReentrantLock();
Condition tailWaits = tailLock.newCondition();
public BlockingQueue2(int capacity) {
this.array = (E[]) new Object[capacity];
}
@Override
public void offer(E e) throws InterruptedException {
int c;
tailLock.lockInterruptibly();
try {
while (isFull()) {
tailWaits.await();
}
array[tail] = e;
if (++tail == array.length) {
tail = 0;
}
c = size.getAndIncrement();
// a. 队列不满, 但不是从满->不满, 由此offer线程唤醒其它offer线程
if (c + 1 < array.length) {
tailWaits.signal();
}
} finally {
tailLock.unlock();
}
// b. 从0->不空, 由此offer线程唤醒等待的poll线程
if (c == 0) {
headLock.lock();
try {
headWaits.signal();
} finally {
headLock.unlock();
}
}
}
@Override
public E poll() throws InterruptedException {
E e;
int c;
headLock.lockInterruptibly();
try {
while (isEmpty()) {
headWaits.await();
}
e = array[head];
if (++head == array.length) {
head = 0;
}
c = size.getAndDecrement();
// b. 队列不空, 但不是从0变化到不空,由此poll线程通知其它poll线程
if (c > 1) {
headWaits.signal();
}
} finally {
headLock.unlock();
}
// a. 从满->不满, 由此poll线程唤醒等待的offer线程
if (c == array.length) {
tailLock.lock();
try {
tailWaits.signal();
} finally {
tailLock.unlock();
}
}
return e;
}
private boolean isEmpty() {
return size.get() == 0;
}
private boolean isFull() {
return size.get() == array.length;
}
}
双锁实现的非常精巧,据说作者 Doug Lea 花了一年的时间才完善了此段代码。