进行 “最佳价格查询器” 的开发

前置条件

public class Shop {
    private final String name;
    private final Random random;
    public Shop(String name) {
        this.name = name;
        random = new Random(name.charAt(0) * name.charAt(1) * name.charAt(2));
    }

    public double getPrice(String product) {
        return calculatePrice(product);
    }

    private double calculatePrice(String product) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }
}

采用顺序查询所有商店的方式

// 采用顺序查询所有商店的方式
public List<String> findPricesSequential(String product) {
    return shops.stream()
            .map(shop -> Thread.currentThread().getName() + shop.getName() + "-" + shop.getPrice(product))
            .collect(Collectors.toList());
}

使用并行流对请求进行并行操作

// 使用并行流对请求进行并行操作
public List<String> findPricesParallel(String product) {
    return shops.parallelStream()
            .map(shop -> Thread.currentThread().getName() + shop.getName() + "-" + shop.getPrice(product))
            .collect(Collectors.toList());
}

使用CompletableFuture发起异步请求(使用内部通用线程池)

// 使用CompletableFuture发起异步请求
public List<String> findPricesFuture(String product) {
    List<CompletableFuture<String>> priceFutures =
            shops.stream()
                    .map(shop -> CompletableFuture.supplyAsync(() -> Thread.currentThread().getName() + shop.getName() + "-" + shop.getPrice(product)))// 内部采用的通用线程池,默认都使用固定数目的线程,具体线程数取决于Runtime.getRuntime().availableProcessors()的返回值。
                    
                    .collect(Collectors.toList());
    List<String> prices = priceFutures.stream()
            .map(CompletableFuture::join) // 对List中的所有future对象执行join操作,一个接一个地等待它们运行结束
            .collect(Collectors.toList());
    return prices;
}

使用CompletableFuture发起异步请求(使用定制的执行器)

CompletableFuture类中的join方法和Future接口中的get有相同的含义,并且也声明在Future接口中,它们唯一的不同是join不会抛出任何检测到的异常。

private final Executor executor = Executors.newFixedThreadPool(shops.size(), ExecuterThreadFactoryBuilder.build("searcher-thread-%d"));

// 使用CompletableFuture发起异步请求+使用定制的执行器
public List<String> findPricesFutureCustom(String product) {
    List<CompletableFuture<String>> priceFutures =
            shops.stream()
                    .map(shop -> CompletableFuture.supplyAsync(() -> Thread.currentThread().getName() + shop.getName() + "-" + shop.getPrice(product), executor))
                    .collect(Collectors.toList());

    List<String> prices = priceFutures.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
    return prices;
}

性能比较

笔者电脑是16线程,所以构造测试数据时16个线程任务是个门槛
在这里插入图片描述

private List<Shop> shops = new ArrayList<>();
{
    for (int i = 0; i < 64; i++) {
        shops.add(new Shop("LetsSaveBig3" + i));
    }
    System.out.println(shops.size());
}

StopWatch stopWatch = new StopWatch("性能比较");
execute("sequential", () -> bestPriceFinder.findPricesSequential("myPhone27S"), stopWatch);
execute("parallelStream", () -> bestPriceFinder.findPricesParallel("myPhone27S"), stopWatch);
execute("CompletableFuture", () -> bestPriceFinder.findPricesFuture("myPhone27S"), stopWatch);
execute("CompletableFutureExecuter", () -> bestPriceFinder.findPricesFutureCustom("myPhone27S"), stopWatch);
StopWatchUtils.logStopWatch(stopWatch);

private static void execute(String msg, Supplier<List<String>> s, StopWatch stopWatch) {
    stopWatch.start(msg);
    System.out.println(s.get());
    stopWatch.stop();
}
availableProcessors() = 164线程任务8线程任务16线程任务20线程任务24线程任务28线程任务32线程任务64线程任务
Sequential4035 ms8057 ms16108 ms20154 ms24131 ms28106 ms32196 ms64325 ms
parallelStream1005 ms1021 ms1022 ms2022 ms2013 ms2008 ms2012 ms4017 ms
CompletableFuture1008 ms1019 ms2022 ms2027 ms2016 ms2006 ms3017 ms5043 ms
CompletableFutureExecuter1012 ms1007 ms1019 ms1023 ms10191012 ms1020 ms1025 ms

线程池如何选择合适的线程数目

线程池中线程的数目取决于你预计你的应用需要处理的负荷,但是你该如何选择合适的线程数目呢?

如果线程池中线程的数量过多,最终它们会竞争稀缺的处理器和内存资源,浪费大量的时间在上下文切换上。
如果线程的数目过少,处理器的一些核可能就无法充分利用。

《Java并发编程实战》作者 Brian Goetz 建议,线程池大小与处理器的利用率之比可以使用下面的公式进行估算:
N(threads) = N(CPU) * U(CPU) * (1 + W/C)
其中:
·N(CPU)是处理器的核的数目,可以通过Runtime.getRuntime().availableProcessors()得到
·U(CPU)是期望的CPU利用率(该值应该介于0和1之间)
·W/C是等待时间与计算时间的比率

公式理解:
C / (C+W) = N(CPU) * U(CPU) / N(threads) → 计算时间占比 = 有效CPU在线程数中的占比

线程极限阈值数计算
你的应用99%的时间都在等待商店的响应,所以估算出的W/C比率为100。且CPU利用率是100%,则根据公式极限阈值为16*1*100=1600 ,即创建一个拥有1600个线程的线程池。

你的应用99%的时间都在等待商店的响应,所以估算出的W/C比率为100。这意味着如果你期望的CPU利用率是100%,你需要创建一个拥有1600个线程的线程池。实际操作中,如果你创建的线程数比商店的数目更多,反而是一种浪费,因为这样做之后,你线程池中的有些线程根本没有机会被使用。出于这种考虑,我们建议你将执行器使用的线程数,与你需要查询的商店数目设定为同一个值,这样每个商店都应该对应一个服务线程。不过,为了避免发生由于商店的数目过多导致服务器超负荷而崩溃,你还是需要设置一个上限,比如100个线程。代码清单如下所示。

private final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size()100), ExecuterThreadFactoryBuilder.build("searcher-thread-%d"));

public List<String> findPricesFutureCustom(String product) {
    List<CompletableFuture<String>> priceFutures =
            shops.stream()
                    .map(shop -> CompletableFuture.supplyAsync(() -> Thread.currentThread().getName() + "-" + shop.getName() + "-" + shop.getPrice(product), executor))
                    .collect(Collectors.toList());

    List<String> prices = priceFutures.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
    return prices;
}
Processors=164线程任务(ms)816202428326410050010001600320040008000
Sequential40358057161082015424131281063219664325
parallelStream10051021102220222013200820124017711032240
CompletableFuture10081019202220272016200630175043705834177
newFixedThreadPool shops.size()101210071019102310191012102010251029108113651330240716623129
newFixedThreadPool min(shops.size(),100)109310435116101923243480658

public static ThreadFactory build(String nameFormat) {
    return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(nameFormat).build();
}

注意,当前创建的是一个由守护线程构成的线程池。Java程序无法终止或者退出一个正在运行中的线程,所以最后剩下的那个线程会由于一直等待无法发生的事件而引发问题。如果将线程标记为守护进程,意味着程序退出时它也会被回收。这二者之间没有性能上的差异。

综上比较可知,CompletableFuture + Executer方式最高效。一般而言,这种状态会一直持续,直到商店的数目达到我们之前计算的 阈值 1600。这个例子证明了要创建更适合你的应用特性的执行器,利用CompletableFutures向其提交任务执行是个不错的主意。处理需大量使用异步操作的情况时,这几乎是最有效的策略。


并行——使用流还是CompletableFutures?
目前为止,你已经知道对集合进行并行计算有两种方式:要么将其转化为并行流,利用map这样的操作开展工作,要么枚举出集合中的每一个元素,创建新的线程,在CompletableFuture内对其进行操作。后者提供了更多的灵活性,你可以调整线程池的大小,而这能帮助你确保整体的计算不会因为线程都在等待I/O而发生阻塞。
我们对使用这些API的建议如下。
1、如果你进行的是计算密集型的操作,并且没有I/O,那么推荐使用Stream接口,因为实现简单,同时效率也可能是最高的(如果所有的线程都是计算密集型的,那就没有必要创建比处理器核数更多的线程)。
2、如果你并行的工作单元还涉及等待I/O的操作(包括网络连接等待),那么使用CompletableFuture灵活性更好,你可以像前文讨论的那样,依据等待/计算,或者W/C的比率设定需要使用的线程数。这种情况不使用并行流的另一个原因是,处理流的流水线中如果发生I/O等待,流的延迟特性会让我们很难判断到底什么时候触发了等待。

参考

《Java8 实战》第11章 CompletableFuture:组合式异步编程

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/125235.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

SpringBoot自动配置的原理篇,剖析自动配置原理;实现自定义启动类!附有代码及截图详细讲解

SpringBoot 自动配置 Condition Condition 是在Spring 4.0 增加的条件判断功能&#xff0c;通过这个可以功能可以实现选择性的创建 Bean 操作 思考&#xff1a;SpringBoot是如何知道要创建哪个Bean的&#xff1f;比如SpringBoot是如何知道要创建RedisTemplate的&#xff1f;…

剖析WPF模板机制的内部实现

剖析WPF模板机制的内部实现 众所周知&#xff0c;在WPF框架中&#xff0c;Visual类是可以提供渲染&#xff08;render&#xff09;支持的最顶层的类&#xff0c;所有可视化元素&#xff08;包括UIElement、FrameworkElment、Control等&#xff09;都直接或间接继承自Visual类。…

单例模式 rust和java的实现

文章目录 单例模式介绍应用实例&#xff1a;优点使用场景 架构图JAVA 实现单例模式的几种实现方式 rust实现 rust代码仓库 单例模式 单例模式&#xff08;Singleton Pattern&#xff09;是最简单的设计模式之一。这种类型的设计模式属于创建型模式&#xff0c;它提供了一种创建…

【第2章 Node.js基础】2.4 Node.js 全局对象...持续更新

什么是Node.js 全局对象 对于浏览器引擎来说&#xff0c;JavaScript 脚本中的 window 是全局对象&#xff0c;而Node.js程序中的全局对象是 global&#xff0c;所有全局变量(除global本身外)都是global 对象的属性。全局变量和全局对象是所有模块都可以调用的。Node.is 的全局…

docker部署mongodb

1&#xff1a;拉去momgodb镜像 2&#xff1a;拉去成功后&#xff0c;通过docker-compose.yml配置文件启动mongodb&#xff0c;docker-compose.yml配置如下 version: 3.8 services:mongodb-1:container_name: mongodbimage: mongo ports:- "27017:27017"volumes:- G:…

TCP/IP详解

TCP/IP详解 一、网络基础1.TCP/IP网络分层2.IP地址和端口号3.封装与分用4.客户-服务端模型 二、链路层1.以太网IEEE802封装2.环回接口 Loopback Interface3.最大传输单元MTU和路径MTU 三、网络层 - IP1.IP首部的关键信息2.IP路由选择3.子网寻址和子网掩码4.ICMP和IGMP 四、传输…

python爬虫hook定位技巧、反调试技巧、常用辅助工具

一、浏览器调试面板介绍 二、hook定位、反调试 Hook 是一种钩子技术&#xff0c;在系统没有调用函数之前&#xff0c;钩子程序就先得到控制权&#xff0c;这时钩子函数既可以加工处理&#xff08;改变&#xff09;该函数的执行行为&#xff0c;也可以强制结束消息的传递。简单…

Postgresql数据类型-布尔类型

前面介绍了PostgreSQL支持的数字类型、字符类型、时间日期类型&#xff0c;这些数据类型是关系型数据库的常规数据类型&#xff0c;此外PostgreSQL还支持很多非常规数据类型&#xff0c;比如布尔类型、网络地址类型、数组类型、范围类型、json/jsonb类型等&#xff0c;从这一节…

python poetry的教程

Poetry Python世界中&#xff0c;Poetry是一个近年来备受瞩目的工具&#xff0c;它为开发者提供了一个灵活且强大的依赖管理解决方案。Poetry可以帮助开发者管理项目的依赖关系&#xff0c;同时提供了一系列的工具和功能&#xff0c;使开发者能够更轻松地创建和管理复杂的项目。…

简单的小调度器

收集小资源下的简单调度器 https://github.com/sigma318/TOS/tree/master https://github.com/smset028/xxddq

Go cobra简介

当你需要为你的 Go 项目创建一个强大的命令行工具时&#xff0c;你可能会遇到许多挑战&#xff0c;比如如何定义命令、标志和参数&#xff0c;如何生成详细的帮助文档&#xff0c;如何支持子命令等等。为了解决这些问题&#xff0c;github.com/spf13/cobra 就可以派上用场。 g…

说说对高阶组件的理解?应用场景?

一、是什么 高阶函数&#xff08;Higher-order function&#xff09;&#xff0c;至少满足下列一个条件的函数 接受一个或多个函数作为输入输出一个函数 在React中&#xff0c;高阶组件即接受一个或多个组件作为参数并且返回一个组件&#xff0c;本质也就是一个函数&#xf…

Linux环境配置(云服务器)

目录 1.第一步&#xff1a;购买云服务器 2.第二步&#xff1a;下载Xshell 7 3.第三步&#xff1a;打开Xshell&#xff0c;登录云服务器 4.第四步&#xff1a;更加便捷的云服务器登录方式 1.第一步&#xff1a;购买云服务器 &#xff08;推荐&#xff1a;阿里云、华为云、腾…

k8s----27、云原生存储StorageClass、CSI、Rook

1、动态存储介绍 StorageClass:存储类&#xff0c;由K8s管理员创建&#xff0c;用于动态PV的管理&#xff0c;可以链接至不同的后端存储&#xff0c; 比如Ceph、GlusterFS等。之后对存储的请求可以指向StorageClass&#xff0c;然后StorageClass会自动 的创建、删除PV。 实现方…

数据结构(c语言版) 栈

顺序栈 要求&#xff1a;实现顺序栈的入栈&#xff0c;出栈&#xff0c;显示栈 代码 #include <stdio.h> #define MAXSIZE 100struct liststack{int data[MAXSIZE];int top; };//初始化栈 void init(struct liststack * LS){LS->top -1; }//入栈操作 void instack…

Git详解及常用命令

前言 Git 是一个分布式版本控制系统&#xff0c;用于跟踪和管理项目的代码变化。它由Linus Torvalds在2005年创建&#xff0c;现在是开源社区中最流行的版本控制工具之一。 国内码云地址&#xff1a;工作台 - Gitee.com 版本控制系统 (VCS)&#xff1a;Git 用于跟踪文件和目录…

yolov8+多算法多目标追踪+实例分割+目标检测+姿态估计(代码+教程)

多目标追踪实例分割目标检测 YOLO (You Only Look Once) 是一个流行的目标检测算法&#xff0c;它能够在图像中准确地定位和识别多个物体。 本项目是基于 YOLO 算法的目标跟踪系统&#xff0c;它将 YOLO 的目标检测功能与目标跟踪技术相结合&#xff0c;实现了实时的多目标跟…

MUYUCMS v2.1:一款开源、轻量级的内容管理系统基于Thinkphp开发

MuYuCMS&#xff1a;一款基于Thinkphp开发的轻量级开源内容管理系统&#xff0c;为企业、个人站长提供快速建站解决方案。它具有以下的环境要求&#xff1a; 支持系统&#xff1a;Windows/Linux/Mac WEB服务器&#xff1a;Apache/Nginx/ISS PHP版本&#xff1a;php > 5.6 (…