文章目录
- 一、前言
- 二、分析
- 2.1 mysql工具
- 2.2 模板
- 2.2 执行shell命令
- 三、代码实现
- 四、演示
- 五、待优化
- 结语
一、前言
最近在学习数仓相关内容,需要把mysql业务数据库gmall中的数据全量同步到hdfs中。使用的工具是datax,虽然datax可以在一个job内放置多个表,但是考虑每个表中数据可能处置方式,存放位置等不同,我们一个表安排一个任务job。最后执行这些job。
gmall库中有几十张表,如果手动创建job的json文件,很费事费力,容易出错;而且通过观察,除了表元数据和存放位置不同外,其他是相同的,那么我们考虑通过模板来生成job。语言选择python。通过subprocess来执行shell命令:datax任务。
下面为相关的环境信息:
软件 | 版本 | 描述 |
---|---|---|
hadoop | 3.3.4 | 大数组生态基础组件 |
datax | 同步异构数据源数据 | |
python | 3.9 | python语言 |
mysql | 8.x | 关系型数据库 |
二、分析
2.1 mysql工具
从mysql读取数据,我们不在重复造轮子了,参考下面链接2和3,工具类代码如下:
# -*- encoding: utf-8 -*-
"""
mysql工具类,主要功能如下:
1. 构造连接mysql
2. 获取设置基础信息:选择数据库、查询数据库版本
3. 查询:查询一条数据、查询多条数据
4. 新增、修改、删除
"""
import pymysql
class MySQLUtil:
def __init__(self, host="127.0.0.1", user=None, passwd=None, db=None, charset="utf8", *args, **kwargs):
"""
构造函数
:param host: 主机地址
:param user: 用户名
:param passwd: 密码
:param db: 数据库名
:param charset: 字符集
:param args: 参数
:param kwargs:
"""
self.__host = host
self.__user = user
self.__passwd = passwd
self.__db = db
self.__conn = pymysql.connect(host=host, user=user, passwd=passwd, db=db, charset=charset, *args, **kwargs)
self.__cursor = self.__conn.cursor()
def __del__(self):
"""析构函数"""
self.__cursor.close()
self.__conn.close()
def get_conn(self):
"""获取连接"""
return self.__conn
def get_cursor(self, cursor=None):
"""获取游标"""
return self.__conn.cursor(cursor)
def select_db(self, db):
"""
选择数据库
:param db: 数据库名
:return:
"""
self.__conn.select_db(db)
def list_databases(self, args=None):
"""查询所有数据库"""
self.__cursor.execute("SHOW DATABASES", args)
dbs = []
for db in self.__cursor.fetchall():
dbs.append(db[0])
return dbs
def list_tables(self, args=None):
"""查询所有表"""
self.__cursor.execute("SHOW TABLES", args)
tables = []
for table in self.__cursor.fetchall():
tables.append(table[0])
return tables
def execute(self, sql, args=None):
"""获取SQL执行结果"""
self.__cursor.execute(sql, args)
return self.__cursor.fetchall()
def get_version(self, args=None):
"""获取MySQL版本"""
self.__cursor.execute("SELECT VERSION()", args)
version = self.__cursor.fetchone()
print("MySQL Version : %s" % version)
return version
def list_table_metadata(self, args=None):
"""查询所有表的元数据信息"""
sql = "SELECT * FROM information_schema.TABLES WHERE TABLE_TYPE !='SYSTEM VIEW' AND TABLE_SCHEMA NOT IN ('sys','mysql','information_schema','performance_schema')"
self.__cursor.execute(sql, args)
return self.__cursor.fetchall()
def get_table_fields(self, db, table, args=None):
"""获取表字段信息"""
sql = 'SELECT COLUMN_NAME FROM information_schema.COLUMNS WHERE table_schema="' + db + '" AND table_name="' + table + '"'
self.__cursor.execute(sql, args)
fields = []
for field in self.__cursor.fetchall():
fields.append(field[0])
return fields
def table_metadata(self, db, table, args=None):
"""查询表字段的元数据信息"""
db = "'" + db + "'"
table = "'" + table + "'"
sql = """
SELECT
column_name,data_type,ordinal_position,column_comment,column_default
FROM
information_schema.COLUMNS
WHERE
table_schema = %s AND table_name = %s;
""" % (db, table)
self.__cursor.execute(sql, args)
return self.__cursor.fetchall()
def query_one(self, sql, args=None):
"""查询单条数据"""
result = None
try:
self.cursor.execute(sql, args)
result = self.cursor.fetchone()
except Exception as e:
print(e)
return result
def query_all(self, sql, args=None):
"""查询多条数据"""
list_result = ()
try:
self.cursor.execute(sql, args)
list_result = self.cursor.fetchall()
except Exception as e:
print(e)
return list_result
def insert(self, sql):
"""新增数据"""
return self.__edit(sql)
def update(self, sql):
"""更新数据"""
return self.__edit(sql)
def delete(self, sql):
"""删除数据"""
return self.__edit(sql)
def __edit(self, sql):
count = 0
try:
count = self.cursor.execute(sql)
except Exception as e:
print(e)
return count
if __name__ == "__main__":
mysqlUtil = MySQLUtil(host='node1', user="root", passwd="123456", db="gmall")
mysqlUtil.get_version()
dbs = mysqlUtil.list_databases()
print(dbs)
conn = mysqlUtil.get_conn()
mysqlUtil.select_db("gmall")
# print(type(conn.db), conn.db)
# databases = mysqlUtil.list_databases()
# print(type(databases), databases)
# tables = mysqlUtil.list_tables()
# print(type(tables), tables)
# sql = "SELECT * FROM activity_info"
# result = mysqlUtil.execute(sql)
# for i in result:
# print(i)
result = mysqlUtil.table_metadata("gmall", "activity_info")
for i in result:
print(i[0],'==',i[1],'===', type(i))
# result = mysqlUtil.get_table_fields("gmall", "activity_info")
# for i in result:
# print(i)
2.2 模板
datax任务为从mysql读取数据同步到hdfs,可以在datax官网查看相应的示例,这里直接给出我们的模板文件mysql2hdfs.tpl
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://node1:3306/gmall?useUnicode=true&allowKeyRetrieval=tru&characterEncoding=utf-8"
],
"table": ["$table_name"]
}
],
"password": "123456",
"splitPk": "",
"username": "root"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [$COLUMN],
"compress": "gzip",
"defaultFS": "hdfs://node1:8020",
"fieldDelimiter": "\t",
"fileName": "$table_name",
"fileType": "text",
"path": "/origin_data/gmall/db/$DIRNAME",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
变量说明:
- $table_name:填充对应的表名
- $COLUMN:填充表对应的列名和数据类型(hdfs类型)
- $DIRNAME:填充对应表存储在hdfs中的父路径名称
tips: 模板中相关的配置参数值改成自己的,包括数据库连接相关参数,hdfs连接参数,存储路径等等。
实现使用string中的Template。
2.2 执行shell命令
python执行shell命令,参考最后链接5,这里我们使用subprocess
三、代码实现
完整结构和主要代码,如下所示:
主要逻辑代码mysql2hdfs.py:
"""
读取mysql元数据,通过模板生成datax任务job:同步全量mysql数据到hdfs,调用shell执行领料
1. 基础配置信息
1.1 mysql与hdfs数据类型对应map
2. 读取mysql元数据信息,通过模板文件生成job文件
3. 执行shell命令,运行datax任务
"""
import os
import pathlib
import subprocess
from string import Template
from time import sleep
import MySQLUtil as util
# mysql与hdfs数据类型对应map
type_mysql_hdfs = {
'tinyint': 'tinyint',
'smallint': 'smallint',
'int': 'int',
'bigint': 'bigint',
'float': 'float',
'double': 'double',
'varchar': 'string',
'bool': 'boolean',
'timestamp': 'timestamp',
'datetime': 'string',
'date': 'date',
'decimal': 'double',
'text': 'string'
}
def tmp2job(db_util, tmp_file):
"""
读取mysql数据,用任务模板生成datax 任务
job:全量同步mysql数据到hdfs
:param db_util: 数据库工具
:param tmp_file: 模板文件
:return:
"""
# 获取模版文件
tmp_file = pathlib.Path(__file__).parent.joinpath(tmp_file)
# 生成文件路径
target_dir = pathlib.Path(__file__).parent.joinpath('gmall')
# 获取全部表名
db_util.select_db("gmall")
tables = db_util.list_tables()
# tables= [tables[0]]
for table in tables:
if table == 'z_log':
continue
target_file = target_dir.joinpath(table + '.json')
with open(tmp_file, mode="r", encoding="utf-8") as r_f, open(
target_file, mode="w", encoding="utf8"
) as w_f:
template_content = r_f.read()
# print(f"template_content:{template_content}")
template = Template(template_content)
columns = db_util.table_metadata(db='gmall', table=table)
column_str = ''
# 拼接hdfswriter column
for column in columns:
# print(column)
type1 = type_mysql_hdfs.get(column[1])
# print(type1)
column_str += '{\"name\":\"' + column[0] + '\",\"type\":\"' + type1 + '\"},'
column_str = column_str[:-1]
# print(os.path.split(table)[0])
# 替换模板中的文件名,hdfswriter中的column,及hdfs文件存储路径
data = template.substitute(table_name=table, COLUMN=column_str, DIRNAME=os.path.splitext(table)[0])
w_f.write(data)
# 执行job脚本
def execute_shell(db_util):
"""
执行shell命令:运行datax任务
:param db_util: 数据库工具
:return:
"""
# cmd_ls = 'ls /export/server/datax/job/gmall'
# name = subprocess.check_output(cmd_ls, shell=True)
# names = str(name, encoding='utf-8').split('\n')[:-1]
db_util.select_db("gmall")
names = db_util.list_tables()
# tables= [tables[0]]
for name in names:
if name == 'z_log':
continue
# 确保hdfs父路径存在
hdfs_mkdir = 'hdfs dfs -mkdir -p /origin_data/gmall/db/' + os.path.splitext(name)[0]
# print('-'*5, hdfs_mkdir,'-'*5,name)
ret = subprocess.check_call(hdfs_mkdir, shell=True)
# print('---ret--',ret)
# 执行datax job任务
commond = "python /export/server/datax/bin/datax.py /export/server/datax/job/gmall/" + name + '.json'
# print(commond)
subprocess.call(commond, shell=True)
sleep(1)
if __name__ == '__main__':
db_util = util.MySQLUtil(host="node1", user="root", passwd="123456", db="gmall")
tmp2job(db_util, tmp_file='mysql2hdfs.tpl')
execute_shell(db_util)
完整在下面源代码仓库
四、演示
把相关代码放置在datax的job目录下,创建gmall存放模板生成的任务。
昨天执行过,这里不再执行,我们去web端查看hdfs同步文件,如下图所示:
五、待优化
- 我们的数据库名、存储位置写死了,可以改为传参。
- mysqlreader中column把*改为具体的表中列名。
- 空值校验,在获取表名、列名等地方进行空值校验,避免生成无意义的文件或者路径。
- 执行效率:如果硬件配置可以,可以考虑并行执行,每个任务可独立运行,提高效率。
结语
如果小伙伴什么问题或者指教,欢迎交流。
❓QQ:806797785
⭐️源代码仓库地址:https://gitee.com/gaogzhen/smart-utilities
参考链接:
[1]数仓视频-模拟数据生成[CP/OL].2023-12-12.p98.
[2]python3连接MySQL的工具类
[3]python-mysql数据库连接工具类封装
[4]datax 官方文档
[5]python执行shell脚本的几种方法