数据抽取平台pydatax介绍--实现和项目使用

  数据抽取平台pydatax实现过程中,有2个关键点:

    1、是否能在python3中调用执行datax任务,自己测试了一下可以,代码如下:
    这个str1就是配置的shell文件     

try:
   result = os.popen(str1).read()
except Exception as e:
  print(e)

  2、是否能获取datax执行后的信息:用来捕获执行的情况和错误信息

         上面执行后的result就包含了datax的执行信息,对信息进行筛选,就可以获得

   pydatax的表设计 

        在上面的2个关键点解决后,其他问题就比较简单,设计相关的表:

datax_config   datax抽取表的模板配置(源表名,目标表名,模板id,抽取的字段,抽取条件(增量,全量,特殊),抽取时间,执行顺序等)

datax_config_repair   datax的出错修复表,结构和datax_config一样,用于datax出错后,修复数据用

datax_etl_error    datax的etl的报错信息(非异常字符的报错)

datax_json   datax的模板id配置(全量和增量2个模板文件名)

datax_log   datax运行抽取表的执行信息(是否执行完成,抽取行数,速度,读出行数,流量等)

datax_row_error  datax执行中,字段有异常字符的报错信息

 pydatax在项目中使用

       项目1: 直接配置datax的模板json,从oracle 11g抽取到postgresql中,

                     因postgresql中会对"0x"这些异常字符报错,如oracle中字段有这样字段,必须在抽取字段使用:

                    使用 replace(name,chr(0),'\'\'') as name 来代替 以前的字段 name

       项目2: 客户有9个分公司,用的ERP有9套,有9个库,不同版本,抽取的同一个表字段长度有不一样,字段可能有多有少,客户ERP核心分公司ERP几个月后有大版本升级。

                     因项目2中:数据仓库使用的GreePlum,datax的驱动用的是gpdbwriter-v1.0.4-hashdata.jar,该驱动自动删除"0x"非法字符,就不存在该错误

                     不可能写9个抽取json模板,再抽取,只能原有json模板上修改

                     字段长度不同: 取9个库的最大值,作为目标表字段的字段长度

                     字段个数不同:   取其一个核心分公司库表为基础建表,其他8个库表,如果有就保留,没有就字段数据为NULL,每次执行查询取出8个库的字段:                         

# 获取分公司库该表的字段,如对比核心库表字段的缺失,使用null as 字段替换,如果多余则废弃,
# 字段对比以核心库为标准
def get_org_src_columns(src_columns,org_name,tab_name):
    src_columns = src_columns
    # 分公司字段
    org_cols = get_org_cols(org_name,tab_name)
    lst = src_columns.split(",")
    cols1 = (org_cols + ',')
    src_columns1 = (src_columns + ',')
    for i in lst:
      str1 =i.strip() + ','  # 去掉空格,对比使用,字段名+',',这样避免有重复前缀的字段名,导致误判
      if (cols1.find(str(','+str1)) == -1):
        src_columns1 = src_columns1.replace(str(','+str1), ',NULL as ' + str1)
    return src_columns1.rstrip(',')

# 获取分公司库的表的字段用','合并成一个字符串
def get_org_cols(org_name,tab_name):
    conn = ora_conn()
    cur = conn.cursor()
    cols=""
    sql="select WM_CONCAT(COLUMN_NAME) cols from (SELECT  COLUMN_NAME FROM  all_tab_columns WHERE OWNER=upper('"+org_name+"') " \
        "and  table_name =upper('"+tab_name+"') order by COLUMN_ID asc) t ";
    cur.execute(sql)
    datas = cur.fetchall()
    for row in datas:
      cols= str(row[0])
    return cols;

       修改json模板支持同时抽取9个数据库,修改的9个库同时抽取oracle数据到greeplum全量json模板,见下载文件的:oracle_gp_table_df_job.json:  

    src_table_columns=row.get("src_table_column")
    # 其他8家分公司库
    src_table_columns_fz=get_org_src_columns(src_table_columns,"FZ",src_table_name)
    src_table_columns_jcg=get_org_src_columns(src_table_columns,"JCG",src_table_name)
    src_table_columns_ks=get_org_src_columns(src_table_columns,"KS",src_table_name)
    src_table_columns_qzdf=get_org_src_columns(src_table_columns,"QZDF",src_table_name)
    src_table_columns_sdsht=get_org_src_columns(src_table_columns,"SDSHT",src_table_name)
    src_table_columns_wfjx=get_org_src_columns(src_table_columns,"WFJX",src_table_name)
    src_table_columns_wst=get_org_src_columns(src_table_columns,"WST",src_table_name)
    src_table_columns_std=get_org_src_columns(src_table_columns,"STD",src_table_name)


    str1 = "/usr/bin/python /opt/module/datax/bin/datax.py /opt/module/datax/job/json/"+etl_mode+" -p  \" " \
           " -Dsrc_table_name='"+src_table_name+"' " \
           " -Ddes_table_name='"+des_table_name+"' " \
           " -Dsplit_pk_field='"+split_pk_field+"'   " \
           " -Drelation='"+relation+"' " \
           " -Dcondition='"+dcondition+"' " \
           " -Dsrc_table_columns='"+src_table_columns+"' " \
           " -Dsrc_table_columns_fz='" + src_table_columns_fz + "' " \
           " -Dsrc_table_columns_jcg='" + src_table_columns_jcg + "' " \
           " -Dsrc_table_columns_ks='" + src_table_columns_ks + "' " \
           " -Dsrc_table_columns_qzdf='" + src_table_columns_qzdf + "' " \
           " -Dsrc_table_columns_sdsht='" + src_table_columns_sdsht + "' " \
           " -Dsrc_table_columns_wfjx='" + src_table_columns_wfjx + "' " \
           " -Dsrc_table_columns_wst='" + src_table_columns_wst + "' " \
           " -Dsrc_table_columns_std='" + src_table_columns_std + "' " \
           " -Ddes_table_columns='"+des_table_columns+"' \" "

      这样修改后,就可以同时抽取9个库的数据,同时配置时,只需要配置核心库的相关字段等数据即可!  

   执行的datax_log表数据:

    说明: 1,该平台没有可视化页面的后台管理系统,如果加上后台管理系统,就更完美,但目前是足够使用的!

      DATAX的GreePlum驱动plugin下载:  

                  https://files.cnblogs.com/files/zping/gpdbwriter.rar?t=1708999240&download=true

     pydatax源码下载地址:

                 https://files.cnblogs.com/files/zping/pydatax.rar?t=1708755764&download=true

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/414289.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

git忽略某些文件(夹)更改方法

概述 在项目中,常有需要忽略的文件、文件夹提交到代码仓库中,在此做个笔录。 一、在项目根目录内新建文本文件,并重命名为.gitignore,该文件语法如下 # 以#开始的行,被视为注释. # 忽略掉所有文件名是 a.txt的文件. a.txt # 忽略所有生成的 java文件, *.java # a.j…

数据结构:栈和队列与栈实现队列(C语言版)

目录 前言 1.栈 1.1 栈的概念及结构 1.2 栈的底层数据结构选择 1.2 数据结构设计代码(栈的实现) 1.3 接口函数实现代码 (1)初始化栈 (2)销毁栈 (3)压栈 (4&…

【MQ05】异常消息处理

异常消息处理 上节课我们已经学习到了消息的持久化和确认相关的内容。但是,光有这些还不行,如果我们的消费者出现问题了,无法确认,或者直接报错产生异常了,这些消息要怎么处理呢?直接丢弃?这就是…

ImportError: urllib3 v2.0 only supports OpenSSL 1.1.1+, currently the ‘ssl‘报错解决

安装labelme出错了 根据爆栈的提示信息,我在cmd运行以下命令之后一切正常了,解决了问题! pip install urllib31.26.6参考网址:ImportError: urllib3 v2.0 only supports OpenSSL 1.1.1, currently the ‘ssl’ module is compile…

常见的socket函数封装和多进程和多线程实现服务器并发

常见的socket函数封装和多进程和多线程实现服务器并发 1.常见的socket函数封装2.多进程和多线程实现服务器的并发2.1多进程服务器2.2多线程服务器2.3运行效果 1.常见的socket函数封装 accept函数或者read函数是阻塞函数,会被信号打断,我们不能让它停止&a…

docker容器技术(3)

Docker技术编排 概述: docker建议我们每一个容器中只运行一个服务,因为docker容器本身占用资源极少,所以最好是将每个服务单独的分割开来但是这样我们又面临了一个问题? 如果我需要同时部署好多个服务,难道要每个服务单独写Dockerfile然后在构建镜像,…

【XR806开发板试用】socket客户端与虚拟机服务器通信交互测试以及终端交互

XR806 客户端准备工作。 1、连接wifi 2、创建socket连接服务器。 3、创建终端接收数据线程。 wifi_connect.c #include <stdio.h> #include <string.h> #include "wifi_device.h" #include "wifi_hotspot.h" #include "kernel/os/os.h…

桥接模式(Bridge Pattern) C++

上一节&#xff1a;适配器模式&#xff08;Adapter Pattern&#xff09; C 文章目录 0.理论1.组件2.使用场景 1.实践 0.理论 桥接模式&#xff08;Bridge Pattern&#xff09;是一种结构型设计模式&#xff0c;它的核心思想是将抽象部分与其实现部分分离&#xff0c;使它们可…

区块链智能合约开发

一.区块链的回顾 1.区块链 区块链实质上是一个去中心化、分布式的可进行交易的数据库或账本 特征: 去中心化&#xff1a;简单来说&#xff0c;在网络上一个或多个服务器瘫痪的情况下&#xff0c;应用或服务仍然能够持续地运行&#xff0c;这就是去中心化。服务和应用部署在…

死区过滤器Deadband和DeadZone区别(应用介绍)

死区过滤器的算法和详细介绍专栏也有介绍,这里我们主要对这两个模块的区别和应用场景进行详细介绍 1、死区过滤器 https://rxxw-control.blog.csdn.net/article/details/128521007https://rxxw-control.blog.csdn.net/article/details/128521007 1、Deadband和DeadZone区别…

kafka学习笔记三

目录 第二篇 外部系统集成 第三篇 生产调优手册 第1章 kafka硬件配置选择 第2章 生产者调优 2.1 生产者核心参数配置 2.2 生产者如何提高吞吐量 2.3 数据可靠性 2.4 数据去重 2.5 数据有序 2.6 数据乱序 第3章 Kafka Broker调优 3.1 Broker核心参数配置 3.2 其他 …

k8s service的概念以及创建方法

Service 的功能&#xff1a; Service主要用于提供网络服务&#xff0c;通过Service的定义&#xff0c;能够为客户端应用提供稳定的访问地址&#xff08;域名或IP地址&#xff09;和负载均衡功能&#xff0c;以及屏蔽后端Endpoint的变化&#xff0c;是K8s实现微服务的核心资源。…

【README 小技巧】 展示gitee中开源项目start

【README 小技巧】 展示gitee中开源项目start <a target"_blank" hrefhttps://gitee.com/wujiawei1207537021/wu-framework-parent><img srchttps://gitee.com/wujiawei1207537021/wu-framework-parent/badge/star.svg altGitee star/></a>

使用ffmpeg压缩视频

一、到ffmpeg官网下载文件包&#xff1a; Download FFmpeg 下载后找到 bin 下的3个exe文件&#xff0c;复制到自己本机的某个目录下, 如&#xff1a; 二、使用命令行压缩&#xff1a; ffmpeg -i input.mp4 -c:v libx265 -crf 28 -y output.mp4 这条命令使用 FFmpeg 工具对输…

QA 的未来:使用生成式 AI 进行 API 测试

QA 团队面临着比以往任何时候都更大的满足软件质量和发布速度期望的压力。继续阅读&#xff0c;了解 GenAI 如何改善开发人员和测试人员的工作体验&#xff0c;同时最大限度地提高团队生产力并提高软件质量。 软件质量差的后果正在日益严重&#xff0c;许多组织因功能缺陷和安…

LACP——链路聚合控制协议

LACP——链路聚合控制协议 什么是LACP&#xff1f; LACP&#xff08;Link Aggregation Control Protocol&#xff0c;链路聚合控制协议&#xff09;是一种基于IEEE802.3ad标准的实现链路动态聚合与解聚合的协议&#xff0c;它是链路聚合中常用的一种协议。 链路聚合组中启用了…

2024 值得推荐的免费开源 WAF

WAF 是 Web Application Firewall 的缩写&#xff0c;也被称为 Web 应用防火墙。区别于传统防火墙&#xff0c;WAF 工作在应用层&#xff0c;对基于 HTTP/HTTPS 协议的 Web 系统有着更好的防护效果&#xff0c;使其免于受到黑客的攻击。 开源 WAF 和商用 WAF&#xff08;奇安信…

Linux学习笔记11——用户组添加删除

Linux 是多用户多任务操作系统&#xff0c;换句话说&#xff0c;Linux 系统支持多个用户在同一时间内登陆&#xff0c;不同用户可以执行不同的任务&#xff0c;并且互不影响。 例如&#xff0c;某台 Linux 服务器上有 4 个用户&#xff0c;分别是 root、www、ftp 和 mysql&…

基于JAVA+Springboot+Thymeleaf前后端分离项目:社区疫情防控系统设计与实现

博主介绍&#xff1a;黄菊华老师《Vue.js入门与商城开发实战》《微信小程序商城开发》图书作者&#xff0c;CSDN博客专家&#xff0c;在线教育专家&#xff0c;CSDN钻石讲师&#xff1b;专注大学生毕业设计教育和辅导。 所有项目都配有从入门到精通的基础知识视频课程&#xff…

广和通发布基于MediaTek T300平台的RedCap模组FM330系列及解决方案

世界移动通信大会MWC 2024期间&#xff0c;广和通发布基于MediaTek T300平台的RedCap模组FM330系列&#xff0c;加速5G-A繁荣发展。FM330系列及其解决方案采用全球先进RedCap方案&#xff0c;满足移动宽带和工业互联对高能效的需求。 广和通FM330系列采用全球首款6nm制程且集成…