初探 JUC 并发编程:Java 中的并发队列 ConcurrentLinkedQueue 源码级解析

第七部分:Java 并发包中并发队列解析

7.1)ConcurrentLinkedQueue 原理探究

7.1.1)类图结构

ConcurrentLinkedQueue 底层通过单向链表的方式实现,其中有两个 volatile 类型的 Node 节点用来表示队列的首、尾节点。

    public ConcurrentLinkedQueue() {
        head = tail = new Node<E>(null);
    }

在默认的构造方法中,首和尾指向值为 null 的哨兵节点。新元素会被插入到队列的末尾,出队从队列对头获取第一个元素。

在 Node 节点中,维护着一个使用 volatile 变量修饰的 item 属性来存放节点的值;next 属性存储下一个节点的指针。

    private static class Node<E> {
        volatile E item;
        volatile Node<E> next;
    }

其内部使用 UnSafe 提供的 CAS 方法来保证出队和入队操作的原子性。

        // 通过 cas 操作设置节点的 item
        boolean casItem(E cmp, E val) {
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }

				// 通过 cas 操作设置 next 的值
        void lazySetNext(Node<E> val) {
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }
        
				// 通过 cas 操作修改 next
        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }

7.1.2) ConcurrentLinkedQueue 原理介绍

💡 offer 操作介绍

offer 操作是在队列的末尾添加一个元素,由于队列是无界队列,所以一定会返回 true ;当传入的值为 null 的时候,会抛出 NPE 异常。

这里先给出完整的代码和注释:

    public boolean offer(E e) {
		    // e 为空则抛出空指针异常
        checkNotNull(e);
        
        // 构造 node 节点
        final Node<E> newNode = new Node<E>(e);

				// 从尾节点开始插入
        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            // p 的下一个节点为空,说明 p 是最后一个节点
            if (q == null) {
            // 通过 CAS 操作设置 p 节点的 next
                if (p.casNext(null, newNode)) {
                    // 每插入两个节点的时候会执行这个方法
                    if (p != t) 
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
            }
            
            // 当 p 节点(指向的是尾部节点,发生自旋的的时候)
            else if (p == q)
                // 多线程操作,由于 poll 操作移除元素之后可能会引发链表自旋
                // 通过这里找到新的 head
                // 下面这段代码的逻辑是这样的:
                // 如果 tail 在操作之前被改变了,就将其变为新的 tail
                // 反之则将其赋值为 head 节点
                p = (t != (t = tail)) ? t : head;
                
            else
                // 重新寻找尾节点
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

在进入方法时,首先检测传入的参数是否为 null ,如果是则直接抛出 NPE 异常;检测完节点后就构造一个新的节点 newNode

然后构造了一个无限循环

for (Node<E> t = tail, p = t;;)

这个循环唯一的出口是通过 CAS 操作成功插入节点后,这也就解释了上面说的这个方法一定会返回 true,下面来具体看这三个 if 分支分别处理的是什么情况:

第一个 if 分支

p.next 为 null 的时候,p 在循环的开始被赋值为了 tail 也就是尾节点,通过这个条件就确定了 p 此时为尾节点;但是多线程情况下会出现改变 tail 的情况,前面提到过 tail 是一个被 volatile 关键字修饰的变量,线程对于它的修改对于其他线程是可见的,每次获取 tail 都是获取最新的值。

然后尝试通过 CAS 操作来设置尾节点的 next ,每插入两个节点会重置一次 tail 的值,但是这里对于 tail 的修改没有要求一定要成功,也就是在多线程的环境下不一定保证 tail 就是真正的尾节点,但是在各种方法中都有充分的安全措施来弥补这个问题,继续看下去就能理解。

最后返回一个 true。


第二个 if 分支

在多线程下执行 poll 出队列的操作的时候,有可能会出现链表自旋的状况,也就是这个分支中出现的 p.next = p 情况;此时需要去重新设置头节点;但多线程情况下仍然很多线程同时操作这个链表,所以在修改之前先去判断链表的 tail 节点是否被重新设置,如果被重新设置了则将 p 赋值为这个新的节点。

p = (t != (t = tail)) ? t : head;

t != (t = tail) 的执行顺序是这样的:

  1. 首先,t 的当前值被拿来与新的 tail 进行比较。
  2. 接着,赋值操作 t = tail 执行。这会将 t 更新为新的 tail 的值。
  3. 最后,将更新后的 t 的值与原来的 t 进行比较。

第三个 if 分支

当链表状态正常且没有插入成功的情况下执行这个分支:

p = (p != t && t != (t = tail)) ? t : q;

在这里会重新去寻找尾节点,当 tail 节点在执行操作的时候被修改了则将 p 赋值为这个新的 tail,反之则将 p 赋值为 q,也就是 p.next


最后,让我们来分析一下上面的代码是如何确保线程安全性的:

  • 首先,链表的插入是用来原子操作 CAS 来执行,它是原子性的,多个线程同时尝试进行设置时,只有一个线程会成功。
  • 对于多线程下可能导致的自旋异常,采用了重新找新的 head 节点来解决。
  • 最后,在发现尾节点 t 在操作之前已经被改变时,会将当前节点 p 更新为新的尾节点 **t ,**确保了在多线程环境下链表的正确性。

因为上面的方法比较复杂,这里采用画图的形式来模拟两个节点插入的情况:

刚初始化的时候,head 和 next 都指向 item 为 null 的哨兵节点,此时我们执行一个插入的操作

				// 从尾节点开始插入
        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            // p 的下一个节点为空,说明 p 是最后一个节点
            if (q == null) {
            // 通过 CAS 操作设置 p 节点的 next
                if (p.casNext(null, newNode)) {
                    // 每插入两个节点的时候会执行这个方法
                    if (p != t) 
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
            }

在这里插入图片描述

此时队列中没有任何元素,所以此时指向的 q == null 是成立的,此时执行插入操作设置下一个节点为新创建的节点

在这里插入图片描述

此时去判断 p 是否等于 t,此时发现是相等的,所以直接返回 true

然后我们来插入第二个节点,此时指针的指向和上面相同,但是此时发现 q 并不是 null,此时执行的就是这个 if 分支:

 						else
                // 重新寻找尾节点
                p = (p != t && t != (t = tail)) ? t : q;

此时 p 会指向 q,在进行下一次循环时,指针的指向是这样的:

在这里插入图片描述

此时执行完和第一次一样的插入逻辑之后,t 就不等于 p,此时就更新 tail 为新的节点,所以每当插入两个节点之后会更新一次 tail。

在这里插入图片描述

💡 add 操作

在链表的末尾添加一个元素,底层仍然是调用的 offer() 方法:

    public boolean add(E e) {
        return offer(e);
    }

💡 poll 操作

这个方法的作用是在队列的头部获取并移除一个元素,如果队列为空则会返回 null。

这里给出完整的代码和注释:

    public E poll() {
		    // goto 标记
        restartFromHead:
      
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
		            // 存储当前循环中节点的值
                E item = p.item;
								
								// 当前节点有值,且通过 CAS 将其变为 null
                if (item != null && p.casItem(item, null)) {
                    if (p != h) // 两次 poll 会更新一次头节点
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }              
                // 队列为空则返回 null
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                // 如果当前节点被自引用了,重新寻找头节点
                else if (p == q)
                    continue restartFromHead;
                    
                else
                    p = q;
            }
        }
    }
    
    final void updateHead(Node<E> h, Node<E> p) {
        if (h != p && casHead(h, p))
            h.lazySetNext(h);
    }

在本方法中我们要注意 updateHead() 方法的使用时机,这个方法中会更新头节点 head 为传入的新节点,并且使原来的头节点 自旋。第一个 if 分支中的 unpdateHead 方法和上面的 offer 方法相同,也是在两次 poll 才会更新一次 head 节点;但与 offer 方法不同的是线·

方法最前面的是一个 goto 语句,使用 goto 语句可以跳到指定的位置,**goto**语句在过去的编程语言中曾经被广泛使用,但是它往往导致代码的可读性和可维护性变差,所以在编写代码的时候不建议使用,只需要看懂即可。

这里先来模拟一个线程弹出两个节点的情况

此时将 p 指向的节点的 item 修改为 null,并且返回 p 节点的值

在这里插入图片描述

执行第二次弹出的时候,发现 p 指向的 item 为 null,且队列不为空、没有出现自引用的情况,所以此时后移 p 到 next 的位置,此时正常执行前面的逻辑弹出节点,并且更新 head 的值。

在这里插入图片描述

最终会达到上图的效果,这是单个线程在正常情况下得到的结果,下面来模拟多个线程同时操控这个队列导致的其他情况。

如果一个线程执行下面这一步,也就是第一个 if 的时候,其他线程获取并且弹出了节点 1(将节点 2的 item 设置为了 null),假设这个线程此时因为某些原因(比如被中断)没有执行 updateHead 方法,那此时的结果如右图,此时其他线程遍历到第二个 null 节点的时候会将头节点放到其正确的位置上,也就是执行 updateHead 并且返回 null。

在这里插入图片描述

而当在执行 poll 操作的时候其他线程执行了 updateHead 方法可能会使得此节点指向的节点变为自旋节点,此时需要重新寻找 head 节点:

                // 如果当前节点被自引用了,重新寻找头节点
                else if (p == q)
                    continue restartFromHead;

上面的方法保证了 head 每两次弹出就会更新一次(即使在多线程的情况下),同时采用节点自旋的方法防止节点被重复的获取。

💡 peek 操作

peek 操作是在不移除元素的情况下获取队列头部的一个元素,如果队列为空则返回 null。

    public E peek() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;
                // 可以获取到元素或者这是队列中最后一个位置(可以为 null)
                if (item != null || (q = p.next) == null) {
                // 更新头节点
                    updateHead(h, p);
                    return item;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

看完上面的 poll 方法,本方法还是比较好懂的

方法返回的条件为 当前节点的 item 有值 或者 当前节点为队列中最后一个节点,此时更新头节点的值并且返回 item。

如果出现 p == q 也就是自旋的时候,重新寻找头节点。

如果没有遍历到队列中有值的节点,且还有后续的节点就后移 p 指针继续寻找。

💡 size 操作

通过 size 方法计算队列中的元素个数,这个方法在并发环境下并不是很有用,因为 CAS 没有加锁,所以调用 size 函数的期间可能增加或删除元素,导致统计结果的不准确。

    public int size() {
        int count = 0;
        // 通过 first 方法来获取队列中的第一个元素,排除哨兵节点
        // succ 方法获取当前节点的 next 元素,如果自旋的话就返回头节点
        for (Node<E> p = first(); p != null; p = succ(p))
            if (p.item != null)
                if (++count == Integer.MAX_VALUE)
                    break;
        return count;
    }
    
    final Node<E> succ(Node<E> p) {
        Node<E> next = p.next;
        return (p == next) ? head : next;
    }

方法中首先调用 first 方法排除哨兵节点,获取真正的第一个节点,然后不断后移节点去计算 count 的值;此方法只能统计到 Integer.MAX_VALUE 即使队列是个无界队列;当统计完成后返回 count。

size 方法的实现逻辑比较简单,来看一下里面调用的 first 方法

    Node<E> first() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                boolean hasItem = (p.item != null);
                if (hasItem || (q = p.next) == null) {
                    updateHead(h, p);
                    return hasItem ? p : null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

方法中会返回第一个不为 null 的节点,如果没有就返回 null。

到这里可以我们已经见到了 updateHead 的所有使用位置,来总结一下它的使用时机

  • 当执行 poll 方法的时候调用了两次
    • 第一次是头节点没有指向此时弹出节点的时候
    • 第二次是发现队列为空的时候
  • 在执行 peek 操作的时候调用了一次
    • 每次获取元素或者发现队列为空的时候会更新头节点
  • 最后就是在上面的 first 方法中调用了一次
    • 当找到第一个 item 不为 null 的节点或者发现队列为空的时候会执行一次

通过上面的方法,在多线程的情况下,也能保证 head 时刻在执行读操作的时候处于正确的位置。

💡 remove 操作

删除队列中的元素,如果存在多个则删除第一个,并且返回 true,没有找到则返回 false。

    public boolean remove(Object o) {
    // o 为空直接返回 false
        if (o != null) {
            Node<E> next, pred = null;
            for (Node<E> p = first(); p != null; pred = p, p = next) {
                boolean removed = false;
                E item = p.item;
                if (item != null) {
                // 如果相等则使用 CAS 设置为 null
                // 多线程情况下只有一个线程会成功,其他线程会继续查找
                // 是否有匹配的其他元素
                    if (!o.equals(item)) {
                        next = succ(p); // 获取 next 元素
                        continue;
                    }
                    removed = p.casItem(item, null);
                }

                next = succ(p);
                
                // 如果有后续的节点的话,前驱节点链接
                // p 节点指向的位置会因为无法到达而被销毁
                if (pred != null && next != null) // unlink
                    pred.casNext(p, next);
                if (removed)
                    return true;
            }
        }
        return false;
    }

💡 contains 操作

这个方法回去判断队列中是否有指定的对象,由于和 size 方法一样是遍历整个队列,所以结果不是那么精确,比如调用的时候元素在队列中,但是在遍历途中元素被删除,会返回 false。

    public boolean contains(Object o) {
        if (o == null) return false;
        for (Node<E> p = first(); p != null; p = succ(p)) {
            E item = p.item;
            if (item != null && o.equals(item))
                return true;
        }
        return false;
    }

方法的执行逻辑和 size 相同,这里不过多赘述。

7.1.3)总结

ConcurrentLinkedQueue 底层使用单线填表数据结构来保存队列元素,每个元素被封装成一个 Node 节点;队列通过 head 和 tail 来维护的,创建队列的时候头尾节点会指向哨兵节点,第一次执行 peek 或者 first 的时候才会把 head 指向第一个真正的队列元素。

由于没有加锁,所以 size 或者 contains 会导致结果不准确。

出队和入队的操作都是操作 tail 和 head 节点,保证在多线程的情况下的线程安全,只需要保证操作的可见性和原子性即可,由于两个属性都是 volatile 修饰的,保证了可见性,同时方法中使用 CAS 保证了原子性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/624422.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

市场对节能高效电机需求不断增长 变频器具有广阔发展空间

市场对节能高效电机需求不断增长 变频器具有广阔发展空间 变频器是利用变频技术与微电子技术&#xff0c;通过改变电机工作电源频率方式来控制交流电动机的电力控制设备&#xff0c;主要由制动单元、检测单元、微处理单元等部分构成。变频器能够根据需要调整电机的转速&#xf…

如何基于可靠事件模式实现最终一致性?

今天我们一起来探讨一个分布式环境下的常见问题,这个问题与数据的一致性有关。那么,什么是数据一致性呢?要回答这个问题,需要我们回顾一下单块系统和分布式系统中对于数据处理的不同需求。 我们知道,传统的单块系统通常都只与一个数据库进行交互,所有的数据处理过程都位于…

混淆矩阵实战

2.实战 1.加载数据 #加载数据 import pandas as pd import numpy as np data pd.read_csv(data_class_raw.csv) data.head()2.data.loc得到样本属性&#xff0c;并进行样本数据可视化 #可视化数据 %matplotlib inline from matplotlib import pyplot as plt#define X and y…

政安晨:【Keras机器学习示例演绎】(四十一)—— 使用预先训练的词嵌入

目录 设置 简介 下载新闻组 20 数据 让我们来看看这些数据 清洗数据并将数据分成训练集和验证集 创建词汇索引 加载预训练的词嵌入 建立模型 训练模型 导出端到端模型 政安晨的个人主页&#xff1a;政安晨 欢迎 &#x1f44d;点赞✍评论⭐收藏 收录专栏: TensorFlow与…

【Unity】为小球添加爆发力往前移动的代码

代码里的几个变量都需要在场景中提前创建好并赋值 using System.Collections; using System.Collections.Generic; using UnityEngine;public class Shotobjt : MonoBehaviour {// 点击按钮&#xff0c;克隆一个prefab&#xff0c;然后给这个克隆后的对象添加往前方的力publi…

TCP/UDP通信中的部分函数

UDP&#xff08;User Datagram Protocol&#xff0c;用户数据报协议&#xff09;和TCP&#xff08;Transmission Control Protocol&#xff0c;传输控制协议&#xff09;是互联网协议套件中最常用的两种传输层协议&#xff0c;它们负责在互联网中端到端地传输数据。尽管它们服务…

链表常见OJ题

目录 题目一&#xff1a;移除链表元素 &#xff08;1&#xff09;题目链接 &#xff08;2&#xff09;题目要求 &#xff08;3&#xff09;题解 题目二&#xff1a;反转链表 &#xff08;1&#xff09;题目链接 &#xff08;2&#xff09;题目要求​编辑 &#xff08;3…

【PostgreSQL里的子查询解析】

什么是子查询&#xff1f; 子查询是一种嵌套在其他SQL查询中的查询方式&#xff0c;也可以称作内查询或嵌套查询。当一个查询是另一个查询的条件时&#xff0c;就称之为子查询。子查询的语法格式与普通查询相同&#xff0c;但其在查询过程中起着临时结果集的作用&#xff0c;为…

冰川秘境:全球冰川可视化大屏带你穿越冰原

在浩瀚无垠的宇宙中&#xff0c;地球以其独特的蓝色光环吸引着人们的目光。而在这颗蓝色星球上&#xff0c;冰川这一大自然的杰作&#xff0c;更是以其壮美与神秘&#xff0c;让人们心驰神往。 从阿尔卑斯山脉的冰川到南极洲的冰盖&#xff0c;从格陵兰岛的冰山到喜马拉雅山脉的…

Visual Studio生成C++的DLL文件(最简单版)

前言 当你在使用C编写一些可重用的代码时&#xff0c;将其打包成一个动态链接库&#xff08;DLL&#xff09;可以使其更容易地被其他项目或者程序调用和使用。Visual Studio提供了一种简单的方式来生成C的DLL文件。下面是一个关于如何在Visual Studio中生成C的DLL文件的简单教…

基于ChatGPT 和 OpenAI 模型的现代生成式 AI

书籍&#xff1a;Modern Generative AI with ChatGPT and OpenAI Models: Leverage the capabilities of OpenAIs LLM for productivity and innovation with GPT3 and GPT4 作者&#xff1a;Valentina Alto 出版&#xff1a;Packt Publishing 书籍下载-《基于ChatGPT 和 Op…

【Unity】 HTFramework框架(四十八)使用Location设置Transform位置、旋转、缩放

更新日期&#xff1a;2024年5月14日。 Github源码&#xff1a;[点我获取源码] Gitee源码&#xff1a;[点我获取源码] 索引 Location定义Location复制Location变量的值复制Transform组件的Location值粘贴Location变量的值粘贴Location值到Transform组件在代码中使用Location Loc…

2.3 应用集成技术

第2章 信息技术知识 2.3 应用集成技术 2.3.1 数据库与数据仓库技术 数据库 以单一的数据源即数据库为中心进行事务处理、批处理、决策分析等各种数据处理工作操作型处理也称事务处理&#xff0c;指的是对联机数据库的日常操作&#xff0c;通常是对数据库中记录的查询和修改…

微信小程序主体变更的操作教程

小程序迁移变更主体有什么作用&#xff1f;进行小程序主体迁移变更&#xff0c;那可是益处多多呀&#xff01;比方说&#xff0c;能够解锁更多权限功能&#xff1b;在公司变更或注销时&#xff0c;还能保障账号的正常使用&#xff1b;此外&#xff0c;收购账号后&#xff0c;也…

Nat Plants | 植物抽核单细胞!多组学探究大豆根瘤成熟过程

发表时间&#xff1a;2023-04 发表期刊&#xff1a;Nature Plants 影响因子&#xff1a;17.352 DOI&#xff1a;10.1038/s41477-023-01387-z 研究背景 根瘤菌是亲和互作寄主植物&#xff0c;感染宿主并在根部形成共生器官根瘤&#xff0c;具有固氮…

vue3中通过自定义指令实现loading加载效果

前言 在现代Web开发中&#xff0c;提升用户体验一直是开发者们追求的目标之一。其中&#xff0c;一个常见的场景就是在用户与应用程序进行交互时&#xff0c;特别是当进行异步操作时&#xff08;如网络请求&#xff09;&#xff0c;为用户提供即时的反馈&#xff0c;避免用户因…

docker实验

1.Docker安装部署 &#xff08;1&#xff09;.关闭防火墙 &#xff08;2&#xff09;.更新源 &#xff08;3&#xff09;设置Docker仓库 &#xff08;4&#xff09;启动docker &#xff08;5&#xff09;查看版本&#xff1a; 2.Docker pull 容器并运行服务&#xff1b; 拉取…

项目9-网页聊天室1(注册+Bycrpt加密)

1.准备工作 1.1.前端页面展示 1.2 数据库的建立 我们通过注册页面&#xff0c;考虑如何设计用户表数据库。 用户id&#xff0c;userId用户名&#xff0c;唯一&#xff0c;username用户密码&#xff0c;password&#xff08;包括密码和确认密码ensurePssword【数据库没有该字段…

PXI/PXIe规格 A429/717 航电总线适配卡

A429是一款标准的PXI/PXIe1规格的多协议总线适配卡。该产品最多支持36个A429通道&#xff0c;或32个A429通道加4个A717通道&#xff0c;每个A429和A717通道可由软件配置成接收或发送&#xff0c;可满足A429总线和A717总线的通讯、测试和数据分析等应用需求。 该产品的每个A429通…

儿童身高成长:关注每一厘米的成长

引言&#xff1a; 儿童的身高发育是家长和教育者普遍关注的问题&#xff0c;它不仅关乎孩子的外貌形象&#xff0c;更与孩子的健康成长密切相关。本文将深入探讨儿童身高的注意事项&#xff0c;为家长和教育者提供科学的指导&#xff0c;帮助孩子健康成长。 1. 身高发育的基本知…