数仓项目6.0(一)

尚硅谷大数据项目【电商数仓6.0】企业数据仓库项目_bilibili

数据流转过程

用户➡️业务服务器➡️数据库存储➡️数仓统计分析➡️数据可视化

· 数据仓库处理流程:数据源➡️加工数据➡️统计筛选数据➡️分析数据

数据库不是为了数据仓库服务的,需要给数仓单独构建一个数据源(行式列式存储不对应、数据库海量数据不满足、对mysql性能造成影响)

数据源周期性(一天、一周)从mysql数据库同步过来,这就叫采集

HDFS承前启后

数据存储file➡️  Flume采集    ➡️HDFS➡️Hive数仓数据源

数据  mysql➡️DataX/Maxwell➡️HDFS➡️Hive数仓数据源

数仓开发需要用sql,需要用结构化数据

一些概念

数据仓库的输入数据通常包括:业务数据用户行为数据爬虫数据

业务数据:就是各行业在处理事务过程中产生的数据。比如用户在电商网站中登录、下单、支付等过程中,需要和网站后台数据库进行增删改查交互,产生的数据就是业务数据业务数据通常存储在MySQL、Oracle等数据库中。

用户行为数据:用户在使用产品过程中,通过埋点收集与客户端产品交互过程中产生的数据,并发往日志服务器进行保存。比如页面浏览、点击、停留、评论、点赞、收藏等。用户行为数据通常存储在日志文件中。

项目需求与架构设计

需求

       (1)用户行为数据采集平台搭建

       (2)业务数据采集平台搭建

离线与实时采集需求

技术选型

  • Master节点:管理节点,保证集群的调度正常进行;主要部署NameNode、ResourceManager、HMaster 等进程;非 HA 模式下数量为1,HA 模式下数量为2。
  • Core节点:为计算及存储节点,您在 HDFS 中的数据全部存储于 core 节点中,因此为了保证数据安全,扩容 core 节点后不允许缩容;主要部署 DataNode、NodeManager、RegionServer 等进程。非 HA 模式下数量≥2,HA 模式下数量≥3。
  • Common 节点:为 HA 集群 Master 节点提供数据共享同步以及高可用容错服务;主要部署分布式协调器组件,如 ZooKeeper、JournalNode 等节点。非HA模式数量为0,HA 模式下数量≥3。

服务名称

子服务

服务器

hadoop102

服务器

hadoop103

服务器

hadoop104

HDFS

NameNode

DataNode

SecondaryNameNode

Yarn

NodeManager

Resourcemanager

Zookeeper

Zookeeper Server

Flume(采集日志)

Flume

Kafka

Kafka

Flume

(消费Kafka日志)

Flume

Flume

(消费Kafka业务)

Flume

Hive

MySQL

MySQL

DataX

Spark

DolphinScheduler

ApiApplicationServer

AlertServer

MasterServer

WorkerServer

LoggerServer

Superset

Superset

Flink

ClickHouse

Redis

Hbase

服务数总计

20

11

12

架构

--- 回头看整个采集大流程 ---

fl脚本将log采集到kafka,max将db增量采集到kafka,f2将log同步到dhfs,datax将db全量采集到hdfs,f3将db从kafka采集到hdfs

日志数据采集2Kafka

Logs(模拟生成)➡️Flume➡️Kafka⬇️➡️HDFS

 全套配置: 

数仓项目6.0配置大全(hadoop/Flume/zk/kafka/mysql配置)-CSDN博客

业务数据sql采集2Kafka

安装maxwell增量采集工具

Maxwell 是由美国Zendesk公司开源,用Java编写的MySQL变更数据抓取软件。它会实时监控MySQL数据库的数据变更操作(包括insert、update、delete),并将变更数据以 JSON 格式发送给 Kafka、Kinesi等流数据处理平台

Maxwell的工作原理是实时读取MySQL数据库的二进制日志(Binlog),从中获取变更数据,再将变更数据以JSON格式发送至Kafka等流处理平台。

二进制日志(Binlog)是MySQL服务端非常重要的一种日志,它会保存MySQL数据库的所有数据变更记录。Binlog的主要作用包括主从复制和数据恢复。

Maxwell的工作原理和主从复制密切相关。

MySQL的主从复制,就是用来建立一个和主数据库完全一样的数据库环境,这个数据库称为从数据库。做数据库的热备、读写分离,在读多写少场景下,可以提高数据库工作效率。

maxwell就是将自己伪装成slave,并遵循MySQL主从复制的协议,从master同步数据。

https://github.com/zendesk/maxwell/releases/download/v1.29.2/maxwell-1.29.2.tar.gz

将安装包解压至/opt/module

MySQL服务器的Binlog默认是未开启的,如需进行同步,需要先进行开启

vim /etc/my.cnf

#数据库id

server-id = 1

#启动binlog,该参数的值会作为binlog的文件名

log-bin=mysql-bin

#binlog类型,maxwell要求为row类型

binlog_format=row

#启用binlog的数据库,需根据实际情况作出修改

binlog-do-db=gmall

重启MySQL服务systemctl restart mysqld

Maxwell需要在MySQL中存储其运行过程中的所需的一些数据,包括binlog同步的断点位置(Maxwell支持断点续传)等等,故需要在MySQL为Maxwell创建数据库及用户。

CREATE DATABASE maxwell;

CREATE USER 'maxwell'@'%' IDENTIFIED BY 'maxwell';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';

修改Maxwell配置文件名称

cd /opt/module/maxwell

cp config.properties.example config.properties

vim config.properties

#Maxwell数据发送目的地,可选配置有stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis

producer=kafka

# 目标Kafka集群地址

kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092

#目标Kafka topic,可静态配置,例如:maxwell,也可动态配置,例如:%{database}_%{table}

kafka_topic=topic_db

# MySQL相关配置

host=hadoop102

user=maxwell

password=maxwell

jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true

# 过滤gmall中的z_log表数据,该表是日志数据的备份,无须采集

filter=exclude:gmall.z_log

# 指定数据按照主键分组进入Kafka不同分区,避免数据倾斜

producer_partition_by=primary_key

若Maxwell发送数据的目的地为Kafka集群,则需要先确保zk、Kafka集群为启动状态

启动脚本

#!/bin/bash
MAXWELL_HOME=/opt/module/maxwell
status_maxwell(){
    result=`ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l`
    return $result
}

start_maxwell(){
    status_maxwell
    if [[ $? -lt 1 ]]; then
        echo "启动Maxwell"
        $MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --daemon
    else
        echo "Maxwell正在运行"
    fi
}

stop_maxwell(){
    status_maxwell
    if [[ $? -gt 0 ]]; then
        echo "停止Maxwell"
        ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9
    else
        echo "Maxwell未在运行"
    fi
}

case $1 in
    start )
        start_maxwell
    ;;
    stop )
        stop_maxwell
    ;;
    restart )
       stop_maxwell
       start_maxwell
    ;;
esac

启动后,进行数据库的修改,手动改一个数、运行lg使用jar包向数据库中添加内容,都会引起maxwell写入kafka

历史数据全量同步

可能需要使用到MySQL数据库中从历史至今的一个完整的数据集。这就需要我们在进行增量同步之前,先进行一次历史数据的全量同步。这样就能保证得到一个完整的数据集。

Maxwell提供了bootstrap功能来进行历史数据的全量同步,命令如下:

/opt/module/maxwell/bin/maxwell-bootstrap 
--database gmall 
--table activity_info
--config /opt/module/maxwell/config.properties

采用bootstrap方式同步的输出数据格式如下,注意 "type": "bootstrap-start","type": "bootstrap-complete",

{
  "database": "gmall",
  "table": "activity_info",
  "type": "bootstrap-start",
  "ts": 1705484093,
  "data": {}
}
{
  "database": "gmall",
  "table": "activity_info",
  "type": "bootstrap-insert",
  "ts": 1705484093,
  "data": {
    "id": 4,
    "activity_name": "TCL全场9折",
    "activity_type": "3103",
    "activity_desc": "TCL全场9折",
    "start_time": "2022-01-13 01:01:54",
    "end_time": "2023-06-19 00:00:00",
    "create_time": "2022-05-27 00:00:00",
    "operate_time": null
  }
}
······
{
  "database": "gmall",
  "table": "activity_info",
  "type": "bootstrap-complete",
  "ts": 1705484093,
  "data": {}
}

日志数据同步2HDFS

实时数仓由Flink源源不断从Kafka当中读数据计算,所以不需要手动同步数据到实时数仓。

用户行为数据由Flume从Kafka直接同步到HDFS,由于离线数仓采用Hive的分区表按天统计,所以目标路径要包含一层日期。具体数据流向如下图所示。

按照规划,该Flume需将Kafka中topic_log的数据发往HDFS。并且对每天产生的用户行为日志进行区分,将不同天的数据发往HDFS不同天的路径。

此处选择KafkaSource、FileChannel、HDFSSink。

#定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#配置source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder

#配置channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = false


a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

#组装 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

零点漂移问题

这里就是Flume配置job文件中,在源处加自定义拦截器 的 原因

拦截器jar包

生成jar包,放到flume的lib下,jar包的java文件存放路径要和job中那个拦截器路径一致,然后沟通Kafka-flume-hdfs

package com.atguigu.gmall.flume.interceptor;

import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;

import java.util.List;
import java.util.Map;

public class TimestampInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        //1、获取header和body的数据
        Map<String, String> headers = event.getHeaders();
        String log = new String(event.getBody(), StandardCharsets.UTF_8);

        try {
            //2、将body的数据类型转成jsonObject类型(方便获取数据)
            JSONObject jsonObject = JSONObject.parseObject(log);
            //3、header中timestamp时间字段替换成日志生成的时间戳(解决数据漂移问题)

            String ts = jsonObject.getString("ts");
            headers.put("timestamp", ts);
            

            return event;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }

    }

    @Override
    public List<Event> intercept(List<Event> list) {
        Iterator<Event> iterator = list.iterator();
        while (iterator.hasNext()) {
            Event event = iterator.next();
            if (intercept(event) == null) {
                iterator.remove();
            }
        }
        return list;

    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new TimestampInterceptor();
        }
        public void configure(Context context) {
        }
    }


}
<dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.10.1</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

同步!

先把日志/opt/module/applog/log清空,kafka清空

启动zk、kafka、hadoop、f1(日志到kafka)、f2(kafka到hdfs),然后生成模拟日志数据就行了

全量还是增量

通常情况,业务表数据量比较大,变动频繁,优先考虑增量,数据量比较小,不怎么变动,优先考虑全量

数据同步工具种类繁多,大致可分为两类,一类是以DataX、Sqoop为代表的基于Select查询的离线、批量同步工具,另一类是以Maxwell、Canal为代表的基于数据库数据变更日志(例如MySQL的binlog,其会实时记录所有的insert、update以及delete操作)的实时流式同步工具。

全量同步采用DataX,增量同步采用Maxwell。

安装DataX

https://github.com/alibaba/DataX?tab=readme-ov-file

DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQLOracle)HDFSHiveODPSHBaseFTP等各种异构数据源之间稳定高效的数据同步功能。

DataX的使用,用户只需根据数据的数据源和目的地选择相应的Reader和Writer,并将Reader和Writer的信息配置在一个json文件中,然后执行如下命令提交数据同步任务即可。

可以使用如下命名查看DataX配置文件模板

python bin/datax.py -r mysqlreader -w hdfswriter

TableMode

同步gmall数据库中base_province表数据到HDFS的/base_province目录

要实现该功能,需选用MySQLReader和HDFSWriter,MySQLReader具有两种模式分别是TableMode和QuerySQLMode,前者使用table,column,where等属性声明需要同步的数据;后者使用一条SQL查询语句声明需要同步的数据。

vim /opt/module/datax/job/base_province.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [
                            "id",
                            "name",
                            "region_id",
                            "area_code",
                            "iso_code",
                            "iso_3166_2",
                            "create_time",
                            "operate_time"
                        ],
                        "where": "id>=3",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://hadoop102:3306/gmall?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8"
                                ],
                                "table": [
                                    "base_province"
                                ]
                            }
                        ],
                        "password": "000000",
                        "splitPk": "",
                        "username": "root"
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "bigint"
                            },
                            {
                                "name": "name",
                                "type": "string"
                            },
                            {
                                "name": "region_id",
                                "type": "string"
                            },
                            {
                                "name": "area_code",
                                "type": "string"
                            },
                            {
                                "name": "iso_code",
                                "type": "string"
                            },
                            {
                                "name": "iso_3166_2",
                                "type": "string"
                            },
                            {
                                "name": "create_time",
                                "type": "string"
                            },
                            {
                                "name": "operate_time",
                                "type": "string"
                            }
                        ],
                        "compress": "gzip",
                        "defaultFS": "hdfs://hadoop102:8020",
                        "fieldDelimiter": "\t",
                        "fileName": "base_province",
                        "fileType": "text",
                        "path": "/base_province",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}

HFDS Writer并未提供nullFormat参数:也就是用户并不能自定义null值写到HFDS文件中的存储格式。默认情况下,HFDS Writer会将null值存储为空字符串(''),而Hive默认的null值存储格式为\N。所以后期将DataX同步的文件导入Hive表就会出现问题。

创建hdfs中的目录

hadoop fs -mkdir /base_province

运行

python bin/datax.py job/base_province.json

查看gz

hadoop fs -cat /base_province/* | zca

QuerySQLMode

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://hadoop102:3306/gmall?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8"
                                ],
                                "querySql": [
                                    "select id,name,region_id,area_code,iso_code,iso_3166_2,create_time,operate_time from base_province where id>=3"
                                ]
                            }
                        ],
                        "password": "000000",
                        "username": "root"
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "bigint"
                            },
                            {
                                "name": "name",
                                "type": "string"
                            },
                            {
                                "name": "region_id",
                                "type": "string"
                            },
                            {
                                "name": "area_code",
                                "type": "string"
                            },
                            {
                                "name": "iso_code",
                                "type": "string"
                            },
                            {
                                "name": "iso_3166_2",
                                "type": "string"
                            },
                            {
                                "name": "create_time",
                                "type": "string"
                            },
                            {
                                "name": "operate_time",
                                "type": "string"
                            }
                        ],
                        "compress": "gzip",
                        "defaultFS": "hdfs://hadoop102:8020",
                        "fieldDelimiter": "\t",
                        "fileName": "base_province",
                        "fileType": "text",
                        "path": "/base_province",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}

传参

DataX配置文件中HDFS Writer的path参数的值应该是动态的。为实现这一效果,就需要使用DataX传参的功能。

DataX传参的用法如下,在JSON配置文件中使用${param}引用参数,在提交任务时使用-p"-Dparam=value"传入参数值,具体示例如下。

"path": "/base_province/${dt}",

创建文件夹

hadoop fs -mkdir /base_province/2022-06-08

运行

python bin/datax.py -p"-Ddt=2022-06-08" job/base_province.json

sql2hdfs全量同步

需要为每张全量表编写一个DataX的json配置文件

写了一个脚本,流程不难但繁琐,建议回去看尚硅谷的资料

大致流程梳理:

目的是把数据库全量同步到hdfs,那么准备好datax配置文件json。

从资料里拉了个配置文件json生成器,一下就生成了所有要导的表的json。

然后写了一个脚本,执行mysql_to_hdfs_full.sh all 2022-06-08

慢慢等。。。。。。。。。。17张表导入

业务数据sql2hdfs增量同步 

通过maxwell和flume

Flume需要将Kafka中topic_db主题的数据传输到HDFS,故其需选用KafkaSource以及HDFSSink,Channel选用FileChannel。

需要注意的是, HDFSSink需要将不同MySQL业务表的数据写到不同的路径,并且路径中应当包含一层日期,用于区分每天的数据。关键配置如下:

vim job/kafka_to_hdfs_db.conf

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.sources.r1.kafka.topics = topic_db
a1.sources.r1.kafka.consumer.group.id = flume
a1.sources.r1.setTopicHeader = true
a1.sources.r1.topicHeader = topic
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.TimestampAndTableNameInterceptor$Builder

a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior2
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior2/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/db/%{tableName}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = db
a1.sinks.k1.hdfs.round = false


a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0


a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

编写Flume拦截器

在com.atguigu.gmall.flume.interceptor包下创建TimestampAndTableNameInterceptor类

package com.atguigu.gmall.flume.interceptor;

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

public class TimestampAndTableNameInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        Map<String, String> headers = event.getHeaders();
        String log = new String(event.getBody(), StandardCharsets.UTF_8);

        JSONObject jsonObject = JSONObject.parseObject(log);

        Long ts = jsonObject.getLong("ts");
        //Maxwell输出的数据中的ts字段时间戳单位为秒,Flume HDFSSink要求单位为毫秒
        String timeMills = String.valueOf(ts * 1000);

        String tableName = jsonObject.getString("table");
        headers.put("timestamp", timeMills);
        headers.put("tableName", tableName);
        return event;

    }

    @Override
    public List<Event> intercept(List<Event> events) {

        for (Event event : events) {
            intercept(event);
        }

        return events;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {


        @Override
        public Interceptor build() {
            return new TimestampAndTableNameInterceptor ();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

重新打包,放到flume/lib中

为方便使用,此处编写一个Flume的启停脚本。

vim f3

#!/bin/bash

case $1 in
"start")
        echo " --------启动 hadoop104 业务数据flume-------"
        ssh hadoop104 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_db.conf >/dev/null 2>&1 &"
;;

"stop")
        echo " --------停止 hadoop104 业务数据flume-------"
        ssh hadoop104 "ps -ef | grep kafka_to_hdfs_db | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac

DataX同步不常变数据,maxwell增量全量同步常变业务数据!!!!

增量表首日全量同步

通常情况下,增量表需要在首日进行一次全量同步,后续每日再进行增量同步,首日全量同步可以使用Maxwell的bootstrap功能,方便起见,下面编写一个增量表首日全量同步脚本。

vim mysql_to_kafka_inc_init.sh

#!/bin/bash

# 该脚本的作用是初始化所有的增量表,只需执行一次
MAXWELL_HOME=/opt/module/maxwell

import_data() {
    $MAXWELL_HOME/bin/maxwell-bootstrap --database gmall --table $1 --config $MAXWELL_HOME/config.properties

}
case $1 in
"cart_info")
  import_data cart_info
  ;;

"all")
  import_data cart_info
  import_data comment_info
  import_data coupon_use
  import_data favor_info
  import_data order_detail
  import_data order_detail_activity
  import_data order_detail_coupon
  import_data order_info
  import_data order_refund_info
  import_data order_status_log
  import_data payment_info
  import_data refund_payment
  import_data user_info
  ;;
esac

现将HDFS上之前同步的增量表数据删除。

hadoop fs -ls /origin_data/gmall/db | grep _inc | awk '{print $8}' | xargs hadoop fs -rm -r -f

mysql_to_kafka_inc_init.sh all

观察HDFS上是否重新出现增量表数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/428746.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【开源】SpringBoot框架开发数据可视化的智慧河南大屏

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块三、系统展示四、核心代码4.1 数据模块 A4.2 数据模块 B4.3 数据模块 C4.4 数据模块 D4.5 数据模块 E 五、免责说明 一、摘要 1.1 项目介绍 基于JAVAVueSpringBootMySQL的数据可视化的智慧河南大屏&#xff0c;包含了GDP、…

MATLAB环境下基于离散小波变换的心电信号伪影去除及PQRST波检测

可穿戴个人健康监护系统被广泛认为是下一代健康监护技术的核心解决方案。监护设备不断地感知、获取、分析和存储大量人体在日常活动中的生理数据&#xff0c;为人体的健康状况提供必要的、准确的、集成的和长期的评估和反馈。在心电监测领域&#xff0c;可穿戴传感器具有以下应…

C及C++每日练习(1)

一.选择&#xff1a; 1.以下for循环的执行次数是&#xff08;&#xff09; for(int x 0, y 0; (y 123) && (x < 4); x); A.是无限循环 B.循环次数不定 C.4次 D.3次 对于循环&#xff0c;其组成部分可以四个部分&#xff1a; for(初始化;循环进行条件;调整) …

JavaScript实现的计时器效果

之前做过电商网站倒计时的效果&#xff0c;今天在倒计时的基础上&#xff0c;把代码修改了一下&#xff0c;改为计时器效果&#xff0c;实现了以下功能&#xff1a; 1.点击“开始”后&#xff0c;按秒计时且“开始”文字变为“停止”&#xff1b; 2.点击“停止”&#xff0c;计…

【Python实战】——Python+Opencv是实现车牌自动识别

&#x1f349;CSDN小墨&晓末:https://blog.csdn.net/jd1813346972 个人介绍: 研一&#xff5c;统计学&#xff5c;干货分享          擅长Python、Matlab、R等主流编程软件          累计十余项国家级比赛奖项&#xff0c;参与研究经费10w、40w级横向 文…

自动化测试过程中的手机验证码处理!

手机验证码登录很普遍了&#xff0c;那么在自动化测试的时候需要登录&#xff0c;登录不了就意味着很多自动化就没法执行下去了。 到底该怎么处理呢&#xff1f;其实并不难&#xff0c;我们先看下验证码的业务逻辑&#xff0c;在我们“点击获取验证码”按钮的时候&#xff0c;…

LeetCode刷题-206.反转链表【递归实现】

206.反转链表 题目 给你单链表的头节点 head &#xff0c;请你反转链表&#xff0c;并返回反转后的链表。 示例 示例1 输入&#xff1a;head [1,2,3,4,5] 输出&#xff1a;[5,4,3,2,1]示例2 输入&#xff1a;head [1,2] 输出&#xff1a;[2,1]示例3 输入&#xff1a;hea…

【C语言】动态内存管理------常见错误,以及经典笔试题分析,柔性数组【图文详解】

欢迎来CILMY23的博客喔&#xff0c;本篇为【C语言】动态内存管理------常见错误&#xff0c;以及经典笔试题分析&#xff0c;柔性数组【图文详解】&#xff0c;感谢观看&#xff0c;支持的可以给个一键三连&#xff0c;点赞关注收藏。 前言 在了解完内存操作中最关键的一节---动…

微信客户维护的三个关键点,助你提高转化率!

对于微信客户维护&#xff0c;有三个关键点尤为重要&#xff0c;它们能够有效提高客户转化率&#xff0c;让客户服务更加高效和个性化。接下来&#xff0c;让我们一起来了解这三个关键点。 1、 给客户打标签 在日常的客户维护中&#xff0c;给客户打标签是非常重要的。通过给…

BioTech - 药物晶型预测与剂型设计 概述

欢迎关注我的CSDN&#xff1a;https://spike.blog.csdn.net/ 本文地址&#xff1a;https://blog.csdn.net/caroline_wendy/article/details/136441046 药物晶型预测与剂型设计是指利用计算机模拟和优化药物分子在固态形式下的结构、性质和稳定性&#xff0c;以及与制剂工艺和质…

【Python】外网远程登录访问jupyter notebook+pycharm使用ipython

第一步&#xff1a;创建python虚拟环境 conda create -n py3610 python3.6.10第二步&#xff1a;安装ipython pip install ipython pip install ipython notebook第三步&#xff1a;创建 IPython Notebook 服务器配置文件 # 进入python交互shell&#xff0c;设置密码 >&…

基于springboot+vue的校园失物招领系统

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、阿里云专家博主、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战&#xff0c;欢迎高校老师\讲师\同行交流合作 ​主要内容&#xff1a;毕业设计(Javaweb项目|小程序|Pyt…

HLS的硬件加速器设计

完整可点击跳转 目录 硬件加速器的设计方法高层次综合HLSHLS与电路地对应关系HLS的设计规范HLS优化延迟优化降低单个循环的延迟循环展开(Unroll)循环展平(Flatten)多个循环的并行化循环合并循环函数化数据流执行(Dataflow)吞吐量优化循环/函数流水线数据流优化调试硬件加…

UCSF DOCK 分子对接详细案例(04)-基于RDKit描述符的分子从头设计DOCK_D3N

欢迎浏览我的CSND博客&#xff01; Blockbuater_drug …点击进入 文章目录 前言一、 软件及操作环境二、研究目的三、结构文件准备四、 DOCK/RDKit中 de novo design4.1 de novo design - refine_D3N4.2 对输出重新评分 总结参考资料 前言 本文是UCSF DOCK的使用案例分享&…

【JavaScript 漫游】【029】GlobalEventHandlers 接口总结

文章简介 本篇文章为【JavaScript 漫游】专栏第 029 篇文章&#xff0c;对 JavaScript 中的 GlobalEventHandlers 接口的知识点进行了总结。 GlobalEventHandlers 接口 除了 addEventListener()&#xff0c;还有一种方法可以直接指定事件的回调函数。 div.onclick clickHa…

LSA头部结构简述

LSA&#xff08;Link State Advertisement&#xff09;是一种用于路由协议头部结构&#xff0c;用于在网络中传递路由信息。 LSA头部结构包含以下几个字段&#xff1a; 1、LSA类型&#xff08;LSA Type&#xff09;&#xff1a;指示LSA的类型&#xff0c;不同类型的LSA用于传递…

软件项目概要设计说明书

1引言 1.1编写目的 1.2项目背景 1.3参考资料 2系统总体设计 2.1整体架构 2.2整体功能架构 2.3整体技术架构 2.4运行环境设计 2.5设计目标 3系统功能模块设计 3.1个人办公 3.2系统管理 4性能设计 4.1响应时间 4.2并发用户数 5接口设计 5.1接口设计原则 5.2接口实现方式 6运行设计…

【深度学习目标检测】二十一、基于深度学习的葡萄检测系统-含数据集、GUI和源码(python,yolov8)

葡萄检测在农业中具有多方面的意义&#xff0c;具体来说如下&#xff1a; 首先&#xff0c;葡萄检测有助于保障农产品质量安全。通过对葡萄进行质量安全专项监测&#xff0c;可以确保葡萄中的农药残留、重金属等有害物质含量符合标准&#xff0c;从而保障消费者的健康。同时&am…

Java 面试题

Java 基础 以下代码执行结果&#xff1f; 示例1&#xff1a; public static void main(String[] args) {int a 0;Integer b 0;String c "0";String d new String("0");change(a, b, c, d);System.out.println(a "|" b "|" …

springboot244基于SpringBoot和VUE技术的智慧生活商城系统设计与实现

智慧生活商城系统的设计与实现 摘 要 计算机网络发展到现在已经好几十年了&#xff0c;在理论上面已经有了很丰富的基础&#xff0c;并且在现实生活中也到处都在使用&#xff0c;可以说&#xff0c;经过几十年的发展&#xff0c;互联网技术已经把地域信息的隔阂给消除了&…