0101模板生成任务与shell命令执行任务-datax-python工具

文章目录

    • 一、前言
    • 二、分析
      • 2.1 mysql工具
      • 2.2 模板
      • 2.2 执行shell命令
    • 三、代码实现
    • 四、演示
    • 五、待优化
    • 结语

一、前言

最近在学习数仓相关内容,需要把mysql业务数据库gmall中的数据全量同步到hdfs中。使用的工具是datax,虽然datax可以在一个job内放置多个表,但是考虑每个表中数据可能处置方式,存放位置等不同,我们一个表安排一个任务job。最后执行这些job。

gmall库中有几十张表,如果手动创建job的json文件,很费事费力,容易出错;而且通过观察,除了表元数据和存放位置不同外,其他是相同的,那么我们考虑通过模板来生成job。语言选择python。通过subprocess来执行shell命令:datax任务。

下面为相关的环境信息:

软件版本描述
hadoop3.3.4大数组生态基础组件
datax同步异构数据源数据
python3.9python语言
mysql8.x关系型数据库

二、分析

2.1 mysql工具

从mysql读取数据,我们不在重复造轮子了,参考下面链接2和3,工具类代码如下:

# -*- encoding: utf-8 -*-
"""
mysql工具类,主要功能如下:
1. 构造连接mysql
2. 获取设置基础信息:选择数据库、查询数据库版本
3. 查询:查询一条数据、查询多条数据
4. 新增、修改、删除
"""

import pymysql


class MySQLUtil:

    def __init__(self, host="127.0.0.1", user=None, passwd=None, db=None, charset="utf8", *args, **kwargs):
        """
        构造函数
        :param host: 主机地址 
        :param user: 用户名
        :param passwd: 密码
        :param db: 数据库名
        :param charset: 字符集
        :param args: 参数
        :param kwargs: 
        """
        self.__host = host
        self.__user = user
        self.__passwd = passwd
        self.__db = db
        self.__conn = pymysql.connect(host=host, user=user, passwd=passwd, db=db, charset=charset, *args, **kwargs)
        self.__cursor = self.__conn.cursor()

    def __del__(self):
        """析构函数"""
        self.__cursor.close()
        self.__conn.close()

    def get_conn(self):
        """获取连接"""
        return self.__conn

    def get_cursor(self, cursor=None):
        """获取游标"""
        return self.__conn.cursor(cursor)

    def select_db(self, db):
        """
        选择数据库
        :param db: 数据库名 
        :return: 
        """
        self.__conn.select_db(db)

    def list_databases(self, args=None):
        """查询所有数据库"""
        self.__cursor.execute("SHOW DATABASES", args)
        dbs = []
        for db in self.__cursor.fetchall():
            dbs.append(db[0])
        return dbs

    def list_tables(self, args=None):
        """查询所有表"""
        self.__cursor.execute("SHOW TABLES", args)
        tables = []
        for table in self.__cursor.fetchall():
            tables.append(table[0])
        return tables

    def execute(self, sql, args=None):
        """获取SQL执行结果"""
        self.__cursor.execute(sql, args)
        return self.__cursor.fetchall()

    def get_version(self, args=None):
        """获取MySQL版本"""
        self.__cursor.execute("SELECT VERSION()", args)
        version = self.__cursor.fetchone()
        print("MySQL Version : %s" % version)
        return version

    def list_table_metadata(self, args=None):
        """查询所有表的元数据信息"""
        sql = "SELECT * FROM information_schema.TABLES WHERE TABLE_TYPE !='SYSTEM VIEW' AND TABLE_SCHEMA NOT IN ('sys','mysql','information_schema','performance_schema')"
        self.__cursor.execute(sql, args)
        return self.__cursor.fetchall()

    def get_table_fields(self, db, table, args=None):
        """获取表字段信息"""
        sql = 'SELECT COLUMN_NAME FROM information_schema.COLUMNS WHERE table_schema="' + db + '" AND table_name="' + table + '"'
        self.__cursor.execute(sql, args)
        fields = []
        for field in self.__cursor.fetchall():
            fields.append(field[0])
        return fields

    def table_metadata(self, db, table, args=None):
        """查询表字段的元数据信息"""
        db = "'" + db + "'"
        table = "'" + table + "'"
        sql = """
        SELECT 
            column_name,data_type,ordinal_position,column_comment,column_default 
        FROM 
            information_schema.COLUMNS 
        WHERE 
            table_schema = %s AND table_name = %s;
        """ % (db, table)
        self.__cursor.execute(sql, args)
        return self.__cursor.fetchall()

    def query_one(self, sql, args=None):
        """查询单条数据"""
        result = None
        try:
            self.cursor.execute(sql, args)
            result = self.cursor.fetchone()

        except Exception as e:
            print(e)
        return result

    def query_all(self, sql, args=None):
        """查询多条数据"""
        list_result = ()
        try:
            self.cursor.execute(sql, args)
            list_result = self.cursor.fetchall()

        except Exception as e:
            print(e)
        return list_result

    def insert(self, sql):
        """新增数据"""
        return self.__edit(sql)

    def update(self, sql):
        """更新数据"""
        return self.__edit(sql)

    def delete(self, sql):
        """删除数据"""
        return self.__edit(sql)

    def __edit(self, sql):
        count = 0
        try:
            count = self.cursor.execute(sql)
        except Exception as e:
            print(e)
        return count


if __name__ == "__main__":
    mysqlUtil = MySQLUtil(host='node1', user="root", passwd="123456", db="gmall")
    mysqlUtil.get_version()
    dbs = mysqlUtil.list_databases()
    print(dbs)
    conn = mysqlUtil.get_conn()
    mysqlUtil.select_db("gmall")
    # print(type(conn.db), conn.db)
    # databases = mysqlUtil.list_databases()
    # print(type(databases), databases)
    # tables = mysqlUtil.list_tables()
    # print(type(tables), tables)
    # sql = "SELECT * FROM activity_info"
    # result = mysqlUtil.execute(sql)
    # for i in result:
    #     print(i)
    result = mysqlUtil.table_metadata("gmall", "activity_info")
    for i in result:
        print(i[0],'==',i[1],'===', type(i))
    # result = mysqlUtil.get_table_fields("gmall", "activity_info")
    # for i in result:
    #     print(i)

2.2 模板

datax任务为从mysql读取数据同步到hdfs,可以在datax官网查看相应的示例,这里直接给出我们的模板文件mysql2hdfs.tpl

{
	"job": {
		"content": [
		{
			"reader": {
				"name": "mysqlreader",
				"parameter": {
					"column": ["*"],
					"connection": [
					{
						"jdbcUrl": [
							"jdbc:mysql://node1:3306/gmall?useUnicode=true&allowKeyRetrieval=tru&characterEncoding=utf-8"
						],
						"table": ["$table_name"]
					}
					],
					"password": "123456",
					"splitPk": "",
					"username": "root"
				}
			},
			"writer": {
				"name": "hdfswriter",
				"parameter": {
					"column": [$COLUMN],
					"compress": "gzip",
					"defaultFS": "hdfs://node1:8020",
					"fieldDelimiter": "\t",
					"fileName": "$table_name",
					"fileType": "text",
					"path": "/origin_data/gmall/db/$DIRNAME",
					"writeMode": "append"
				}
			}
		}
		],
		"setting": {
			"speed": {
				"channel": 1
			}
		}
	}
}

变量说明:

  • $table_name:填充对应的表名
  • $COLUMN:填充表对应的列名和数据类型(hdfs类型)
  • $DIRNAME:填充对应表存储在hdfs中的父路径名称

tips: 模板中相关的配置参数值改成自己的,包括数据库连接相关参数,hdfs连接参数,存储路径等等。

实现使用string中的Template。

2.2 执行shell命令

python执行shell命令,参考最后链接5,这里我们使用subprocess

三、代码实现

完整结构和主要代码,如下所示:

在这里插入图片描述

主要逻辑代码mysql2hdfs.py:

"""
读取mysql元数据,通过模板生成datax任务job:同步全量mysql数据到hdfs,调用shell执行领料
1. 基础配置信息
    1.1 mysql与hdfs数据类型对应map
2. 读取mysql元数据信息,通过模板文件生成job文件
3. 执行shell命令,运行datax任务
"""

import os
import pathlib
import subprocess
from string import Template
from time import sleep

import MySQLUtil as util

# mysql与hdfs数据类型对应map
type_mysql_hdfs = {
    'tinyint': 'tinyint',
    'smallint': 'smallint',
    'int': 'int',
    'bigint': 'bigint',
    'float': 'float',
    'double': 'double',
    'varchar': 'string',
    'bool': 'boolean',
    'timestamp': 'timestamp',
    'datetime': 'string',
    'date': 'date',
    'decimal': 'double',
    'text': 'string'
}


def tmp2job(db_util, tmp_file):
    """
    读取mysql数据,用任务模板生成datax 任务
        job:全量同步mysql数据到hdfs
    :param db_util: 数据库工具
    :param tmp_file: 模板文件
    :return:
    """
    # 获取模版文件
    tmp_file = pathlib.Path(__file__).parent.joinpath(tmp_file)
    # 生成文件路径
    target_dir = pathlib.Path(__file__).parent.joinpath('gmall')
    # 获取全部表名
    db_util.select_db("gmall")
    tables = db_util.list_tables()
    # tables= [tables[0]]
    for table in tables:
        if table == 'z_log':
            continue
        target_file = target_dir.joinpath(table + '.json')
        with open(tmp_file, mode="r", encoding="utf-8") as r_f, open(
                target_file, mode="w", encoding="utf8"
        ) as w_f:
            template_content = r_f.read()
            # print(f"template_content:{template_content}")
            template = Template(template_content)
            columns = db_util.table_metadata(db='gmall', table=table)
            column_str = ''
            # 拼接hdfswriter column
            for column in columns:
                # print(column)
                type1 = type_mysql_hdfs.get(column[1])
                # print(type1)
                column_str += '{\"name\":\"' + column[0] + '\",\"type\":\"' + type1 + '\"},'
            column_str = column_str[:-1]
            # print(os.path.split(table)[0])
            # 替换模板中的文件名,hdfswriter中的column,及hdfs文件存储路径
            data = template.substitute(table_name=table, COLUMN=column_str, DIRNAME=os.path.splitext(table)[0])
            w_f.write(data)


# 执行job脚本
def execute_shell(db_util):
    """
    执行shell命令:运行datax任务
    :param db_util: 数据库工具
    :return:
    """
    # cmd_ls = 'ls /export/server/datax/job/gmall'
    # name = subprocess.check_output(cmd_ls, shell=True)
    # names = str(name, encoding='utf-8').split('\n')[:-1]
    db_util.select_db("gmall")
    names = db_util.list_tables()
    # tables= [tables[0]]
    for name in names:
        if name == 'z_log':
            continue
        # 确保hdfs父路径存在
        hdfs_mkdir = 'hdfs dfs -mkdir -p /origin_data/gmall/db/' + os.path.splitext(name)[0]
        # print('-'*5, hdfs_mkdir,'-'*5,name)
        ret = subprocess.check_call(hdfs_mkdir, shell=True)
        # print('---ret--',ret)
        # 执行datax job任务
        commond = "python /export/server/datax/bin/datax.py  /export/server/datax/job/gmall/" + name + '.json'
        # print(commond)
        subprocess.call(commond, shell=True)
        sleep(1)


if __name__ == '__main__':
    db_util = util.MySQLUtil(host="node1", user="root", passwd="123456", db="gmall")
    tmp2job(db_util, tmp_file='mysql2hdfs.tpl')
    execute_shell(db_util)

完整在下面源代码仓库

四、演示

在这里插入图片描述

把相关代码放置在datax的job目录下,创建gmall存放模板生成的任务。

昨天执行过,这里不再执行,我们去web端查看hdfs同步文件,如下图所示:

在这里插入图片描述

五、待优化

  • 我们的数据库名、存储位置写死了,可以改为传参。
  • mysqlreader中column把*改为具体的表中列名。
  • 空值校验,在获取表名、列名等地方进行空值校验,避免生成无意义的文件或者路径。
  • 执行效率:如果硬件配置可以,可以考虑并行执行,每个任务可独立运行,提高效率。

结语

如果小伙伴什么问题或者指教,欢迎交流。

❓QQ:806797785

⭐️源代码仓库地址:https://gitee.com/gaogzhen/smart-utilities

参考链接:

[1]数仓视频-模拟数据生成[CP/OL].2023-12-12.p98.

[2]python3连接MySQL的工具类

[3]python-mysql数据库连接工具类封装

[4]datax 官方文档

[5]python执行shell脚本的几种方法

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/501931.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【实现报告】学生信息管理系统(链表实现)

目录 实验一 线性表的基本操作 一、实验目的 二、实验内容 三、实验提示 四、实验要求 五、实验代码如下: (一)链表的构建及初始化 学生信息结构体定义 定义元素类型 链表节点结构体定义 初始化链表 (二)…

C++编译过程

C编译过程分为四个步骤:分别是预处理(Prepressing) 、编译(Compilation) 、汇编(Assembly) 和链接(Linking),如下图所示: 假如一个文件名为hello.cpp 预编译后的文件 1、预编译 将源代码文件hello.cpp和源文件中使用到的头文件&#xff0c…

WEB自动化测试,一定得掌握的8个核心知识点

写在前面 使用 cypress 进行端对端测试,和其他的一些框架有一个显著不同的地方,它使用 JavaScript 作为编程语言。 传统主流的 selenium 框架是支持多语言的,大多数 QA 会的 python 和 Java 语言都可以编写 selenium 代码,遇到需…

Adaboost集成学习 | Matlab实现基于RF-Adaboost随机森林结合Adaboost集成学习时间序列预测

目录 效果一览基本介绍模型设计程序设计参考资料效果一览 基本介绍 Matlab实现基于RF-Adaboost随机森林结合Adaboost集成学习时间序列预测。基于RF-Adaboost(随机森林结合Adaboost集成学习)的时间序列预测方法结合了随机森林在处理高维数据和复杂关系方面的优势,以及Adaboos…

vue源码解析—— watch/computed的实现逻辑和区别

watch 和 computed 是 Vue 中的两个重要的响应式属性,它们在实现机制和使用上存在一些区别。 watch:用于监听数据的变化,并在数据变化时执行回调函数。可以使用 deep 配置项来开启深度监听,监听数据的子属性变化。可以使用 immedi…

QT 最近使用的项目配置文件

目录 1 QT 最近使用的项目配置文件所在路径 2 QtCreator.ini 1 QT 最近使用的项目配置文件所在路径 C:\Users\your username\AppData\Roaming\QtProject QtCreator.ini最好先备份一份 2 QtCreator.ini ProjectExplorer 下面的 RecentProjects\FileNames RecentProjects\…

希尔排序

文章目录 前言一.直接插入排序(时间复杂度N^2)二.希尔排序时间复杂度 前言 今天我们来讲一下排序算法中的插入排序中的希尔排序,插入排序分为两种,一种是直接插入排序,另一种就是希尔排序 一.直接插入排序(时间复杂度N^2) 我们这个排序,我们…

Day26 手撕各种集合底层源码(一)

Day26 手撕各种集合底层源码(一) 一、手撕ArrayList底层源码 1、概念: ArrayList的底层实现是基于数组的动态扩容结构。 2、思路: 1.研究继承关系 2.研究属性 3.理解创建集合的过程 – 构造方法的底层原理 4.研究添加元素的过程…

微机原理-基于8086倒计时多路抢答器系统

**单片机设计介绍,微机原理-基于8086倒计时多路抢答器系统 文章目录 一 概要二、功能设计三、 软件设计原理图 五、 程序六、 文章目录 一 概要 微机原理-基于8086倒计时多路抢答器系统概要主要关注于利用8086微处理器设计和实现一个具有倒计时功能的多路抢答器系统…

总结UDP协议各类知识点

前言 本篇博客博主将详细地介绍UDP有关知识点,坐好板凳发车啦~ 一.UDP特点 1.无连接 UDP传输的过程类似于发短信,知道对端的IP和端口号就直接进行传输,不需要建立连接; 2.不可靠传输 没有任何的安全机制,发送端发…

MySQL Innodb 引擎中预防 Update 操作上升为表锁

一、MySQL 如何预防 Update 上升为表锁 在 MySQL 中,进行任何数据的 修改 操作都会进行一定的锁操作,而锁的不同直接导致性能的差异。例如 MyISAM 引擎,更新时采用表锁,并发性较差。而 Innodb 引擎支持事务,更新时采用…

c++调用阿里云短信服务

💂 个人主页:pp不会算法^ v ^ 🤟 版权: 本文由【pp不会算法v】原创、在CSDN首发、需要转载请联系博主 💬 如果文章对你有帮助、欢迎关注、点赞、收藏(一键三连)和订阅专栏哦 购买套餐包 申请资质 申请模板 申请签名 上面这些审核通过之后 添…

低代码平台与自动化软件开发的关系

引言 随着信息技术的不断发展,软件开发领域也在不断演进。在追求更高效、更快速的软件开发过程中,低代码平台和自动化软件开发技术日益受到关注。低代码平台以其可视化开发界面和快速构建应用的能力,为非专业开发人员提供了参与软件开发的机会…

预处理详解(一) -- 预定义符号与#define定义

目录 一. 预定义符号二. #define1.#define定义常量2.#define定义宏3.带有副作用的宏参数4.宏替换的规则5.宏和函数的对比 一. 预定义符号 %s _ _FILE_ _ //文件 %s _ _ DATE_ _ //日期 %s _ _ TIME_ _ //时间 %d _ _ LINE_ _ //行号 %d _ _ STDC_ _ //如果编译器支持 ANSI C,那…

【Vue】动态样式

内联样式的动态样式 body(){ boxASelect:false, } v-bind:style"{borderColor:boxASelect ? red : #ccc}" <body><header><h1>Vue Dynamic Styling</h1></header><section id"styling"><div class"demo&quo…

kubernetes(K8S)学习(七):K8S之系统核心组件

K8S之系统核心组件 K8s系统核心组件1.1 Master和Node1.2 kubeadm1.3 先把核心组件总体过一遍1.4 Kubernetes源码查看方式1.5 kubectl1.6 API Server1.7 集群安全机制之API Server1.8 Scheduler1.9 kubelet1.10 kube-proxy K8s系统核心组件 1.1 Master和Node 官网 &#xff1a;…

蓝桥杯刷题-重新排序

重新排序 差分&#xff1a; s,d [0]*100010,[0]*100010 tmp 0 n int(input()) a list(map(int,input().split())) a.insert(0,0) for i in range(1,n1):s[i] s[i-1] a[i] m int(input()) for _ in range(m):l,r map(int,input().split())# [l,r]的和tmp s[r] - s[l-1…

【AI】命令行调用大模型

&#x1f308;个人主页: 鑫宝Code &#x1f525;热门专栏: 闲话杂谈&#xff5c; 炫酷HTML | JavaScript基础 ​&#x1f4ab;个人格言: "如无必要&#xff0c;勿增实体" 文章目录 【AI】命令行调用大模型引入正文初始化项目撰写脚本全局安装 成果展示 【AI】命令…

基于Spring Boot的在线学习系统的设计与实现

基于Spring Boot的在线学习系统的设计与实现 摘 要 在线学习系统是以大学传统线下教学方式不适应信息技术的迅速发展为背景&#xff0c;提高学习效率&#xff0c;解决传统教学问题&#xff0c;并且高效的实现教学信息化的一款软件系统。为了更好的实现对于教学和学生的管理&a…

【C++进阶】多态,带你领悟虚函数和虚函数表

&#x1fa90;&#x1fa90;&#x1fa90;欢迎来到程序员餐厅&#x1f4ab;&#x1f4ab;&#x1f4ab; 主厨&#xff1a;邪王真眼 主厨的主页&#xff1a;Chef‘s blog 所属专栏&#xff1a;c大冒险 总有光环在陨落&#xff0c;总有新星在闪烁 【本节目标】 1. 多态的概…