JUC 笔记 8

1. Semaphore

信号量
基本使用
[ˈsɛməˌfɔr] 信号量,用来限制能同时访问共享资源的线程上限。

public static void main(String[] args) {
    // 1. 创建 semaphore 对象
    Semaphore semaphore = new Semaphore(3);
    // 2. 10个线程同时运行
    for (int i = 0; i < 10; i++) {
        new Thread(() -> {
            // 3. 获取许可
            try {
                semaphore.acquire();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                log.debug("running...");
                sleep(1);
                log.debug("end...");
            } finally {
                // 4. 释放许可
                semaphore.release();
            }
        }).start();
    }
 }

结果

07:35:15.485 c.TestSemaphore [Thread-2] - running... 
07:35:15.485 c.TestSemaphore [Thread-1] - running... 
07:35:15.485 c.TestSemaphore [Thread-0] - running... 
07:35:16.490 c.TestSemaphore [Thread-2] - end... 
07:35:16.490 c.TestSemaphore [Thread-0] - end... 
07:35:16.490 c.TestSemaphore [Thread-1] - end... 
07:35:16.490 c.TestSemaphore [Thread-3] - running... 
07:35:16.490 c.TestSemaphore [Thread-5] - running... 
07:35:16.490 c.TestSemaphore [Thread-4] - running... 
07:35:17.490 c.TestSemaphore [Thread-5] - end... 
07:35:17.490 c.TestSemaphore [Thread-4] - end... 
07:35:17.490 c.TestSemaphore [Thread-3] - end... 
07:35:17.490 c.TestSemaphore [Thread-6] - running... 
07:35:17.490 c.TestSemaphore [Thread-7] - running... 
07:35:17.490 c.TestSemaphore [Thread-9] - running... 
07:35:18.491 c.TestSemaphore [Thread-6] - end... 
07:35:18.491 c.TestSemaphore [Thread-7] - end... 
07:35:18.491 c.TestSemaphore [Thread-9] - end... 
07:35:18.491 c.TestSemaphore [Thread-8] - running... 
07:35:19.492 c.TestSemaphore [Thread-8] - end...

* Semaphore 应用 (实现简单连接池)

● 使用 Semaphore 限流,在访问高峰期时,让请求线程阻塞,高峰期过去再释放许可,当然它只适合限制单机线程数量,并且仅是限制线程数,而不是限制资源数(例如连接数,请对比 Tomcat LimitLatch 的实现)
● 用 Semaphore 实现简单连接池,对比『享元模式』下的实现(用wait notify),性能和可读性显然更好,注意下面的实现中线程数和数据库连接数是相等的

@Slf4j(topic = "c.Pool")
class Pool {
    // 1. 连接池大小
    private final int poolSize;
    // 2. 连接对象数组
    private Connection[] connections;
    // 3. 连接状态数组 0 表示空闲, 1 表示繁忙
    private AtomicIntegerArray states;
    private Semaphore semaphore;
    // 4. 构造方法初始化
    public Pool(int poolSize) {
        this.poolSize = poolSize;
        // 让许可数与资源数一致
        this.semaphore = new Semaphore(poolSize);
        this.connections = new Connection[poolSize];
        this.states = new AtomicIntegerArray(new int[poolSize]);
        for (int i = 0; i < poolSize; i++) {
            connections[i] = new MockConnection("连接" + (i+1));
        }
    }
    // 5. 借连接
    public Connection borrow() {// t1, t2, t3
        // 获取许可
        try {
            semaphore.acquire(); // 没有许可的线程,在此等待
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        for (int i = 0; i < poolSize; i++) {
            // 获取空闲连接
            if(states.get(i) == 0) {
                if (states.compareAndSet(i, 0, 1)) {
                    log.debug("borrow {}", connections[i]);
                    return connections[i];
                }
            }
        }
        // 不会执行到这里
        return null;
    }
    // 6. 归还连接
    public void free(Connection conn) {
        for (int i = 0; i < poolSize; i++) {
            if (connections[i] == conn) {
                states.set(i, 0);
                log.debug("free {}", conn);
                semaphore.release();
                break;
            }
        }
    }
    
}
    public static void main(String[] args) {
        Pool pool = new Pool(2);
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                Connection conn = pool.borrow();
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                pool.free(conn);
            }).start();
        }
    }

结果

已连接到地址为 ''127.0.0.1:14924',传输: '套接字'' 的目标虚拟机
10:34:36.915 c.Pool [Thread-0] - borrow MockConnection{name='连接1'}
10:34:36.915 c.Pool [Thread-2] - borrow MockConnection{name='连接2'}
10:34:37.927 c.Pool [Thread-2] - free MockConnection{name='连接2'}
10:34:37.927 c.Pool [Thread-0] - free MockConnection{name='连接1'}
10:34:37.928 c.Pool [Thread-1] - borrow MockConnection{name='连接1'}
10:34:37.928 c.Pool [Thread-3] - borrow MockConnection{name='连接2'}
10:34:38.936 c.Pool [Thread-1] - free MockConnection{name='连接1'}
10:34:38.936 c.Pool [Thread-3] - free MockConnection{name='连接2'}
10:34:38.936 c.Pool [Thread-4] - borrow MockConnection{name='连接1'}
10:34:39.945 c.Pool [Thread-4] - free MockConnection{name='连接1'}
已与地址为 ''127.0.0.1:14924',传输: '套接字'' 的目标虚拟机断开连

* Semaphore 原理

  1. 加锁解锁流程
    Semaphore 有点像一个停车场,permits 就好像停车位数量,当线程获得了 permits 就像是获得了停车位,然后 停车场显示空余车位减一

刚开始,permits(state)为 3,这时 5 个线程来获取资源

在这里插入图片描述
假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列

park 阻塞

在这里插入图片描述
这时 Thread-4 释放了 permits,状态如下

在这里插入图片描述
接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态
在这里插入图片描述

2. 源码分析

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;
    NonfairSync(int permits) {
        // permits 即 state
        super(permits);
    }
    
    // Semaphore 方法, 方便阅读, 放在此处
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    // AQS 继承过来的方法, 方便阅读, 放在此处
    public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
    
    // 尝试获得共享锁
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
    
    // Sync 继承过来的方法, 方便阅读, 放在此处
    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            int available = getState();
            int remaining = available - acquires; 
            if (
                // 如果许可已经用完, 返回负数, 表示获取失败, 进入 doAcquireSharedInterruptibly
                remaining < 0 ||
                // 如果 cas 重试成功, 返回正数, 表示获取成功
                compareAndSetState(available, remaining)
            ) {
                return remaining;
            }
        }
    }
    
    // AQS 继承过来的方法, 方便阅读, 放在此处
    private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    // 再次尝试获取许可
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 成功后本线程出队(AQS), 所在 Node设置为 head
                        // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
                        // 如果 head.waitStatus == 0 ==> Node.PROPAGATE 
                        // r 表示可用资源数, 为 0 则不会继续传播
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                // 不成功, 设置上一个节点 waitStatus = Node.SIGNAL, 下轮进入 park 阻塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    // Semaphore 方法, 方便阅读, 放在此处
    public void release() {
        sync.releaseShared(1);
    }
    
    // AQS 继承过来的方法, 方便阅读, 放在此处
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    
    // Sync 继承过来的方法, 方便阅读, 放在此处
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
            int next = current + releases;
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            if (compareAndSetState(current, next))
                return true;
        }
    }
}

3. 为什么要有 PROPAGATE

早期有 bug
● releaseShared 方法

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
 return false; 
 }

doAcquireShared 方法

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 这里会有空档
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

setHeadAndPropagate 方法

private void setHeadAndPropagate(Node node, int propagate) {
    setHead(node);
    // 有空闲资源
    if (propagate > 0 && node.waitStatus != 0) {
        Node s = node.next;
        // 下一个
        if (s == null || s.isShared())
            unparkSuccessor(node);
    }
}
  • 假设存在某次循环中队列里排队的结点情况为head(-1)->t1(-1)->t2(-1)
  • 假设存在将要信号量释放的 T3 和 T4,释放顺序为先 T3 后 T4

在这里插入图片描述
产生bug的情况

在这里插入图片描述
修复前版本执行流程

    1. T3 调用 releaseShared(1),直接调用了 unparkSuccessor(head),head 的等待状态从 -1 变为 0
    1. T1 由于 T3 释放信号量被唤醒,调用 tryAcquireShared,假设返回值为 0(获取锁成功,但没有剩余资源 量)
    1. T4 调用 releaseShared(1),此时 head.waitStatus 为 0(此时读到的 head 和 1 中为同一个head),不满足条件,因此不调用 unparkSuccessor(head)
    1. T1 获取信号量成功,调用 setHeadAndPropagate 时,因为不满足 propagate > 0(2 的返回值也就是propagate(剩余资源量) == 0),从而不会唤醒后继结点, T2 线程得不到唤醒

bug 修复后

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    // 设置自己为 head
    setHead(node);
    // propagate 表示有共享资源(例如共享读锁或信号量)
    // 原 head waitStatus == Node.SIGNAL 或 Node.PROPAGATE
    // 现在 head waitStatus == Node.SIGNAL 或 Node.PROPAGATE
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        // 如果是最后一个节点或者是等待共享读锁的节点
        if (s == null || s.isShared()) {
            doReleaseShared();
        }
    }
}

private void doReleaseShared() {
    // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
    // 如果 head.waitStatus == 0 ==> Node.PROPAGATE 
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue; // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue; // loop on failed CAS
        }
        if (h == head) // loop if head changed
            break;
    }
}

在这里插入图片描述

    1. T3 调用 releaseShared(),直接调用了 unparkSuccessor(head),head 的等待状态从 -1 变为 0
    1. T1 由于 T3 释放信号量被唤醒,调用 tryAcquireShared,假设返回值为 0(获取锁成功,但没有剩余资源量)
    1. T4 调用 releaseShared(),此时 head.waitStatus 为 0(此时读到的 head 和 1 中为同一个 head),调用 doReleaseShared() 将等待状态置为 PROPAGATE(-3)
    1. T1 获取信号量成功,调用 setHeadAndPropagate 时,读到 h.waitStatus < 0,从而调用 doReleaseShared() 唤醒 T2

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/677553.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

一款史上最强的智能优化算法软件,MATLAB APP Designer开发

很久没有整理干货了&#xff0c;原因是最近一直在精心打磨一款智能优化算法APP&#xff0c;前前后后改了很多次&#xff0c;今天终于完工了&#xff01;接下来跟我一起来看看这款软件吧&#xff01; 目录 引言 01 单个算法测试介绍 02多种算法对比介绍 03软件安装及一些限…

深入解析ETL与ELT架构:数据集成技术的演进与发展

摘要&#xff1a;随着大数据时代的到来&#xff0c;数据集成成为企业信息化建设的重要环节。本文将深入探讨ETL与ELT两种架构&#xff0c;分析它们在数据处理、性能、可扩展性等方面的差异&#xff0c;为企业数据集成提供技术指导。 一、引言 在大数据时代&#xff0c;企业需要…

slf4j等多个jar包冲突绑定的排查方法使用IDEA的maven help解决

1.安装 2.使用maven help解决&#xff0c;找到对应包存在的冲突 使用exclude直接解决即可

HTML跨年烟花

目录 写在前面 关于小编 HTML简介 程序设计 系列文章 写在后面 写在前面 学会了这个html烟花秀&#xff0c;跨年就不缺文案喽~ 关于小编 平易近人&#xff0c;慈眉善目&#xff0c;爱交朋友&#xff0c;舍己为人&#xff0c;和蔼可亲&#xff0c;能说会道&#xff0c;…

解决 Mac Django 连接Mysql 出现 image not found 问题

最近在使用 Django 框架&#xff0c;因为升级到4.2版本了&#xff0c;对应的本机 Mysql 5.7 就不适用了&#xff0c;于是升级到了 Mysql 8.0&#xff0c;写好代码之后出现如下错误&#xff1a; 仔细分析一下错误的描述&#xff1a; ImportError: dlopen(/Library/Frameworks/P…

【安装笔记-20240529-Windows-Electerm 终端工具】

安装笔记-系列文章目录 安装笔记-20240529-Windows-Electerm 终端工具 文章目录 安装笔记-系列文章目录安装笔记-20240529-Windows-Electerm 终端工具 前言一、软件介绍名称&#xff1a;electerm主页官方介绍功能特性 二、安装步骤测试版本&#xff1a;electerm-1.39.35-win-x…

Pixi绘制地图和小车

之前已经用Pixi绘制出了各种图形以及通过图片绘制精灵&#xff0c;这节用pixi绘制网格地图&#xff0c;并通过图片制作一个Sprite&#xff0c;让这个Sprite在网格地图上运动。首先需要在页面中添加一个div用来后期展示canvas的画布&#xff0c;并将此div实例化为PIXI的Applicat…

Doris 少数SQL在Datagrip无法执行,而在DorisUI或程序调用可以执行的问题

问题&#xff1a;Doris 少数SQL在Datagrip无法执行&#xff0c;而在DorisUI或程序调用可以执行 解决&#xff1a;Datagrip 执行SQL切分异常&#xff0c;设置默认执行语句方式&#xff0c;将分句改为整句执行 但是 支持多SQL批量分开执行更好用

茶树三维基因组-文献精读19

The high-resolution three-dimensional (3D) chromatin map of the tea plant (Camellia sinensis) 茶树&#xff08;Camellia sinensis&#xff09;的高分辨率三维染色质图&#xff0c;还记得茶属的转录组分析嘛~ 比较转录组分析揭示了116种山茶属(Camellia)植物的深层系统…

IDEA下项目发送到Gitee

一、首先在Gitee创建一个仓库&#xff08;什么都不选&#xff0c;这是最简单的方式&#xff0c;否则需要 pull push等一些操作&#xff0c;我嫌麻烦&#xff09; 二、按图点击&#xff08;创建存储区&#xff0c;选择你要上传的项目&#xff09; 三、按图点击后正常文件名会变绿…

ChatTTS 如何安装可视化操作

可视化一键安装下载地址&#xff1a; 百度网盘 Download from GitHub 从 GitHub 下载代码。 git clone https://github.com/2noise/ChatTTS 下载地址 Install Dependencies 在开始之前&#xff0c;请确保已安装必要的软件包。如果您尚未安装它们&#xff0c;可以使用 pip …

51种企业应用架构模式详解

01 什么是企业应用 我的职业生涯专注于企业应用&#xff0c;因此&#xff0c;这里所谈及的模式也都是关于企业应用的。&#xff08;企业应用还有一些其他的说法&#xff0c;如“信息系统”或更早期的“数据处理”。&#xff09;那么&#xff0c;这里的“企业应用”具体指的是什…

芯片验证分享1 —— 开篇及名词解释

大家好&#xff0c;我是谷公子的藏经阁&#xff0c;今天和大家很高兴能和大家分享的是芯片验证中的一些内容&#xff0c;希望对大家的日常工作有所帮助&#xff0c;如果这些内容有帮助到大家的话&#xff0c;那么此次的分享就很值得。另外&#xff0c;对于这个课题&#xff0c;…

Vue2里CSS动画实际应用之transform属性和animation属性的使用

最近项目需要做个简单的动画&#xff0c;如上图&#xff0c;框出来的图片需要上下浮动在Y轴上来回循环的移动&#xff0c;这个要用到如下css代码&#xff1a; .active-image-7 {animation: 5s float7 linear infinite normal;}keyframes float7{0% {transform: translateY(0px…

毫米波雷达阵列天线设计综合1(MATLAB仿真)

1 天线设计目标 毫米波雷达探测目标的距离、速度和角度&#xff0c;其中距离和角度和天线设计相关性较强。天线增益越高&#xff0c;则根据雷达方程可知探测距离越远&#xff1b;天线波束越窄&#xff0c;则角度分辨率越高&#xff1b;天线副瓣/旁瓣越低&#xff0c;则干扰越少…

C++第二十三弹---深入理解STL中list的使用

✨个人主页&#xff1a; 熬夜学编程的小林 &#x1f497;系列专栏&#xff1a; 【C语言详解】 【数据结构详解】【C详解】 目录 1、list的介绍 2、list的使用 2.1、构造函数 2.2、赋值操作符重载 2.3、迭代器使用 2.4、容量操作 2.5、元素访问 2.6、修改操作 2.7、其…

从0-1实现大模型

github: LLMs-from-scratch/ch02/01_main-chapter-code 数据 Data sampling with a sliding window We train LLMs to generate one word at a time, so we want to prepare the training data accordingly where the next word in a sequence represents the target to predi…

【设计模式】JAVA Design Patterns——Observer(观察者模式)

&#x1f50d;目的 定义一种一对多的对象依赖关系这样当一个对象改变状态时&#xff0c;所有依赖它的对象都将自动通知或更新。 &#x1f50d;解释 真实世界例子 在遥远的土地上生活着霍比特人和兽人的种族。他们都是户外生活的人所以他们密切关注天气的变化。可以说他们不断地…

Baidu Comate帮开发者“代码搬砖”,2天搞定原先3周工作量

日常项目基础工作耗费大量时间、紧急任务一连“肝”几个大夜……对于一个计算机相关专业研究生来说&#xff0c;几乎是家常便饭。随着大模型能力赋能编码工具&#xff0c;被开发者们戏称的“代码搬砖”生活有了起色。 从去年开始&#xff0c;PPDE 飞桨开发者技术专家、澳门理工…

Quantlab 4.1:基于Deap遗传算法多股票因子挖掘

原创文章第549篇&#xff0c;专注“AI量化投资、世界运行的规律、个人成长与财富自由"。 遗传算法本身并不复杂&#xff0c;但gplearn的实现&#xff0c;把问题复杂化了&#xff0c;尤其在因子挖掘这个场景。 使用deap进行因子挖掘的代码在如下位置&#xff1a; import …