如何通过 AWS Managed Apache Flink 实现 Iceberg 的实时同步

AWS Managed Apache Flink (以下以 MSF 代指)是 AWS 提供的一款 Serverless 的 Flink 服务。

1. 问题

大家在使用 MSF 的时候,可能遇到最大的一个问题就是 MSF 的依赖管理,很多时候在 Flink 上运行的代码,托管到 MAF 上之后发现有很多依赖问题需要解决,大体上感觉就是 MSF 一定需要一个纯洁的环境,纯洁的 Flink 代码包。
而我们在使用 MSF 向 Iceberg 表写入数据时候更是如此。在使用 MSF 向 Iceberg 写入数据时,使用 Glue Data Catalog,会遇到如下报错:

Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
at org.apache.iceberg.flink.FlinkCatalogFactory.clusterHadoopConf(FlinkCatalogFactory.java:211)
at org.apache.iceberg.flink.FlinkCatalogFactory.createCatalog(FlinkCatalogFactory.java:139)
at org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:406)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalog(TableEnvironmentImpl.java:1356)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1111)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:701)

分析上面的错误,发现是在执行 Craete catalog 的时候,调用了 clusterHadoopConf 方法。我们在继续分析源码,在Iceberg 的源码 FlinkCatalogFactory 中,找到报错的代码位置,如下:

public static Configuration clusterHadoopConf() {
  return HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration());
}

而 HadoopUtils 这个类是来自于 org.apache.flink.runtime.util.HadoopUtils,我怀疑可能是 MAF 的环境是依赖于 EKS,因此镜像中并没有包含和 hadoop 相关的依赖,导致这里方法加载默认配置的时候,找不到 org/apache/hadoop/conf/Configuration 类,但是当我尝试在 maven 中加入 hadoop-client 依赖后,仍然存在这个问题。

2. 解决方案

通过上面的分析,我们知道了问题是出在了 org.apache.flink.runtime.util.HadoopUtils这个类,查找了很多资料,终于在 github 的 issue 中发现也有人遇到过这样的问题【#3044】,并且给出了一个绕行的方法,就是在自己的代码工程中重写 org.apache.flink.runtime.util.HadoopUtils这个类,不得不承认这是一个高明的方法。

重写HadoopUtils
在我们的代码工程中创建一个 package,并且添加一个名为 HadoopUtils 的 class,填入如下代码:

package org.apache.flink.runtime.util;

import org.apache.hadoop.conf.Configuration;

public class HadoopUtils {

    public static Configuration getHadoopConfiguration(
            org.apache.flink.configuration.Configuration flinkConfiguration) {
        return new Configuration(false);
    }
}

然后重新打包代码。
也可以参考 github 上的代码,链接🔗 github code
然后我们就可以编译打包代码。

3. Demo

下面我们通过一个完整的 Demo 来了解如何在 MSF 上实现 Iceberg 表的实时摄入。Demo 中会使用一个数据生成工具 Datafaker ,生成数据并且写入 MSK(kafka)中。

3.1 编译代码

获取 Demo代码,直接编译打包。

3.2 创建 MSF Application

  1. 将打包的 jar 上传至S3
  2. 进入 MSF 控制台,创建 Application,版本选择 Flink 1.18。
  3. 在 Application code location 部份填写在第1步上传的 jar 位置。
  4. MAF 会自动创建一个 IAM Role,在完成 Application 创建之后,请记得给这个 IAM Role 添加 Glue 读和写 Data Catalog 的权限,因为 Demo 代码工程会使用 Glue data catalog 作为 Iceberg catalog。
  5. 创建完 Application 就可以直接点击 Run 运行了。

3.3 生成数据

export MYBROKERS=<kafka-server>
export KAFKA_HOME=/home/ec2-user/environment/kafka_2.12-2.8.1
export TOPIC=datafaker_user_order_list_01
export IMPORT_ROWS=100000
#写入一条记录的间隔时间,也可以不设置
export INTERVAL=0.01
datafaker kafka $MYBROKERS $TOPIC $IMPORT_ROWS --meta dataformat_01.txt --interval $INTERVAL

这里就不详细介绍 datafaker 的使用了,如果想了解 datafaker 的参数配置可以从这个 github datafaker 获取。

3.4 在 Athena 中查询数据写入的结果

注意,如果 Athena 开启了 Reuse query results,可能会导致 count(*) 查询的不是最新的结果。
在这里插入图片描述

  1. 运维监控
    4.1 Metrics
    由于写入 Iceberg 表,不会在 Flink UI 看到 Records Recevied 以及 Records Send 等指标,因此如果想查看 Iceberg Sink 写入的数据量,需要进入Flink UI Sink 算子中,查看 Metrics 的 committedDataFilesRecordCount 指标。
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/623338.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

CV每日论文--2024.5.10

1、Attention-Driven Training-Free Efficiency Enhancement of Diffusion Models 中文标题&#xff1a;扩散模型的注意力驱动的训练免费效率增强 简介&#xff1a;扩散模型在生成高质量和多样化图像方面取得了出色的表现,但其卓越性能是以昂贵的架构设计为代价的,特别是广泛使…

1727jsp思想政治活动Myeclipse开发mysql数据库web结构java编程计算机网页项目

一、源码特点 JSP 思想政治活动管理系统 是一套完善的web设计系统&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统采用web模式&#xff0c;系统主要采用B/S模式开发。开发环境为TOMCAT7.0,Myeclipse8.5开发&#xff…

【VTKExamples::Rendering】第十期 TestStippledLine

很高兴在雪易的CSDN遇见你 VTK技术爱好者 QQ:870202403 公众号:VTK忠粉 前言 本文分享VTK样例TestStippledLine,希望对各位小伙伴有所帮助! 感谢各位小伙伴的点赞+关注,小易会继续努力分享,一起进步! 你的点赞就是我的动力(^U^)ノ~YO 1. TestStippledLin…

【机器学习】机器学习与人工智能融合新篇章:自适应智能代理在多元化复杂环境中的创新应用与演进趋势

&#x1f512;文章目录&#xff1a; &#x1f4a5;1.引言 &#x1f68b;1.1 机器学习与人工智能的发展背景 &#x1f68c;1.2 自适应智能代理的概念与重要性 &#x1f690;1.3 研究目的与意义 ☔2.自适应智能代理的关键技术 &#x1f6e3;️2.1 环境感知与信息处理技术 …

JavaEE技术之SpringCloud(Nacos注册中心、Nacos配置中心、Sentinel实现熔断与限流)

文章目录 SpringCloud Alibaba1、简介1.1 背景1.2 Nacos主要功能1.3 Nacos和SpringBoot、SpringCloud版本选择 2、Nacos注册中心2.1 案例准备2.2 Nacos注册中心下载启动2.2.1 下载2.2.2 解压启动2.2.3 nacos-server访问测试 2.3 nacos注册中心客户端整合2.3.1 订单服务整合naco…

英伟达解码性能NVDEC

如果你能打开官网&#xff0c;请看这里&#xff1a; NVDEC Application Note 下面是摘录&#xff1a;

python数据可视化:从n个点中挑选m组3个点绘制m个三角形matplotlib.pyplot.triplot()

【小白从小学Python、C、Java】 【考研初试复试毕业设计】 【Python基础AI数据分析】 python数据可视化&#xff1a; 从n个点中挑选m组3个点 绘制m个三角形 matplotlib.pyplot.triplot() [太阳]选择题 以下关于matplotlib.pyplot.triplot()函数说法正确的是&#xff1f; impor…

Python环境变量的访问:从入门到精通

环境变量是操作系统中的一种特殊变量&#xff0c;它允许用户在不修改程序的情况下&#xff0c;通过配置文件来改变程序的行为。在Python中&#xff0c;访问环境变量是一个常见的需求&#xff0c;无论是为了读取配置信息、获取系统信息&#xff0c;还是进行程序调试。本文将详细…

draw.io 网页版二次开发(1):源码下载和环境搭建

目录 一 说明 二 源码地址以及下载 三 开发环境搭建 1. 前端工程地址 2. 配置开发环境 &#xff08;1&#xff09;安装 node.js &#xff08;2&#xff09;安装 serve 服务器 3. 运行 四 最后 一 说明 应公司项目要求&#xff0c;需要对draw.io进行二次开发&…

python练习题(编程)

目录 7-1 输入列表&#xff0c;求列表元素和(eval输入应用&#xff09; 输入格式: 输出格式: 输入样例: 输出样例: 7-3 求矩阵鞍点的个数 7-5 求圆面积 输入格式: 输出格式: 输入样例: 输出样例: 7-6 字典合并与排序 输入格式: 输出格式: 输入样例1: 输出样例1:…

Redis:分布式系统

文章目录 分布式单机架构应用数据分离架构应用服务集群架构 负载均衡读写分离冷热分离架构垂直分库微服务架构 分布式 下面就要简单对于分布式进行一个认识了 单机架构 在进行了解分布式之前&#xff0c;先了解一下什么是单机架构 如上所示就是一个单机架构&#xff0c;对于…

【JavaEE 初阶(六)】网络编程

❣博主主页: 33的博客❣ ▶️文章专栏分类:JavaEE◀️ &#x1f69a;我的代码仓库: 33的代码仓库&#x1f69a; &#x1faf5;&#x1faf5;&#x1faf5;关注我带你了解更多网络知识 目录 1.前言2.浅谈网络2.1基本知识2.2.OSI与TCP/IP 3.网络编程3.1TCP与UDP区别3.2UDP网路编程…

18 【Aseprite 作图】描边 换颜色 蒙版

1 描边的方式&#xff1a;选择“编辑 - 特效 - 轮廓” 就可以一键描边了 2 替换颜色 通过“编辑 - 替换颜色”&#xff0c;就把颜色从黄色 替换成了 紫色 3 调整色相&#xff0c;通过“编辑 - 调整 - 色相/饱和度”&#xff0c;就可以类似PS调整色相饱和度 4 铅笔选择“锁…

vue+springboot用户注销功能

vue文件前端 <el-button type"warning" plain click"handleDeletion">注 销</el-button> // 注销 const handleDeletion (userName) > {ElMessageBox.confirm(注销该用户所有信息后无法恢复&#xff0c;您确认注销吗?, 注销确认, { type…

【eclipse】如何在IDE里创建一个Java Web项目?

如何在eclipse中创建一个动态Web项目并成功运行&#xff1f; 一、 最终效果 懒得写那么多了…我也不知道该怎么写了&#xff0c;有点乱&#xff0c;有问题可以在评论里留言&#xff0c;我看到会解决的&#xff0c;在这个过程中也踩到了一些坑&#xff0c;但好在有CSDN帮助解决…

【VTKExamples::Rendering】第十一期 TestStringToImageDemo

很高兴在雪易的CSDN遇见你 VTK技术爱好者 QQ:870202403 公众号:VTK忠粉 前言 本文分享TestStringToImageDemo,该样例用于将字符转化为Image,希望对各位小伙伴有所帮助! 感谢各位小伙伴的点赞+关注,小易会继续努力分享,一起进步! 你的点赞就是我的动力(^U^…

YOLOv8独家改进:backbone改进 | 微软新作StarNet:超强轻量级Backbone | CVPR 2024

💡💡💡创新点:star operation(元素乘法)在无需加宽网络下,将输入映射到高维非线性特征空间的能力,这就是StarNet的核心创新,在紧凑的网络结构和较低的能耗下展示了令人印象深刻的性能和低延迟 💡💡💡如何跟YOLOv8结合:替代YOLOv8的backbone 收录 YOLOv8…

Qt---文件系统

一、基本文件操作 1. QFile对文件进行读和写 QFile file( path 文件路径) 读&#xff1a; file.open(打开方式) QlODevice::readOnly 全部读取->file.readAll()&#xff0c;按行读->file.readLine()&#xff0c;atend()->判断是否读到文件尾 …

贪心算法----最大数

今日题目&#xff1a;leetcode179------点击跳转题目 分析&#xff1a; 要把这些数组组成最大的数&#xff0c;首先我们把数字转化为字符串&#xff0c;根据自定义的排序规则把这些字符串字数排列&#xff0c;再用一个字符串接受这些字符串数字拼接成最大的字符串数字 排序规则…

UniGen:用于生成自动驾驶场景的初始智体状态和轨迹的统一建模

24年5月谷歌WayMo论文“UniGen: Unified Modeling of Initial Agent States and Trajectories for Generating Autonomous Driving Scenarios”。 本文介绍 UniGen&#xff0c;一种生成交通场景的新方法&#xff0c;用于通过仿真评估和改进自动驾驶软件。 其方法在一个统一的模…