线上问诊:业务数据采集

系列文章目录

线上问诊:业务数据采集
线上问诊:数仓数据同步


文章目录

  • 系列文章目录
  • 前言
  • 一、环境安装
    • 1.DataX
  • 二、全量同步
    • 1.DataX配置文件生成
    • 2.启动hadoop测试一下。
    • 3.全量同步
  • 三、增量同步
    • 1.配置Flume
    • 2.编写Flume拦截器
    • 3.通道测试
    • 4.修改Maxwell参数
    • 5.编写Flume启停脚本
    • 6.创建增量同步脚本
  • 总结


前言

上次博客记录的是数据的采集,这次我们记录一下数据从MYSQL到HDFS的数据同步。


一、环境安装

1.DataX

上传并解压
尚硅谷提供的资料里边解压会报一个错误,所以自己寻找了一下。
DataX下载
在这里插入图片描述
用datax提供的自检命令测试一下。

python /opt/module/datax/bin/datax.py /opt/module/datax/job/job.json

在这里插入图片描述

二、全量同步

在这里插入图片描述

1.DataX配置文件生成

mkdir /opt/module/gen_datax_config
cd /opt/module/gen_datax_config

在这里插入图片描述
修改配置文件
在这里插入图片描述

mysql.username=root
mysql.password=000000
mysql.host=hadoop102
mysql.port=3306
mysql.database.import=medical
# mysql.database.export=
mysql.tables.import=dict,doctor,hospital,medicine,patient,user
# mysql.tables.export=
is.seperated.tables=0
hdfs.uri=hdfs://hadoop102:8020
import_out_dir=/opt/module/datax/job/medical/import
# export_out_dir=

执行文件

java -jar datax-config-generator-1.0-SNAPSHOT-jar-with-dependencies.jar

在这里插入图片描述

2.启动hadoop测试一下。

hadoop fs -mkdir -p /origin_data/medical/user_full/2023-05-09

在这里插入图片描述
数据同步测试

python /opt/module/datax/bin/datax.py -p"-Dtargetdir=/origin_data/medical/user_full/2023-05-09" /opt/module/datax/job/medical/import/medical.user.json

在这里插入图片描述
测试成功后编写全量同步脚本。
vim ~/bin/medical_mysql_to_hdfs_full.sh

#!/bin/bash

DATAX_HOME=/opt/module/datax
DATAX_DATA=/opt/module/datax/job/medical

#清理脏数据
handle_targetdir() {
  hadoop fs -rm -r $1 >/dev/null 2>&1
  hadoop fs -mkdir -p $1
}

#数据同步
import_data() {
  local datax_config=$1
  local target_dir=$2

  handle_targetdir "$target_dir"
  echo "正在处理$1"
  python $DATAX_HOME/bin/datax.py -p"-Dtargetdir=$target_dir" $datax_config >/tmp/datax_run.log 2>&1
  if [ $? -ne 0 ]
  then
    echo "处理失败, 日志如下:"
    cat /tmp/datax_run.log 
  fi
}

#接收表名变量
tab=$1
# 如果传入日期则do_date等于传入的日期,否则等于前一天日期
if [ -n "$2" ] ;then
    do_date=$2
else
    do_date=$(date -d "-1 day" +%F)
fi


case ${tab} in
dict|doctor|hospital|medicine|patient|user)
  import_data $DATAX_DATA/import/medical.${tab}.json /origin_data/medical/${tab}_full/$do_date
  ;;
"all")
  for tmp in dict doctor hospital medicine patient user
  do
    import_data $DATAX_DATA/import/medical.${tmp}.json /origin_data/medical/${tmp}_full/$do_date
  done
  ;;
esac

添加权限。

chmod +x ~/bin/medical_mysql_to_hdfs_full.sh

3.全量同步

medical_mysql_to_hdfs_full.sh all 2023-05-09

在这里插入图片描述

三、增量同步

在这里插入图片描述

1.配置Flume

在hadoop104进行配置
在这里插入图片描述
vim medical_kafka_to_hdfs.conf

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics = topic_db
a1.sources.r1.kafka.consumer.group.id = medical-flume
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.medical.flume.interceptors.TimestampAndTableNameInterceptor$Builder


a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/medical
a1.channels.c1.dataDirs = /opt/module/flume/data/medical
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/medical/%{tableName}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = db
a1.sinks.k1.hdfs.round = false


a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0


a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

2.编写Flume拦截器

创建项目
medical-flume-interceptor
在这里插入图片描述
编写pom.xml

<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.10.1</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.68</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.0.0</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

创建包
com.atguigu.medical.flume.interceptors
创建类
在这里插入图片描述

package com.atguigu.medical.flume.interceptors;

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

public class TimestampAndTableNameInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        Map<String, String> headers = event.getHeaders();
        String log = new String(event.getBody(), StandardCharsets.UTF_8);

        JSONObject jsonObject = JSONObject.parseObject(log);

        Long ts = jsonObject.getLong("ts");

        String tableName = jsonObject.getString("table");

        headers.put("timestamp", ts * 1000 + "");
        headers.put("tableName", tableName);
        return event;

    }

    @Override
    public List<Event> intercept(List<Event> events) {

        for (Event event : events) {
            intercept(event);
        }

        return events;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {


        @Override
        public Interceptor build() {
            return new TimestampAndTableNameInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

idea打包
在这里插入图片描述
将文件上传
在这里插入图片描述
放入到hadoop104的/opt/module/flume/lib
在这里插入图片描述

3.通道测试

启动Zookeeper、Kafka和Maxwell
在Hadoop104启动Flume

bin/flume-ng agent -n a1 -c conf/ -f job/medical_kafka_to_hdfs.conf -Dflume.root.logger=INFO,console

然后再次进行数据模拟。

medical_mock.sh 1

在这里插入图片描述
增量同步完成

4.修改Maxwell参数

vim /opt/module/maxwell/config.properties
在这里插入图片描述

5.编写Flume启停脚本

vim medical-f1.sh

#!/bin/bash

case $1 in
"start")
        echo " --------启动 hadoop104 业务数据flume-------"
        ssh hadoop104 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/medical_kafka_to_hdfs.conf > /opt/module/flume/medical-f1.log 2>&1 &"
;;
"stop")

        echo " --------停止 hadoop104 业务数据flume-------"
        ssh hadoop104 "ps -ef | grep medical_kafka_to_hdfs | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac

chmod +x medical-f1.sh

6.创建增量同步脚本

vim medical_mysql_to_kafka_inc_init.sh

#!/bin/bash

# 该脚本的作用是初始化所有的增量表,只需执行一次

MAXWELL_HOME=/opt/module/maxwell

import_data() {
  for tab in $@
  do 
    $MAXWELL_HOME/bin/maxwell-bootstrap --database medical --table $tab --config $MAXWELL_HOME/config.properties
  done
}

case $1 in
consultation | payment | prescription | prescription_detail | user | patient | doctor)
  import_data $1
  ;;
"all")
  import_data consultation payment prescription prescription_detail user patient doctor
  ;;
esac

chmod +x medical_mysql_to_kafka_inc_init.sh
删除之前的增量数据,再次测试。

hadoop fs -ls /origin_data/medical  | grep _inc | awk '{print $8}' | xargs hadoop fs -rm -r -f

medical_mysql_to_kafka_inc_init.sh all

再次测试后出现增量数据,代表通过建立完成。


总结

数据的同步到这里就结束了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/97453.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Pytorch学习:神经网络模块torch.nn.Module和torch.nn.Sequential

文章目录 1. torch.nn.Module1.1 add_module&#xff08;name&#xff0c;module&#xff09;1.2 apply(fn)1.3 cpu()1.4 cuda(deviceNone)1.5 train()1.6 eval()1.7 state_dict() 2. torch.nn.Sequential2.1 append 3. torch.nn.functional.conv2d 1. torch.nn.Module 官方文档…

环保数字化,让污染无处遁形

环保一直以来都是我国大力推崇的举措&#xff0c;“保护环境、人人有责”的标语深入人心&#xff0c;但是环保绝不是某一天某一年就能做好的事情&#xff0c;而在于一朝一夕坚持不懈&#xff0c;下文将针对环保的场景介绍一下数字孪生技术在环保领域的应用。 一、环保背景 新中…

几个nlp的小项目(文本分类)

几个nlp的小项目(文本分类) 导入加载数据类、评测类查看数据集精确展示数据测评方法设置参数tokenizer,token化的解释对数据集进行预处理加载预训练模型进行训练设置训练模型的参数一个根据任务名获取,测评方法的函数创建预训练模型开始训练本项目的工作完成了什么任务?导…

CNN 02(CNN原理)

一、卷积神经网络(CNN)原理 1.1 卷积神经网络的组成 定义 卷积神经网络由一个或多个卷积层、池化层以及全连接层等组成。与其他深度学习结构相比&#xff0c;卷积神经网络在图像等方面能够给出更好的结果。这一模型也可以使用反向传播算法进行训练。相比较其他浅层或深度神经…

景联文科技数据标注:人体关键点标注用途及各点的位置定义

人体关键点标注是一种计算机视觉任务&#xff0c;指通过人工的方式&#xff0c;在指定位置标注上关键点&#xff0c;例如人脸特征点、人体骨骼连接点等&#xff0c;常用来训练面部识别模型以及统计模型。这些关键点可以表示图像的各个方面&#xff0c;例如角、边或特定特征。在…

unity 之参数类型之引用类型

文章目录 引用类型引用类型与值类型的差异 引用类型 在Unity中&#xff0c;引用类型是指那些在内存中存储对象引用的数据类型。以下是在Unity中常见的引用类型的介绍&#xff1a; 节点&#xff08;GameObject&#xff09;&#xff1a; 在Unity中&#xff0c;游戏对象&#xff…

day28 异常

to{}catch{} try{}catch{}的流传输 try {fis new FileInputStream("file-APP\\fos.txt");fos new FileOutputStream("fos.txt");int a ;while ((a fis.read())! -1){fos.write(a);}System.out.println(a); } catch (IOException e) {e.printStackTrace()…

在编辑器中使用正则

正则是一种文本处理工具&#xff0c;常见的功能有文本验证、文本提取、文本替换、文本切割等。有一些地方说的正则匹配&#xff0c;其实是包括了校验和提取两个功能。 校验常用于验证整个文本的组成是不是符合规则&#xff0c;比如密码规则校验。提取则是从大段的文本中抽取出…

0基础学习VR全景平台篇 第92篇:智慧景区-智慧景区常见问题

Q&#xff1a;怎么编辑景区里面各个景点的介绍和推荐该景点A&#xff1a;在下方素材栏中该景点&#xff08;素材&#xff09;的右上角选择【编辑场景】里面就可以在场景介绍中编辑该场景的介绍并且在该选项中可以将此场景设置为推荐景点。 Q&#xff1a;景区项目可不可以离线浏…

数字孪生智慧工厂:电缆厂 3D 可视化管控系统

近年来&#xff0c;我国各类器材制造业已经开始向数字化生产转型&#xff0c;使得生产流程变得更加精准高效。通过应用智能设备、物联网和大数据分析等技术&#xff0c;企业可以更好地监控生产线上的运行和质量情况&#xff0c;及时发现和解决问题&#xff0c;从而提高生产效率…

【大虾送书第七期】深入浅出SSD:固态存储核心技术、原理与实战

目录 ✨写在前面 ✨内容简介 ✨作者简介 ✨名人推荐 ✨文末福利 &#x1f990;博客主页&#xff1a;大虾好吃吗的博客 &#x1f990;专栏地址&#xff1a;免费送书活动专栏地址 写在前面 近年来国家大力支持半导体行业&#xff0c;鼓励自主创新&#xff0c;中国SSD技术和产业…

ChromeOS 的 Linux 操作系统和 Chrome 浏览器分离

导读科技媒体 Ars Technica 报道称&#xff0c;谷歌正在将 ChromeOS 的浏览器从操作系统中分离出来 —— 让它变得更像 Linux。虽然目前还没有任何官方消息&#xff0c;但这项变化可能会在本月的版本更新中推出。 据介绍&#xff0c;谷歌将该项目命名为 "Lacros"——…

数据驱动的生活:探索未来七天生活指数API的应用

前言 随着科技的不断发展&#xff0c;数据已经成为我们生活中不可或缺的一部分。从社交媒体上的点赞和分享&#xff0c;到电子邮件和搜索引擎的历史记录&#xff0c;数据正在以前所未有的速度积累。而这些数据的利用不仅仅停留在社交媒体或商业领域&#xff0c;它们还可以为我…

机器学习:争取被遗忘的权利

随着越来越多的人意识到他们通过他们经常访问的无数应用程序和网站共享了多少个人信息&#xff0c;数据保护和隐私一直在不断讨论。看到您与朋友谈论的产品或您在 Google 上搜索的音乐会迅速作为广告出现在您的社交媒体提要中&#xff0c;这不再那么令人惊讶。这让很多人感到担…

【炼气境】Java集合框架篇

【炼气境】Java集合框架篇 文章目录 【炼气境】Java集合框架篇概述接口Collection接口List接口ArrayList类LinkedList类 Set接口HashSet类LinkedHashSet类TreeSet类 Queue接口LinkedList类PriorityQueue类ArrayDeque Map接口HashMap类LinkedHashMap类TreeMap类 常用方法特性适用…

软件工程(八) UML之类图与对象图

1、类图与对象图 1.1、类图与对象图的概念 类图(class diagram)描述一组类、接口、协作和它们之间的关系 对象图(object diagram)描述一组对象及它们之间的关系、对象图描述了在类图中所建立的事物实例的静态快照。 1.2、类图与对象图的区别 类图和对象图基本上是一样…

大数据平台与数据仓库的五大区别

随着大数据的快速发展&#xff0c;很多人难以区分大数据平台与数据仓库的区别&#xff0c;两者傻傻分不清楚。今天我们小编就给大家汇总了大数据平台与数据仓库的五大区别&#xff0c;希望有用哦&#xff01;仅供参考&#xff01; 大数据平台与数据仓库的五大区别 一、概念不同…

如何把本地项目上传github

一、在gitHub上创建新项目 【1】点击添加&#xff08;&#xff09;-->New repository 【2】填写新项目的配置项 Repository name&#xff1a;项目名称 Description &#xff1a;项目的描述 Choose a license&#xff1a;license 【3】点击确定&#xff0c;项目已在githu…

火狐浏览器使用scss嵌套编写css无法识别问题

火狐浏览器使用scss嵌套编写css无法识别问题 版本&#xff1a; “node-sass”: “^4.14.1”, “sass-loader”: “^7.3.1”,vue版本&#xff1a; v2问题描述&#xff1a; 我的文件目录是这样的&#xff1a; 而在scss文件中我是这样书写的 .vue文件中 在火狐浏览器中 在谷…

几个nlp的小任务(生成任务(摘要生成))

几个nlp的小任务生成任务——摘要生成 安装库选择模型加载数据集展示数据集数据预处理 tokenizer注意特殊的 token处理组成预处理函数调用map,对数据集进行预处理微调模型,设置参数设置数据收集器,将处理好的数据喂给模型封装测评方法将参数传给 trainer,开始训练安装库 选…