在互联网应用构建过程中,我们知道可以采用异步非阻塞的编程模型来提高服务的响应能力。而为了实现异步非阻塞,我们可以引入数据流,并对数据的流量进行控制。我们来考虑一个场景,如果数据消费的速度跟不上数据发出的速度,会发生什么现象呢?让我们来看一下。
显然,如果我们不对生产者生产的数据流量进行控制,那么消费者中的数据就会不断的积压,从而导致出现服务不可用等异常情况。那么如何解决这一问题呢?这就需要引入响应式编程中的一个核心概念,即背压。
背压和响应式流
在数据流中,我们注意到当消费者无法及时处理超过其承受能力的数据量时,需要有一个反馈机制,告知生产者调整生产数据的速度。从数据的流转方向而言,这种反馈机制由位于数据流下游的消费者进行发起,代表着消费者处理数据的压力,所以被称为Back Pressure,翻译成中文就是背压。
一旦有了背压机制,位于下游的消费者就可以通知位于上游的生产者合理控制数据生产的速度,从而确保消费者能够正常处理数据流中的数据。这样,消费者就不会因为数据流量过大而出现问题。
明白了背压的概念,那么问题又来了,如何实现背压呢?这就需要引入一套完整的规范,基于这套规范,开发人员可以获取所有实现背压机制所需要的编程组件,这套规范就是响应式流(Reactive Stream)规范。
响应式流规范为如何实现基于背压的数据流提供了一种事实上的标准。在该规范中,我们通过这样一种机制来实现背压:消费者发送一种异步请求向生产者反馈自己所能处理的数据量,然后生产者同样通过一种异步响应的方式向消费者发送对应的数据量。
响应式流规范
从表现形式上讲,响应式流规范为开发人员提供了一批事先约定好的接口定义。通过这些接口中所包含的各个操作方法,数据流就能从生产者异步传递到消费者。同时,生产者所产生的数据也不会导致消费者无法正常消费。
响应式流是一个非常简洁的规范,只包含了Publisher、Subscriber、Subscription和Processor这四个核心接口。其中Publisher和Subscriber分别充当了生产者和消费者的角色。
我们先来看代表生产者的Publisher接口,该接口的作用就是生成一定数量的数据并进行发送,而发送数据的前提是接收到来自订阅者的请求。
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
可以看到在Publisher接口的subscribe方法中传入了一个Subscriber,这个Subscriber代表的就是订阅者。另一方面,订阅者想要发送订阅请求的前提是需要明确该次请求的数据量,这部分内部被封装在一个订阅令牌中,Subscriber接口的定义如下所示。
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
可以看到在Subscriber接口中包含了一个onSubscribe方法,该方法需要传入代表订阅令牌的Subscription参数。结合Publisher接口和Subscriber接口,我们注意到当执行Publisher接口的subscribe方法时,Subscriber接口的onSubscribe方法就会被执行,这是一种典型的回调处理机制。
而从方法命名上看,Subscriber接口的所有方法都是以on为前缀,代表了对数据流处理过程都是采用了回调处理机制。除了onSubscribe方法之外,onNext回调方法会根据Subscription参数中所包含的请求数量逐一发出数据。数据的发送过程会有两种情况,一种情况是所有数据都成功发送,这时候onComplete回调方法就会被触发;或者是数据发送过程出现了错误,那么就会执行onError回调方法中的处理流程。
接下来,让我们来到前面已经介绍到的Subscription接口。Subscription接口的作用就是一种令牌,相当于在Publisher和Subscriber之间建立了一种请求响应的桥梁,它的定义如下所示。
public interface Subscription {
public void request(long n);
public void cancel();
}
显然,Subscription接口的request方法用于向Subscriber请求n个数据,而cancel方法则可以用来对请求过程执行取消操作。
现在,我们已经掌握了响应式流规范中生产者和消费者之间的数据处理流程,让我们回到背压机制,看看在这种处理流程下如何实现背压。在整个数据流处理过程中,处于数据流下游的Subscriber通过Subscription接口的request方法向Publisher请求数据,这就是一种向上反馈的机制,也是实现背压的关键所在。Publisher、Subscriber和Subscription三者之间的交互关系如图所示。
至于响应式流规范中的最后一个接口Processor,它同时具备Publisher和Subscriber接口的所有能力,并提供了对数据流中数据进行转换和处理的能力,定义如下。
public interface Processor<T,R> extends Subscriber<T>,
Publisher<R> {
}
可以看到,Processor可以将来自Subscriber接口的数据类型从T转换为R并返回给Subscriber,这种转换关系如下图所示。
以上四个接口构成了响应式流规范的主体。虽然接口的定义并不复杂,但围绕数据流所展开的交互过程值得我们做进一步的总结。
上图中所示的交互方式共包含7个步骤。
- 当Publisher需要执行数据发布操作时,首先需要明确所发布数据的数量,这时候就应该创建一个Subscription接口的实例。
- 然后Publisher通过Subscriber的onSubscribe回调方法发送数据,这个过程中需要用到前面所创建的Subscription。
- 我们知道Subscription中包含了一个request方法,执行该方法将发起真正的数据请求。
- 一旦成功发起请求,Subscriber中的onNext回调方法就会执行,该方法会对发送的数据进行业务处理。
- 每当处理完一个数据之后,Subscription的request方法将再次被执行,直到所有数据都发送完毕。
- 同样,Publisher继续发送数据,而Subscriber中的onNext回调方法继续处理数据。
- 取决于数据发送过程是否顺利结束,系统会触发Subscriber的onComplete或onError回调方法,代表数据流是正常结束还是异常结束。
响应式流规范实现框架
响应式流只是一种规范,而不是一种实现框架或工具。围绕响应式流规范,业界也诞生了一批响应式编程框架,包括Spring 5中所引入的Project Reactor,以及RxJava、Akka和Vert.x等经典框架。
规范的作用就是为所有技术框架提供了一种能够相互协作的兼容模式。基于响应式流规范,开发人员可以在同一个应用程序中综合使用一组响应式编程框架。以Spring 5为例,默认使用了Project Reactor框架,但我们也可以引入RxJava来开发业务代码。下图展示了基于响应式流规范的几种代表性具体技术框架以及它们之间的一种可能交互过程。
对于开发人员而言,掌握如何使用这些响应式编程框架是日常开发过程中的一个方面。更重要的是,我们需要理解和掌握这些框架背后的设计思想和理念,而响应式流规范正是响应式编程思想和理念的精髓所在。
总结
在今天的内容中,我们对响应式流规范的方方面面进行了阐述。我们首先需要明确,数据流处理过程中的流量控制和背压机制是促使响应式流规范诞生的原因,响应式流规范的目的就是为了更好的处理消费者和生产者之间的数据交互过程。同时,我们也注意到想要实现背压机制,需要数据消费者具备一种向上反馈的能力,响应式流规范通过提供一组接口定义了这种能力,包括Publisher、Subscriber、Subscription和Processor。这些接口中的方法并不复杂,但却完整的描绘了整个数据流的高效处理过程。