利用java8 的 CompletableFuture 优化 Flink 程序,性能提升 50%

你好,我是 shengjk1,多年大厂经验,努力构建 通俗易懂的、好玩的编程语言教程。 欢迎关注!你会有如下收益:

  1. 了解大厂经验
  2. 拥有和大厂相匹配的技术等

希望看什么,评论或者私信告诉我!

文章目录

  • 一、前言
  • 二、Flink 代码优化
    • 2.0 问题发现
    • 2.1 原有代码
    • 2.2 CompletableFuture 优化
  • 三、avatorscript 使用的简单介绍
    • 3.1 自定义函数
    • 3.2 从 Map 中取值
    • 3.3 使用 Java 的工具类
    • 3.4 AviatorScript 函数
  • 四、总结


一、前言

目前 Flink 利用 avatorscript 脚本语言,来做到规则的自动化更新。avatorscript将表达式直接翻译成对应的 java 字节码执行,所以在大数据量的情况下,自然而然这里就成为了瓶颈

二、Flink 代码优化

2.0 问题发现

图片.png
通过 Flink UI 发现 window 算子是瓶颈,而 window 算子的核心就是 avatorscript 表达式

2.1 原有代码

xxx
AviatorEvaluator.execute(columnFunction, dataView.getProperties(), true);
xxx

经过测试平均执行时间在1毫秒以内,但经不住数据量大,所以Flink QPS一直在 11w 左右

2.2 CompletableFuture 优化

xxx
List<CompletableFuture> executeFutures=new ArrayList<>();

CompletableFuture<Object> executeFuture = CompletableFuture.supplyAsync(() -> {
                return AviatorEvaluator.execute(columnFunction, dataView.getProperties(), true);
            });
executeFutures.add(executeFuture);

for (int i = 0; i < executeFutures.size(); i++) {
    executeFutures.get(i).get()
    xxxx
}

修改完上线后,Flink QPS 有原来 11W 增加到 17W 左右

三、avatorscript 使用的简单介绍

为了让你更容易理解 avatorscript,这里我们也可以先简单的介绍一下:

3.1 自定义函数

class AddFunction extends AbstractFunction {
    @Override
    public AviatorObject call(Map<String, Object> env,

                              AviatorObject arg1, AviatorObject arg2) {
        Number left = FunctionUtils.getNumberValue(arg1, env);
        Number right = FunctionUtils.getNumberValue(arg2, env);
        return new AviatorDouble(left.intValue() + right.intValue());
    }

    public String getName() {

        return "add" ;
    }
}


public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {
    //注册函数
   AviatorEvaluator.addFunction(new AddFunction());
    System.out.println(AviatorEvaluator.execute( "add(2,1)" ));
}

3.2 从 Map 中取值

public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {
    //注册函数
     AviatorEvaluator.addFunction(new AddFunction());
 HashMap<String, Object> stringObjectHashMap = new HashMap<>();
    stringObjectHashMap.put( "testId1" , 1);
    stringObjectHashMap.put( "testId2" , 2);
    Object execute = AviatorEvaluator.execute( "add(testId1,testId2)" , stringObjectHashMap);

3.3 使用 Java 的工具类

public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {
 HashMap<String, Object> stringObjectHashMap = new HashMap<>();
    stringObjectHashMap.put( "ip" , "a1111" );
    // stringObjectHashMap.put("result", "a&B&C&d");
 stringObjectHashMap.put( "voucher_endtime" , "2022.03.02 11:32" );
    stringObjectHashMap.put( "imei2" , "v1aaaaaa1" );
    stringObjectHashMap.put( "testId" , "v1ot_service_quality_1111" );
    stringObjectHashMap.put( "testId1" , "sku" );
    stringObjectHashMap.put( "a" , "123" );
    stringObjectHashMap.put( "a1" , "null" );
    stringObjectHashMap.put( "b1" , 123);
 
    AviatorEvaluator.addStaticFunctions( "doubleStatic" , Double.class);
    AviatorEvaluator.addInstanceFunctions( "doubleInstance" , Double.class)

 execute2 = AviatorEvaluator.execute( "(doubleStatic.valueOf(sys_net_bandwidth))" , stringObjectHashMap);
    System.out.println(execute2);
    execute2 = AviatorEvaluator.execute( "doubleInstance.longValue(doubleStatic.valueOf(sys_net_bandwidth)) " , stringObjectHashMap);
    System.out.println( "###" + execute2);
    execute2 = AviatorEvaluator.execute( "doubleInstance.longValue(doubleStatic.valueOf(str(voucher)))" , stringObjectHashMap);

3.4 AviatorScript 函数

## examples/function.av
fn add(x, y) {
  return x + y;
}
p(add(1,2))
public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {
    String function = "## examples/function.av\n" +
            "\n" +
            "fn add(x, y) {\n" +
            "  return x + y;\n" +
            "}" ;
    AviatorEvaluator.defineFunction( "add" , function);
    System.out.println( "defineFunction6666================+" + AviatorEvaluator.execute( "add(1,2)" , stringObjectHashMap));
}

四、总结

本文主要介绍了 Flink 中使用 avatorscript 脚本语言的问题,以及如何通过 CompletableFuture 优化代码来提高 Flink QPS。同时,还介绍了 avatorscript 的使用方法,包括自定义函数、从 Map 中取值、使用 Java 工具类和 AviatorScript 函数。通过本文的介绍,读者可以更好地了解 Flink 中 avatorscript 的使用方法,以及如何优化代码来提高 Flink QPS。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/640921.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【数据结构】线性表习题 |顺序表 |链表 |栈和队列

&#x1f4d6;专栏文章&#xff1a;数据结构学习笔记 &#x1faaa;作者主页&#xff1a;格乐斯 前言 线性表习题 |顺序表 |链表 |栈和队列 顺序表和链表 1、 选B 1002(5-1)108* 第i个元素地址X&#xff0c;元素长度Len&#xff0c;第j个元素地址Y 公式&#xff1a;YXL…

Docker进入容器查看内容并从容器里拷贝文件到宿主机

工作中需要从docker正在运行的镜像中复制文件到宿主机&#xff0c;于是便将这个过程记录了下来。 &#xff08;1&#xff09;查看正在运行的容器 通过以下命令&#xff0c;可以查看正在运行的容器&#xff1a; docker ps &#xff08;2&#xff09;进入某个容器执行脚本 我…

备考AMC8和AMC10竞赛,吃透2000-2024年1850道真题和解析(持续)

多做真题&#xff0c;吃透真题和背后的知识点是备考AMC8、AMC10有效的方法之一&#xff0c;通过做真题&#xff0c;可以帮助孩子找到真实竞赛的感觉&#xff0c;而且更加贴近比赛的内容&#xff0c;可以通过真题查漏补缺&#xff0c;更有针对性的补齐知识的短板。 今天我们继续…

Android Audio基础——AudioFlinger回放录制线程(七)

AndioFlinger 作为 Android 的音频系统引擎,重任之一是负责输入输出流设备的管理及音频流数据的处理传输,这是由回放线程 PlaybackThread 及其派生的子类和录制线程 RecordThread 进行的。 一、基础介绍 1、关系图 ThreadBase:PlaybackThread 和 RecordThread 的基类。 Re…

群晖NAS使用Docker部署WPS Office结 合内网穿透实现远程编辑本地文档

文章目录 1. 拉取WPS Office镜像2. 运行WPS Office镜像容器3. 本地访问WPS Office4. 群晖安装Cpolar5. 配置WPS Office远程地址6. 远程访问WPS Office小结 7. 固定公网地址 wps-office是一个在Linux服务器上部署WPS Office的镜像。它基于WPS Office的Linux版本&#xff0c;通过…

redis--redis Cluster

简介 解决了redis单机写入的瓶颈问题&#xff0c;即单机的redis写入性能受限于单机的内存大小、并发数量、网卡速率等因素无中心架构的redis cluster机制&#xff0c;在无中心的redis集群当中&#xff0c;其每个节点保存当前节点数据和整个集群状态,每个节点都和其他所有节点连…

Redis机制-Redis互斥锁、分布式锁

目录 一 互斥锁 二 分布式锁 Redis实现分布式锁 redisson实现分布式锁 可重入性&#xff1a; 主从一致性&#xff08;性能差&#xff09;&#xff1a; 一 互斥锁 假设我们现在有一个业务要实现秒杀优惠券的功能&#xff0c;如果是一个正常的流程&#xff0c;线程之间应该…

ThreadLocal为什么会导致内存泄漏?

问题引出&#xff1a; ThreadLocal是为了解决什么问题而产生的&#xff1f; ThreadLocal发生内存泄漏的根本原因是什么&#xff1f; 如何避免内存泄漏的发生&#xff1f;定义 为了解决多个线程同时操作程序中的同一个变量而导致的数据不一致性的问题。   假设现在有两个线程A…

【C++题解】1696. 请输出1~n之间所有的整数

问题:1696. 请输出1~n之间所有的整数 类型&#xff1a;循环 题目描述&#xff1a; 从键盘读入一个整数 &#x1d45b;n &#xff0c;请循环输出 1∼n 之间所有的整数&#xff0c;每行输出 1 个。 比如&#xff0c;假设 n5 &#xff0c;那么输出结果如下&#xff1a; 1 2 3 4 …

微调Llama3实现在线搜索引擎和RAG检索增强生成功能

视频中所出现的代码 Tavily SearchRAG 微调Llama3实现在线搜索引擎和RAG检索增强生成功能&#xff01;打造自己的perplexity和GPTs&#xff01;用PDF实现本地知识库_哔哩哔哩_bilibili 一.准备工作 1.安装环境 conda create --name unsloth_env python3.10 conda activate …

我用 Midjourney 的这种风格治愈了强迫症

在 Midjourney 能够实现的各种布局之中&#xff0c;有两种风格因其简洁、有序而独居魅力&#xff0c;它们就是平铺 (Flat Lay) 和 Knolling (Knolling 就是 Knolling, 无法翻译&#x1f923;)。要在现实生活中实现这样的美学效果并不容易&#xff0c;你需要精心挑选各种小物件&…

【JAVA WEB实用与优化技巧】如何自己封装一个自定义UI的Swagger组件,包含Swagger如何处理JWT无状态鉴权自动TOKEN获取

目录 一、Swagger 简介1. 什么是 Swagger&#xff1f;2. 如何使用 Swagger3. Springboot 中swagger的使用示例1. maven 引入安装2. java配置 二、Swagger UI存在的缺点1.不够方便直观2.请求的参数没有缓存3.不够美观4.如果是JWT 无状态登录&#xff0c;Swagger使用起来就没有那…

MVCC相关

文章目录 前情要点基于什么引擎并发事务产生的问题不可重复读和幻读区别Next-Key Lock的示例解决并发事务采用的隔离级别当前读(Current Read)快照读(Snapshot Read)参考 MVCC定义表里面的隐藏字段由db_roll_ptr串成的版本链ReadView可见性算法mvcc的可见性算法为什么要以提交的…

编译器 编译过程 compiling 动态链接库 Linking 接口ABI LTO PGO inline bazel增量编译

编译器 编译过程 compiling 动态链接库 Linking 接口ABI LTO PGO Theory Shared Library Symbol Conflicts (on Linux) 从左往右查找:Note that the linker only looks further down the line when looking for symbols used by but not defined in the current lib.Linux 下…

【C++题解】1697. 请输出n~1之间所有的整数

问题:1697. 请输出n~1之间所有的整数 类型&#xff1a;循环 题目描述&#xff1a; 从键盘读入一个整数 n &#xff0c;请输出 n∼1 之间所有的整数&#xff0c;每行输出 1 个。 比如&#xff0c;假设读入 n5 &#xff0c;输出结果如下&#xff1a; 5 4 3 2 1 输入&#xff1…

第199题|关于函数的周期性问题|函数强化训练(六)|武忠祥老师每日一题 5月24日

解题思路&#xff1a;解这道题我们要用到下面这个结论 f(x)连续&#xff0c;以T为周期时&#xff0c;原函数以T为周期的充分必要条件是&#xff1a; (A) sin x显然是以π为周期的&#xff0c;我们可以看到并不等于0,根据结论&#xff0c;A的原函数显然不是周期函数。 (B) 的…

移动端仪表盘,支持更多组件

05/22 主要更新模块概览 定位函数 快捷筛选 轨迹图表 时间组件 01 表单管理 1.1 【表单组件】- 表单关联新增支持自定义按钮样式 说明&#xff1a; 表单关联-关联数据按钮&#xff0c;原仅支持默认按钮样式&#xff0c;现增加关联数据按钮自定义功能&#xff0c;满…

【传知代码】掩码自回归编码器法(论文复现)

前言&#xff1a;在探索现代数据科学的前沿领域时&#xff0c;掩码自回归编码器法&#xff08;Masked Autoencoder&#xff0c;简称MAE&#xff09;无疑是一个引人注目的亮点。这一技术&#xff0c;凭借其独特的训练机制和卓越的性能&#xff0c;已经在图像识别、自然语言处理以…

《我的阿勒泰》观后感(二、返璞归真也是一种美)

看了李娟的小说《我的阿勒泰》逐渐悟到一个道理&#xff0c;返璞归真也是一种美&#xff0c;没必要每个人的人生三十年的年华&#xff0c;都去追求房子&#xff0c;车子等逐渐贬值的东西。人究竟应该追求怎样的一种活法&#xff1f; 什么是城市化&#xff1f;这是我听到的最好…

osgearth 3.5 vs 2019编译

下载源码 git clone --recurse-submodules https://github.com/gwaldron/osgearth.git 修改配置文件 主要是修改bootstrap_vcpkg.bat&#xff0c;一处是vs的版本&#xff0c;第二处是-DCMAKE_BUILD_TYPERELEASE 构建 执行bootstrap_vcpkg.bat vs中生成安装 vs2019打开bu…