JDK-反应流(响应式流)

归档

  • GitHub: JDK-反应流(响应式流)

使用示例

  • https://github.com/zengxf/small-frame-demo/blob/master/multi-thread/reactive-test/reactor-demo/src/main/java/cn/zxf/reactor_demo/jdk/PubSubTest.java

JDK 版本

openjdk version "17" 2021-09-14
OpenJDK Runtime Environment (build 17+35-2724)
OpenJDK 64-Bit Server VM (build 17+35-2724, mixed mode, sharing)

原理

关键类

  • java.util.concurrent.SubmissionPublisher
// 提交式-发布者
public class SubmissionPublisher<T> implements Publisher<T>, AutoCloseable {
    BufferedSubscription<T> clients;    // 客户端(BufferedSubscriptions)链表
    final ReentrantLock lock;           // 锁定以排除多个源
    volatile boolean closed;            // 运行状态,仅在锁内更新
    boolean subscribed; // 在第一次调用订阅时设置 true,以初始化可能的拥有者
    Thread owner;       // 第一个要订阅的调用者线程,如果线程发生更改则为 null
    volatile Throwable closedException; // closeExceptionally 中的异常
    final Executor executor;            // 用于构造 BufferedSubscriptions 的参数
    final BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler;
    final int maxBufferCapacity;

    // 调用入口 ref: sign_demo_001
    public SubmissionPublisher() {
        this(ASYNC_POOL, Flow.defaultBufferSize(), null);   // ref: sign_cm_002
    }

    // sign_cm_002
    public SubmissionPublisher(
        Executor executor, int maxBufferCapacity,
        BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler
    ) {
        ... // 参数校验
        this.lock = new ReentrantLock();
        this.executor = executor;   // def: ForkJoinPool
        this.onNextHandler = handler;
        this.maxBufferCapacity = roundCapacity(maxBufferCapacity);  // def: 256
    }
}

订阅

  • java.util.concurrent.SubmissionPublisher
    // 订阅(添加订阅者)。调用入口 ref: sign_demo_010
    public void subscribe(Subscriber<? super T> subscriber) {
        if (subscriber == null) throw new NullPointerException();
        ReentrantLock lock = this.lock;
        int max = maxBufferCapacity;
        // INITIAL_CAPACITY = 32, 默认情况下计算完数组长度为 32
        Object[] array = new Object[max < INITIAL_CAPACITY ? max : INITIAL_CAPACITY]; 
        // 初始化订阅关系,ref: sign_c_110 | sign_cm_110
        BufferedSubscription<T> subscription = new BufferedSubscription<T>(
            subscriber, executor, onNextHandler, array, max
        );
        lock.lock();
        try {
            if (!subscribed) {
                subscribed = true;
                owner = Thread.currentThread();
            }
            for (BufferedSubscription<T> b = clients, pred = null;;) {
                if (b == null) {    // 还没有初始化链
                    Throwable ex;
                    subscription.onSubscribe();
                    if ((ex = closedException) != null)
                        subscription.onError(ex);   // 有异常
                    else if (closed)
                        subscription.onComplete();  // 已关闭
                    else if (pred == null)
                        clients = subscription;     // 初始化链
                    else
                        pred.next = subscription;   // 加入链
                    break;
                }
                BufferedSubscription<T> next = b.next;
                if (b.isClosed()) {   // remove
                    b.next = null;    // detach
                    if (pred == null)
                        clients = next;
                    else
                        pred.next = next;
                }
                else if (subscriber.equals(b.subscriber)) {
                    // 不能重复添加
                    b.onError(new IllegalStateException("Duplicate subscribe"));
                    break;
                }
                else
                    pred = b;   // 方便后面的加入链
                b = next;       // 方便遍历链
            }
        } finally {
            lock.unlock();
        }
    }
  • java.util.concurrent.SubmissionPublisher.BufferedSubscription
    // sign_c_110 订阅关系
    static final class BufferedSubscription<T> implements Subscription, ForkJoinPool.ManagedBlocker {
        long timeout;                      // Long.MAX_VALUE if untimed wait
        int head;                          // next position to take
        int tail;                          // next position to put
        final int maxCapacity;             // max buffer size
        volatile int ctl;                  // atomic run state flags
        Object[] array;                    // buffer
        final Subscriber<? super T> subscriber;
        final BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler;
        Executor executor;                 // null on error

        BufferedSubscription<T> next;      // 组装链 

        // sign_cm_110
        BufferedSubscription(
            Subscriber<? super T> subscriber,   // 自定义的订阅者
            Executor executor,      // ForkJoinPool
            BiConsumer<? super Subscriber<? super T>, ? super Throwable> onNextHandler, // null
            Object[] array,         // len: 32
            int maxBufferCapacity   // 256
        ) {
            this.subscriber = subscriber;
            this.executor = executor;
            this.onNextHandler = onNextHandler;
            this.array = array;
            this.maxCapacity = maxBufferCapacity;
        }
    }

提交数据

  • java.util.concurrent.SubmissionPublisher
    // 提交数据。调用入口 ref: sign_demo_020
    public int submit(T item) {
        return doOffer(item, Long.MAX_VALUE, null); // ref: sign_m_210
    }

    // sign_m_210
    private int doOffer(T item, long nanos, BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
        if (item == null) throw new NullPointerException();
        int lag = 0;
        boolean complete, unowned;
        ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Thread t = Thread.currentThread(), o;
            BufferedSubscription<T> b = clients;
            if ((unowned = ((o = owner) != t)) && o != null)
                owner = null;                     // disable bias
            if (b == null)
                complete = closed;
            else {  // 有订阅者才做处理
                complete = false;
                boolean cleanMe = false;
                BufferedSubscription<T> retries = null, rtail = null, next;
                do {
                    next = b.next;
                    int stat = b.offer(item, unowned);  // 依次发给订阅者句柄,ref: sign_m_220
                    ...
                } while ((b = next) != null);   // 遍历链

                ...
            }
        } finally {
            lock.unlock();
        }
        ...
    }
  • java.util.concurrent.SubmissionPublisher.BufferedSubscription
        // sign_m_220 添加到队列
        final int offer(T item, boolean unowned) {
            Object[] a;
            int stat = 0, cap = ((a = array) == null) ? 0 : a.length;
            int t = tail, i = t & (cap - 1), n = t + 1 - head;
            if (cap > 0) {
                boolean added;
                if (n >= cap && cap < maxCapacity)  // resize (扩容)
                    added = growAndOffer(item, a, t);
                else if (n >= cap || unowned)       // need volatile CAS (CAS 替换)
                    added = QA.compareAndSet(a, i, null, item);
                else {                              // can use release mode (设置值)
                    QA.setRelease(a, i, item); 
                    added = true;
                }
                if (added) {        // 添加成功
                    tail = t + 1;   // 改下标(可循环使用数组)
                    stat = n;
                }
            }
            return startOnOffer(stat);  // 尝试启动,ref: sign_m_221
        }

        /**
         * sign_m_221 尝试在添加后启动消费者任务
         */
        final int startOnOffer(int stat) {
            int c; // start or keep alive if requests exist and not active
            if (((c = ctl) & (REQS | ACTIVE)) == REQS &&
                ((c = getAndBitwiseOrCtl(RUN | ACTIVE)) & (RUN | CLOSED)) == 0)
                tryStart(); // 尝试启动,ref: sign_m_222
            ...
            return stat;
        }

        // sign_m_222 尝试启动消费者任务
        final void tryStart() {
            try {
                Executor e;
                ConsumerTask<T> task = new ConsumerTask<T>(this);   // ref: sign_c_230
                if ((e = executor) != null)     // skip if disabled on error
                    e.execute(task);            // 执行体,ref: sign_c_230
            } ... // catch
        }

        // sign_m_225 消费
        final void consume() {
            Subscriber<? super T> s;
            if ((s = subscriber) != null) {          // hoist checks
                subscribeOnOpen(s); // 没开启,则开启并回调 Subscriber #onSubscribe() 方法,ref: sign_demo_110
                long d = demand;
                for (int h = head, t = tail;;) {
                    int c, taken; boolean empty;
                    if (((c = ctl) & ERROR) != 0) {
                        closeOnError(s, null);      // 有异常,回调 Subscriber #onError() 方法,ref: sign_demo_130
                        break;
                    }
                    else if ((taken = takeItems(s, d, h)) > 0) {    // 获取队列元素并处理,ref: sign_m_226
                        head = h += taken;
                        d = subtractDemand(taken);
                    }
                    ...
                    else if (t == (t = tail)) {      // stability check
                        if ((empty = (t == h)) && (c & COMPLETE) != 0) {
                            closeOnComplete(s);      // 已完成,回调 Subscriber #onComplete() 方法,ref: sign_demo_140
                            break;
                        }
                        ...
                    }
                }
            }
        }

        // sign_m_226 获取队列元素并处理
        final int takeItems(Subscriber<? super T> s, long d, int h) {
            Object[] a;
            int k = 0, cap;
            if ((a = array) != null && (cap = a.length) > 0) {
                int m = cap - 1, b = (m >>> 3) + 1;
                int n = (d < (long)b) ? (int)d : b;
                for (; k < n; ++h, ++k) {
                    Object x = QA.getAndSet(a, h & m, null); // 获取元素
                    ...
                    else if (!consumeNext(s, x)) // 通知订阅者,回调 Subscriber #onNext() 方法,ref: sign_demo_120
                        break;
                }
            }
            return k;
        }
  • java.util.concurrent.SubmissionPublisher.ConsumerTask
    // sign_c_230 消费任务(通知订阅者)
    static final class ConsumerTask<T> extends ForkJoinTask<Void>
        implements Runnable, CompletableFuture.AsynchronousCompletionTask 
    {
        final BufferedSubscription<T> consumer;
        ConsumerTask(BufferedSubscription<T> consumer) {
            this.consumer = consumer;
        }
        ... // 其他方法
        // sign_c_230 执行体
        public final void run() { consumer.consume(); } // 消费,ref: sign_m_225
    }

关闭

  • 简单,略

背压

  • 看代码或调试时,没发现 publisher 暂停的代码,可用 JConsole 查看线程栈
...
java.base@17/jdk.internal.misc.Unsafe.park(Native Method)
... .locks.LockSupport.park(LockSupport.java:211)
... .SubmissionPublisher$BufferedSubscription.block(SubmissionPublisher.java:1495) // ref: sign_m_321
... .ForkJoinPool.unmanagedBlock(ForkJoinPool.java:3463)
... .ForkJoinPool.managedBlock(ForkJoinPool.java:3434)
... .SubmissionPublisher$BufferedSubscription.awaitSpace(SubmissionPublisher.java:1462) // ref: sign_m_320
...
  • java.util.concurrent.SubmissionPublisher
    // 提交数据。调用入口 ref: sign_demo_020
    private int doOffer(T item, long nanos, BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
        int lag = 0;
        ...
        try {
            Thread t = Thread.currentThread(), o;
            if ((unowned = ((o = owner) != t)) && o != null)
                ...
                if (retries != null || cleanMe)
                    lag = retryOffer(item, nanos, onDrop, retries, lag, cleanMe);   // 背压处理,ref: sign_m_310
        } ... // finally
        ...
    }

    // sign_m_310 背压处理
    private int retryOffer(
        T item, long nanos,
        BiPredicate<Subscriber<? super T>, ? super T> onDrop,
        BufferedSubscription<T> retries, int lag,
        boolean cleanMe
    ) {
        for (BufferedSubscription<T> r = retries; r != null;) {
            BufferedSubscription<T> nextRetry = r.nextRetry;
            r.nextRetry = null;
            if (nanos > 0L)
                r.awaitSpace(nanos);    // 等待,ref: sign_m_320
            ...
        }
        ...
        return lag;
    }
  • java.util.concurrent.SubmissionPublisher.BufferedSubscription
    static final class BufferedSubscription<T> implements Subscription, ForkJoinPool.ManagedBlocker {
        // sign_m_320 帮助或阻止直到超时、关闭或空间可用
        final void awaitSpace(long nanos) {
            if (!isReleasable()) {
                ForkJoinPool.helpAsyncBlocker(executor, this);
                if (!isReleasable()) {
                    timeout = nanos;
                    try {
                        ForkJoinPool.managedBlock(this);    // 最终会调用 block() 进行阻塞,ref: sign_m_321
                    } ... // catch
                }
            }
        }

        // sign_m_321 阻塞实现 (实现 ManagedBlocker 方法)
        @Override
        public final boolean block() {
            ...
            while (!isReleasable()) {
                ...
                else if (waiter == null)
                    waiter = Thread.currentThread();    // 记录当前线程
                ...
                else
                    LockSupport.park(this); // 阻塞
            }
            ...
        }

        // 在订阅者获取元素时,进行通知,继 sign_m_226
        final int takeItems(Subscriber<? super T> s, long d, int h) {
            ...
                    if (waiting != 0)
                        signalWaiter(); // 通知发布者,ref: sign_m_325
            ...
        }

        // sign_m_325 通知发布者
        final void signalWaiter() {
            Thread w;
            waiting = 0;
            if ((w = waiter) != null)
                LockSupport.unpark(w);  // 唤醒发布者线程
        }
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/777564.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

HTML5使用<mark>标签:高亮显示文本

1、<mark>标签的使用 mark 标签用于表示页面中需要突出显示或高亮的一段文本&#xff0c;这段文本对于当前用户具有参考作用。它通常在引用原文以引起读者注意时使用。<mark>标签的作用相当于使用一支荧光笔在打印的纸张上标出一些文字。它与强调不同&#xff0c;…

自闭症在生活中的典型表现

自闭症&#xff0c;这个看似遥远却又悄然存在于我们周围的疾病&#xff0c;其影响深远且复杂。在日常生活中&#xff0c;自闭症患者的典型表现往往让人印象深刻&#xff0c;这些表现不仅揭示了他们内心的世界&#xff0c;也提醒我们要以更加包容和理解的心态去面对他们。 首先…

UEC++ 虚幻5第三人称射击游戏(二)

UEC++ 虚幻5第三人称射击游戏(二) 派生榴弹类武器 新建一个继承自Weapon的子类作为派生榴弹类武器 将Weapon类中的Fire函数添加virtual关键字变为虚函数让榴弹类继承重写 在ProjectileWeapon中重写Fire函数,新建生成投射物的模版变量 Fire函数重写逻辑 代码//生成的投射物U…

MySQL中mycat与mha应用

目录 一.Mycat代理服务器 1.Mycat应用场景 2.mycat安装目录结构说明 3.Mycat的常用配置文件 4.Mycat日志 5.mycat 实现读写分离 二.MySQL高可用 1.原理过程 2.MHA软件 3.实现MHA 一.Mycat代理服务器 1.Mycat应用场景 Mycat适用的场景很丰富&#xff0c;以下是几个典型…

大模型的实践应用25-LLama3模型模型的架构原理,以及手把手教你搭建LLama3模型

大家好,我是微学AI,今天给大家介绍一下大模型的实践应用25-LLama3模型模型的架构原理,以及手把手教你搭建LLama3模型。LLaMA 3 是Meta公司开发的最新一代大规模语言模型,其架构在很大程度上继承了LLaMA 2的设计,但对某些关键组件进行了改进和优化。 文章目录 一、LLama3模…

Vue通过Key管理状态

Vue通过Key管理状态 Vue 默认按照“就地更新”的策略来更新&#xff0c;通过 v-for 渲染的元素列表。当数据项的顺序改变时&#xff0c;Vue 不会随之移动 DOM 元素的顺序&#xff0c;而是就地更新每个元素&#xff0c;确保它们在原本指定的索引位置上渲染。为了给 Vue 一个提示…

mupdf加载PDF显示中文乱码

现象 加载PDF显示乱码,提示非嵌入字体 non-embedded font using identity encoding调式 在pdf-font.c中加载字体 调试源码发现pdf文档的字体名字居然是GBK&#xff0c;估计又是哪个windows下写的pdf生成工具生成pdf 字体方法&#xff1a; static pdf_font_desc * load_cid…

STM32利用FreeRTOS实现4个led灯同时以不同的频率闪烁

在没有接触到FreeRTOS时&#xff0c;也没有想过同时叫两个或两个以上的led灯闪烁的想法&#xff0c;接触后&#xff0c;发现如果想叫两个灯同时以不同的频率闪烁&#xff0c;不能说是不可能&#xff0c;就算是做到了也要非常的麻烦。但是学习了FreeRTOS后&#xff0c;发现要想同…

Qt 网络编程实战

一.获取主机的网络信息 需要添加network模块 QT core gui network主要涉及的类分析 QHostInfo类 QHostInfo::localHostName() 获取本地的主机名QHostInfo::fromName(const QString &) 获取指定主机的主机信息 addresses接口 QNetworkInterface类 QNetworkInterfac…

Redis---9---集群(cluster)

将新增的6387节点&#xff08;空槽号&#xff09;作为master节点加入原集群 Redis—9—集群&#xff08;cluster&#xff09; 是什么 定义 ​ 由于数据量过大&#xff0c;单个Master复制集难以承担&#xff0c;因此需要对多个复制集进行集群&#xff0c;形成水平扩展每个复…

电脑f盘的数据回收站清空了能恢复吗

随着信息技术的飞速发展&#xff0c;电脑已成为我们日常生活和工作中不可或缺的设备。然而&#xff0c;数据的丢失或误删往往会给人们带来极大的困扰。尤其是当F盘的数据在回收站被清空后&#xff0c;许多人会陷入绝望&#xff0c;认为这些数据已无法挽回。但事实真的如此吗&am…

【C语言】自定义类型:联合和枚举

前言 前面我们学习了一种自定义类型&#xff0c;结构体&#xff0c;现在我们学习另外两种自定义类型&#xff0c;联合 和 枚举。 目录 一、联合体 1. 联合体类型的声明 2. 联合体的特点 3. 相同成员联合体和结构体对比 4. 联合体大小的计算 5. 用联合体判断当前机…

AI Earth应用—— 在线使用sentinel数据VV和VH波段进行水体提取分析(昆明抚仙湖、滇池为例)

AI Earth 本文的主要目的就是对水体进行提取,这里,具体的操作步骤很简单基本上是通过,首页的数据检索,选择需要研究的区域,然后选择工具箱种的水体提取分析即可,剩下的就交给阿里云去处理,结果如下: 这是我所选取的一景影像: 详情 卫星: Sentinel-1 级别: 1 …

利用redis数据库管理代理库爬取cosplay网站-cnblog

爬取cos猎人 数据库管理主要分为4个模块&#xff0c;代理获取模块&#xff0c;代理储存模块&#xff0c;代理测试模块&#xff0c;爬取模块 cos猎人已经倒闭&#xff0c;所以放出爬虫源码 api.py 为爬虫评分提供接口支持 import requests import concurrent.futures import …

dependencyManagement的作用、nacos的学习

使用SpringCloudAlibaba注意各组件的版本适配 SpringCloudAlibaba已经包含了适配的各组件&#xff08;nacos、MQ等&#xff09;的版本号&#xff0c;也是一个版本仲裁者&#xff0c;但是可能已经有了父项目Spring-Boot-Starter-Parent这个版本仲裁者&#xff0c;又不能加多个父…

Mongodb oplog的作用及如何评估和更改保留时间

作者介绍&#xff1a;老苏&#xff0c;10余年DBA工作运维经验&#xff0c;擅长Oracle、MySQL、PG数据库运维&#xff08;如安装迁移&#xff0c;性能优化、故障应急处理等&#xff09; 公众号&#xff1a;老苏畅谈运维 欢迎关注本人公众号&#xff0c;更多精彩与您分享。oplog …

硅纪元视角 | 国内首款鸿蒙人形机器人“夸父”开启应用新篇章

在数字化浪潮的推动下&#xff0c;人工智能&#xff08;AI&#xff09;正成为塑造未来的关键力量。硅纪元视角栏目紧跟AI科技的最新发展&#xff0c;捕捉行业动态&#xff1b;提供深入的新闻解读&#xff0c;助您洞悉技术背后的逻辑&#xff1b;汇聚行业专家的见解&#xff0c;…

景区气象站:守护旅行安全的智能向导

在繁忙的现代社会&#xff0c;人们越来越渴望逃离城市的喧嚣&#xff0c;寻找一处宁静的自然之地放松身心。景区&#xff0c;作为大自然与人类文明交织的瑰宝&#xff0c;吸引了无数游客前来探访。然而&#xff0c;多变的天气往往给游客的旅行带来不确定性。 景区气象站&#x…

Java跳出循环的四种方式

1、continue,break,return continue&#xff1a;跳出当前层循环的当前语句&#xff0c;执行当前层循环的下一条语句。   continue标签 break&#xff1a;跳出当前层循环。 break标签&#xff1a;多层循环时&#xff0c;跳到具体某层循环。 return&#xff1a;结束所有循环…

微观特征轮廓尺寸测量:光学3D轮廓仪、共焦显微镜与台阶仪的应用

随着科技进步&#xff0c;显微测量仪器以满足日益增长的微观尺寸测量需求而不断发展进步。多种高精度测量仪器被用于微观尺寸的测量&#xff0c;其中包括光学3D表面轮廓仪&#xff08;白光干涉仪&#xff09;、共聚焦显微镜和台阶仪。有效评估材料表面的微观结构和形貌&#xf…