Flink实时电商数仓之旁路缓存

撤回流的处理

撤回流是指流式处理过程中,两表join过程中的数据是一条一条跑过来的,即原本可以join到一起的数据在刚开始可能并没有join上。

  • 撤回流的格式:
    在这里插入图片描述
  • 解决方案
    • 定时器:使用定时器定时10s(数据最大的时间差值),定时器触发时将状态中的数据发送过来
    • 如果重复计算这些数据,如何保持结果正确即可;通过每次度量值修改为当次度量值 - 上次度量值即可

异步IO

在这里插入图片描述

  • 减少等待的时间,充分利用已有的资源
  • 使用异步IO时,必须保证从头到尾都是异步的操作;即使用异步的连接器
/**
     * 获取到 redis 的异步连接
     *
     * @return 异步链接对象
     */
    public static StatefulRedisConnection<String, String> getRedisAsyncConnection() {
        RedisClient redisClient = RedisClient.create("redis://hadoop102:6379/2");
        return redisClient.connect();
    }



    /**
     * 关闭 redis 的异步连接
     *
     * @param redisAsyncConn
     */
    public static void closeRedisAsyncConnection(StatefulRedisConnection<String, String> redisAsyncConn) {
        if (redisAsyncConn != null) {
            redisAsyncConn.close();
        }
    }

 /**
     * 获取到 Hbase 的异步连接
     *
     * @return 得到异步连接对象
     */
    public static AsyncConnection getHBaseAsyncConnection() {
        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", "hadoop102");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        try {
            return ConnectionFactory.createAsyncConnection(conf).get();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 关闭 hbase 异步连接
     *
     * @param asyncConn 异步连接
     */
    public static void closeAsyncHbaseConnection(AsyncConnection asyncConn) {
        if (asyncConn != null) {
            try {
                asyncConn.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

异步IO关联

  1. AsyncDataStream.unorderedWait(异步核心逻辑, 60, TimeUnit.SECONDS) 异步关联维度表
  2. CompletableFuture.supplyAsync(new Supplier<>(){ 异步访问读取Redis中的数据 }),返回的数据类型是Future类型
    • 先拼写访问的redisKey
    • 获取到dimSkuInfoFuture期货
    • 使用dimSkuInfoFuture.get()获取异步结果
  3. thenApplyAsync(new Function<>()), 旁路缓存判断,判断是否在redis中读取到相关数据,如果没有读取到,需要访问HBase.
  4. 需要重写HBase的getCells方法,改为getAsyncCells方法
    • 连接更换为异步连接
    • Future类型数据需要再get()方法获取具体的值
    • 无需关闭连接
  5. 将从HBase读取的数据保存到redis, redisAsyncConnection.async().setex(redisKey,24*60*60,dimJsonObj.toJSONString());

异步维度关联封装

  1. 继承RichAsyncFunction<TradeSkuOrderBean, TradeSkuOrderBean>接口
  2. 将表名和rowkey拼接的方法抽象化,让方法调用者自己传进来
  3. 封装join方法, join(TradeSkuOrderBean input, JSONOjbect dim); join方法里面填写度量值的聚合逻辑
  4. 将抽象方法和具体方法分离,把抽象方法放到接口中,在实现该接口
  5. TradeSkuOrderBean类改为泛型方法T
public abstract class DimAsyncFunction<T> extends RichAsyncFunction<T, T>
implements DimJoinFunction<T>{

    StatefulRedisConnection<String, String> redisAsyncConnection;
    AsyncConnection hBaseAsyncConnection;
    String tableName;


    @Override
    public void open(Configuration parameters) throws Exception {
        redisAsyncConnection = RedisUtil.getRedisAsyncConnection();
        hBaseAsyncConnection = HBaseUtil.getHBaseAsyncConnection();
    }

    @Override
    public void close() throws Exception {
        RedisUtil.closeRedisAsyncConnection(redisAsyncConnection);
        HBaseUtil.closeAsyncHbaseConnection(hBaseAsyncConnection);
    }

    @Override
    public void asyncInvoke(T input, ResultFuture<T> resultFuture) throws Exception {
        //java的异步编程方式
        String tableName = getTableName();
        String rowKey = getId(input);
        String redisKey = RedisUtil.getRedisKey(tableName, rowKey);
        CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {

                //第一步异步访问得到的数据

                RedisFuture<String> dimSkuInfoFuture = redisAsyncConnection.async().get(redisKey);
                String dimInfo = null;
                try {
                    dimInfo = dimSkuInfoFuture.get();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return dimInfo;
            }
        }).thenApplyAsync(new Function<String, JSONObject>() {
            @Override
            public JSONObject apply(String dimInfo) {
                JSONObject dimJsonObj = null;
                //旁路缓存判断
                if (dimInfo == null || dimInfo.isEmpty()) {
                    try {
                        //需要访问HBase
                        dimJsonObj = HBaseUtil.getAsyncCells(hBaseAsyncConnection, Constant.HBASE_NAMESPACE, tableName, rowKey);
                        //将读取的数据保存到redis
                        redisAsyncConnection.async().setex(redisKey, 24 * 60 * 60, dimJsonObj.toJSONString());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                } else {
                    //redis中存在缓存数据
                    dimJsonObj = JSONObject.parseObject(dimInfo);
                }
                return dimJsonObj;
            }
        }).thenAccept(new Consumer<JSONObject>() {
            public void accept(JSONObject dim) {
                //合并维度信息
                if (dim == null) {
                    //无法关联到维度信息
                    System.out.println("无法关联到当前的维度信息:" + tableName + ":" + rowKey);
                } else {
                    join(input,dim);
                }

                //返回结果
                resultFuture.complete(Collections.singletonList(input));
            }
        });
    }



}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/293027.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【数据结构—二叉树的链式结构实现】

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言 一、二叉树的存储结构 二、二叉树链式结构的实现 2.1手动构建一课树 2.2二叉树的遍历 三、二叉树链式结构的实现 3.1前序遍历(递归) 3.2中序遍历(递归) 3.3后序…

如何成为ChatGPT 优质Prompt创作者

如何提问&#xff1f; 我想让你成为我的Prompt创作者。你的目标是帮助我创作最佳的Prompt&#xff0c;这个Prompt将由你ChatGPT使用。你将遵循 以下过程&#xff1a;1.首先&#xff0c;你会问我Prompt是关于什么&#xff1f;我会告诉你&#xff0c;但我们需要 通过不断的重复来…

LCR 176. 判断是否为平衡二叉树

解题思路&#xff1a; class Solution {public boolean isBalanced(TreeNode root) {return recur(root) ! -1;}private int recur(TreeNode root) {if (root null) return 0;int left recur(root.left);if(left -1) return -1;int right recur(root.right);if(right -1) …

使用通用MCU实现无人机飞行任务的快速二次开发

使用通用MCU实现无人机飞行任务的快速二次开发 ---TIDronePilot外部控制offboard模式介绍 无名小哥 2024年1月1日 传统飞控二次开发方法和主要存在的问题简介 通过对前面几讲中《零基础竞赛无人机积木式编程指南》系列开发教程的学习可知&#xff0c;在以往TI电赛真题的学习…

2023APMCM亚太数学建模C题 - 中国新能源汽车的发展趋势(2)

五&#xff0e;问题二模型建立和求解 5.1 问题二模型建立和求解 针对题目二&#xff0c;题目要求收集中国新能源电动汽车行业发展数据&#xff0c;建立数学模型描述&#xff0c;并预测未来十年的发展。由于在第一文中&#xff0c;我们已经收集了一定的新能源行业发展数据&…

Quartus II 13.1的安装及使用

Quartus II 13.1的安装及使用_quartus13.1-CSDN博客1.3 Verilog 环境搭建 | 菜鸟教程 学习 Verilog 做仿真时&#xff0c;可选择不同仿真环境。FPGA 开发环境有 Xilinx 公司的 ISE&#xff08;目前已停止更新&#xff09;&#xff0c;VIVADO&#xff1b;因特尔公司的 Quartu…

【React系列】JSX核心语法和原理

本文来自#React系列教程&#xff1a;https://mp.weixin.qq.com/mp/appmsgalbum?__bizMzg5MDAzNzkwNA&actiongetalbum&album_id1566025152667107329) 一. ES6 的 class 虽然目前React开发模式中更加流行hooks&#xff0c;但是依然有很多的项目依然是使用类组件&#x…

imgaug库指南(三):从入门到精通的【图像增强】之旅

引言 在深度学习和计算机视觉的世界里&#xff0c;数据是模型训练的基石&#xff0c;其质量与数量直接影响着模型的性能。然而&#xff0c;获取大量高质量的标注数据往往需要耗费大量的时间和资源。正因如此&#xff0c;数据增强技术应运而生&#xff0c;成为了解决这一问题的…

14.7-时序反馈移位寄存器建模

时序反馈移位寄存器建模 1&#xff0c;阻塞赋值实现的LFSR&#xff0c;实际上并不具有LFSR功能1.1.1&#xff0c;RTL设计&#xff0c;阻塞赋值1.1.2&#xff0c;tb测试代码1.1.3&#xff0c;波形仿真输出&#xff0c;SIM输出&#xff0c;没实现LFSR1.2.1&#xff0c;RTL设计&am…

【导出与导入Virtualbox虚拟机和启动连接openGauss数据库】

【导出与导入Virtualbox虚拟机和启动连接openGauss数据库】 一、导出虚拟机二、导入虚拟机三、启动数据库四、使用Data Studio连接数据库 一、导出虚拟机 选择关机状态的虚拟机 -> 管理菜单 -> 导出虚拟电脑 点击完成后&#xff0c;需要等待一小段时间&#xff0c;如…

神经网络的核心:帮助新手理解 PyTorch 非线性激活函数

目录 torch.nn子函数非线性激活详解 nn.Softmin Softmin 函数简介 函数工作原理 参数详解 使用技巧与注意事项 示例代码 nn.Softmax Softmax 函数简介 函数工作原理 参数详解 使用技巧与注意事项 示例代码 nn.Softmax2d Softmax2d 函数简介 函数工作原理 输入…

Java异常简单介绍

文章目录 1. 异常分类和关键字1.1 分类1.2 关键字 2. Error2.1 Error定义2.2 常见的Error2.2.1 VirtualMachineError2.2.2 ThreadDeath2.2.3 LinkageError2.2.4 AssertionError2.2.5 InternalError2.2.6 OutOfMemoryError2.2.6.1 OOM原因2.2.6.2 OutOfMemoryError会导致宕机吗 …

Python(wordcloud):根据文本数据(.txt文件)绘制词云图

一、前言 本文将介绍如何利用python来根据文本数据&#xff08;.txt文件&#xff09;绘制词云图&#xff0c;除了绘制常规形状的词云图&#xff08;比如长方形&#xff09;&#xff0c;还可以指定词云图的形状。 二、相关库的介绍 1、安装相关的库 pip install jieba pip i…

c语言-指针进阶

文章目录 前言一、字符指针二、数组指针2.1 数组指针基础2.2 数组指针作函数参数 总结 前言 在c语言基础已经介绍过关于指针的概念和基本使用&#xff0c;本篇文章进一步介绍c语言中关于指针的应用。 一、字符指针 字符指针是指向字符的指针。 结果分析&#xff1a; "ab…

【常用排序算法】快速排序

##快速排序 快速排序由于排序效率在同为O(N*logN)的几种排序方法中效率较高&#xff0c;因此经常被采用&#xff0c;再加上快速排序思想----分治法 先从数列中取出一个数作为基准数pivot。分区过程&#xff0c;将比这个数大的数全放到它的右边&#xff0c;小于或等于它的数全放…

索引类型-哈希索引

一. 前言 前面我们简单介绍了数据库的B-Tree索引&#xff0c;下面我们介绍另一种索引类型-哈希索引。 二. 哈希索引的简介 哈希索引(hash index) 基于哈希表实现&#xff0c;只有精确匹配索引所有列的查询才有效。对于每一行数据&#xff0c;存储引擎都会对所有索引列计算一个…

智能穿戴时代 | 米客方德SD NAND的崭新优势

SD NAND在智能穿戴上的优势 SD NAND是一种可以直接焊接在智能穿戴设备主板上的存储芯片&#xff0c;其小型化设计有助于紧凑设备尺寸&#xff0c;同时提供可靠的嵌入式存储解决方案。 这种集成设计减少了空间占用&#xff0c;同时确保设备在高度活动的环境中更为稳定。SD NAND…

【数据库系统概论】数据库恢复机制

系统文章目录 数据库的四个基本概念&#xff1a;数据、数据库、数据库管理系统和数据库系统 数据库系统的三级模式和二级映射 数据库系统外部的体系结构 数据模型 关系数据库中的关系操作 SQL是什么&#xff1f;它有什么特点&#xff1f; 数据定义之基本表的定义/创建、修改和…

[论文笔记] Megtron_LM 0、报错:vscode调试无法传进去参数 launch.json文件获取args参数

解决方法&#xff1a; 配置好launch.json文件后&#xff0c;应该点运行和调试里面的运行按钮。而不是直接点文件右上角的debug。 可以看到terminal中&#xff0c;如果没有正常加载launch.json&#xff0c;则参数中没有args的参数。 如果正常加载&#xff0c;可以看到args的很多…

Mysql 重要知识点1(含面试题1)

Mysql 知识点较多&#xff0c;这里涵盖了基本知识点&#xff0c;包括SQL语句 、重要面试题等。后面还有几章Mysql的知识点&#xff0c;分别是刷题总结与进阶优化SQL 面试题等。 目录 Mysql 安装Mysql 重要知识点SQL 重要语句面试题精选 Mysql 安装 1.官网下载mysql5.7版本压缩…