【Java并发编程】信号量Semaphore详解

一、简介

Semaphore(信号量):是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

Semaphore 一般用于流量的控制,特别是公共资源有限的应用场景。例如数据库的连接,假设数据库的连接数上线为10个,多个线程并发操作数据库可以使用Semaphore来控制并发操作数据库的线程个数最多为10个。

Semaphore 是一个有效的流量控制工具,它基于 AQS 共享锁实现。我们常常用它来控制对有限资源的访问。

  • 每次使用资源前,先申请一个信号量,如果资源数不够,就会阻塞等待;
  • 每次释放资源后,就释放一个信号量。

二、源码

2.1 类总览

通过上面的类图可以看到,SemaphoreReentrantLock 的内部类的结构相同,类内部总共存在 SyncNonfairSyncFairSync 三个类, NonfairSync 与 FairSync 类继承自 Sync 类,其只有一个 tryAcquireShared() 方法,重写了AQS的该方法。Sync 类继承自 AbstractQueuedSynchronizer 抽象类。

CountDownLatch 类似,Semaphore 主要是通过 AQS 的共享锁机制实现的,因此它的核心属性只有一个 Sync。总体源码如下:

public class Semaphore implements java.io.Serializable {
    //序列化版本号
    private static final long serialVersionUID = -3222578661600680210L;
    //同步队列
    private final Sync sync;
    
    //构造方法
    //指定许可数,默认为非公平策略
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    
    //指定许可数和是否公平策略
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

    //Semaphore提供了acquire方法来获取一个许可,会阻塞线程(有重载方法,可以指定获取许可的个数)
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1); //调用AQS的acquireSharedInterruptibly方法, 即共享式获取响应中断
    }
    //tryAcquire的意思是尝试获取许可,如果获取成功返回true,否则返回false,不会阻塞线程,而且不响应中断
    public boolean tryAcquire() {
        return sync.nonfairTryAcquireShared(1) >= 0;
    }
    
    //Semaphore提供release来释放许可
    public void release() {
        sync.releaseShared(1); //调用AQS的releaseShared方法,即释放共享式同步状态
    }
    
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;
    
        Sync(int permits) {
            setState(permits);
        }

        //获取许可数目    
        final int getPermits() {
            return getState();
        }
    
        //共享模式下非公平策略获取
        //本质就是一个自旋方法,通过自旋+CAS来保证修改许可值的线程安全性,该方法返回的情况有如下两种情况:
        //   信号量不够,直接返回,返回值为负数,表示获取失败;
        //   信号量足够,且CAS操作成功,返回值为剩余许可值,获取成功。
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) { //自旋
                int available = getState(); //获取可用许可值
                int remaining = available - acquires; //计算剩余的许可值
                //如果剩余许可值小于0,说明许可不够用了,直接返回,否则CAS更新许可值,更新成功返回,否则继续自旋
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    
        //共享模式下进行释放
        //该方法也是一个自旋方法,通过自旋+CAS原子性地修改许可值
        protected final boolean tryReleaseShared(int releases) {
            for (;;) { //自旋
                int current = getState(); //获取许可值
                int next = current + releases; //计算释放后的许可值
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next)) //CAS修改许可值,成功则返回,失败则继续自旋
                    return true;
            }
        }
    
        //根据指定的缩减量减小可用许可的数目
        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }
        
        //获取并返回立即可用的所有许可数目
        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }
    
    //采用非公平策略获取资源
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;
    
        NonfairSync(int permits) {
            super(permits);
        }
        
        //获取许可
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires); //共享模式下非公平策略获取
        }
    }
    
    //采用公平策略获取资源
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;
    
        FairSync(int permits) {
            super(permits);
        }
    
        //获取许可
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                //获取共享锁之前,先调用hasQueuedPredecessors方法来判断队列中是否存在其他正在排队的节点,
                // 如果是返回true,否则为false。因此当存在其他正在排队的节点,当前节点就无法获取许可,只能排队等待,这也是公平策略的体现。
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }
}

2.2 核心方法

获取信号量的方法总共有四个:

释放信号量的方法有两个:

获取信号量四个方法中后面三个方法原理同 acquire() ,我们这里来分析一下 acquire() 和 release() 方法。

2.2.1 acquire() 方法

获取许可,会阻塞线程,响应中断。

// Semaphore
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

内部调用的是 AQSacquireSharedInterruptibly() 方法, 即共享式获取响应中断,代码如下:

// AbstractQueuedSynchronizer
public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

除了 tryAcquireShared() 方法由 AQS 子类实现,其他方法在 《AQS实现原理》中有讲解过,这里不再赘述。我们来分析一下子类实现的 tryAcquireShared() 方法,这里就要分公平和非公平策略两种情况了。

2.2.1.1 非公平策略下

非公平策略下的 tryAcquireShared() 方法:

// Semaphore#NonfairSync
protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

内部调用 Sync#nonfairTryAcquireShared() 方法:

// Sync
final int nonfairTryAcquireShared(int acquires) {
    //自旋
    for (;;) {
        //获取可用许可值
        int available = getState();
        //计算剩余的许可值
        int remaining = available - acquires;
        //如果剩余许可值小于0,说明许可不够用了,直接返回,否则CAS更新同步状态,更新成功返回,否则继续自旋
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

该方法本质就是一个自旋方法,通过自旋+CAS来保证修改许可值的线程安全性。方法返回的情况有如下两种情况

  • 信号量不够,直接返回,返回值为负数,表示获取失败;
  • 信号量足够,且CAS操作成功,返回值为剩余许可值,获取成功。

2.2.1.2 公平策略下

公平策略下的 tryAcquireShared() 方法如下:

// Semaphore#FairSync
protected int tryAcquireShared(int acquires) {
    //自旋
    for (;;) {
        if (hasQueuedPredecessors())
            return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

我们看到它与非公平策略的唯一区别就是多了下面这个 if 代码:

protected int tryAcquireShared(int acquires) {
    for (;;) {
        if (hasQueuedPredecessors())
            return -1;
        ......
    }
}

// AbstractQueuedSynchronizer
public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

即在获取共享锁之前,先调用 hasQueuedPredecessors() 方法来判断队列中是否存在其他正在排队的节点,如果是返回true,否则为false。因此当存在其他正在排队的节点,当前节点就无法获取许可,只能排队等待,这也是公平策略的体现。

2.2.2 release() 方法

Semaphore 提供 release() 方法来释放许可。我们继续分析 release() 方法,源码如下:

// Semaphore
public void release() {
    sync.releaseShared(1);
}

//AbstractQueuedSynchronizer
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        //如果释放锁成功,唤醒正在排队的节点
        doReleaseShared();
        return true;
    }
    return false;
}

//Semaphore#Sync
protected final boolean tryReleaseShared(int releases) {
    //自旋
    for (;;) {
        //获取许可值
        int current = getState();
        //计算释放后的许可值
        int next = current + releases;
        //如果释放后比释放前的许可值还小,直接报Error
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        //CAS修改许可值,成功则返回,失败则继续自旋
        if (compareAndSetState(current, next))
            return true;
    }
}

tryReleaseShared() 方法是一个自旋方法,通过自旋+CAS原子性地修改同步状态,逻辑很简单。

2.2.3 其余方法

获取信号量的方法有四个:

释放信号量的方法有两个:

其余获取和释放信号量的方法原理同上问,不再赘述。接下来看看其余的工具方法。

2.2.3.1 tryAcquire() 尝试获取许可

该方法一共有四种重载形式:

  • tryAcquire() :尝试获取许可,如果获取成功返回true,否则返回false,不会阻塞线程,而且不响应中断。
  • tryAcquire(int permits) :同上的基础上,可以指定获取许可的个数。
  • tryAcquire(long timeout, TimeUnit unit) :指定超时时间,它调用AQS的tryAcquireSharedNanos() 方法,即共享式超时获取。
  • tryAcquire(int permits, long timeout, TimeUnit unit) :可以指定获取许可的个数和超时时间。
//Semaphore
public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;
}

public boolean tryAcquire(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.nonfairTryAcquireShared(permits) >= 0;
}

public boolean tryAcquire(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
2.2.3.2 availablePermits() 获取可用许可数

源码如下:

//Semaphore
public int availablePermits() {
    //获取可用许可数
    return sync.getPermits();
}

//Sync
//获取可用许可数
final int getPermits() {
    return getState();
}

2.2.3.3 drainPermits() 耗光信号量

将剩下的信号量一次性消耗光,并且返回所消耗的信号量。

//Semaphore
public int drainPermits() {
    return sync.drainPermits();
}

//Sync
final int drainPermits() {
    //自旋操作
    for (;;) {
        //获取信号量值
        int current = getState();
        //如果信号量为0,直接返回
        //否则CAS修改为0,成功则返回,否则继续自旋
        if (current == 0 || compareAndSetState(current, 0))
            return current;
    }
}
2.2.3.4 reducePermits() 减少信号量

reducePermits() 和 acquire() 方法相比都是减少信号量的值,但是 reducePermits() 不会导致任何线程阻塞,即只要传递的参数 reductions(减少的信号量的数量)大于0,操作就会成功。所以调用该方法可能会导致信号量最终为负数

//Semaphore
protected void reducePermits(int reduction) {
    if (reduction < 0) throw new IllegalArgumentException();
    sync.reducePermits(reduction);
}

//Sync
final void reducePermits(int reductions) {
    //自旋
    for (;;) {
        //获取当前信号量值
        int current = getState();
        //计算剩余许可值
        int next = current - reductions;
        if (next > current) // underflow
            throw new Error("Permit count underflow");
        //CAS修改同步状态,成功则返回,失败则继续自旋
        if (compareAndSetState(current, next))
            return;
    }
}

三、使用案例

这里以经典的停车作为案例。假设停车场有3个停车位,此时有5辆汽车需要进入停车场停车。

public static void main(String[] args) {
    //定义semaphore实例,设置许可数为3,即停车位为3个
    Semaphore semaphore = new Semaphore(3);
    //创建五个线程,即有5辆汽车准备进入停车场停车
    for (int i = 1; i <= 5; i++) {
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + "尝试进入停车场...");
                //尝试获取许可
                semaphore.acquire();
                //模拟停车
                long time = (long) (Math.random() * 10 + 1);
                System.out.println(Thread.currentThread().getName() + "进入了停车场,停车" + time +
                                   "秒...");
                Thread.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println(Thread.currentThread().getName() + "开始驶离停车场...");
                //释放许可
                semaphore.release();
                System.out.println(Thread.currentThread().getName() + "离开了停车场!");
            }
        }, i + "号汽车").start();
    }
}
//执行结果
1号汽车尝试进入停车场...
5号汽车尝试进入停车场...
4号汽车尝试进入停车场...
3号汽车尝试进入停车场...
2号汽车尝试进入停车场...
5号汽车进入了停车场,停车5秒...
1号汽车进入了停车场,停车8秒...
4号汽车进入了停车场,停车9秒...
5号汽车开始驶离停车场...
5号汽车离开了停车场!
3号汽车进入了停车场,停车10秒...
1号汽车开始驶离停车场...
1号汽车离开了停车场!
2号汽车进入了停车场,停车2秒...
4号汽车开始驶离停车场...
4号汽车离开了停车场!
2号汽车开始驶离停车场...
2号汽车离开了停车场!
3号汽车开始驶离停车场...
3号汽车离开了停车场!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/902346.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Python | Leetcode Python题解之第516题最长回文子序列

题目&#xff1a; 题解&#xff1a; class Solution:def longestPalindromeSubseq(self, s: str) -> int:n len(s)dp [[0] * n for _ in range(n)]for i in range(n - 1, -1, -1):dp[i][i] 1for j in range(i 1, n):if s[i] s[j]:dp[i][j] dp[i 1][j - 1] 2else:dp…

从病理AI的基础模型发展历程,看未来的医学AI发展趋势|个人观点·24-10-23

小罗碎碎念 在临床相关的人工智能&#xff08;AI&#xff09;模型发展方面&#xff0c;传统上需要大量标注数据集&#xff0c;这使得AI的进步主要围绕大型中心和私营企业展开。所以&#xff0c;在这期推文中&#xff0c;我会介绍一些已经商用的模型&#xff0c;并且为计划进军…

逻辑推理学习笔记

目的 立场辩护整理思绪 基本框架 论题 &#xff08;变化&#xff09; 我要证明&#xff08;讨论对象 变化&#xff09; 论据 &#xff08;变化&#xff09; 拿什么证明&#xff1f;也就是证据呈现。 论证 &#xff08;不变&#xff09; 要如何证明&#xff1f;逻辑框架…

通过conda install -c nvidia cuda=“11.3.0“ 安装低版本的cuda,但是却安装了高版本的12.4.0

问题 直接通过 conda install -c nvidia cuda"11.3.0"安装得到的却是高版本的 不清楚原理 解决方法 不过我们可以分个安装 runtime toolkit 和 nvcc 安装指定版本的 cudatoolkit 和 nvcc conda install -c nvidia cuda-cudart"11.3.58" conda instal…

【Linux系统编程】——Linux入门指南:从零开始掌握操作系统的核心(指令篇)

文章目录 查看 Linux 主机 ip以及登录主机Linux基础文件操作指令man&#xff1a;查看命令的手册页&#xff0c;了解命令的详细用法。pwd&#xff1a;显示当前目录路径。cd&#xff1a;切换目录。ls&#xff1a;列出当前目录下的文件和文件夹。mkdir&#xff1a;创建新目录。 文…

第三讲、C的运算符和表达式

一、运算符分类&#xff1a; &#xff08;1&#xff09;按运算对象的数目&#xff1a; 单目运算符 双目运算符 三目运算符 &#xff08;2&#xff09;按运算对象的数目&#xff1a; 算术运算符、赋值运算符、关系运算符、逻辑运算符、位运算符、自增自减运算符、…

菜叶子芯酸笔记3:GPU、GPGPU、CUDA之间的关系;CUDA之外;Tensor Core

我今天看到B站一个up主很好的资料【云计算科普研究所的个人空间-云计算科普研究所个人主页-哔哩哔哩视频】&#xff0c;结合我这周的积累整理了这份我觉得相比之前逻辑更加完善的笔记。 先是GPU到GPGPU 到CUDA之间进化关系部分&#xff0c;然后CUDA之外的友商竞品部分&#xf…

orbslam安装

1.linux操作命令 pwd&#xff1a;查看终端所在路径 cd&#xff1a;切换路径 cd ..&#xff1a;跳回到上级目录 ls: 列出当前路径下的所有文件夹 touch&#xff1a;创建新的文件 mv &#xff1a;移动文件(在该文件所在目录的路径下执行此操作) 例如&#xff1a;mv test_file /ho…

vue3中mitt和pinia的区别和主要用途,是否有可重合的部分?

在 Vue 中&#xff0c;Mitt 和 Pinia 是两个不同的工具&#xff0c;它们的主要用途和功能有所不同&#xff0c;但在某些方面也存在重合的部分。 区别 Mitt&#xff1a; Mitt 是一个简单而强大的事件总线库&#xff0c;用于在组件之间进行事件的发布和订阅。 它提供了一种简洁…

讲一讲 kafka 的 ack 的三种机制?

大家好&#xff0c;我是锋哥。今天分享关于【K讲一讲 kafka 的 ack 的三种机制&#xff1f;】面试题&#xff1f;希望对大家有帮助&#xff1b; 讲一讲 kafka 的 ack 的三种机制&#xff1f; 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 Kafka的消息确认机制&…

python实战项目46:selenium爬取百度新闻

python实战项目46:selenium爬取百度新闻 一、项目简介二、完整代码一、项目简介 思路是首先使用selenium打开百度新闻页面,然后实现翻页操作,获取每条新闻的标题和链接。接下来的问题是,在遍历标题和链接,对每一个链接发送请求时,发现会弹出百度安全验证,本文的思路是使…

远程root用户访问服务器中的MySQL8

一、Ubuntu下的MySQL8安装 在Ubuntu系统中安装MySQL 8.0可以通过以下步骤进行1. 更新包管理工具的仓库列表&#xff1a; sudo apt update 2. 安装MySQL 8.0&#xff0c;root用户默认没有密码&#xff1a; sudo apt install mysql-server sudo apt install mysql-client 【…

动态规划 - 背包问题 - 01背包

01背包问题 二维数组 1. 确定dp数组&#xff08;dp table&#xff09;以及下标的含义&#xff1a;dp[i][j]-下标为[0,i]的物品&#xff0c;任取放容量为j的背包中的最大价值 2. 确定递推公式&#xff1a;dp[i][j] max(dp[i-1][j]&#xff08;不放物品i), dp[i-1][j-weight[i]]…

研发效能DevOps: Vite 使用 Vue Router

目录 一、实验 1.环境 2.初始化前端项目 3.安装vue-router 4.Vite 使用 Vue Router 二、问题 1.运行出现空页面 2.Vue Router如何禁止页面回退 一、实验 1.环境 &#xff08;1&#xff09;主机 表1 主机 系统 软件版本备注Windows11VS Code1.94.2Node.jsv18.20.4(LT…

Redis 篇-深入了解在 Linux 的 Redis 网络模型结构及其流程(阻塞 IO、非阻塞 IO、IO 多路复用、异步 IO、信号驱动 IO)

&#x1f525;博客主页&#xff1a; 【小扳_-CSDN博客】 ❤感谢大家点赞&#x1f44d;收藏⭐评论✍ 文章目录 1.0 用户空间与内核空间概述 2.0 Redis 网络模型 2.1 Redis 网络模型 - 阻塞 IO 2.2 Redis 网络模型 - 非阻塞 IO 2.3 Redis 网络模型 - IO 多路复用 2.3.1 IO 多路复…

WPF LiveChart控件基础属性介绍

WPF LiveChart控件基础属性介绍 在Nuget添加方法如下&#xff1a; 然后在xaml中添加引用&#xff1a; xmlns:lvc"clr-namespace:LiveCharts.Wpf;assemblyLiveCharts.Wpf"调用控件&#xff1a; <lvc:CartesianChart Name"chart" Margin"40"…

Java应用程序的服务器有哪些

1.Tomcat、Jetty 和 JBoss 区别&#xff1f; Apache Tomcat、Jetty 和 JBoss都是用于部署Java应用程序的服务器&#xff0c;它们都支持Servlet、JSP和其他Java EE&#xff08;现在称为Jakarta EE&#xff09;技术。尽管它们有一些相似的功能&#xff0c;但它们之间还是存在一些…

DownUnderCTF web sniffy

题目中给了源码 在index.php中将flag的值赋给了session[flag] session[theme]接收GET传入的theme参数。。。??是PHP中的空合并运算符它的作用是检查左侧的值是否存在且不为null。如果存在&#xff0c;则返回左侧的值&#xff1b;如果不存在&#xff0c;则返回右侧的值。 …

用友U8接口-采购管理(8)

概括 本文的操作需要正确部署U8API主要讲述采购管理接口的使用&#xff0c;以采购订单为例&#xff0c;其他单据接口都是大同小异的&#xff01;许多时候先在ERP做个单&#xff0c;然后仿造ERP单据参数&#xff0c;构造接口JSON参数是不错的做法哦 ERP单据金额计算 在ERP的许…

3DCAT亮相2024中国国际消费电子博览会,引领AI潮流

2024年10月18日-20日&#xff0c;备受瞩目的2024中国国际消费电子博览会&#xff08;以下简称“电博会”&#xff09;在青岛国际会展中心&#xff08;红岛馆&#xff09;盛大开幕。作为消费电子领域的盛会&#xff0c;本次电博会吸引了国内外300多家企业参展&#xff0c;展示了…