数据采集(全量采集和增量采集)

全量采集:采集全部数据

3、全量采集

vim students_all.json
{
    "job": {
        "setting": {
            "speed": {
                 "channel": 1
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "123456",
                        "splitPk": "id",
                        "column": [
                            "id",
                            "name",
                            "age",
                            "gender",
                            "clazz",
                            "update_time"
                        ],
                        "connection": [
                            {
                                "table": [
                                    "students"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://master:3306/bigdata31"
                                ]
                            }
                        ]
                    }
                },
               "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://master:9000",
                        "fileType": "text",
                        "path": "/data/students_all/dt=${dt}",
                        "fileName": "students",
                        "column": [
                            {
                                "name": "id",
                                "type": "STRING"
                            },
                            {
                                "name": "name",
                                "type": "STRING"
                            },
                            {
                                "name": "age",
                                "type": "INT"
                            },
                            {
                                "name": "gender",
                                "type": "STRING"
                            },
                            {
                                "name": "clazz",
                                "type": "STRING"
                            },
                             {
                                "name": "update_time",
                                "type": "STRING"
                            }
                        ],
                        "writeMode": "truncate",
                        "fieldDelimiter": ","
                    }
                }
            }
        ]
    }
}

# 创建分区目录
hdfs dfs -mkdir -p  /data/students_all/dt=2024-10-21
# 执行datax脚本
datax.py -p"-Ddt=2024-10-21" students_all.json
# 增加分区
hive -e "alter table students_all add if not exists partition(dt='2024-10-21');"

增量采集:就只采集新插入或修改的数据

1、原表需要有一个更新时间字段

CREATE TABLE `students`  (
  `id` bigint(20) ,
  `name` varchar(255) ,
  `age` bigint(20),
  `gender` varchar(255) ,
  `clazz` varchar(255),
  `update_time` datetime NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ;

2、在hive中创建分区表

create external table if not exists students_all(
    id bigint comment '学生id'
    ,name string comment '学生姓名'
    ,age bigint comment '学生年龄'
    ,sex string comment '学生性别'
    ,clazz string comment '学生班级'
    ,update_time string comment '更新时间'
) comment '学生信息表'
partitioned by (dt string)
row format delimited fields terminated by ','
stored as textfile 
location 'hdfs://master:9000/data/students_all';

4、创建增量表

create external table if not exists students_acc(
    id bigint comment '学生id'
    ,name string comment '学生姓名'
    ,age bigint comment '学生年龄'
    ,sex string comment '学生性别'
    ,clazz string comment '学生班级'
    ,update_time string comment '更新时间'
) comment '学生信息表'
partitioned by (dt string)
row format delimited fields terminated by ','
stored as textfile 
location 'hdfs://master:9000/data/students_acc';

5、增量采集更新的数据

vim students_acc.json
{
    "job": {
        "setting": {
            "speed": {
                 "channel": 1
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "123456",
                        "splitPk": "id",
            		    "where": "substr(update_time,1,10)='${dt}'",
                        "column": [
                            "id",
                            "name",
                            "age",
                            "gender",
                            "clazz",
                            "update_time"
                        ],
                        "connection": [
                            {
                                "table": [
                                    "students"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://master:3306/bigdata31"
                                ]
                            }
                        ]
                    }
                },
               "writer": {
                   "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://master:9000",
                        "fileType": "text",
                        "path": "/data/students_acc/dt=${dt}",
                        "fileName": "students",
                        "column": [
                            {
                                "name": "id",
                                "type": "STRING"
                            },
                            {
                                "name": "name",
                                "type": "STRING"
                            },
                            {
                                "name": "age",
                                "type": "INT"
                            },
                            {
                                "name": "gender",
                                "type": "STRING"
                            },
                            {
                                "name": "clazz",
                                "type": "STRING"
                            },
                             {
                                "name": "update_time",
                                "type": "STRING"
                            }
                        ],
                        "writeMode": "truncate",
                        "fieldDelimiter": ","
                    }
                }
            }
        ]
    }
}
# 创建分区目录
hdfs dfs -mkdir -p  /data/students_acc/dt=2024-10-22
# 执行datax脚本
datax.py -p"-Ddt=2024-10-22" students_acc.json
# 增加分区
hive -e "alter table students_acc add if not exists partition(dt='2024-10-22');"

6、合并数据

vim student_merge.sql
insert overwrite table students_all partition(dt='${dt}')
select
    id,
    name,
    age,
    sex,
    clazz,
    update_time
from
    (
        select
            id,
            name,
            age,
            sex,
            clazz,
            update_time,
            row_number() over (
                partition by
                    id
                order by
                    update_time desc
            ) as r
        from
            (
                select
                    *
                from
                    students_all
                where
                    dt = '${diff_dt}'
                union all
                select
                    *
                from
                    students_acc
                where
                    dt = '${dt}'
            ) as a
    ) as b
where
    r = 1;
hive -f student_merge.sql -d dt=2024-10-22 -d diff_dt=2024-10-21


spark-sql \
--master yarn \
--deploy-mode client \
--num-executors 2 \
--executor-cores 1 \
--executor-memory 2G \
--conf spark.sql.shuffle.partitions=1 \
-f student_merge.sql -d dt=2024-10-22 -d diff_dt=2024-10-21

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/904200.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

2024年10月中国数据库排行榜:TiDB续探花,GaussDB升四强

10月中国数据库流行度排行榜如期发布,再次印证了市场分层的加速形成。国家数据库测评结果已然揭晓,本批次通过的产品数量有限,凸显了行业标准的严格与技术门槛的提升。再看排行榜,得分差距明显增大,第三名与后续竞争者…

【C++】RBTree——红黑树

文章目录 一、红黑树的概念1.1 红⿊树的规则:1.2 理解最长路径长度不超过最短路径长度的 2 倍1.3 红⿊树的效率 二、 红⿊树的实现2.1 红⿊树的结构2.2 红⿊树的插⼊2.2.1 红⿊树树插⼊⼀个值的⼤概过程 2.3 红⿊树的插⼊代码实现 一、红黑树的概念 红⿊树是⼀棵⼆…

git下载和配置

git是什么? Git是一种分布式版本控制系统,用于跟踪文件的变化,尤其是源代码。它允许多个开发者在同一项目上进行协作,同时保持代码的历史记录。Git的主要特点包括: 分布式:每个开发者都有项目的完整副本&a…

[MySQL#6] 表的CRUD (1) | Create | Retrieve(查) | where

目录 1. 插入 1.1 单行数据 - 全列插入 指定列插入 1.2 多行数据 - 全列插入 指定列插入 1.3 更新 1.4 替换 2. 查找 2.1 select 列 2.2 where 条件 具体案例 2.3 结果排序 总结关键字执行顺序 2.4 筛选分页结果 CRUD : Create(创建),Retrieve(读取)&…

C语言:代码运行的底层奥秘,编译和链接

目录 翻译环境和运行环境编译环境预编译(预处理)编译词法分析语法分析语义分析 汇编 链接运行环境 翻译环境和运行环境 在ANSI C的任何⼀种实现中,存在两个不同的环境。 第1种是翻译环境,在这个环境中源代码被转换为可执行的机器…

2024 FinTechathon 校园行:助力高校学生探索金融科技创新

在金融科技蓬勃发展的当下,人才培养成为推动行业前行的关键。为推进深圳市金融科技人才高地建设,向高校学子提供一个展示自身知识、能力和创意的平台,2024 FinTechathon 深圳国际金融科技大赛——西丽湖金融科技大学生挑战赛重磅开启&#xf…

第7章 内容共享

第 7 章 内容共享 bilibili学习地址 github代码地址 本章介绍Android不同应用之间共享内容的具体方式,主要包括:如何利用内容组件在应用之间共享数据,如何使用内容组件获取系统的通讯信息,如何借助文件提供器在应用之间共享文件…

控制台安全内部:创新如何塑造未来的硬件保护

在 Help Net Security 的采访中,安全研究人员 Specter 和 ChendoChap 讨论了游戏机独特的安全模型,并强调了它与其他消费设备的不同之处。 他们还分享了对游戏机安全性的进步将如何影响未来消费者和企业硬件设计的看法。 斯佩克特 (Specter) 是本周在阿…

开源项目-投票管理系统

哈喽,大家好,今天主要给大家带来一个开源项目-投票管理系统 投票管理系统主要有首页,发起投票,管理投票,参与投票,查看投票等功能 首页 为用户提供了一键导航到各个功能模块的便捷途径。 新增投票 用户…

Unity 两篇文章熟悉所有编辑器拓展关键类 (上)

本专栏基础资源来自唐老狮和siki学院,仅作学习交流使用,不作任何商业用途,吃水不忘打井人,谨遵教诲 编辑器扩展内容实在是太多太多了(本篇就有五千字) 所以分为两个篇章而且只用一些常用api举例&#xff0c…

rnn/lstm

tip:本人比较小白,看到july大佬的文章受益匪浅,现在其文章基础上加上自己的归纳、理解,以及gpt的答疑,如果有侵权会删。 july大佬文章来源:如何从RNN起步,一步一步通俗理解LSTM_rnn lstm-CSDN博…

【Docker大揭秘】

Docker 调试一天的血与泪的教训:设备条件:对应的build preparation相应的报错以及修改 作为记录 构建FASTLIO2启动docker获取镜像列出镜像运行containerdocker中实现宿主机与container中的文件互传 调试一天的血与泪的教训: 在DOCKER中跑通F…

APISQL企业版离线部署教程

针对政务、国企、医院、军工等内网物理隔离的客户,有时需要多次摆渡才能到达要安装软件的服务器。本教程将指导您使用Linux和Docker Compose编排服务,实现APISQL的离线部署。 准备 准备一台Linux(x86_64)服务器。 安装Docker Engine(推荐版本…

音视频入门基础:AAC专题(11)——AudioSpecificConfig简介

音视频入门基础:AAC专题系列文章: 音视频入门基础:AAC专题(1)——AAC官方文档下载 音视频入门基础:AAC专题(2)——使用FFmpeg命令生成AAC裸流文件 音视频入门基础:AAC…

docker 可用镜像服务地址(2024.10.25亲测可用)

1.错误 Error response from daemon: Get “https://registry-1.docker.io/v2/” 原因:镜像服务器地址不可用。 2.可用地址 编辑daemon.json: vi /etc/docker/daemon.json内容修改如下: {"registry-mirrors": ["https://…

【AI应用落地实战】智能文档处理本地部署——可视化文档解析前端TextIn ParseX实践

湘江之畔,秋风送爽。前不久,2024长沙中国1024程序员节在长沙盛大举行。今年的程序员节主题为“智能应用新生态”,以科技为纽带,搭建起了一个共筑智能应用新生态的交流平台,众多技术大咖齐聚一堂,探讨智能应…

echarts实现 水库高程模拟图表

需求背景解决思路解决效果index.vue 需求背景 需要做一个水库高程模拟的图表&#xff0c;x轴是水平距离&#xff0c;y轴是高程&#xff0c;需要模拟改水库的形状 echarts 图表集链接 解决思路 配合ui切图&#xff0c;模拟水库形状 解决效果 index.vue <!--/*** author:…

Kubeadm搭建k8s

一、架构 节点名称规格IP地址安装组件master012C/4G&#xff0c;cpu核心数要求大于2192.168.88.76docker、kubeadm、kubelet、kubectl、flannelnode012C/2G192.168.88.20docker、kubeadm、kubelet、kubectl、flannelnode022C/2G192.168.88.21docker、kubeadm、kubelet、kubect…

transformers和bert实现微博情感分类模型提升

项目源码获取方式见文章末尾&#xff01; 600多个深度学习项目资料&#xff0c;快来加入社群一起学习吧。 《------往期经典推荐------》 项目名称 1.【LSTM模型实现光伏发电功率的预测】 2.【卫星图像道路检测DeepLabV3Plus模型】 3.【GAN模型实现二次元头像生成】 4.【CNN模…

【Apache Zookeeper】

一、简介 1、场景 如何让⼀个应⽤中多个独⽴的程序协同⼯作是⼀件⾮常困难的事情。开发这样的应⽤&#xff0c;很容易让很多开发⼈员陷⼊如何使多个程序协同⼯作的逻辑中&#xff0c;最后导致没有时间更好地思考和实现他们⾃⼰的应⽤程序逻辑&#xff1b;又或者开发⼈员对协同…