基于华为MRS3.2.0实时Flink消费Kafka落盘至HDFS的Hive外部表的调度方案

文章目录

  • 1 Kafka
    • 1.1 Kerberos安全模式的认证与环境准备
    • 1.2 创建一个测试主题
    • 1.3 消费主题的接收测试
  • 2 Flink
    • 1.1 Kerberos安全模式的认证与环境准备
    • 1.2 Flink任务的开发
  • 3 HDFS与Hive
    • 3.1 Shell脚本的编写思路
    • 3.2 脚本测试方法
  • 4 DolphinScheduler


该需求为实时接收对手Topic,并进行消费落盘至Hive。

在具体的实施中,基于华为MRS 3.2.0安全模式带kerberos认证的Kafka2.4、Flink1.15、Hadoop3.3.1、Hive3.1,调度平台为开源dolphinscheduler。
在这里插入图片描述

本需求的完成全部参考华为官方MRS3.2.0开发文档,相关章节是普通版的安全模式。

华为官方文档:https://support.huaweicloud.com/cmpntguide-mrs/mrs_01_1031.html

1 Kafka

1.1 Kerberos安全模式的认证与环境准备

着手开发前,需要将FushionInsight租户加入kafkaadmin组,保证有创建主题和消费主题的权限,在得到此权限时,切勿对集群中的主题进行危险操作。

保证租户权限后,开始准备开发环境。该步骤需要安装Idea客户端在windows本地,同时安装兼容的maven版本,华为MRS需要安装至少OpenJDK 1.8.0_332的版本。

运行环境的配置则需要在FushionInsight的web管理界面下载kafka的完整客户端,包括config配置文件也需要下载。另外windows本地的hosts文件中要和FushionInsight中的集群地址有映射,可手动添加,同时应保证本地和集群能ping通。

参考文档:https://support.huaweicloud.com/devg3-mrs/mrs_07_130006.html

1.2 创建一个测试主题

在Linux环境中执行:

bin/kafka-topics.sh --create --bootstrap-server <Kafka集群IP:21007> --command-config config/client.properties --partitions 1 --replication-factor 1 --topic testTopic

创建一个测试testTopic,创建成功后,FushionInsight的web界面会报topic只有一个分区副本的警告,请忽略它。

同时也可以开启两个新的终端窗口用于测试生产者和消费者:

  1. bin/kafka-console-producer.sh --broker-list <Kafka集群IP:21007> --topic <Topic名称> --producer.config config/producer.properties
  2. bin/kafka-console-consumer.sh --topic <Topic名称> --bootstrap-server <Kafka集群IP:21007> --consumer.config config/consumer.properties

参考文档:https://support.huaweicloud.com/devg3-mrs/mrs_07_130031.html

1.3 消费主题的接收测试

通过以下网站下载华为MRS所需的样例代码:

https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.2.0

下载样例代码之后需要在华为镜像站下载代码所需依赖,华为MRS所需的组件依赖不同于apache的开源版本,需要单独配置maven的setting文件华为中央仓库进行下载,在开发时,组件相关的依赖都需要用下载华为的。

镜像地址:

https://repo.huaweicloud.com/repository/maven/huaweicloudsdk/org/apache/

华为开源镜像站:

https://mirrors.huaweicloud.com/home

完成依赖和样例代码项目创建即可开发,在开发程序时,需要将用于安全认证的keytab文件即“user.keytab”和“krb5.conf”文件以及config所有配置文件放置在样例工程的“kafka-examples\src\main\resources”目录下。

在开发时,这些安全认证只需要生成一个jaas文件并设置相关环境变量即可。华为提供了LoginUtil相关接口来完成这些配置,样例代码中只需要配置用户自己租户名称和对应的keytab文件名称即可。

创建生产测试时,首先需要修改KafkaProperties类中的生产主题名,接下来在com.huawei.bigdata.kafka.example.Producer类中修改租户账号,keytab位置即可,运行成功后,会向主题推送100条测试数据,此时,我们在1.2小节中开启的消费者窗口就能接受到生产的数据。

在具体的测试中,需要控制消息发送的间隔和消息次数,方便后续开发Flink。一般来说,每秒发送一条,一直发送即可。

至此,Kafka的主题消费测试完成,接下来需要用Flink将主题落盘到HDFS。

如果运行代码时报和clock相关的错误,是因为本地时间和FushionInsight集群时间不一致所致,请将本地时间和服务器时间差控制在5分钟内。

参考文档:
https://support.huaweicloud.com/devg3-mrs/mrs_07_130012.html

2 Flink

1.1 Kerberos安全模式的认证与环境准备

用户在提交Flink应用程序时,需要与Yarn、HDFS等之间进行通信。那么提交Flink的应用程序中需要设置安全认证,确保Flink程序能够正常运行。
在这里插入图片描述
图为Flink在华为MRS安全模式的认证体系。

对于Kafka的权限在章节1.1已经获取,另外要保证有yarn资源的使用权限,还需要对HDFS的/flink/flink-checkpoint目录获取权限,保证执行。有了相关权限之后,再下载kerberos认证凭据文件,keytab和conf。准备运行环境同Kafka类似,需要对Flink客户端进行配置,注意config文件应该在权限修改之后获取。

Flink整个系统存在三种认证方式,使用kerberos认证、使用security cookie进行认证、使用YARN内部的认证机制。在进行安全认证时,可以用flink自带的wordcount样例程序进行提交测试,根据提交结果反馈再进行适配,直到提交成功。如果报auth相关的错误,可能还是权限问题,可以尝试先将租户权限给到最大,谨慎操作,先保证代码能通。

参考文档:
https://support.huaweicloud.com/devg3-mrs/mrs_07_050010.html

1.2 Flink任务的开发

最终在yarn队列运行的flink程序是从本地idea打包,通过flink run提交的。前面安全模式已经打通,在开发时仍然是使用华为官方的flink样例代码进行修改调试。

在具体的flink程序开发中,由于是开启了kerberos认证的安全模式,需要加入判断安全模式登录的代码段在main方法,以下代码来自华为官方样例:

 if (LoginUtil.isSecurityModel()) {
            try {
                LOG.info("Securitymode start.");
                //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号
                LoginUtil.securityPrepare(USER_PRINCIPAL, USER_KEYTAB_FILE);
            } catch (IOException e) {
                LOG.error("Security prepare failure.");
                LOG.error("The IOException occured : {}.", e);
                return;
            }
            LOG.info("Security prepare success.");
        }

对于具体需求的开发参照开源Flink的apache官方文档即可,只需要保证依赖是华为官方镜像站的。

在该需求中,是将消费的数据落盘到HDFS中。开发中要用到FlinkKafkaConsumer方法创建kafka消费者,拿到流数据。该方法在Flink1.17版本被弃用,但是Flink1.15仍然可以用,具体开发方法可参考Flink1.13的官方文档Apache Kafka 连接器。

FlinkKafkaConsumer方法参考文档:
https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/kafka/

接收的Kafka数据,我们不需要处理,测试时直接测试主题的数据写入HDFS即可,需要用StreamingFileSink方法。该方法可以设置按照日期分桶,我们设置.withBucketAssigner为每天一个桶,保证每天消费的数据在一个文件中,同时用该方法传入日期格式参数yyyy-MM-dd,这样便于使用shell调度每日增量数据时日期变量的传递。

FileSink方法参考文档:
https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/file_sink/

另外,关于Sink到HDFS的数据文件(part file) 生命周期有几种状态,其中当文件名为in-progress表示当前文件正在写入中,此时的文件是不能被Hive读到的,我们需要将该文件的状态通过checkpoint机制转变为Finished。需要配置env.enableCheckpointing(60000)开启checkpoint,该参数是60秒开启一次。

完成代码开发后无法在本地测试,只能通过maven打包到华为服务器,通过flink run提交到yarn,此时可以指定并行度及其他配置。

通过以上方法即可实现将我们测试主题中的数据存储在按照每天一个yyyy-mm-dd命名的文件夹中。

3 HDFS与Hive

HDFS与Hive的交互也可以使用FlinkSQL,但是考虑到未来对数据的加工过滤,在此需求中选择将数据落盘HDFS再通过Shell命令调度至Hive。

3.1 Shell脚本的编写思路

  1. source华为的环境,认证状态成功;

  2. 创建日期变量:c_date=$(date '+%Y-%m-%d')

  3. 在beeline -u中执行HiveSQL代码:

    • 使用beeline的变量函数--hivevar将在外部注册的c_date变量注册为hive beeline的变量;

    • 创建临时外部表,映射字段一行数据,建表语句中指定位置为Flink写入的当日日期变量的HDFS数据文件夹;

    • 将临时表中的数据解析,一般是json数据,可通过get_json_object函数解析为字段,insert into table存入贴源层正式表;

    • 删除临时表;

  4. 有需要的话,也可以添加日志路径,将执行结果追加至日志。

3.2 脚本测试方法

该脚本的执行原理是首先在刷新华为租户环境,然后创建时间变量,并且是yyyy-mm-dd格式,与flink写入在HDFS中的每日增量文件夹名相同;

然后在beeline客户端中注册beeline的变量,将linux的时间变量传入beeline;

解下来是建临时表,将HDFS中的增量数据先写入,再解析字段到下一层标准表,同时删除临时表,通过此方法即完成每天新增数据的导入。

需要注意的是,hive -e命令似乎由于认证安全设置,无法在华为集群节点机使用。

4 DolphinScheduler

通过将脚本文件挂在DS调度中,每天在Flink完成消费落盘后,即可执行该shell。DS的部署不在华为MRS集群,在客户端节点中,使用开源版本即可,DS更方便查看每天的调度执行日志。

需要注意的是,目前我的需求中每天的新增数据大约2000-10000条,可以在短时间内完成调度执行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/337453.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

yum下载源,vim使用

文章目录 yum本地配置lzrsz命令行互传scp(远程拷贝)vim yum本地配置 [rootiZf8z3j2ckkap6ypn717msZ ~]# pwd /root [rootiZf8z3j2ckkap6ypn717msZ ~]# ls /etc/yum.repos.d CentOS-Base.repo epel.repo //本地配置源yum会根据/etc/yum.repo.d路径下的配置文件来构成自己的下载…

一个简单的ETCD GUI工具

使用ETCD没有好用的GUI工具&#xff0c;随手用c#写了一个&#xff0c; 做得好玩的一个ETCD GUI工具&#xff0c;后面加上CLI 工具&#xff0c;类似于 redis Cli工具一样&#xff0c;简化在 Linux下面的操作&#xff0c;不知道有没有必要&#xff0c; git 地址如下&#xff0c;…

华为OD机试 - 查找一个有向网络的头节点和尾节点(Java JS Python C)

题目描述 给定一个有向图,图中可能包含有环,图使用二维矩阵表示,每一行的第一列表示起始节点,第二列表示终止节点,如 [0, 1] 表示从 0 到 1 的路径。 每个节点用正整数表示。 求这个数据的首节点与尾节点,题目给的用例会是一个首节点,但可能存在多个尾节点。同时图中…

天龙八部资源提取工具(提取+添加+修改+查看+教程)

可以提取&#xff0c;添加&#xff0c;修改&#xff0c;查看天龙八部里面的数据。非常好用。 天龙八部资源提取工具&#xff08;提取添加修改查看教程&#xff09; 下载地址&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/1XOMJ1xvsbD-UUQOv3QfHPQ?pwd0kd0 提取码&…

leetcode下一个更大的元素---1暴力---2单调栈

1.题目&#xff1a; nums1 中数字 x 的 下一个更大元素 是指 x 在 nums2 中对应位置 右侧 的 第一个 比 x 大的元素。 给你两个 没有重复元素 的数组 nums1 和 nums2 &#xff0c;下标从 0 开始计数&#xff0c;其中nums1 是 nums2 的子集。 对于每个 0 < i < nums1.l…

苹果眼镜(Vision Pro)的开发者指南(1)

一、用到的底层核心框架&#xff1a; SwiftUI&#xff1a;无论开发者是要创建窗口、体积还是空间体验&#xff0c;SwiftUI 都是构建新的 visionOS 应用程序或将现有 iPadOS 或 iOS 应用程序引入平台的最佳方式。凭借全新的 3D 功能以及对深度、手势、效果和沉浸式场景类型的支…

C++类与对象【多态】

&#x1f308;个人主页&#xff1a;godspeed_lucip &#x1f525; 系列专栏&#xff1a;C从基础到进阶 &#x1f384;1 多态&#x1f355;1.1 多态的基本概念&#x1f355;1.2 多态案例一-计算器类&#x1f355;1.3 纯虚函数和抽象类&#x1f355;1.4 多态案例二-制作饮品&…

【华为GAUSS数据库】IDEA连接GAUSS数据库方法

背景&#xff1a;数据库为华为gauss for opengauss 集中式数据库 IDEA提供了丰富的各类型数据库驱动&#xff0c;但暂未提供Gauss数据库。可以通过以下方法进行连接。 连接后&#xff0c; 可以自动检查xml文件中的sql语句是否准确&#xff0c;表名和字段名是否正确还可以直接在…

数学建模常见算法的通俗理解(2)

目录 6 K-Means&#xff08;K-均值&#xff09;聚类算法&#xff08;无需分割数据即可分类&#xff09; 6.1 粗浅理解 6.2 算法过程 6.2.1 选定质心 6.2.2 分配点 6.2.3 评价 7 KNN算法&#xff08;K近邻算法&#xff09;&#xff08;K个最近的决定方案&#xff09; 7.…

第 122 场 LeetCode 双周赛题解

A 将数组分成最小总代价的子数组 I 枚举&#xff1a;枚举后两个子数组的起始下标 class Solution { public:int minimumCost(vector<int> &nums) {int n nums.size();int res INT32_MAX;for (int i 1; i < n; i)for (int j i 1; j < n; j)res min(res, n…

基于SpringBoot+Redis的前后端分离外卖项目-苍穹外卖微信小程序端(十三)

地址簿相关功能 1.1 需求分析和设计1.1.1 产品原型1.1.2 接口设计1.1.3 表设计 1.2 代码实现1.2.1 Mapper层1.2.2 Service层1.2.3 Controller层 1.1 需求分析和设计 1.1.1 产品原型 地址簿&#xff0c;指的是消费者用户的地址信息&#xff0c;用户登录成功后可以维护自己的地…

C#调用C动态链接库

前言 已经没写过博客好久了&#xff0c;上一篇还是1年半前写的LTE Gold序列学习笔记&#xff0c;因为工作是做通信协议的&#xff0c;然后因为大学时没好好学习专业课&#xff0c;现在理论还不扎实&#xff0c;不敢瞎写&#xff1b; 因为工作原因&#xff0c;经常需要分析一些字…

如何使用xlwings库为Excel表格的单元格创建超链接----关于Python里xlwings库对Excel表格的操作(三十九)

这篇小笔记主要记录【如何使用xlwings库为Excel表格的单元格创建超链接】。前面的小笔记已整理成目录&#xff0c;可点链接去目录寻找所需更方便。【目录部分内容如下】【点击此处可进入目录】 &#xff08;1&#xff09;如何安装导入xlwings库&#xff1b; &#xff08;2&…

雷盛红酒LEESON分享葡萄酒也有“社会责任感”?

葡萄酒文化从来都不仅仅是感官体验&#xff0c;一瓶佳酿的背后不但蕴含着风土人情、历史传承和文化交流&#xff0c;更反映了时代社会的变迁以及体现的社会责任意识。 目前葡萄酒生产商追求酒瓶越来越轻就是葡萄酒市场上的一个趋势&#xff0c;因为任何一个行业都在追求与世界共…

坦克大战游戏代码

坦克大战游戏 主函数战场面板开始界面坦克父类敌方坦克我方坦克子弹爆炸效果数据存盘及恢复图片 主函数 package cn.wenxiao.release9;import java.awt.event.ActionEvent; import java.awt.event.ActionListener;import javax.swing.JFrame; import javax.swing.JMenu; impor…

套接字通信(附带单线程TCP套接字通信代码)

套接字-Socket 1. 概念 1.1 局域网和广域网 局域网&#xff08;LAN&#xff09;和广域网&#xff08;WAN&#xff09;是两种不同范围的计算机网络&#xff0c;它们用于连接多台计算机以实现数据共享和通信。 局域网&#xff08;LAN&#xff09;&#xff1a; 定义&#xff1…

【JavaEE】文件操作与IO

作者主页&#xff1a;paper jie_博客 本文作者&#xff1a;大家好&#xff0c;我是paper jie&#xff0c;感谢你阅读本文&#xff0c;欢迎一建三连哦。 本文于《JavaEE》专栏&#xff0c;本专栏是针对于大学生&#xff0c;编程小白精心打造的。笔者用重金(时间和精力)打造&…

0121-1-计算机网络安全

计算机网络安全 1.Get 和 Post 的区别 结构&#xff1a;get 有请求体&#xff0c;post没有请求体 应用场景&#xff1a;get 用于获取数据&#xff0c;post用于提交数据&#xff1b; 缓存&#xff1a;get 的缓存保存在浏览器和web服务器日志中&#xff1b; 传输方式&#x…

typing python 类型标注学习笔记

在Python 3.5版本后引入的typing模块为Python的静态类型注解提供了支持。这个模块在增强代码可读性和维护性方面提供了帮助。 目录 简介为什么需要 Type hints typing常用类型typing初级语法typing基础语法默认参数及 Optional联合类型 (Union Type)类型别名 (Type Alias)子类型…

ESP32-HTTP_webServer库(Arduino)

ESP32-HTTP 介绍 ESP32是一款功能强大的微控制器&#xff0c;具有丰富的网络和通信功能。其中之一就是支持HTTP协议&#xff0c;这使得ESP32可以用于创建Web服务器。 HTTP是什么&#xff1f; HTTP&#xff08;Hyper Text Transfer Protocol&#xff09;&#xff0c;即超文本传…