RxJava学习记录

文章目录

  • 1. 总览
    • 1.1 基本原理
    • 1.2 导入包和依赖
  • 2. 操作符
    • 2.1 创建操作符
    • 2.2 转换操作符
    • 2.3 组合操作符
    • 2.4 功能操作符

1. 总览

1.1 基本原理

在这里插入图片描述
参考文献
在这里插入图片描述
构建流:每一步操作都会生成一个新的Observable节点(没错,包括ObserveOn和SubscribeOn线程变换操作),并将新生成的Observable返回,直到最后一步执行subscribe方法。编写Rxjava代码的过程其实就是构建一个一个Observable节点的过程
订阅流:从最后一个N5节点的订阅行为开始,依次执行前面各个节点真正的订阅方法。在每个节点的订阅方法中,都会生成一个新的Observer**,这个Observer会包含“下游”的Observer,这样当每个节点都执行完订阅(subscribeActual)后,也就生成了一串Observer,它们通过downstream,upstream引用连接
回调流: 当订阅流执行到最后,也就是第一个节点N0时,用onNext方法,两个作用,一个是把上个节点返回的数据进行一次map变换,另一个就是将map后的结果传递给下游。
小结:先从上到下把各个变换的Observable连成链(拼装流水线),然后在最后subscribe的时候,又从下到上通过每个Observable的OnSubscribe从最下的Subscriber对象开始连成链(流水线开始工作包装Subscriber),直到顶端,当顶端的Subscriber对象调用了onNext方法的时候,又从上往下调用Subscriber链的onNext(用户一层层拆开包装盒),里面执行了每个操作的变换逻辑。

1.2 导入包和依赖

implementation 'io.reactivex.rxjava2:rxjava:2.2.21'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'

2. 操作符

添加链接描述

2.1 创建操作符

  • Create
    private void test1() {
    //被观察者Observable;观察者Observer/消费者consumer;通过subsribe订阅
        Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
                emitter.onNext("1");
//                emitter.onError(new Throwable("异常模拟"));
                emitter.onComplete();
            }
        }).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("subscribe");
            }
            @Override
            public void onNext(Object o) {
                System.out.println("onNext Observer " + o);
            }
            @Override
            public void onError(Throwable e) {
                System.out.println("erro");
            }
            @Override
            public void onComplete() {
                System.out.println("Complete Observer....");
            }
        });
    }


    private void test2() {
        Disposable d = Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
                emitter.onNext("2");
                emitter.onError(new Throwable("模拟异常"));
                emitter.onComplete();
            }
        }).subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Exception {
                System.out.println("Accept " + o);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                System.out.println("Accept " + throwable);
            }
        });
    }

Observer:
适合需要完整事件处理的场景,包括处理数据、错误和完成信号。
提供了更灵活的事件处理能力,可以根据需求实现对错误和完成事件的响应。
Consumer:
适合简单的场景,只需处理每个发出的数据项,而不需要关心错误或完成事件。
简化了代码结构,特别是在处理简单流时,使用起来更为便捷和直观。

  • 其他
    just 10个发射源
    from 将一个Iterable、一个Future、 或者一个数组,内部通过代理的方式转换成一个Observable
    interval操作符 创建一个按固定时间间隔发射整数序列的Observable,这个序列为一个无限递增的整数序列
    range操作符 发射一个范围内的有序整数序列,并且我们可以指定范围的起始和长度
    repeat操作符 重复发射原始Observable的数据序列,这个序列或者是无限的,或者通过repeat(n)指定重复次数

2.2 转换操作符

map
将源Observable发送的数据转换为一个新的Observable对象

    private void test3(){
        Observable.just("111")
                .map(new Function<String, Object>() {
                    @Override
                    public Object apply(String s) throws Exception {
                        return "my name is " + s;
                    }
                }).subscribe(ob);
    }
   //subscribe
//onNext Observer my name is 111
//Complete Observer....

flatmap
添加链接描述
在这里插入图片描述
将一个发送事件的上游Observable变换为多个发送事件的Observables,然后将它们发射的事件合并后放进一个单独的Observable里(但是是无序的)

    private void test4(){
        Disposable ob = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        }).flatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer o) throws Exception {
                final List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("I am value " + o);
                }
                return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);//为了无序 加了延迟
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String o) throws Exception {
                System.out.println(o);
            }
        });
    }
    //出现的 1 2 3会随机出现

concatMap
concatMap操作符类似于flatMap操作符,不同的一点是它按次序连接。

2.3 组合操作符

concat
concatArray
合并多个对象,按照一定的顺序
在这里插入图片描述
merge
在这里插入图片描述

2.4 功能操作符

SubscribeOn 改变调用它之前代码的线程,只有第一次有效
ObserveOn 改变调用它之后代码的线程, 可以多次调用

        Observable.create(new ObservableOnSubscribe<Object>() {
                    @Override
                    public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
                        Log.d(TAG,"加了subscribeOn和observeOn: " + Thread.currentThread().getName());
                        emitter.onNext("1111");
                        emitter.onNext("22222");
                        emitter.onComplete();
                    }
                })
               .subscribeOn(Schedulers.newThread()) //1 进行创建和发射在子线程
                .observeOn(AndroidSchedulers.mainThread())// 2 在主线程消费;由于程序是test里面执行,所以不是main线程;后续改成了main是一样的道理

                .subscribe(new Observer<Object>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG,"onSubscribe " + Thread.currentThread().getName());
                    }

                    @Override
                    public void onNext(Object o) {
                        Log.d(TAG,"onNext " + Thread.currentThread().getName());

                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG,"onError " + Thread.currentThread().getName());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG,"onComplete " + Thread.currentThread().getName());
                    }
                });
    }

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
这一个onSubsribe 一直是在测试线程里


1. **Observable 的创建和订阅**:
   - 在 `subscribe()` 方法中,你创建了一个 `Observer` 对象,并将其订阅到了 `Observable` 对象上。

2. **onSubscribe 方法执行**:
   - 当 `subscribe()` 方法被调用后,`Observer` 对象的 `onSubscribe` 方法会立即执行。这是因为 `onSubscribe` 是 `Observer` 接口的一部分,它负责接收 `Disposable` 对象,表示订阅关系,而不是响应数据流本身。

3. **异步操作执行**:
   - 然后,`Observable` 中的异步操作开始执行。在你的例子中,通过 `Observable.create()` 创建了一个新的数据流,该数据流会在新线程(通过 `subscribeOn(Schedulers.newThread())` 指定的线程)中执行。这意味着 `Observable.create()` 中的代码块会在新线程中运行,而不会阻塞主线程。

4. **数据流发射和消费**:
   - 在新线程中,`ObservableEmitter` 会发射数据项(通过 `emitter.onNext()` 发送数据)并在合适的时机调用 `onComplete()` 或者 `onError()`,表示数据流的结束。

5. **observeOn 切换到主线程**:
   - 通过 `observeOn(AndroidSchedulers.mainThread())`,确保在数据流中的消费者部分(即 `Observer` 的 `onNext()`, `onError()`, `onComplete()` 方法)在主线程中执行。这个切换保证了在主线程更新UI或处理数据,从而避免了在主线程中执行耗时操作而导致的UI阻塞问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/785118.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

YOLOv10改进 | 主干篇 | 低照度增强网络PE-YOLO改进主干(改进暗光条件下的物体检测模型)

一、本文介绍 本文给大家带来的改进机制是低照度图像增强网络PE-YOLO中的PENet&#xff0c;PENet通过拉普拉斯金字塔将图像分解成多个分辨率的组件&#xff0c;增强图像细节和低频信息。它包括一个细节处理模块&#xff08;DPM&#xff09;&#xff0c;用于通过上下文分支和边…

【安全设备】日志审计

一、什么是日志审计 日志审计是一站式的日志数据管理平台&#xff0c;主要致力于提供事前预警、事后审计的安全能力&#xff0c; 通过对日志数据的全面采集、解析和深度的关联分析&#xff0c;及时发现各种安全威胁和异常行为事件。日志审计是指通过集中采集信息系统中的各类信…

Chain-of-Verification Reduces Hallucination in Lagrge Language Models阅读笔记

来来来&#xff0c;继续读文章了&#xff0c;今天这个是meta的研究员们做的一个关于如何减少LLM得出幻觉信息的工作&#xff0c;23年底发表。文章链接&#xff1a;https://arxiv.org/abs/2309.11495 首先&#xff0c;这个工作所面向的LLM的问答任务&#xff0c;是list-based q…

怎样优化 PostgreSQL 中对日期时间范围的模糊查询?

文章目录 一、问题分析&#xff08;一&#xff09;索引未有效利用&#xff08;二&#xff09;日期时间格式不统一&#xff08;三&#xff09;复杂的查询条件 二、优化策略&#xff08;一&#xff09;使用合适的索引&#xff08;二&#xff09;规范日期时间格式&#xff08;三&a…

前沿重器[53] | 聊聊搜索系统6:精排

前沿重器 栏目主要给大家分享各种大厂、顶会的论文和分享&#xff0c;从中抽取关键精华的部分和大家分享&#xff0c;和大家一起把握前沿技术。具体介绍&#xff1a;仓颉专项&#xff1a;飞机大炮我都会&#xff0c;利器心法我还有。&#xff08;算起来&#xff0c;专项启动已经…

IDEA启动tomcat之后控制台出现中文乱码问题

方法1&#xff1a; 第一步&#xff1a;file--setting--Editor--File Encodings 注意页面中全部改为UTF-8&#xff0c;然后apply再ok 第二步&#xff1a;Run--Edit Configuration&#xff0c;将VM options输入以下值&#xff1a; -Dfile.encodingUTF-8 还是一样先apply再ok …

视频图文理解关联技术与创业团队(二)

上一篇&#xff1a;google gemini1.5 flash视频图文理解能力初探&#xff08;一&#xff09;提到了gemini 1.5 flash 可以对视频进行理解以及分析&#xff0c;但是整体在检索任务上效果不佳。 这几天参加了人工智能大会 网上收集&#xff0c;看一看有相似能力的一些技术点、创…

生物素化果胶粒子包裹药物阿霉素;DOX/Bio-PEC

生物素化果胶粒子包裹药物阿霉素&#xff08;DOX/Bio-PEC&#xff09;是一种新型的药物载体系统&#xff0c;结合了生物素和果胶多糖的优势&#xff0c;旨在提高药物的靶向性和控释性能。以下是对该系统的详细解析&#xff1a; 一、生物素化果胶粒子的制备 原理与步骤&#xff…

独立开发者系列(22)——API调试工具apifox的使用

接口的逻辑已经实现&#xff0c;需要对外发布接口&#xff0c;而发布接口的时候&#xff0c;我们需要能自己简单调试接口。当然&#xff0c;其实自己也可以写简单的代码调试自己的接口&#xff0c;因为其实就是简单的request请求或者curl库读取&#xff0c;调整请求方式get或者…

甄选范文“论区块链技术及应用”,软考高级论文,系统架构设计师论文

论文真题 区块链作为一种分布式记账技术,目前已经被应用到了资产管理、物联网、医疗管理、政务监管等多个领域。从网络层面来讲,区块链是一个对等网络(Peer to Peer, P2P),网络中的节点地位对等,每个节点都保存完整的账本数据,系统的运行不依赖中心化节点,因此避免了中…

MATLAB基础应用精讲-【数模应用】分层聚类(附python代码实现)

目录 前言 知识储备 层次聚类 1. 算法解读: 2. 步骤和细节: 3. 举例: 4. 算法评价: 5. 算法的变体: 算法原理 基本思想 分层聚类网络的原理 分层聚类网络的优势 分层聚类网络的应用领域 SPSSAU 分层聚类案例 1、背景 2、理论 3、操作 4、SPSSAU输出结果…

Johnson Counter

目录 描述 输入描述&#xff1a; 输出描述&#xff1a; 参考代码 描述 请用Verilog实现4位约翰逊计数器&#xff08;扭环形计数器&#xff09;&#xff0c;计数器的循环状态如下。 电路的接口如下图所示。 输入描述&#xff1a; input clk , input …

[氮化镓]Kevin J. Chen组新作—肖特基p-GaN HEMTs正栅ESD机理研究

这篇文章是发表在《IEEE Electron Device Letters》上的一篇关于Schottky型p-GaN栅极高电子迁移率晶体管&#xff08;HEMTs&#xff09;的正向栅极静电放电&#xff08;ESD&#xff09;机理研究的论文。文章由Jiahui Sun等人撰写&#xff0c;使用了基于碳化硅&#xff08;SiC&a…

设计模式探索:观察者模式

1. 观察者模式 1.1 什么是观察者模式 观察者模式用于建立一种对象与对象之间的依赖关系&#xff0c;当一个对象发生改变时将自动通知其他对象&#xff0c;其他对象会相应地作出反应。 在观察者模式中有如下角色&#xff1a; Subject&#xff08;抽象主题/被观察者&#xf…

第11章 规划过程组(二)(11.10制订进度计划)

第11章 规划过程组&#xff08;二&#xff09;11.10制订进度计划&#xff0c;在第三版教材第395~397页&#xff1b;文字图片音频方式 第一个知识点&#xff1a;定义及作用 分析活动顺序、持续时间、资源需求和进度制约因素&#xff0c;创建项目进度模型&#xff0c;从而落实项目…

六、数据可视化—Wordcloud词云(爬虫及数据可视化)

六、数据可视化—Wordcloud词云&#xff08;爬虫及数据可视化&#xff09; 也是一个应用程序 http://amueller.github.io/word_cloud/ Wordcloud词云&#xff0c;在一些知乎&#xff0c;论坛等有这样一些东西&#xff0c;要么做封面&#xff0c;要么做讲解&#xff0c;进行分析…

Java并发/多线程CompleteableFuture详解

目录 CompleteableFuture 创建 获得结果的方法 辅助方法 allOf和anyOf的区别 CompletableFuture 里大约有五十种方法&#xff0c;但是可以进行归类: 变换类 thenApply 消费类 thenAccept 执行操作类 thenRun thenApply/thenAccept/thenRun 结合转化类 thenCombine 结…

浅析Nginx技术:开源高性能Web服务器与反向代理

什么是Nginx&#xff1f; Nginx是一款轻量级、高性能的HTTP和反向代理服务器&#xff0c;也可以用作邮件代理服务器。它最初由俄罗斯的程序员Igor Sysoev在2004年开发&#xff0c;并于2004年首次公开发布。Nginx的主要优势在于其非阻塞的事件驱动架构&#xff0c;能够处理大量并…

python-24-零基础自学python while循环+交互+数据的存储

学习内容&#xff1a;《python编程&#xff1a;从入门到实践》第二版 知识点&#xff1a; 文件处理 with open&#xff08;&#xff09;while 练习内容&#xff1a;10章练习题10-3、10-4、10-5 练习10-3&#xff1a;访客 编写一个程序&#xff0c;提示用户输入名字。用户做…

北森锐途人才竞聘盘点管理测评:高管领导力六大评判标准深度解析万达商管中国绿发等

北森锐途人才管理测评&#xff1a;高管领导力评判标准深度解析 在企业高管的盘点与竞聘测评领域&#xff0c;众多管理人才面临评估自身领导力的挑战。面对能力卓越、职级显赫的同僚&#xff0c;许多管理者感到缺乏一套权威且专业的评价体系。然而&#xff0c;无论是天赋异禀的领…