RxJS 提供了一套完整的异步解決方案,让我们在面对各种异步行为(Event, AJAX, Animation 等)都可以使用相同的 API 做开发。
前置知识点
- 同步(Synchronous)就是整个处理过程顺序执行,当各个过程都执行完毕,并返回结果。是一种线性执行的方式,执行的流程不能跨越。一般用于流程性比较强的程序,比如用户登录,需要对用户验证完成后才能登录系统。
- 异步(Asynchronous)则是只是发送了调用的指令,调用者无需等待被调用的方法完全执行完毕;而是继续执行下面的流程。是一种并行处理的方式,不必等待一个程序执行完,可以执行其它的任务,比如页面数据加载过程,不需要等所有数据获取后再显示页面。
- 观察者模式又叫发布订阅模式(Publish/Subscribe),它是一种一对多的关系,让多个观察者(Obesver)同时监听一个主题(Subject),这个主题也就是被观察者(Observable),被观察者的状态发生变化时就会通知所有的观察者,使得它们能够接收到更新的内容。观察者模式主题和观察者是分离的,不是主动触发而是被动监听。
- 迭代器模式又叫游标(Sursor)模式,迭代器具有 next 方法,可以顺序访问一个聚合对象中的各个元素,而不需要暴露该对象的内部表现。迭代器模式可以把迭代的过程从从业务逻辑中分离出来,迭代器将使用者和目标对象隔离开来,即使不了解对象的内部构造,也可以通过迭代器提供的方法顺序访问其每个元素。
- 响应式编程,即 Reactive Programming,它是一种基于事件模式的模型。简单来说,上一个任务执行结果的反馈就是一个事件,这个事件的到来会触发下一个任务的执行。
- Stream / 流的本质是一个按时间顺序排列的进行中事件的序列集合。
一、RxJS 基本介绍
RxJS 全称 Reactive Extensions for JavaScript,翻译过来是 Javascript 的响应式扩展,起源于 Reactive Extensions,是一个基于可观测数据流 Stream 结合观察者模式和迭代器模式的一种异步编程的应用库。
它提供了一个核心类 Observable,几个关键的类Observer、 Schedulers、 Subjects 和受 [Array#extras] 启发的操作符 (map、filter、reduce、every, 等等),这些数组操作符可以把异步事件作为集合来处理。
简单来说 Rx(JS) = Observables (被观察者)+ Operator (操作符)+ Scheduler(调度器)。
二、RxJS 的核心概念:
- Observable (被观察者): 表示一个概念,这个概念是一个可调用的未来值或事件的集合。可以类比为Promise对象,在其内部可以进行异步操作,并且在异步操作执行完毕后将结果传递到可观察对象的外部。
- Observer (观察者): 一个回调函数的集合,它知道如何去监听由 Observable 提供的值。类比then方法中的回调函数,用于接收可观察对象传递出来的数据
- Subscribe(订阅): 表示 Observable 的执行,主要用于取消 Observable 的执行。类比then方法,当可观察对象发出数据时,订阅者可以接收到数据。
- Operators (操作符): 采用函数式编程风格的纯函数 (pure function),使用像map、filter、concat、flatMap等这样的操作符来处理集合。
- Subject (主体): 相当于 EventEmitter,并且是将值或事件多路推送给多个 Observer 的唯一方式。
- Schedulers (调度器): 用来控制并发并且是中央集权的调度员,允许我们在发生计算时进行协调,例如
setTimeout 或 requestAnimationFrame或其他。
三、 理解 RxJS 中的 Observer 和 Observable
简单一行代码,就懂了它们的关系:observable.subscribe( observer )——observable是 数据的源头,是生产者,是待订阅者,通过subscribe方法可以被observer所订阅,而observer 是观察者,数据的使用者,数据的消费者。
将买房作为例子套用到观察者模式中:
- 房价即为 Observable 对象;
- 购房者即为 Observer 对象;
- 而购房者观察房价即为 Subscribe(订阅)关系;
结合买房的例子,描述 Observable 和 Observer 的行为:
- Observable ****可被观察,即房价被购房者关注,并且随时间变化发出 (emit) 不同值,表现为房价波动;
- Observer 可以观察 Observable,即购房者关注房价,并在 Observable (房价)发出不同值(房价波动)时做出响应,即买房或观望;
- Observable 和 Observer 之间通过订阅(Subscription)建立观察关系;
- 当 Observable 没有 Observer 的时候,即使发出了值,也不会产生任何影响,即无人关注房价时,房价不管如何波动都不会有响应;
四、Observable (被观察者)
RxJS 的基础就是 Observable,只要弄懂 Observable 就算是学会一半的 RxJS ,剩下的就只是一些方法的练习和熟悉;
Observables 是使用 Rx.Observable.create 或创建类操作符创建的,并使用 观察者来订阅它,然后执行它并发送 next / error / complete 通知给观察者,而且执行可能会被清理。
//调用 Observable.create 方法来创建一个 Observable.入参是 observer(观察者) ——>Rx.Observable.create 是 Observable 构造函数的别名,它接收一个参数
var observable = Rx.Observable.create(function subscribe(observer) {
//function 代码块内容表示 “Observable 执行”,它是惰性运算,只有在每个观察者订阅后才会执行。
var id = setInterval(() => {
observer.next('hi')//每隔一秒会向观察者发送字符串 'hi' 。
}, 1000);
});
/*但是运行这段代码后并不会发生任何事情.
我们需要一个 observer 去订阅这个 observable,此后基于 observable 发出的值,observer 才会响应。
因此,在如上代码末尾,我们再加上一行:*/
subscription = observable.subscribe(x => console.log(x));
//当你订阅了 Observable,你会得到一个 Subscription ,它表示进行中的执行。只要调用 unsubscribe() 方法就可以取消执行。
subscription.unsubscribe();
- 创建 :Observables 可以使用
create
来创建, 但通常我们使用所谓的创建操作符, 像of
、from
、interval
、等等。 - 订阅 :订阅 Observable 像是调用函数, 并提供接收数据的回调函数。
subscribe
调用是启动 “Observable 执行”的一种简单方式, 并将值或事件传递给本次执行的观察者。 - 执行 :它是惰性运算,只有在每个观察者订阅后才会执行。Observable 执行可以传递三种类型的值:"Next" 通知: 发送一个值,比如数字、字符串、对象,等等。"Error" 通知: 发送一个 JavaScript 错误 或 异常。"Complete" 通知: 不再发送任何值。"Next" 通知是最重要,也是最常见的类型,表示传递给观察者的实际数据。"Error" 和 "Complete" 通知可能只会在 Observable 执行期间发生一次,并且只会执行其中的一个。
- 清理 :每个执行都是其对应观察者专属的,一旦观察者完成接收值,它必须要一种方法来停止执行,以避免浪费计算能力或内存资源。
1、创建Observable
可以使用of, from, interval, timer,等操作符来创建Observable,具体取决于你需要使用的数据源类型。
// 从一个静态值创建一个Observable
const myObservable = of('hello', 'world');
// 从数组中创建一个Observable
const myArrayObservable = from([1, 2, 3, 4, 5]);
// 创建一个每秒发送递增整数的Observable
const intervalObservable = interval(1000);
// 延迟三秒后发射单个值的Observable
const timerObservable = timer(3000, 1000);
或者直接通过new Observable()构造函数来创建 ,它接收一个方法作为参数。方法接收一个观察者作为参数,通过这个参数的next()方法可以向外发送数据。
const observable = new Observable((Observer ) => {
Observer.next(1);
})
2、订阅Observable
Observables是惰性的,只有当您订阅它们时,它们才会开始推送值。可以使用.subscribe()方法来订阅Observable,然后处理这些发出的值。
observable.subscribe()方法接受一个观察者对象作为参数,对象中包含可观察者对象发出数据后的回调函数。当Observable对象执行了.next()方法,就会触发observer.next函数,observer.next函数可以接收到Observable对象.next()方法的参数。
Observable对象的.next()方法可以被调用多次,每次调用都会被observer.next监听到。
Observable对象可以订阅多次,每订阅一次就会执行一次其中的代码。
const observer = {
next: (value: any) => {
console.log(value);
}
}
observable.subscribe(observer);
4、退订Observable
一旦订阅Observable,您需要决定何时解除订阅。使用 .subscribe()
返回的订阅对象,可以通过调用 .unsubscribe()
来解除订阅。
const subscription = myArrayObservable.subscribe(value => console.log(value));
// 在3秒后取消订阅
setTimeout(() => {
subscription.unsubscribe();
}, 3000);
Observable 与 Promise关键性的不同点:
Observable | Promise | |
---|---|---|
使用场景 | 同步、异步均可使用 | 用 Promise 包裹的多数是异步场景 |
执行时机 | 声明式惰性执行,只有在订阅后才会执行 | 创建时就立即执行 |
执行次数 | 多次调用 subscribe 函数会执行多次 | 只有第一次执行,后续都是取值 |
流程控制 | 相较于 Promise 有更为全面的操作符 | 提供串行、并行的函数 |
错误处理 | subscribe 函数捕获错误 | .catch 捕获 |
总的来说,Promise 可读性更优,Observable 从使用场景更为全面。
五、Observer(观察者)
Observer 是观察者模式中的观察者/消费者,它用来消费/执行 Observable 创建的函数。
//一个典型的观察者对象:
var observer = {//创建包含三个函数的观察者对象—— observer
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};
//要使用观察者,需要把它提供给 Observable 的 subscribe 方法:
observable.subscribe(observer);
观察者只是有三个回调函数的对象,每个回调函数对应一种 Observable 发送的通知类型:next、error 和 complete 。
三种类型的回调函数都可以直接作为参数来提供:
observable.subscribe(
x => console.log('Observer got a next value: ' + x),
err => console.error('Observer got an error: ' + err),
() => console.log('Observer got a complete notification')
);
RxJS 中的观察者的回调函数是可选的。如果你没有提供某个回调函数,Observable 的执行也会正常运行,只是某些通知类型会被忽略。
如果我们只提供了一个回调函数作为参数,subscribe会将我们提供的函数参数作为next的回调处理函数。
- next (传值):每当 Observable 输出新值,next 方法就会被触发。
- error (错误处理):每当Observable 內发生错误时,error 方法就会被触发。
- complete (完成/终止):在 Observable 沒有其它数据可以获取时,complete 方法就会被触发,在 complete 被触发之后,next 方法就不会再起作用。
工作流程:
六、Subscription (订阅对象)
subscribe (订阅)Observable (被观察者)会返回一个 subscription 对象,该对象有一个常用方法—— 停止订阅 且 释放 资源 的 unsubscribe
方法。
unsubcribe
(取消订阅)add
(分组或在取消订阅之前插入一段逻辑)
注意:调用unsubcribe
后(包含add
传入的其它 Subscription)不会再接收到它们的数据。
var source = Rx.Observable.timer(1000, 1000);//通过timer 创建 Observeable 实例 source
var subscription = source.subscribe({//调用 subscribe ,获得订阅对象subscription
next: function(value) {
console.log(value)
},
complete: function() {
console.log('complete!');
},
error: function(error) {
console.log('Throw Error: ' + error)
}
});
setTimeout(() => {
subscription.unsubscribe() // 停止订阅 (退订)
}, 5000);
// 0
// 1
// 2
// 3
// 4
Subscription 还可以合在一起,这样一个 Subscription 调用 unsubscribe() 方法,可能会有多个 Subscription 取消订阅 。
var observable1 = Rx.Observable.interval(400);
var observable2 = Rx.Observable.interval(300);
var subscription = observable1.subscribe(x => console.log('first: ' + x));
var childSubscription = observable2.subscribe(x => console.log('second: ' + x));
subscription.add(childSubscription);
setTimeout(() => {
// subscription 和 childSubscription 都会取消订阅
subscription.unsubscribe();
}, 1000);
Subscriptions 还有一个 remove(otherSubscription) 方法,用来撤销一个已添加的子 Subscription 。
七、Subject(主体)
多播 Observable 在底层是通过使用 Subject 使得多个观察者可以看见同一个 Observable 执行。
RxJS 中 Subject 是一种特殊类型的 Observable,它允许将值多播给多个观察者。
“多播 Observable” 是通过 Subject 来发送通知,这个 Subject 可能有多个订阅者,然而普通的 “单播 Observable” 只发送通知给单个观察者。Subjects 是将任意 Observable 执行共享给多个观察者的唯一方式。
RXJS官方文档说的:Subject 像是 Observable,但是可以多播给多个观察者。Subject 还像是 EventEmitters,维护着多个监听器的注册表。
从观察者的角度而言,无法判断 Observable 执行是来自普通的 Observable 还是 Subject 。在这里,subscribe 类似于 JavaScript 中的 addEventListener;与此同时,每个 Subject 又都是观察者。Subject 是一个有如下方法的对象: next(v)
、 error(e)
和 complete()
。要给 Subject 提供新值,只要调用 next(theValue)
方法。简单的说,Subject既可以是Observable,也可以是 observer(可以多个)。
//为 Subject 添加了两个观察者,然后给 Subject 提供一些值:
var subject = new Rx.Subject();
subject.subscribe({ //对于 Subject,你提供一个观察者并使用 subscribe 方法,就可以开始正常接收值。
next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(1);
subject.next(2);
//控制台的输出:
observerA: 1
observerB: 1
observerA: 2
observerB: 2
//因为 Subject 是观察者,这也就在意味着你可以把 Subject 作为参数传给任何 Observable 的 subscribe 方法
var subject = new Rx.Subject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
var observable = Rx.Observable.from([1, 2, 3]);
observable.subscribe(subject); // 你可以提供一个 Subject 进行订阅
//结果:
observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3
与 Observable 的区别
Observable | Subject | |
---|---|---|
角色 | 生产者(单向) | 生产者、消费者(双向) |
消费策略 | 单播 | 多播 |
流转方式 | 内部发送/接收数据 | 外部发送/接收数据 |
数据特性 | 冷数据流 | 热数据流 |
消费时机 | 调用 subscribe | 调用 next |
冷数据流:可以订阅任意时间的数据流。
热数据流:只给已订阅的消费者发送消息,定阅之前的消费者,不会收到消息。
单播:每个普通的 Observables 实例都只能被一个观察者订阅,当它被其他观察者订阅的时候会产生一个新的实例。也就是普通 Observables 被不同的观察者订阅的时候,会有多个实例,不管观察者是从何时开始订阅,每个实例都是从头开始把值发给对应的观察者。
多播:前面说,每个普通的 Observables 实例都只能被一个观察者订阅,但是如果通过 Subject 来代理 Observable 实例的话就能够被多个 observer 所订阅,且无论有没有 observer 订阅,都会发送数据。也就是说无论 observer 什么时候订阅都只会接收到实时的数据。
其他 Subject
种类 | 作用 |
---|---|
BehaviorSubject | 它有一个“当前值”的概念。它保存了发送给消费者的最新值。并且当有新的观察者订阅时,会立即从 BehaviorSubject 那接收到“当前值”。 |
ReplaySubject | ReplaySubject 记录 Observable 执行中的多个值并将其发送给新的订阅者。 |
AsyncSubject | 只有当 Observable 执行完成时(执行 complete() ),它才会将执行的最后一个值发送给观察者。AsyncSubject 和 last() 操作符类似,因为它也是等待 complete 通知,以发送一个单个值。 |
八、Operator (操作符)
Operator 本质上是一个纯函数 (pure function),它接收一个 Observable 作为输入,并生成一个新的 Observable 作为输出,前面的 Observable 保持不变。
Operator 是允许复杂的异步代码以声明式的方式进行轻松组合的基础代码单元。
操作符是 Observable 类型上的方法,比如 .map(...)
、.filter(...)
、.merge(...)
等,当操作符被调用时,它们不会改变已经存在的 Observable 实例。相反,它们返回一个新的 Observable ,它的 subscription 逻辑基于第一个 Observable 。
创建一个自定义操作符函数,它将从输入 Observable 接收的每个值都乘以10:
function multiplyByTen(input) {
var output = Rx.Observable.create(function subscribe(observer) {
input.subscribe({
next: (v) => observer.next(10 * v),
error: (err) => observer.error(err),
complete: () => observer.complete()
});
});
return output;
}
var input = Rx.Observable.from([1, 2, 3, 4]);
var output = multiplyByTen(input);
output.subscribe(x => console.log(x));//注意,订阅 output 会导致 input Observable 也被订阅。我们称之为“操作符订阅链”。
//输出
10
20
30
40
注意,订阅 output
会导致 input
Observable 也被订阅。我们称之为“操作符订阅链”。
RxJS 6 及更新版本提供了可链式调用(Pipeable)的 RxJS 操作符,一个简单示例如下:
source.pipe(//假设 source 是一个已定义的 observable
map(x => x + x),
mergeMap(n => of(n + 1, n + 2).pipe(
filter(x => x % 1 == 0),
scan((acc, x) => acc + x, 0),
)),
catchError(err => of('error found')),
).subscribe(console.log);
生命周期:创建流(create、new、创建类操作符)——> 执行流(subscribe) ——> 销毁流(unsubscribe)
操作符分类:
九、Scheduler(调度器)
Scheduler(调度器)控制着何时启动 subscription 和何时发送通知。RxJS 有自己的基准时钟和一套的执行规则,来安排多个任务/数据该如何执行。
种类 | 描述 |
---|---|
null | 不传递或 null 或 undefined,表示同步执行 |
queue | 使用队列的方式执行 |
asap | 全称:as soon as possible ,表示尽快执行 |
async | 使用 setInterval 的调度。 |
使用 subscribeOn
来调度 subscribe()
调用在什么样的上下文中执行。 默认情况下,Observable 的 subscribe()
调用会立即同步地执行。然而,你可能会延迟或安排在给定的调度器上执行实际的 subscription ,使用实例操作符 subscribeOn(scheduler)
,其中 scheduler 是你提供的参数。我们来看一个同步变异步执行的例子:
var observable = Rx.Observable.create(function (observer) {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
})
.observeOn(Rx.Scheduler.async);
console.log('just before subscribe');
observable.subscribe({
next: x => console.log('got value ' + x),
error: err => console.error('something wrong occurred: ' + err),
complete: () => console.log('done'),
});
console.log('just after subscribe');
//结果 输出
just before subscribe
just after subscribe
got value 1
got value 2
got value 3
done
使用原则/策略
RxJS Scheduler 的原则是:尽量减少并发运行。
- 对于返回有限和少量消息的 observable 的操作符,RxJS 不使用调度器,即
null
或undefined
。 - 对于返回潜在大量的或无限数量的消息的操作符,使用
queue
调度器。 - 对于使用定时器的操作符,使用
aysnc
调度器。
支持调度器的操作符
of 、 from 、 timer 、 interval 、 concat 、 merge 、 combineLatest ,
bufferTime 、 debounceTime 、 delay 、 auditTime 、 sampleTime 、 throttleTime 、 timeInterval 、 timeout 、 timeoutWith 、 windowTime 这样时间相关的操作符全部接收调度器作为最后的参数,并且默认的操作是在 Scheduler.async 调度器上。
附录:
通过示例来学习 RxJS 操作符