深入理解CyclicBarrier

文章目录

      • 1. 概念
      • 2. CylicBarier使用简单案例
      • 3. 源码

1. 概念

CyclicBarrier 字面意思回环栅栏(循环屏障),通过它可以实现让一组线程等待至某个状态(屏障点)之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。CyclicBarrier 作用是让一组线程相互等待,当达到一个共同点时,所有之前等待的线程再继续执行,且 CyclicBarrier 功能可重复使用。
在这里插入图片描述

2. CylicBarier使用简单案例

public class Main {
    public static void main(String[] args) throws  InterruptedException{
      CyclicBarrier cyclicBarrier=new CyclicBarrier(3);
        for (int i = 0; i < 5; i++) {
            new Thread(()->{
                try{
                    System.out.println(Thread.currentThread().getName()+"开始等待其它线程");
                    //阻塞直到指定方法的数量调用这个方法就会停止阻塞
                    cyclicBarrier.await();
                    System.out.println(Thread.currentThread().getName()+"开始执行");
                    Thread.sleep(5000);
                    System.out.println(Thread.currentThread().getName()+"执行完毕");

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

在这里插入图片描述

可以发现只有3个线程继续执行,剩余两个线程被阻塞

3. 源码

  • 构造方法
//这个构造方法有两个参数,分别是parties和一个任务,parties代表着屏障拦截的线程数量,每个线程调用 await 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。当阻塞的线程达到parties的数量时,就会执行barrieAction这个任务
public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        //使用两个变量存储parties,这也是parties可以复用的根本原因
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

 public CyclicBarrier(int parties) {
        this(parties, null);
    }
  • 重要方法
 public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

源码分析要点

1. 一组现场在触发屏障之前互相等待,最后一个线程到达屏障后唤醒逻辑是如何实现的
2. 栅栏循环是如何实现的
3. 条件队列到同步队列的转换实现逻辑

await()方法

   public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

发现里面实际逻辑调用的是dowait(false, 0L)方法

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
               //定义了一个ReentrantLock
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;

            if (g.broken)
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            //更新count方法
            int index = --count;
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    if (!timed)
                    //进入条件队列trip进行阻塞
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                     Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

上面方法最核心的就是更新count,然后判断count是否为0,如果为0就开始执行唤醒逻辑(这里先不考虑),如果不为0就会进入trip这个条件队列进行阻塞,下面分析线程是如何进行条件队列阻塞的。

//这是AQS类的一个方法
  public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
               
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            //判断当亲线程是不是同步队列,不是直接调用park进行阻塞
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

往条件等待队列中添加节点就是下面这句代码

 Node node = addConditionWaiter();
 private Node addConditionWaiter() {
           //获得条件队列的最后一个结点
            Node t = lastWaiter;
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            //如果为空就新创建一个节点
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
            //如果当前单向队列为空,直接让新创建的节点成为头节点
                firstWaiter = node;
            else
            //否则就放到尾节点的后面
                t.nextWaiter = node;
             //让尾指针指向当前节点
            lastWaiter = node;
            //返回当前节点
            return node;
        }

addConditionWaiter实际是AQS的内部类ConditionObject中实现的

public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        //条件队列的第一个节点
        private transient Node firstWaiter;
        //条件队列的最后一个节点
        private transient Node lastWaiter;
        public ConditionObject() { }
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            //如果条件队列为空,创建一个新的节点
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
            //让新创建的节点成为头节点和尾节点
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }
         private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

        /**
         * Removes and transfers all nodes.
         * @param first (non-null) the first node on condition queue
         */
        private void doSignalAll(Node first) {
            lastWaiter = firstWaiter = null;
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
        }
        private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
                Node next = t.nextWaiter;
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }

节点入队后就继续执行 public final void await() throws InterruptedException方法,当调用await()方法,我们需要释放持有的锁,也就是执行下面这句代码:

int savedState = fullyRelease(node);
 final int fullyRelease(Node node) {
        boolean failed = true;
        try {
        //获取state标记(独占锁如果state从0-1表示释放锁,从1-0表示占用锁
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }
  public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

释放锁后回到await()方法,调用下面代码进行实际阻塞

 while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }

上面就队线程阻塞以及入队的原理分析,下面分析count减到0,后是如何执行线程唤醒的,核心代码是:

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
    if (index == 0) {  // tripped
        boolean ranAction = false;
        try {
               final Runnable command = barrierCommand;
               if (command != null)
                    command.run();
                    ranAction = true;
                    //开始下一轮屏障
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

nextGeneration的代码如下:

    private void nextGeneration() {
        //唤醒条件队列的所有节点
        trip.signalAll();
        // 恢复count值
        count = parties;
        generation = new Generation();
    }

signalAll()唤醒条件队列中所有的节点

public class ConditionObject implements Condition, java.io.Serializable {
......
	private void doSignalAll(Node first) {
	//首尾节点置为null
            lastWaiter = firstWaiter = null;
            do {
            //获取首节点的下一个节点
                Node next = first.nextWaiter;
                //然后将first的nextWaiter指针置为空
                first.nextWaiter = null;
                //实现头部出队的节点怎么进入同步队列
                transferForSignal(first);
                //然后开始迭代处理下一个节点
                first = next;
            } while (first != null);
        }
......
}

下面分析头部出队的节点进入同步队列的逻辑

final boolean transferForSignal(Node node) {
		//使用CAS操作修改节点的状态
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        //节点入同步队列
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        //p节点的前驱节点置换为-1,这样就可以唤醒node节点,然后调用park进行阻塞
            LockSupport.unpark(node.thread);
        return true;
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/230664.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

智能优化算法应用:基于法医调查算法无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于法医调查算法无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于法医调查算法无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.法医调查算法4.实验参数设定5.算法结果6.参考…

Unity 状态系统

状态系统 原理食用方法Demo 原理 #mermaid-svg-lUbxJ8eMP3KqrEhY {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-lUbxJ8eMP3KqrEhY .error-icon{fill:#552222;}#mermaid-svg-lUbxJ8eMP3KqrEhY .error-text{fill:#55…

使用RSA工具进行对信息加解密

我们在开发中需要对用户敏感数据进行加解密&#xff0c;比如密码 这边科普一下RSA算法 RSA是非对称加密算法&#xff0c;与对称加密算法不同;在对称加密中&#xff0c;相同的密钥用于加密和解密数据,因此密钥的安全性至关重要;而在RSA非对称加密中&#xff0c;有两个密钥&…

一文说清google最新大模型Gemini

随着AI技术的快速发展&#xff0c;谷歌和其他科技巨头在研究和部署上的竞争也越来越激烈。本月12月6号谷歌CEO哈萨比斯在谷歌官网发文&#xff0c;宣布推出万众瞩目的多模态大模型Gemini。标题明晃晃写着“最大”、“最强”&#xff0c;主打的就是一个干爆GPT-4。 一、Gemini的…

unity 2d 入门 飞翔小鸟 场景延续(八)

1、新建c#脚本如下 代码&#xff0c;在前方生成生成自身图片并3s后销毁自身&#xff0c;在碰撞物体后小鸟死亡后不删除自身 using System.Collections; using System.Collections.Generic; using UnityEngine;public class CopyScene : MonoBehaviour { //要复制的对象public…

银行卡二要素API的应用案例:从在线购物到金融投资

引言 随着互联网技术的不断发展&#xff0c;人们的金融需求也在不断增加。随之而来的是各种新型金融服务的涌现&#xff0c;让用户的金融体验更加便利快捷。其中&#xff0c;银行卡二要素API的应用&#xff0c;则为用户的金融体验和安全性提供了极大的保障。 银行卡二要素API…

计算机存储单位 + 程序编译过程

C语言的编译过程 计算机存储单位 头文件包含的两种方式 使用 C/C 程序常用的IDE 常用的C语言编译器&#xff1a; 在选择编译器时&#xff0c;需考虑平台兼容性、性能优化、调试工具和开发人员的个人偏好等因素。 详细教程可转 爱编程的大丙

Python---random库

目录 基本随机数函数(): rand.seed() random() 扩展随机数函数(): random库包含两类函数&#xff1a;基本随机数函数&#xff0c;扩展随机数函数 基本随机数函数:seed(),random() 扩展随机数函数&#xff1a;randint,getrandbits(),uniform(),randrange(),choice(),shuff…

JumpServer初探

JumpServer资产&#xff0c;用户关系如图所示。 资产管理下有资产列表和系统用户&#xff0c;系统用户分为特权用户和普通用户。资产列表下管理的是服务器&#xff0c;而特权用户就是JumpServer用来登录服务器的账号&#xff0c;因此特权用户需要拥有较高的权限&#xff0c;比…

AWS攻略——Peering连接VPC

文章目录 创建IP/CIDR不覆盖的VPC创建VPC创建子网创建密钥对创建EC2 创建Peering接受Peering邀请修改各个VPC的路由表修改美东us-east-1 pulic subnet的路由修改悉尼ap-southeast-2路由 测试知识点 我们回顾下《AWS攻略——VPC初识》中的知识&#xff1a; 一个VPC只能设置在一…

五、HotSpot细节实现

一、并发标记与三色标记 问题&#xff1a;三色标记到底发生在什么阶段&#xff0c;替代了什么。并发标记 1、并发标记( Concurrent Marking) 从 GC Root 开始对堆中对象进行可达性分析&#xff0c;递归扫描整个堆里的对象图&#xff0c;找出要回收的对象&#xff0c;这阶段耗…

Vue学习计划-Vue2--VueCLi(二)vuecli脚手架创建的项目内部主要文件分析

1. 文件分析 1. 补充&#xff1a; 什么叫单文件组件&#xff1f; 一个文件中只有一个组件 vue-cli创建的项目中&#xff0c;.vue的文件都是单文件组件&#xff0c;例如App.vue 2. 进入分析 1. package.json: 项目依赖配置文件&#xff1a; 如图&#xff0c;我们说主要的属性…

C++学习之路(十九)C++ 用Qt5实现一个工具箱(用SQLite数据库来管理粘贴板数据)- 示例代码拆分讲解

上篇文章&#xff0c;我们用 Qt5 实现了在小工具箱中添加了《点击按钮以新窗口打开功能面板》功能。今天我们把粘贴板功能用SQLite数据库来管理&#xff0c;用SQLite来实现增删改查。下面我们就来看看如何来规划开发这样的小功能并且添加到我们的工具箱中吧。 老规矩&#xff…

实现:切换页面切换标题,扩展 vue-router 的类型

布局容器-页面标题 网址&#xff1a;https://router.vuejs.org/zh/guide/advanced/meta 给每一个路由添加 元信息 数据 router/index.ts const router createRouter({history: createWebHistory(import.meta.env.BASE_URL),routes: [{ path: /login, component: () > im…

LeetCode(52)最小栈【栈】【中等】

目录 1.题目2.答案3.提交结果截图 链接&#xff1a; 最小栈 1.题目 设计一个支持 push &#xff0c;pop &#xff0c;top 操作&#xff0c;并能在常数时间内检索到最小元素的栈。 实现 MinStack 类: MinStack() 初始化堆栈对象。void push(int val) 将元素val推入堆栈。void…

PandoraFMS 监控软件 任意文件上传漏洞复现

0x01 产品简介 Pandora FMS 是用于监控计算机网络的软件。 Pandora FMS 允许以可视化方式监控来自不同操作系统、服务器、应用程序和硬件系统(例如防火墙、代理、数据库、Web 服务器或路由器)的多个参数的状态和性能。 0x02 漏洞概述 PandoraFMS upload_head_image.php 接…

web前端开发html/css练习

目标图&#xff1a; 素材&#xff1a; 代码&#xff1a; <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> <html xmlns"http://www.w3.org/1999/xhtml"…

通过误差改变控制的两种策略

如果反馈误差越来越大&#xff0c;需要改变调节方向以减小误差并实现更好的控制。以下是两种常见的调节方向改变的方法&#xff1a; PID控制器中的积分限制&#xff1a;在PID控制中&#xff0c;积分项可以用来减小稳态误差。然而&#xff0c;当反馈误差持续增大时&#xff0c;积…

8、操作符重载

友元 可以通过friend关键字&#xff0c;把一个全局函数、另一个类的成员函数或者另一个类整体&#xff0c;声明为授权类的友元友元拥有访问授权类任何非公有成员的特权友元声明可以出现在授权类的公有、私有或者保护等任何区域且不受访问控制限定符的约束友元不是成员&#xf…

计算机概论第十三章

Answers are in blue. Computer Science Illuminated, Seventh Edition Nell Dale, PhD; John Lewis, PhD CHAPTER 13 EXERCISES AND ANSWERS For Exercises 1–5, match the type of ambiguity with an example. Lexical词汇歧义Referential指代歧义Syntactic句法 “Stand up…