初见-响应式编程-002

 🤗 ApiHug × {Postman|Swagger|Api...} = 快↑ 准√ 省↓

  1. GitHub - apihug/apihug.com: All abou the Apihug   
  2. apihug.com: 有爱,有温度,有质量,有信任
  3. ApiHug - API design Copilot - IntelliJ IDEs Plugin | Marketplace

#Reactive

The Reactive Manifestoopen in new window:

Systems built as Reactive Systems are more flexible, loosely-coupled and scalable. This makes them easier to develop and amenable to change. They are significantly more tolerant of failure and when failure does occur they meet it with elegance rather than disaster. Reactive Systems are highly responsive, giving users effective interactive feedback.

  1. flexible,
  2. loosely-coupled
  3. scalable

  1. Responsive, 响应时间, 服务质量
  2. Resilient, 容错, 恢复能力
  3. Elastic, 弹性,动态扩容
  4. Message Driven, 事件驱动,低耦合

#Reactor

Reactor is an implementation of the Reactive Programming paradigm, which can be summed up as follows:

Reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. This means that it becomes possible to express static (e.g. arrays) or dynamic (e.g. event emitters) data streams with ease via the employed programming language(s). — https://en.wikipedia.org/wiki/Reactive_programmingopen in new window

最初 微软创建了 .NET 里的 Reactive Extensions (Rx); RxJava 实现了 JVM 上的 Reactive编程模式, 最终 JAVA 9 融入了 Flowopen in new window -- java.util.concurrent.Flowopen in new window

在 OO 编程里面 reactive 常被当做 Observer 观察者设计模式, 当然你可把 Reactive Stream 和你熟悉的 Iterator 设计模式做对比;两种实现里面都有Iterable-Iterator 两个概念, 主要的不一样在, Iterator 是一种 拉 pull 模式, 而 reactive 是一种push 推模式。

在两个流程中都有 next(), 在 reactive stream 更类似于 Publisher-Subscriber 模式,由Publishr 来控制新到的value 给 Subscriber; push 是 reactive 里面非常重要的一面;

程序实现着注重收到 value 后的计算逻辑 Operation, 而不是整个控制流程。

整个流程里面喂入数据 push 自然是整个响应流程里面最核心的流程, onNext 用来完成此动作, 还包含 错误 onErro() 和最终的结束处理 onComplete(), 整个流程可以被抽象为:

onNext x 0..N [onError | onComplete]

整个流程的处理可以说非常灵活, 可以有 0 个, 1个, N 个, 或者无限多的数据, 比如一个定时器。

回到问题的本质, WHY 我们为什么需要 异步的 reactive 模式呢?

#Asynchronicity 能解决

并行, 多核已经是常态来增加吞吐量和响应时间。

多线程用来最大化利用资源; 但是多线程异步可以解决问题通知, 带来了很大的挑战。 JVM 解决此引入两个概念:

  1. callback, 异步方法用来通知结果, 一般是一个内部类,或者一个 lamdba 表达式
  2. future,异步调用立即返回一个 Future<T>, 但是结果 T 尚不能立即获得, 结果获得后才能通过 poll 获得。 ExecutorService 跑 Callable<T> 时返回 Future。
#Callback地狱

callback 贯穿整个链路的调用过程:

userService.getFavorites(userId, new Callback<List<String>>() { 
  public void onSuccess(List<String> list) { 
    if (list.isEmpty()) { 
      suggestionService.getSuggestions(new Callback<List<Favorite>>() {
        public void onSuccess(List<Favorite> list) { 
          UiUtils.submitOnUiThread(() -> { 
            list.stream()
                .limit(5)
                .forEach(uiList::show); 
            });
        }

        public void onError(Throwable error) { 
          UiUtils.errorPopup(error);
        }
      });
    } else {
      list.stream() 
          .limit(5)
          .forEach(favId -> favoriteService.getDetails(favId, 
            new Callback<Favorite>() {
              public void onSuccess(Favorite details) {
                UiUtils.submitOnUiThread(() -> uiList.show(details));
              }

              public void onError(Throwable error) {
                UiUtils.errorPopup(error);
              }
            }
          ));
    }
  }

  public void onError(Throwable error) {
    UiUtils.errorPopup(error);
  }
});

如果换成 reactor:

userService.getFavorites(userId) 
           .flatMap(favoriteService::getDetails) 
           .switchIfEmpty(suggestionService.getSuggestions()) 
           .take(5) 
           .publishOn(UiUtils.uiThreadScheduler()) 
           .subscribe(uiList::show, UiUtils::errorPopup); 

reactor 不仅仅让整个流程更精简, 通知提供服务质量控制(类似熔断), 比如我们保证整个服务质量在 800ms内返回,超时后从 fallback cache 或者其他获取:

userService.getFavorites(userId)
           .timeout(Duration.ofMillis(800)) 
           .onErrorResume(cacheService.cachedFavoritesFor(userId)) 
           .flatMap(favoriteService::getDetails) 
           .switchIfEmpty(suggestionService.getSuggestions())
           .take(5)
           .publishOn(UiUtils.uiThreadScheduler())
           .subscribe(uiList::show, UiUtils::errorPopup);
#Future

Future 避免了回调地狱, 但是依然不太容易进行组装, 虽然在 Java 8 里面引入了 CompletableFuture, 编制多个 Future 在一起虽然可以操作,但是Future 还是有其他问题:

  1. Future 的 get() 依然是阻塞的
  2. 不支持延迟计算
  3. 缺乏对多结果的支持, 更好的错误处理

这样一个业务场景; 从一个 ID 列表, 去查询他们的name + 统计, 所有的都是异步 CompletableFuture 例子:

CompletableFuture<List<String>> ids = ifhIds(); 

CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> { 

	Stream<CompletableFuture<String>> zip =
			l.stream().map(i -> { 
				CompletableFuture<String> nameTask = ifhName(i); 
				CompletableFuture<Integer> statTask = ifhStat(i); 

				return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat); 
			});

	List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList()); 
	
  CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);

	CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray); 
	return allDone.thenApply(v -> combinationList.stream()
			.map(CompletableFuture::join) 
			.collect(Collectors.toList()));

});

List<String> results = result.join(); 
assertThat(results).contains(
		"Name NameJoe has stats 103",
		"Name NameBart has stats 104",
		"Name NameHenry has stats 105",
		"Name NameNicole has stats 106",

reactor 更紧凑解决方案, 更精炼容易理解:

Flux<String> ids = ifhrIds(); 

Flux<String> combinations =
		ids.flatMap(id -> { 
			Mono<String> nameTask = ifhrName(id); 
			Mono<Integer> statTask = ifhrStat(id); 

			return nameTask.zipWith(statTask, 
					(name, stat) -> "Name " + name + " has stats " + stat);
		});

Mono<List<String>> result = combinations.collectList(); 

List<String> results = result.block(); 
assertThat(results).containsExactly( 
		"Name NameJoe has stats 103",
		"Name NameBart has stats 104",
		"Name NameHenry has stats 105",
		"Name NameNicole has stats 106",
		"Name NameABSLAJNFOAJNFOANFANSF has stats 121"
);

#响应编程

除非上面我们看到, 让编码更清晰直观, reactor 等还在这些方面上花了很多心思:

  1. Composability and readability; 组件化, 可读性好。
  2. Data as a flow manipulated with a rich vocabulary of operators, 数据管道铺好, 自由搭配运算逻辑。
  3. Nothing happens until you subscribe, 延迟计算 subscribe 触发计算。
  4. Backpressure or the ability for the consumer to signal the producer that the rate of emission is too high, 背压控制, consumer 和 producer配合,消费端和生产端。
  5. High level but high value abstraction that is concurrency-agnostic; 润物细无声,抽象的让你感知不到并发。

Composability and Readability 包括可以自由的组建编制任务, 任务之间的依赖关系, 上下关系, 或者同步运行的 fork-join 风格, 高度抽象层使用异步任务。

流水线一样的操作, Reactor 既是传送带也是工作站, 原料从 Publisher 最终成品发送到消费端 Subscriber

Operators, 相当于流水线上的工作站, 整个流水线上就是上一个 Publisher 的产物, 然后包装发送到下一个 Publisher, 最终到一个 Subsccriber 里面。

延迟计算, 在Reactor 里当你写一个 Publisher 链条, 数据默认是不会启动起来, 你只是创建了一个异步处理的抽象流程(Spark 里的RDD, 或者像一个流程的 DSL)。

当你触发 subscribing 时候, 将 Publisher 和一个 Subscriber 绑定, 同事触发数据流, 流入到整个链中, 内部通过 Subscriber 触发一个 request 信号传递到上游, 最终到源的 Publisher

背压, 下游将信号传递给上游是用来实现 backpressure背压的一种方式, 依然用流水线的比方, 当下游的工作站赶不上上游的速度的时候需要反馈一个信号到上游去。

A subscriber can work in unbounded mode and let the source push all the data at its fastest achievable rate or it can use the request mechanism to signal the source that it is ready to process at most n elements.

可以 push-pull 方式混合, 下游容量自由控制上游的推送速度, 或者懒式的拉取。

Cold流和Hot流

  1. Cold流不论订阅者在何时订阅该数据流,总是能收到数据流中产生的全部消息。
  2. Hot流则是在持续不断地产生消息,订阅者只能获取到在其订阅之后产生的消息。

冷例子:

@Test
  public void cold_example() {
    Flux<String> source =
        Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
            .map(String::toUpperCase);

    source.subscribe(d -> System.out.println("Subscriber 1: " + d));
    source.subscribe(d -> System.out.println("Subscriber 2: " + d));
}

输出结果:

Subscriber 1: BLUE
Subscriber 1: GREEN
Subscriber 1: ORANGE
Subscriber 1: PURPLE
Subscriber 2: BLUE
Subscriber 2: GREEN
Subscriber 2: ORANGE
Subscriber 2: PURPLE

所有的 subscriber 都能得到结果。

热例子:

@Test
  public void hot_example() {

    Sinks.Many<String> hotSource = Sinks.unsafe().many().multicast().directBestEffort();

    Flux<String> hotFlux = hotSource.asFlux().map(String::toUpperCase);

    hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: " + d));

    hotSource.emitNext("blue", FAIL_FAST);
    hotSource.tryEmitNext("green").orThrow();

    hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: " + d));

    hotSource.emitNext("orange", FAIL_FAST);
    hotSource.emitNext("purple", FAIL_FAST);
    hotSource.emitComplete(FAIL_FAST);
  }

得到结果:

Subscriber 1 to Hot Source: BLUE
Subscriber 1 to Hot Source: GREEN
Subscriber 1 to Hot Source: ORANGE
Subscriber 2 to Hot Source: ORANGE
Subscriber 1 to Hot Source: PURPLE
Subscriber 2 to Hot Source: PURPLE

#结论

从 reactive 宣言我们看到响应式编程的 '理想', 初探 reactor, 我们看到 reactor 的强大表达力, 这些还只是管中窥豹, 更多的等待我们下面章节去探索和挖掘。

测试项目 Reactor_001_testopen in new window

#参考

  1. The Reactive Manifestoopen in new window
  2. reactive-streams-jvmopen in new window
  3. reactive-streamsopen in new window
  4. java 9 flowopen in new window
  5. Cold流和Hot流open in new window
  6. Flux 详实的流程图open in new window
  7. Mono 详实的流程图

我们

api-hug-contact

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/568972.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

lnmp架构

目录 环境 步骤 下载nginx源码包&#xff0c;并解压 安装依赖包 进行预编译 、编译安装 安装php、设置开机自启 配置nginx让其支持php服务 浏览器测试 安装mariadb 部署discuz论坛 简介 LNMP架构是一种常见的Web服务器架构&#xff0c;由Linux、Nginx、MySQL和PHP组成。它…

高级数据结构—线段树(一)

学线段树的原因是因为cf的一道题目始终想不出来怎么优化&#xff0c;后来知道区间查询和修改要用到线段树。。。 原题&#xff1a;Iva & Pav 线段树的作用 区间最值查询&#xff1a;可以高效地找到给定区间内的最大值、最小值等。 区间和查询&#xff1a;可以高效地计算…

Leetcode算法训练日记 | day34

专题九 贪心算法 一、K次取反后最大化的数组和 1.题目 Leetcode&#xff1a;第 1005 题 给你一个整数数组 nums 和一个整数 k &#xff0c;按以下方法修改该数组&#xff1a; 选择某个下标 i 并将 nums[i] 替换为 -nums[i] 。 重复这个过程恰好 k 次。可以多次选择同一个…

关于Spring事务管理之默认事务间调用问题

由事务的传播行为我们知道, 如果将方法配置为默认事务REQUIRED在执行过程中Spring会为其新启事务REQUIRES_NEW, 作为一个独立事务来执行. 由此存在一个问题。 如果使用不慎, 会引发org.springframework.transaction.UnexpectedRollbackException: Transaction rolled back bec…

ACE框架学习

目录 ACE库编译 ACE Reactor框架 ACE_Time_Value类 ACE_Event_Handler类 ACE定时器队列类 ACE_Reator类 ACE Reactor实现 ACE_Select_Reactor类 ACE_TP_Reactor类 ACE_WFMO_Reactor类 ACE库编译 首先去ACE官网下载安装包&#xff0c;通过vs2017或者2019进行编译&#x…

【洛谷 P8605】[蓝桥杯 2013 国 AC] 网络寻路 题解(图论+无向图+组合数学)

[蓝桥杯 2013 国 AC] 网络寻路 题目描述 X X X 国的一个网络使用若干条线路连接若干个节点。节点间的通信是双向的。某重要数据包&#xff0c;为了安全起见&#xff0c;必须恰好被转发两次到达目的地。该包可能在任意一个节点产生&#xff0c;我们需要知道该网络中一共有多少种…

10.接口自动化测试学习-Pytest框架(2)

1.mark标签 如果在每一个模块&#xff0c;每一个类&#xff0c;每一个方法和用例之前都加上mark标签&#xff0c;那么在pytest运行时就可以只运行带有该mark标签的模块、类、接口。 这样可以方便我们执行自动化时&#xff0c;自主选择执行全部用例、某个模块用例、某个流程用…

数据分析专家能力模型

招式&#xff1a;懂商业&#xff08;业务能力&#xff09; 外功更偏重于技能&#xff0c;首先需要懂招式&#xff0c;即懂商业&#xff0c;数据分析最终是为业务服务的&#xff0c;无论是互联网企业准求的用户增长和UJM分解&#xff0c;还是传统企业追求的降本增效和精细化运营…

appium相关的知识

>adb shell dumpsys window | findstr mCurrentFocus adb devices # 实例化字典 desired_caps = dict() desired_caps[platformName] = Android desired_caps[platformVersion] = 9 # devices desired_caps[deviceName] = emulator-5554 # 包名 desired_caps[appPackage] …

重建大师出现“密集匹配失败”的情况是什么原因?

答&#xff1a;一般出现密集匹配失败的情况&#xff0c;就是瓦块连接点过少&#xff0c;空瓦块边缘瓦块等原因导致。遇见这种情况&#xff0c;确定是边缘瓦块导致后&#xff0c;就可以不用管&#xff0c;不是模型主体&#xff0c;不影响成果。 重建大师是一款专为超大规模实景三…

MySQL__索引

文章目录 &#x1f60a; 作者&#xff1a;Lion J &#x1f496; 主页&#xff1a; https://blog.csdn.net/weixin_69252724 &#x1f389; 主题&#xff1a; MySQL__索引&#xff09; ⏱️ 创作时间&#xff1a;2024年04月23日 ———————————————— 索引介绍…

消消乐算法总结

前言 最近在工作中遇到一个问题&#xff0c;做一个消消乐的demo项目&#xff0c;连续相同数目超过四个后就要消除。我在网上看了很多解决方案&#xff0c;有十字形&#xff0c;横向&#xff0c;纵向&#xff0c;梯形搜索。越看越迷糊。这不是用一个BFS就能解决的问题吗&#x…

MySQL数据库进阶篇一(存储引擎、索引)

目录 一、存储引擎1.1、MySQL体系结构&#xff1a;连接层&#xff0c;Server层&#xff0c;引擎层&#xff0c;存储层1.2、存储引擎1.2.1、存储引擎&#xff1a;InnoDB(MySQL 5.5后默认的存储引擎)1.2.2、存储引擎&#xff1a;MyISAM (MySQL早期默认存储引擎)1.2.3、存储引擎&a…

数据可视化———Tableau

基本认识&#xff1a; 维度&#xff1a;定性—字符串文本&#xff0c;日期和日期时间等等 度量&#xff1a;定量—连续值&#xff0c;一般属于数值 数据类型&#xff1a; 数值 日期/日期时间 字符串 布尔值 地理值 运算符 算数运算符&#xff1a;加减乘除,%取余&#xff0c;…

【Flask】Flask中HTTP请求与接收

一、接收http请求与返回响应 在Flask中&#xff0c;可以通过app.route装饰器来定义路由函数。 app.route(/BringGoods,methods [POST, GET]) GET请求&#xff1a;使用request.args.get(key)或者request.values.get(key)来获取URL中的参数。 POST请求&#xff1a; 使用req…

Python自学之路--001:Python + PyCharm安装图文详解教程

目录 1、概述 2、Python解释器 2.1、下载 2.2、Python安装 2.3、Python环境变量配置&#xff0c;必选项 3、PyCharm安装 3.1、PyCharm下载 3.2、PyCharm安装 4、建一个Hello World 5、Phcarm设置 5.1、Phcarm汉化 5.2、Phcarm工具栏显示在顶部 5.3、Phcarm通过pip安…

【QT学习】9.绘图,三种贴图,贴图的转换,不规则贴图(透明泡泡)

一。绘图的解释 Qt 中提供了强大的 2D 绘图系统&#xff0c;可以使用相同的 API 在屏幕和绘图设备上进行绘制&#xff0c;它主要基于QPainter、QPaintDevice 和 QPaintEngine 这三个类。 QPainter 用于执行绘图操作&#xff0c;其提供的 API 在 GUI 或 QImage、QOpenGLPaintDev…

linux18:进程等待

进程等待的必要性 1&#xff1a;子进程创建的目的是要完成父进程指派的某个任务&#xff0c;当子进程运行完毕退出时&#xff0c;父进程需要通过进程等待的方式&#xff0c;回收子进程资源&#xff0c;获取子进程退出信息&#xff08;子进程有无异常&#xff1f;没有异常结果是…

研究发现:提示中加入数百个示例显著提升大型语言模型的性能

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

人工智能时代的关键技术:深入探索向量数据库及其在AI中的应用

文章目录 1. 理解向量数据库&#xff1a;二维模型示例2. 向量数据库中的数据存储与检索3. 向量数据库如何工作&#xff1f;4. 向量数据库如何知道哪些向量相似&#xff1f; 在人工智能技术日益成熟的当下&#xff0c;向量数据库作为处理和检索高维数据的关键工具&#xff0c;对…