POSTGRESQL中ETL、fdw的平行替换

POSTGRESQL中ETL、fdw的平行替换

01、简介

“ 在我前两次的文章中,说到postgresql对于python的支持,其实很多功能也就可以封装进入的postgresql数据库中去。比如fdw、etl等,本文将以此为叙述点,进行演示展示”

d9de783ffe5e4224852626e3f2388609_0.png

在postgresql数据库中fdw的支持,在创建和使用上都不上太方便,特别是fdw在用表级别关联的时候,性能会大大折扣,因为fdw的数据并不会落地到本地​。所以我们可以利用postgresql对于python的支持,自行封装一个库对库的调度工具,将远端数据进行落地​后再次使用。对于使用的便利性,读者可自行​对比。

02、postgresql16.1的安装

安装依赖

yum install -y bison flex readline-devel zlib-devel zlib zlib-devel gcc  gcc-c++ openssl-devel python3  python3-devel libicu-devel ncurses-devel sqlite-devel tk-devel gcc make

添加用户

useradd postgres 
vim /etc/sudo

在101行以下添加以下内容


postgres ALL=(ALL)     NOPASSWD: ALL

进入官网找到链接,这里使用源码安装。

wget https://ftp.postgresql.org/pub/source/v16.1/postgresql-16.1.tar.gz

解压并进入解压目录


 mv postgresql-16.1.tar.gz /home/postgres
 su - postgres 
 tar -zxf postgresql-16.1.tar.gz
 cd postgresql-16.1

这里编译python支持还是很重要。–with-python 自行构建plpython3u插件


./configure --prefix=/home/postgres/pg --with-openssl  --with-python

make && make install

编辑环境变量


cd 
vim .bash_profile

加入以下环境变量

export PATH=/home/postgres/pg/bin:$PATH 
export PGDATA=/home/postgres/pg/data 

加载环境变量


source ~/.bash_profile

初始化数据库


initdb -D $PGDATA -U postgres -W 
(输入超级用户密码两次)
pg_ctl start 
pg_ctl status

进入数据库创建拓展


CREATE EXTENSION plpython3u CASCADE;

02、创建支持跨库访问的函数

首先下载python链接数据库所需module

postgres=# \! pip3 install -i https://mirrors.aliyun.com/pypi/simple/ cx_Oracle pyodbc pymysql --user 
Looking in indexes: https://mirrors.aliyun.com/pypi/simple/
Requirement already satisfied: cx_Oracle in ./.local/lib/python3.6/site-packages (8.3.0)
Collecting pyodbc
  Downloading https://mirrors.aliyun.com/pypi/packages/27/5c/5e472d714dea2a634bd79df6b8ace55737a9f50c8fbb3b15521fceda4694/pyodbc-4.0.39-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (330 kB)
     |████████████████████████████████| 330 kB 2.8 MB/s            
Collecting pymysql
  Downloading https://mirrors.aliyun.com/pypi/packages/4f/52/a115fe175028b058df353c5a3d5290b71514a83f67078a6482cff24d6137/PyMySQL-1.0.2-py3-none-any.whl (43 kB)
     |████████████████████████████████| 43 kB 2.4 MB/s             
Installing collected packages: pyodbc, pymysql
Successfully installed pymysql-1.0.2 pyodbc-4.0.39

在链接远程Oracle数据库,需要下载指定的客户端,本文使用的是oracle 19C

wget https://download.oracle.com/otn_software/linux/instantclient/1921000/oracle-instantclient19.21-basic-19.21.0.0.0-1.x86_64.rpm
sudo rpm -ivh oracle-instantclient19.21-basic-19.21.0.0.0-1.x86_64.rpm

编辑环境变量

vim /etc/profile

配置以下环境变量值

export LD_LIBRARY_PATH=/usr/lib/oracle/19.21/client64/lib:$LD_LIBRARY_PATH

加载环境变量

source /etc/profile

在postgresql数据库中创建具有跨库链接mysql\oracle\sqlserver功能的function。

CREATE OR REPLACE FUNCTION fdw_db(db_type varchar(100),host VARCHAR(100),port integer, username VARCHAR(100), password VARCHAR(100), db_name VARCHAR(100),tablename varchar(100))
RETURNS text AS $$

import cx_Oracle
import pyodbc
import pymysql

def read_data_from_database(db_type, host, port, username, password, db_name, table_name):
    result_values = []  # Initialize as an empty list

    # 读取Oracle数据库中指定表数据的函数
    if db_type.lower() == 'oracle':
        connection_string = f"{username}/{password}@{host}:{port}/{db_name}"
        connection = cx_Oracle.connect(connection_string)
        cursor = connection.cursor()
        cursor.execute(f'SELECT * FROM {table_name}')
        result = cursor.fetchall()
        cursor.close()
        connection.close()

        # 将结果转换为支持INSERT INTO的VALUES语句
        for row in result:
            values_str = ', '.join([f"'{value}'" if isinstance(value, str) else str(value) for value in row])
            result_values.append(f'({values_str})')

    # 读取SQL Server数据库中指定表数据的函数
    elif db_type.lower() == 'sqlserver':
        connection = pyodbc.connect(f"DRIVER={{SQL Server}};SERVER={host};port={port};DATABASE={db_name};UID={username};PWD={password}")
        cursor = connection.cursor()
        cursor.execute(f'SELECT * FROM {table_name}')
        result = cursor.fetchall()
        cursor.close()
        connection.close()

        # 将结果转换为支持INSERT INTO的VALUES语句
        for row in result:
            values_str = ', '.join([f"'{value}'" if isinstance(value, str) else str(value) for value in row])
            result_values.append(f'({values_str})')

    # 读取MySQL数据库中指定表数据的函数
    elif db_type.lower() == 'mysql':
        connection = pymysql.connect(host=host, user=username, password=password, database=db_name, port=port)
        cursor = connection.cursor()
        cursor.execute(f'SELECT * FROM {table_name}')
        result = cursor.fetchall()
        cursor.close()
        connection.close()

        # 将结果转换为支持INSERT INTO的VALUES语句
        for row in result:
            values_str = ', '.join([f"'{value}'" if isinstance(value, str) else str(value) for value in row])
            result_values.append(f'({values_str})')

    else:
        raise ValueError("Unsupported database type. Supported types: 'oracle', 'sqlserver', 'mysql'")

    # 返回拼接的VALUES子句
    return ', '.join(result_values)

insert_values = read_data_from_database(db_type, host, port, username, password, db_name, tablename)
return insert_values


$$ LANGUAGE plpython3u;

以Oracle作为测试 在Oracle 和PG中均创建测试表conn_fdw
postgresql

-- 创建表 conn_fdw
CREATE TABLE conn_fdw (
    id integer,
    name VARCHAR(50),
    age integer,
    city VARCHAR(50),
    salary integer
);

oracle中

-- 创建表 conn_fdw
CREATE TABLE conn_fdw (
    id NUMBER,
    name VARCHAR2(50),
    age NUMBER,
    city VARCHAR2(50),
    salary NUMBER
);

Oracle中插入数据

-- 插入20行数据
INSERT INTO conn_fdw VALUES (1, 'John', 30, 'New York', 50000);
INSERT INTO conn_fdw VALUES (2, 'Alice', 25, 'Los Angeles', 60000);
INSERT INTO conn_fdw VALUES (3, 'Bob', 35, 'Chicago', 70000);
INSERT INTO conn_fdw VALUES (4, 'Eva', 28, 'San Francisco', 55000);
INSERT INTO conn_fdw VALUES (5, 'Mike', 32, 'Seattle', 65000);
INSERT INTO conn_fdw VALUES (6, 'Sophia', 29, 'Boston', 75000);
INSERT INTO conn_fdw VALUES (7, 'David', 27, 'Denver', 52000);
INSERT INTO conn_fdw VALUES (8, 'Emily', 31, 'Austin', 68000);
INSERT INTO conn_fdw VALUES (9, 'Daniel', 26, 'Phoenix', 58000);
INSERT INTO conn_fdw VALUES (10, 'Olivia', 33, 'Houston', 72000);
INSERT INTO conn_fdw VALUES (11, 'Liam', 24, 'Portland', 49000);
INSERT INTO conn_fdw VALUES (12, 'Ava', 34, 'Atlanta', 71000);
INSERT INTO conn_fdw VALUES (13, 'Logan', 30, 'Miami', 62000);
INSERT INTO conn_fdw VALUES (14, 'Mia', 28, 'Dallas', 54000);
INSERT INTO conn_fdw VALUES (15, 'Jackson', 29, 'Minneapolis', 67000);
INSERT INTO conn_fdw VALUES (16, 'Sophie', 31, 'Detroit', 59000);
INSERT INTO conn_fdw VALUES (17, 'William', 27, 'Philadelphia', 70000);
INSERT INTO conn_fdw VALUES (18, 'Emma', 32, 'San Diego', 66000);
INSERT INTO conn_fdw VALUES (19, 'James', 26, 'Raleigh', 63000);
INSERT INTO conn_fdw VALUES (20, 'Avery', 35, 'Tampa', 71000);


此时再结合SQL语言进行处理远程连接传过来数据,再创建一个函数用于调用以上创建fdw_db

CREATE OR REPLACE FUNCTION inset_fdw_db(db_type varchar(100),host VARCHAR(100)
								  ,port integer, username VARCHAR(100), 
								  password VARCHAR(100), db_name VARCHAR(100),
								  tablename varchar(100),target_bale varchar(100))
RETURNS void AS $$
declare 
data_values text;
begin 
SELECT   fdw_db(db_type, host, port, username, password, db_name,tablename) into data_values;
 
EXECUTE 'insert into '||target_bale ||' values'||data_values;
end;

$$ LANGUAGE plpgsql;

进行调用

 SELECT   inset_fdw_db('oracle', '192.168.48.1', 1521, 'system', 'system', 'orcl', 'CONN_FDW','public.conn_fdw');
 

进入数据库中查看
此时数据已经落地

postgres=# select *  from CONN_FDW;
 id | name | age | city | salary 
----+------+-----+------+--------
(0 rows)

postgres=#  SELECT   inset_fdw_db('oracle', '192.168.48.1', 1521, 'system', 'system', 'orcl', 'CONN_FDW','public.conn_fdw');
 inset_fdw_db 
--------------
 
(1 row)


postgres=# select *  from CONN_FDW;
 id |  name   | age |     city      | salary 
----+---------+-----+---------------+--------
  1 | John    |  30 | New York      |  50000
  2 | Alice   |  25 | Los Angeles   |  60000
  3 | Bob     |  35 | Chicago       |  70000
  4 | Eva     |  28 | San Francisco |  55000
  5 | Mike    |  32 | Seattle       |  65000
  6 | Sophia  |  29 | Boston        |  75000
  7 | David   |  27 | Denver        |  52000
  8 | Emily   |  31 | Austin        |  68000
  9 | Daniel  |  26 | Phoenix       |  58000
 10 | Olivia  |  33 | Houston       |  72000
 11 | Liam    |  24 | Portland      |  49000
 12 | Ava     |  34 | Atlanta       |  71000
 13 | Logan   |  30 | Miami         |  62000
 14 | Mia     |  28 | Dallas        |  54000
 15 | Jackson |  29 | Minneapolis   |  67000
 16 | Sophie  |  31 | Detroit       |  59000
 17 | William |  27 | Philadelphia  |  70000
 18 | Emma    |  32 | San Diego     |  66000
 19 | James   |  26 | Raleigh       |  63000
 20 | Avery   |  35 | Tampa         |  71000
(20 rows)



总结

该方法不仅可以应用到数据库对数据库之间,也可以应到,数据库对文件路径下。在postgresql嵌入python代码 其实可以替换掉一些中间件的使用。可控性,定制性也会更强。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/318092.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

详解矩阵变换:伸缩,旋转,反射和投影

目录 一. 矩阵子空间 二. 矩阵变换 2.1 伸缩矩阵 2.2 旋转矩阵 2.3 反射矩阵 2.4 投影矩阵 2.5 小结 三. 矩阵变换与函数 3.1 原点 3.2 常数倍性质 3.3 加法性质 3.4 小结 四. 空间变换 五. 小结 一. 矩阵子空间 矩阵与向量相乘Ax可以看成子空间的变换。 零空间…

一文搞定,JMeter的三种参数化方式

1、Test Plan 中添加变量 可以在 Test Plan 中设置好添加变量,变量名可以在任意的位置使用,比如说在线程组中直接用${ 变量名 }方式引用,步骤如下: 1)设置变量名和变量值 2)添加线程组 3)添加…

[情商-11]:人际交流的心理架构与需求层次模型

目录 前言: 一、心理架构 1.1 个体生理层 1.2 个体心理层 1.3 点对点人际交流层 1.4 社会网络层 1.5 社会价值层 二、人的需求层次模型 2.1 需求(欲望)层次模型 2.2 基因与人需求之间的关系 2.3 个体生理需求 2.4 个体的心理需求…

MyBatis源码分析(六):数据源模块

1. 概述 本文,我们来分享 MyBatis 的数据源模块,对应 datasource 包。如下图所示: ​ 在 MyBatis源码分析(二):项目结构 中,简单介绍了这个模块如下: 数据源是实际开发中常用的组件…

5 微信小程序

功能开发 5 功能开发概要今日详细1.发布1.1 发布流程的问题1.2 组件:进度条1.3 修改data中的局部数据1.4 发布示例效果前端后端 1.5 闭包 2.获取前10条新闻(动态/心情,无需分页)3.复杂版4.文章详细页面 各位小伙伴想要博客相关资料…

Java--业务场景:SpringBoot 通过Redis进行IP封禁实现接口防刷

文章目录 前言具体实现步骤1. 定义自定义注解2. 编写拦截器类IpUrlLimitInterceptor3. 在WebConfig类中添加IpUrlLimitInterceptor4. 添加注解到接口上 测试效果参考文章 前言 在实际项目中,有些攻击者会使用自动化工具来频繁刷新接口,造成系统的瞬时吞…

一些前端学习过程的自测练习题

目录 页面设计部分 1 设计一个简单的学院网站首页; 2.按照图示要求完成简单的登录页面 3.完成如下网站设计 4.完成如下网站设计(练习页面布局) 5 利用下面素材,设计一个满足H5规范的网页(移动端页面练习&#xff…

leetcode刷题记录18(2023-08-29)【最短无序连续子数组(单调栈) | 合并二叉树(dfs) | 任务调度器(桶) | 回文子串(二维dp)】

581. 最短无序连续子数组 给你一个整数数组 nums ,你需要找出一个 连续子数组 ,如果对这个子数组进行升序排序,那么整个数组都会变为升序排序。 请你找出符合题意的 最短 子数组,并输出它的长度。 示例 1: 输入&am…

TensorRT模型优化模型部署(七)--Quantization量化(PTQ and QAT)(二)

系列文章目录 第一章 TensorRT优化部署(一)–TensorRT和ONNX基础 第二章 TensorRT优化部署(二)–剖析ONNX架构 第三章 TensorRT优化部署(三)–ONNX注册算子 第四章 TensorRT模型优化部署(四&am…

Java中finally和return的执行顺序

Java中finally和return的执行顺序 try...catch...finally1. finally语句在return语句执行之后return返回之前执行的2. finally块中的return语句会覆盖try块中的return返回3. 如果finally语句中没有return语句覆盖返回值,那么原来的返回值可能因为finally里的修改而改…

进程的状态

进程状态反映进程执行过程的变化。这些状态随着进程的执行和外界条件的变化而转换。在三态模型 中,进程状态分为三个基本状态,即就绪态,运行态,阻塞态。在五态模型中,进程分为新建态、就绪态,运行态&#x…

【书生·浦语】大模型实战营——第四课笔记

教程链接:https://github.com/InternLM/tutorial/blob/main/xtuner/README.md 视频链接:https://www.bilibili.com/video/BV1yK4y1B75J/?vd_source5d94ee72ede352cb2dfc19e4694f7622 本次视频的内容分为以下四部分: 目录 微调简介 微调会使…

【ArcGIS遇上Python】ArcGIS Python批量筛选多个shp中指定字段值的图斑(以土地利用数据为例)

文章目录 一、案例分析二、提取效果二、代码运行效果三、Python代码四、数据及代码下载一、案例分析 以土地利用数据为例,提取多个shp数据中的旱地。 二、提取效果 原始土地利用数据: 属性表: 提取的旱地:(以图层名称+地类名称命名)

数据结构——排序算法之快速排序

个人主页:日刷百题 系列专栏:〖C/C小游戏〗〖Linux〗〖数据结构〗 〖C语言〗 🌎欢迎各位→点赞👍收藏⭐️留言📝 ​ ​ 前言: 快速排序是Hoare于1962年提出的一种二叉树结构的交换排序方法。 基本思想&…

弟12章 1 网络编程

文章目录 网络协议概述 p164TCP协议与UDP协议的区别 p165 网络协议概述 p164 ipv4:十进制点分制 ipv6:十六进制冒号分隔 TCP协议与UDP协议的区别 p165 tcp协议的三次握手:

MySQL单表查询

显示所有职工的基本信息。 mysql8.0 [chap03]>select * from worker; 查询所有职工所属部门的部门号,不显示重复的部门号。 mysql8.0 [chap03]>select distinct(部门号) from worker; 求出所有职工的人数。 mysql8.0 [chap03]>select count(*) from …

山西电力市场日前价格预测【2024-01-14】

日前价格预测 预测说明: 如上图所示,预测明日(2024-01-14)山西电力市场全天平均日前电价为415.13元/MWh。其中,最高日前电价为851.84元/MWh,预计出现在18:15。最低日前电价为198.87元/MWh,预计…

04.neuvector进程策略生成与管控实现

原文链接,欢迎大家关注我的github 一、进程学习管控的实现方式 策略学习实现: 进程的学习与告警主要依据通过netlink socket实时获取进程启动和退出的事件: 1.创建netLink socket; 2.通过创建netlink的fd对进程的事件进行捕获与更新&#x…

“超人练习法”系列08:ZPD 理论

01 先认识一个靓仔 看过 Lev Vygotsky 这个人的书吗?他是一位熟练心理学家,对人们习得技能的方式非常感兴趣,但他 37 岁的时候就因肺炎英年早逝了。 他认为社会环境对学习有关键性的作用,认为社会因素与个人因素的整合促成了学习…

计算机网络 —— 数据链路层

数据链路层 3.1 数据链路层概述 数据链路层把网络层交下来的数据构成帧发送到链路上,以及把收到的帧数据取出并上交给网络层。链路层属于计算机网络的底层。数据链路层使用的信道主要由以下两种类型: 点对点通信。广播通信。 数据链路和帧 链路&…