接上一篇:
1.1.2背压
背压是响应式编程的核心概念,这一节也是我们了解响应式编程的重点。
1.背压的机制
在生产者/消费者模型中,我们意识到消费者在消费由生产者生产的数据的同时,也需要有一种能够向上游反馈流量需求的机制,这种能够向上游反馈请求的机制称之为背压。
如下图所示
现在我们从一个具体的角度来说背压的概念。在1.1.1中我们了解了同步消费和异步消费,其中异步消费者会向生产者订阅消费数据,当有新的数据可用时,消费者会通过之前订阅时提供的回调函数激活调用过程。
如果生产者发出的数据比消费者能够处理的数据量大而且快时,消费者可能会被迫一直再获取或处理数据,消耗越来越多的资源,从而埋下潜在的风险。为了防止这一点,需要有一种机制,消费者可以通知生产者降低生产数据的速度。生产者可以通过多种方式来实现这一要求,这时候我们就会用到背压机制。
采用背压机制后,消费者告诉生产者降低生产数据速度并保存元素,知道消费者能够处理更多的元素。使用背压可以有效地避免过快的生产者压制消费者。如果生产者要一直生产和保存元素,使用背压也可能会要求其拥有无限制的缓冲区。生产者也可以实现有界缓冲区来保存有限数量的元素,如果缓冲区已满可以选择放弃。
2.背压的实现方式
背压的实现方式有两种,一种是阻塞式背压另一种是非阻塞式背压。
1、阻塞式背压
阻塞式背压是比较容易实现的,例如:当生产者和消费者在同一个线程中运行时,其中任何一方都将阻塞其他线程的执行。这就意味着,当消费者被执行时,生产者就不能发出任何新的数据元素。因而也需要一中自然地方式来平衡生产数据和消费数据的过程。
在有些情况下,阻塞式背压会出现不良的问题,比如:当生产者有多个消费者时,不是所有消费者都能以同样的速度消费消息。当消费者和生产者在不同环境中运行时,这就达不到降压的目的了。
2、非阻塞式背压
背压机制应该以非阻塞式的方式工作,实现非阻塞式背压的方法是放弃推策略,采用拉策略。生产者发送消息给消费者等操作都可以保存在拉策略当中,消费者会要求生产者生成多少消息量,而且最多只能发送这些量,然后等到更多消息的请求。(关于推策略和拉策略请回顾)
1.1.3 响应式流
响应式编程的另外一个核心概念就是响应式流。响应式流是一种规范,这种规范表现在技术上就是一批被预先定义好的接口。
1.响应式流规范
响应式流规范是提供非阻塞背压的异步流处理标准倡议的。响应式流的目标是定义将数据流从生产者传递到消费者而不需要生产者阻塞。在响应式流模型中,消费者向生产者发送多个元素的异步请求,然后生产者向消费者发送合适数量的数据。
各个响应式开发库都要遵循响应式流规范,采用规范的好处是显而易见的。由于各个响应式都遵循一套规范,因而互相兼容,不同的开发库之间也是可以进行交流的。甚至可以在同一个项目中使用多个开发库。而Spring WebFlux响应式web是采用Reactor框架来实现的。(其他开发库可百度了解这里我们只探讨reactor)。
虽然响应式流规范用来约束响应式开发库的实现方式的,但是作为使用者而言,能够了解这一规范,对我们了解使用开发库的方法和基本原理很有帮助,因为规范内容都是对响应式编程思想的精髓呈现。
2.响应式流接口
Java API响应式流有4个接口,即
Publisher
、
Subscription
、
Subscriber
和
ProcessorT,R
。
1、Publisher``
发布者(Publisher)是潜在的包含无限数量的有序元素的生产者,他根据收到的请求向当前订阅者发送元素。接口定义如下:
123
public interface PublisherT{ public void subscribe (Subscriber? super T s);}
public interface Publisher{
public void subscribe (Subscriber? super s);
}
2、Subscriber``
订阅者从发布这那里订阅并且接收元素。发布者想订阅者发送订阅令牌。使用订阅令牌,订阅者向发布者请求多个元素。当元素准备就绪时,发布者就会向订阅者发送合适数量的元素。然后订阅者可以请求更多的元素,发布者也可能有多个来自订阅者的待处理请求。接口定义如下:
123456
public interface SubscriberT{ public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete();}
public interface Subscriber{
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
当执行发布者的
subscribe()
方法时,发布者会回调订阅者的
onSubscribe()
方法。在这个方法中订阅者通常会借助传入的Subscription 对象向发布者请求n个数据。然后发布者通过不断调用
onNext()
方法想订阅者发出最多n个数据。如果数据全部发送完毕,就会调用
onComplete()
方法告知订阅者所需要的n个数据已经发送完毕。如果有错误发生就会通过调用
onError()
方法发出错误数据,这同样也会终止数据流。
3、Subscription``
订阅(Subscription )表示订阅者订阅的一个发布者的令牌,当订阅情趣成功时,发布者将其传递给订阅者,订阅者使用订阅令牌与发布者进行交互,比如:请求更多的元素或取消订阅。接口定义如下:
1234
public interface Subscription{ public void request(Long n); public void cancel();}
public interface Subscription{
public void request(Long n);
public void cancel();
}
当发布者调用
subscribe()
方法注册订阅者时,会通过订阅者的回调方法
onSubscribe()
传入Subscription对象,之后订阅者就可以使用Subscription对象的
request()
方法向发布者请求数据了。
Publisher
、
Subscription
、
Subscriber
三者的交互如下:
4、ProcessorT,R``
处理器(Processor)充当订阅者和发布者之间的处理媒介,
Processor
接口继承了
Publisher
和
Subscribe
接口,它用于转换发布者/订阅者管道中的元素。
ProcessorT,R
订阅类型T的数据元素,接收并转换为R的数据,然后发布该数据。处理器在发布者/订阅者管道中充当转换器的角色。接口定义如下:
123
public interface ProcessorT,R extends SubscriberT,PublisherR{ }
public interface ProcessorT,R extends Subscriber,PublisherR{
}
Processor
集
Subscriber
和
Publisher
于一身,三者之间的关系如下所示:
这四个接口是实现各个响应式开发库之间互相兼容的桥梁,响应式流规范也仅仅聚焦于此,而对于转换、合并、分组等操作并未做要求。也是一个非常抽象且精简的接口规范。
原文始发于微信公众号(Java知音):