记 Rxjava zip操作符遇到的问题

在项目中遇到了类似下面这样的代码
本意是希望当zip操作符中三个Observable执行完毕之后,将他们返回的数据统一进行处理

        Observable.zip(startFirst(), startSecond(), startThird(),
                (first, second, third) -> {
                    Log.i("Rxjava", "handle all data");
                    return 1;
                }).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Object o) {

            }

            @Override
            public void onError(Throwable e) {
                Log.e("Rxjava", e.getMessage());
            }

            @Override
            public void onComplete() {

            }
        });


    private Observable<Boolean> startFirst() {
        return new Observable<Boolean>() {
            @Override
            protected void subscribeActual(Observer<? super Boolean> observer) {
                Log.i("Rxjava", "startFirst");
            }
        };
    }


    private Observable<Boolean> startSecond() {
        return Observable
                .timer(300, TimeUnit.MILLISECONDS)
                .map(aLong -> {
                    Log.i("Rxjava", "startSecond");
                    return true;
                });
    }


       private Observable<Boolean> startThird() {
        return Observable
                .timer(360, TimeUnit.MILLISECONDS)
                .map(aLong -> {
                    Log.i("Rxjava", "startThird");
                    return true;
                });
    }

但打印日志后,实际结果如下:
在这里插入图片描述
发现他只走了三个Observable流, 并没有去处理这些Observable返回的数据
带着这个问题我去看了zip实现原理‘
下面我把自定义传入的类称为zipper 就是框中的这一段
在这里插入图片描述

来看zip方法实现

    public static <T1, T2, T3, R> Observable<R> zip(
            ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, ObservableSource<? extends T3> source3,
            Function3<? super T1, ? super T2, ? super T3, ? extends R> zipper) {
        ObjectHelper.requireNonNull(source1, "source1 is null");
        ObjectHelper.requireNonNull(source2, "source2 is null");
        ObjectHelper.requireNonNull(source3, "source3 is null");
        
        return zipArray(Functions.toFunction(zipper), false, bufferSize(), source1, source2, source3);
    }
    
    //上面将我们传入zipper封装 Functions.toFunction(zipper),封装成了一个Array3Func类
        public static <T1, T2, T3, R> Function<Object[], R> toFunction(final Function3<T1, T2, T3, R> f) {
        ObjectHelper.requireNonNull(f, "f is null");
        return new Array3Func<T1, T2, T3, R>(f);
    }

   //最终zipper被封装在这个类里面
   static final class Array3Func<T1, T2, T3, R> implements Function<Object[], R> {
        final Function3<T1, T2, T3, R> f;

        Array3Func(Function3<T1, T2, T3, R> f) {
            this.f = f;  //这个f就是我们传的zipper
        }

        @SuppressWarnings("unchecked")
        @Override
        public R apply(Object[] a) throws Exception {
            if (a.length != 3) {
                throw new IllegalArgumentException("Array of size 3 expected but got " + a.length);
            }
            return f.apply((T1)a[0], (T2)a[1], (T3)a[2]);  //这里调用zipper的apply方法 参数对应的就是三个Observable发送的值  至于这个值怎么得来的  继续往下看
        }
    }
    

zip调用了zipArray,而zipArray果然不出意外返回了一个ObservableZip ,参数sources就是我们传递的那些Observables

    public static <T, R> Observable<R> zipArray(Function<? super Object[], ? extends R> zipper,
            boolean delayError, int bufferSize, ObservableSource<? extends T>... sources) {
        if (sources.length == 0) {
            return empty();
        }
        ObjectHelper.requireNonNull(zipper, "zipper is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableZip<T, R>(sources, null, zipper, bufferSize, delayError));
    }

现在需要去看ObservableZip的subscribeActual(),了解Rxjava原理的应该都知道 当调用ObservableZip的subscribe()时,他一定会去调subscribeActual()

    public ObservableZip(ObservableSource<? extends T>[] sources,
            Iterable<? extends ObservableSource<? extends T>> sourcesIterable,
            Function<? super Object[], ? extends R> zipper,
            int bufferSize,
            boolean delayError) {
        this.sources = sources;  //Observable列表
        this.sourcesIterable = sourcesIterable;
        this.zipper = zipper;  //自定义的zipper
        this.bufferSize = bufferSize;
        this.delayError = delayError;
    }

    @Override
    @SuppressWarnings("unchecked")
    public void subscribeActual(Observer<? super R> observer) {
        ObservableSource<? extends T>[] sources = this.sources;
        int count = 0;
        if (sources == null) {
            sources = new ObservableSource[8];
            for (ObservableSource<? extends T> p : sourcesIterable) {
                if (count == sources.length) {
                    ObservableSource<? extends T>[] b = new ObservableSource[count + (count >> 2)];
                    System.arraycopy(sources, 0, b, 0, count);
                    sources = b;
                }
                sources[count++] = p;
            }
        } else {
            count = sources.length;  //获取到Observable的数量
        }

        if (count == 0) {
            EmptyDisposable.complete(observer);
            return;
        } 

       //将observer zipper count 等参数封装
        ZipCoordinator<T, R> zc = new ZipCoordinator<T, R>(observer, zipper, count, delayError);
        zc.subscribe(sources, bufferSize);
    }

当调用subscribe时,会调用subscribeActual,在subscribeActual中 获取到我们传入的Observable数量 ,将zipper ,observer (这个observer是subscribe zip操作符时传入的 ),Observable数量等封装成一个ZipCoordinator类 然后调用ZipCoordinator类的subscribe

ZipCoordinator类:

 ZipCoordinator(Observer<? super R> actual,
                Function<? super Object[], ? extends R> zipper,
                int count, boolean delayError) {
            this.downstream = actual;  //observer
            this.zipper = zipper;
            this.observers = new ZipObserver[count];  //创建了一个zipObserver数组
            this.row = (T[])new Object[count];  //这个数组用来存储那三个Observable发射的数据
            this.delayError = delayError;
        }

        public void subscribe(ObservableSource<? extends T>[] sources, int bufferSize) {
            ZipObserver<T, R>[] s = observers;
            int len = s.length;
            for (int i = 0; i < len; i++) {
                s[i] = new ZipObserver<T, R>(this, bufferSize);  
            }
            // this makes sure the contents of the observers array is visible
            this.lazySet(0);
            downstream.onSubscribe(this);
            for (int i = 0; i < len; i++) {
                if (cancelled) {
                    return;
                }
                sources[i].subscribe(s[i]);  //给传入的那几个Observable注册观察者
            }
        }

ZipCoordinator的subscribe()主要做的事情就是实例化了count个ZipObserver,count代表zip中传入的Observable数量,然后用这个zipObserver分别作为观察者注册给我们传入的那几个Observable,这样的话我们传入的那几个Observable操作流就可以执行了
针对本例就是startFirst() startSecond() startThird()这几个方法开始执行,因为Rxjava流是从下到上,由上而下调用 所以他最终去调用Observer的onNext onError或者onComplete

所以现在去看ZipObserver里面这几个方法的实现


        ZipObserver(ZipCoordinator<T, R> parent, int bufferSize) {
            this.parent = parent;  //parent指的是上面的ZipCoordinator
            this.queue = new SpscLinkedArrayQueue<T>(bufferSize);
        }

        @Override
        public void onSubscribe(Disposable d) {
            DisposableHelper.setOnce(this.upstream, d);
        }

        @Override
        public void onNext(T t) {
            queue.offer(t); 
            parent.drain();
        }

        @Override
        public void onError(Throwable t) {
            error = t;
            done = true;
            parent.drain();
        }

        @Override
        public void onComplete() {
            done = true;
            parent.drain();
        }

ZipObserver的onNext onError或者onComplete都是将Observable发射的数据存在了一个queue里面 然后调用ZipCoordinator的drain()

 public void drain() {
            if (getAndIncrement() != 0) {
                return;
            }

            int missing = 1;

            final ZipObserver<T, R>[] zs = observers;
            final Observer<? super R> a = downstream;
            final T[] os = row; //这个row上面提到过  是一个Object[]  长度为count
            final boolean delayError = this.delayError;

            for (;;) {

                for (;;) {
                    int i = 0;
                    int emptyCount = 0;
                    for (ZipObserver<T, R> z : zs) {
                        if (os[i] == null) {  
                            boolean d = z.done;
                            T v = z.queue.poll();  //从queue中取出Observable发射的数据
                            boolean empty = v == null; 

                            if (checkTerminated(d, empty, a, delayError, z)) {
                                return;
                            }
                            if (!empty) {  //如果不为空 将数据保存在os[]中
                                os[i] = v;
                            } else {
                                emptyCount++;  // 记录空值数量
                            }
                        } else {
                            if (z.done && !delayError) {
                                Throwable ex = z.error;
                                if (ex != null) {
                                    cancelled = true;
                                    cancel();
                                    a.onError(ex);
                                    return;
                                }
                            }
                        }
                        i++;
                    }

                    // 在这里判断  如果Observale没有发射数据  就不会再继续往下了
                    if (emptyCount != 0) {
                        break;
                    }

                    R v;
                    try {
                      //调用zipper的apply  这个os就是发射的数据集合
                        v = ObjectHelper.requireNonNull(zipper.apply(os.clone()), "The zipper returned a null value");
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        cancel();
                        a.onError(ex);
                        return;
                    }

                    a.onNext(v);

                    Arrays.fill(os, null);
                }

                missing = addAndGet(-missing);
                if (missing == 0) {
                    return;
                }
            }
        }

好了 这下明白了
zip实现就是给传入的那些Observablef分别注册了一个zipObserver,然后在zipObserver里面是将发射的数据存在了一个queue里面,接着for循环去取queue里面的数据 如果不为空 将把数据存在了一个object[]里面,只要有一个Observable没有发射数据 就不会去分发这个object[] 如果都不为空 才去调用zipper的apply,参数就是这个object[]
还记得吗 之前zipper是被封装成了一个Array3Func

  public R apply(Object[] a) throws Exception {
            if (a.length != 3) {
                throw new IllegalArgumentException("Array of size 3 expected but got " + a.length);
            }
            return f.apply((T1)a[0], (T2)a[1], (T3)a[2]);  //
        }

三个Observable发射的数据调用f.apply去处理 f就是自己传的那个zipper啦
在这里插入图片描述
至于我遇到的问题再来看,发现在startFirst()里面我没有发送任何数据
在这里插入图片描述
修改一下startFirst
在这里插入图片描述
果然就可以走到处理数据的地方了 log如下
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/349497.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

外包干了一个月,技术退步明显。。。。。

先说一下自己的情况&#xff0c;本科生&#xff0c;19年通过校招进入南京某软件公司&#xff0c;干了接近4年的功能测试&#xff0c;今年年初&#xff0c;感觉自己不能够在这样下去了&#xff0c;长时间呆在一个舒适的环境会让一个人堕落!而我已经在一个企业干了四年的功能测试…

基于springboot+vue旅游网站

摘要 旅游网站的开发是一个综合性的项目&#xff0c;涉及到前端和后端的技术&#xff0c;而基于Spring Boot和Vue.js的组合是一种常见的选择&#xff0c;因为它们可以很好地配合&#xff0c;提供高效且现代化的开发体验。首先&#xff0c;我们使用Spring Boot作为后端框架。Spr…

编码神仙插件Machinet AI GPT-4 Chat and Unit Tests

最近发现一个神仙插件Machinet AI GPT-4 Chat and Unit Tests&#xff0c;支持多个编译器安装使用。 我下载安装到Android Studio上&#xff0c;不需要登录直接可以使用。 可以直接提问&#xff0c;支持中文。

Ansible自动化运维(三)Playbook 模式详解

&#x1f468;‍&#x1f393;博主简介 &#x1f3c5;云计算领域优质创作者   &#x1f3c5;华为云开发者社区专家博主   &#x1f3c5;阿里云开发者社区专家博主 &#x1f48a;交流社区&#xff1a;运维交流社区 欢迎大家的加入&#xff01; &#x1f40b; 希望大家多多支…

H.264与H.265的主要差异

H.265仍然采用混合编解码&#xff0c;编解码结构域H.264基本一致&#xff0c; H.265与H.264的主要不同 编码块划分结构&#xff1a;采用CU (CodingUnit)、PU(PredictionUnit)和TU(TransformUnit)的递归结构。 并行工具&#xff1a;增加了Tile以及WPP等并行工具集以提高编码速…

【数据结构和算法】--- 二叉树(4)--二叉树链式结构的实现(2)

目录 一、二叉树剩余函数1.1二叉树的层序遍历1.2判断二叉树是否为完全二叉树1.3二叉树销毁 二、二叉树的构建及遍历OJ题 一、二叉树剩余函数 1.1二叉树的层序遍历 层序遍历&#xff1a; 除了先序遍历、中序遍历、后序遍历外&#xff0c;还可以对二叉树进行层序遍历。设二叉树…

GoZero微服务个人探究之路(九)api文件编写总结

参考来源go-zero官方文档https://go-zero.dev/docs/tutorials 前言 go-zero是目前star最多的go语言微服务框架&#xff0c;api 是 go-zero特殊的语言&#xff0c;类型文件&#xff0c;go-zero自带的goctl可以通过.api文件生成http服务代码 api文件内容编写 不可使用关键字 …

webug存在的越权漏洞-水平越权以及垂直越权的漏洞复现(超详解)

越权漏洞-webug、 1.登录 账号&#xff1a;admin 密码&#xff1a;admin 2.进入逻辑漏洞 3.进入越权修改密码靶场 &#xff08;1&#xff09;输入账号密码 进入进去会发现没有权限进入 方法一&#xff1a; 这里我们只需要将 127.0.0.1:8080/control/a/auth_cross/cross_a…

小迪安全23WEB 攻防-Python 考点CTF 与 CMS-SSTI 模版注入PYC 反编译

#知识点&#xff1a; 1、PYC 文件反编译 2、Python-Web-SSTI 3、SSTI 模版注入利用分析 各语言的SSIT漏洞情况&#xff1a; SSIT漏洞过程&#xff1a; https://xz.aliyun.com/t/12181?page1&time__1311n4fxni0Qnr0%3DD%2FD0Dx2BmDkfDCDgmrYgBxYwD&alichlgrefhtt…

Angular响应式表单表单验证触发另一个字段校验

Angular响应式表单校验联动 前言表单字段日期校验函数效果 前言 在某些业务场景中&#xff0c;校验某表单字段的同时也需要校验另外一个与之相关的字段&#xff0c;例如开始时间和结束时间&#xff0c;要求结束时间必须晚于开始时间。在angular 响应式表单中改如何实现该需求呢…

云组态监控平台:开启智能监控新时代

在数字化浪潮中&#xff0c;物联网技术正逐渐成为各行业转型升级的核心驱动力。而云组态监控平台作为物联网技术的重要组成部分&#xff0c;正在开启智能监控的新时代。HiWoo Cloud的云组态监控平台&#xff0c;凭借其强大的功能和创新能力&#xff0c;致力于推动智能监控技术的…

uniapp app更新

uniapp app更新 这个版本要随之增加&#xff0c;不然刚更新时直接用app, 新包增加的那些页面跳转会有问题&#xff0c;不能跳新的页面 //app更新检测 updataApp(){const that this;uni.showLoading({title:加载中...})plus.runtime.getProperty(plus.runtime.appid, functio…

【微信小程序】图片违法违规内容鉴别(云函数)

微信小程序通过云调用校验一张图片是否含有违法违规内容。 选择图片: wx.chooseImage({count: 6,sizeType: [compressed], // 可以指定是原图还是压缩图&#xff0c;默认二者都有sourceType: [album, camera], // 可以指定来源是相册还是相机&#xff0c;默认二者都有success: …

用于医学分割的实时Test-time adaption

机构&#xff1a;约翰霍普金斯 论文&#xff1a;https://arxiv.org/abs/2203.05574 代码&#xff1a;https://github.com/jeya-maria-jose/On-The-Fly-Adaptation 摘要 基于深度学习的医学成像解决方案的一个主要问题是&#xff0c;当模型在不同于其训练的数据分布上进行测…

紫光展锐T760_芯片性能介绍_展锐T760安卓核心板定制

展锐T760核心板是一款基于国产5G芯片的智能模块&#xff0c;采用紫光展锐T760制程工艺为台积电6nm工艺&#xff0c;支持工艺具有出色的能效表现。其采用主流的44架构的八核设计&#xff0c;包括4颗2.2GHz A76核心和4颗A55核心设计&#xff0c;内存单元板载可达8GB Ram256GB ROM…

HCIA——29HTTP、万维网、HTML、PPP、ICMP;万维网的工作过程;HTTP 的特点HTTP 的报文结构的选择、解答

学习目标&#xff1a; 计算机网络 1.掌握计算机网络的基本概念、基本原理和基本方法。 2.掌握计算机网络的体系结构和典型网络协议&#xff0c;了解典型网络设备的组成和特点&#xff0c;理解典型网络设备的工作原理。 3.能够运用计算机网络的基本概念、基本原理和基本方法进行…

【python爬虫】爬虫编程技术的解密与实战

​&#x1f308;个人主页&#xff1a;Sarapines Programmer&#x1f525; 系列专栏&#xff1a; 爬虫】网络爬虫探秘⏰诗赋清音&#xff1a;云生高巅梦远游&#xff0c; 星光点缀碧海愁。 山川深邃情难晤&#xff0c; 剑气凌云志自修。 目录 &#x1f33c;实验目的 &#x1f…

Android开发修炼之路——(一)Android App开发基础-2

本专栏文章 上一篇 Android开发修炼之路——&#xff08;一&#xff09;Android App开发基础-1 2 App的工程结构 本节介绍App工程的基本结构及其常用配置&#xff0c;首先描述项目和模块的区别&#xff0c;以及工程内部各目录与配置文件的用途说明&#xff1b;其次阐述两种级别…

BabylonJS 6.0文档 Deep Dive 摄像机(六):遮罩层和多相机纹理

1. 使用遮罩层来处理多个摄影机和多网格物体 LayerMask是分配给每个网格&#xff08;Mesh&#xff09;和摄像机&#xff08;Camera&#xff09;的一个数。它用于位&#xff08;bit&#xff09;级别用来指示灯光和摄影机是否应照射或显示网格物体。默认值为0x0FFFFFFF&#xff…

SpringBoot使用druid

SpringBoot使用druid 一、前言二、配置1、pom依赖2、配置文件yml3、配置类 一、前言 Java程序很大一部分要操作数据库&#xff0c;为了提高性能操作数据库的时候&#xff0c;又不得不使用数据库连接池。 Druid 是阿里巴巴开源平台上一个数据库连接池实现&#xff0c;结合了 C…