C++和Python实现SQL Server数据库导出数据到S3并导入Redshift数据仓库

用C++实现高性能数据处理,Python实现操作Redshift导入数据文件。
在Visual Studio 2022中用C++和ODBC API导出SQL Server数据库中张表中的所有表的数据为CSV文件格式的数据流,用逗号作为分隔符,用双引号包裹每个数据,字符串类型的数据去掉前后的空格,数据中如果包含双引号,则将一个双引号替换为两个双引号,创建gzip压缩文件,输出数据流写入到gzip压缩文件包中的唯一的csv文件中,一张表对应一个gzip压缩文件。用异步编程的方法优化数据库表的读取和写入文件的过程,每个gzip压缩文件包中的csv文件的第一行都是用逗号分割的表头,对应数据库表的字段,程序需要异常处理和日志文件输出,当前路径下唯一的日志文件中包含运行时间戳、数据库名、schema名、表名、导出数据行数、成功或者失败的状态,日志文件不存在则创建。

以下是一个使用C++和ODBC API在Visual Studio 2022中实现将SQL Server数据库表数据导出为CSV文件并进行gzip压缩的示例代码。代码使用异步编程优化数据库读取和文件写入过程,并包含异常处理和日志记录功能。

#include <iostream>
#include <fstream>
#include <sstream>
#include <string>
#include <vector>
#include <future>
#include <sql.h>
#include <sqlext.h>
#include <zlib.h>
#include <ctime>
#include <iomanip>

// 日志记录函数
void logMessage(const std::string& message) {
    std::ofstream logFile("export_log.txt", std::ios::app);
    if (logFile.is_open()) {
        auto now = std::chrono::system_clock::now();
        std::time_t now_c = std::chrono::system_clock::to_time_t(now);
        std::tm* now_tm = std::localtime(&now_c);
        std::ostringstream oss;
        oss << std::put_time(now_tm, "%Y-%m-%d %H:%M:%S") << " " << message << std::endl;
        logFile << oss.str();
        logFile.close();
    }
}

// 处理字符串中的双引号
std::string escapeDoubleQuotes(const std::string& str) {
    std::string result = str;
    size_t pos = 0;
    while ((pos = result.find('"', pos))!= std::string::npos) {
        result.replace(pos, 1, 2, '"');
        pos += 2;
    }
    return result;
}

// 从数据库读取表数据
std::vector<std::vector<std::string>> readTableData(SQLHSTMT hstmt) {
    std::vector<std::vector<std::string>> data;
    SQLSMALLINT columnCount = 0;
    SQLNumResultCols(hstmt, &columnCount);

    std::vector<SQLCHAR*> columns(columnCount);
    std::vector<SQLINTEGER> lengths(columnCount);
    for (SQLSMALLINT i = 0; i < columnCount; ++i) {
        columns[i] = new SQLCHAR[SQL_MAX_MESSAGE_LENGTH];
        SQLBindCol(hstmt, i + 1, SQL_C_CHAR, columns[i], SQL_MAX_MESSAGE_LENGTH, &lengths[i]);
    }

    while (SQLFetch(hstmt) == SQL_SUCCESS) {
        std::vector<std::string> row;
        for (SQLSMALLINT i = 0; i < columnCount; ++i) {
            std::string value(reinterpret_cast<const char*>(columns[i]));
            value = escapeDoubleQuotes(value);
            row.push_back(value);
        }
        data.push_back(row);
    }

    for (SQLSMALLINT i = 0; i < columnCount; ++i) {
        delete[] columns[i];
    }

    return data;
}

// 将数据写入CSV文件
void writeToCSV(const std::vector<std::vector<std::string>>& data, const std::vector<std::string>& headers, const std::string& filename) {
    std::ofstream csvFile(filename);
    if (csvFile.is_open()) {
        // 写入表头
        for (size_t i = 0; i < headers.size(); ++i) {
            csvFile << '"' << headers[i] << '"';
            if (i < headers.size() - 1) csvFile << ',';
        }
        csvFile << std::endl;

        // 写入数据
        for (const auto& row : data) {
            for (size_t i = 0; i < row.size(); ++i) {
                csvFile << '"' << row[i] << '"';
                if (i < row.size() - 1) csvFile << ',';
            }
            csvFile << std::endl;
        }

        csvFile.close();
    } else {
        throw std::runtime_error("Failed to open CSV file for writing");
    }
}

// 压缩CSV文件为gzip
void compressCSV(const std::string& csvFilename, const std::string& gzipFilename) {
    std::ifstream csvFile(csvFilename, std::ios::binary);
    std::ofstream gzipFile(gzipFilename, std::ios::binary);
    if (csvFile.is_open() && gzipFile.is_open()) {
        gzFile gzOut = gzopen(gzipFilename.c_str(), "wb");
        if (gzOut) {
            char buffer[1024];
            while (csvFile.read(buffer, sizeof(buffer))) {
                gzwrite(gzOut, buffer, sizeof(buffer));
            }
            gzwrite(gzOut, buffer, csvFile.gcount());
            gzclose(gzOut);
        } else {
            throw std::runtime_error("Failed to open gzip file for writing");
        }
        csvFile.close();
        gzipFile.close();
        std::remove(csvFilename.c_str());
    } else {
        throw std::runtime_error("Failed to open files for compression");
    }
}

// 导出单个表
void exportTable(const std::string& server, const std::string& database, const std::string& schema, const std::string& table) {
    SQLHENV henv = nullptr;
    SQLHDBC hdbc = nullptr;
    SQLHSTMT hstmt = nullptr;

    try {
        SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &henv);
        SQLSetEnvAttr(henv, SQL_ATTR_ODBC_VERSION, (SQLPOINTER)SQL_OV_ODBC3, 0);
        SQLAllocHandle(SQL_HANDLE_DBC, henv, &hdbc);

        std::string connectionString = "DRIVER={ODBC Driver 17 for SQL Server};SERVER=" + server + ";DATABASE=" + database + ";UID=your_username;PWD=your_password";
        SQLRETURN ret = SQLDriverConnect(hdbc, nullptr, (SQLCHAR*)connectionString.c_str(), SQL_NTS, nullptr, 0, nullptr, SQL_DRIVER_NOPROMPT);
        if (ret!= SQL_SUCCESS && ret!= SQL_SUCCESS_WITH_INFO) {
            throw std::runtime_error("Failed to connect to database");
        }

        std::string query = "SELECT * FROM " + schema + "." + table;
        SQLAllocHandle(SQL_HANDLE_STMT, hdbc, &hstmt);
        ret = SQLExecDirect(hstmt, (SQLCHAR*)query.c_str(), SQL_NTS);
        if (ret!= SQL_SUCCESS && ret!= SQL_SUCCESS_WITH_INFO) {
            throw std::runtime_error("Failed to execute query");
        }

        std::vector<std::vector<std::string>> data = readTableData(hstmt);
        std::vector<std::string> headers;
        SQLSMALLINT columnCount = 0;
        SQLNumResultCols(hstmt, &columnCount);
        for (SQLSMALLINT i = 0; i < columnCount; ++i) {
            SQLCHAR columnName[SQL_MAX_COLUMN_NAME_LEN];
            SQLSMALLINT nameLen;
            SQLDescribeCol(hstmt, i + 1, columnName, SQL_MAX_COLUMN_NAME_LEN, &nameLen, nullptr, nullptr, nullptr, nullptr);
            headers.push_back(reinterpret_cast<const char*>(columnName));
        }

        std::string csvFilename = table + ".csv";
        writeToCSV(data, headers, csvFilename);
        std::string gzipFilename = table + ".gz";
        compressCSV(csvFilename, gzipFilename);

        std::ostringstream oss;
        oss << "Database: " << database << ", Schema: " << schema << ", Table: " << table << ", Rows Exported: " << data.size() << ", Status: Success";
        logMessage(oss.str());
    } catch (const std::exception& e) {
        std::ostringstream oss;
        oss << "Database: " << database << ", Schema: " << schema << ", Table: " << table << ", Status: Failed, Error: " << e.what();
        logMessage(oss.str());
    } finally {
        if (hstmt) SQLFreeHandle(SQL_HANDLE_STMT, hstmt);
        if (hdbc) SQLDisconnect(hdbc); SQLFreeHandle(SQL_HANDLE_DBC, hdbc);
        if (henv) SQLFreeHandle(SQL_HANDLE_ENV, henv);
    }
}

int main() {
    std::string server = "<实例名>";
    std::string database = "<数据库名>";
    std::string schema = "<Schema名>";
    std::vector<std::string> tables = {"<表名1>", "<表名2>", "<表名3>"};

    std::vector<std::future<void>> futures;
    for (const auto& table : tables) {
        futures.push_back(std::async(std::launch::async, exportTable, server, database, schema, table));
    }

    for (auto& fut : futures) {
        fut.get();
    }

    return 0;
}

代码说明:

日志记录: logMessage 函数用于记录操作日志,包括时间戳、数据库名、schema名、表名、导出数据行数和操作状态。
字符串处理: escapeDoubleQuotes 函数用于处理字符串中的双引号,将其替换为两个双引号。
数据库读取: readTableData 函数使用ODBC API从数据库中读取表数据,并将其存储在二维向量中。
CSV写入: writeToCSV 函数将数据写入CSV文件,包括表头和数据行,并用双引号包裹每个数据,使用逗号作为分隔符。
文件压缩: compressCSV 函数将生成的CSV文件压缩为gzip格式,并删除原始CSV文件。
表导出: exportTable 函数负责连接数据库、执行查询、读取数据、写入CSV文件并压缩。
主函数: main 函数定义了数据库服务器、数据库名、schema名和表名,并使用异步任务并行导出每个表的数据。

用Python删除当前目录下所有功能扩展名为gz文件,接着运行export_sqlserver.exe程序,输出该程序的输出内容并等待它运行完成,然后连接SQL Server数据库和Amazon Redshift数据仓库,从数据库中获取所有表和它们的字段名,然后在Redshift中创建字段名全部相同的同名表,字段长度全部为最长的varchar类型,如果表已经存在则不创建表,自动上传当前目录下所有功能扩展名为gz文件到S3,默认覆盖同名的文件,然后使用COPY INTO将S3上包含csv文件的gz压缩包导入对应创建的Redshift表中,文件数据的第一行是表头,导入所有上传的文件到Redshift表,程序需要异常处理和日志文件输出,当前路径下唯一的日志文件中包含运行时间戳、数据库名、schema名、表名、导入数据行数、成功或者失败的状态,日志文件不存在则创建。

import os
import subprocess
import pyodbc
import redshift_connector
import boto3
import logging
from datetime import datetime


# 配置日志记录
logging.basicConfig(filename='operation_log.log', level=logging.INFO,
                    format='%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')


def delete_gz_files():
    try:
        for file in os.listdir('.'):
            if file.endswith('.gz'):
                os.remove(file)
        logging.info('所有.gz文件已删除')
    except Exception as e:
        logging.error(f'删除.gz文件时出错: {e}')


def run_export_sqlserver():
    try:
        result = subprocess.run(['export_sqlserver.exe'], capture_output=True, text=True)
        print(result.stdout)
        logging.info('export_sqlserver.exe运行成功')
    except Exception as e:
        logging.error(f'运行export_sqlserver.exe时出错: {e}')


def create_redshift_tables():
    # SQL Server 连接配置
    sqlserver_conn_str = 'DRIVER={ODBC Driver 17 for SQL Server};SERVER=your_sqlserver_server;DATABASE=your_database;UID=your_username;PWD=your_password'
    try:
        sqlserver_conn = pyodbc.connect(sqlserver_conn_str)
        sqlserver_cursor = sqlserver_conn.cursor()

        # Redshift 连接配置
        redshift_conn = redshift_connector.connect(
            host='your_redshift_host',
            database='your_redshift_database',
            user='your_redshift_user',
            password='your_redshift_password',
            port=5439
        )
        redshift_cursor = redshift_conn.cursor()

        sqlserver_cursor.execute("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE'")
        tables = sqlserver_cursor.fetchall()

        for table in tables:
            table_name = table[0]
            sqlserver_cursor.execute(f"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table_name}'")
            columns = sqlserver_cursor.fetchall()
            column_definitions = ', '.join([f"{column[0]} VARCHAR(MAX)" for column in columns])

            try:
                redshift_cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} ({column_definitions})")
                redshift_conn.commit()
                logging.info(f'在Redshift中成功创建表 {table_name}')
            except Exception as e:
                logging.error(f'在Redshift中创建表 {table_name} 时出错: {e}')

        sqlserver_conn.close()
        redshift_conn.close()
    except Exception as e:
        logging.error(f'连接数据库或创建表时出错: {e}')


def upload_gz_files_to_s3():
    s3 = boto3.client('s3')
    bucket_name = 'your_bucket_name'
    try:
        for file in os.listdir('.'):
            if file.endswith('.gz'):
                s3.upload_file(file, bucket_name, file)
                logging.info(f'成功上传文件 {file} 到S3')
    except Exception as e:
        logging.error(f'上传文件到S3时出错: {e}')


def copy_data_to_redshift():
    redshift_conn = redshift_connector.connect(
        host='your_redshift_host',
        database='your_redshift_database',
        user='your_redshift_user',
        password='your_redshift_password',
        port=5439
    )
    redshift_cursor = redshift_conn.cursor()
    bucket_name = 'your_bucket_name'
    try:
        for file in os.listdir('.'):
            if file.endswith('.gz') and file.endswith('.csv.gz'):
                table_name = file.split('.')[0]
                s3_path = f's3://{bucket_name}/{file}'
                sql = f"COPY {table_name} FROM '{s3_path}' IAM_ROLE 'your_iam_role' CSV HEADER"
                try:
                    redshift_cursor.execute(sql)
                    redshift_conn.commit()
                    row_count = redshift_cursor.rowcount
                    logging.info(f'成功将数据导入表 {table_name},导入行数: {row_count}')
                except Exception as e:
                    logging.error(f'将数据导入表 {table_name} 时出错: {e}')
    except Exception as e:
        logging.error(f'连接Redshift或导入数据时出错: {e}')
    finally:
        redshift_conn.close()


if __name__ == "__main__":
    delete_gz_files()
    run_export_sqlserver()
    create_redshift_tables()
    upload_gz_files_to_s3()
    copy_data_to_redshift()

代码说明:

日志记录:使用 logging 模块配置日志记录,记录操作的时间戳和操作信息到 operation_log.log 文件。
删除.gz文件: delete_gz_files 函数删除当前目录下所有扩展名为 .gz 的文件。
运行export_sqlserver.exe: run_export_sqlserver 函数运行 export_sqlserver.exe 程序并输出其内容。
创建Redshift表: create_redshift_tables 函数连接SQL Server和Redshift数据库,获取SQL Server中所有表和字段名,在Redshift中创建同名表,字段类型为 VARCHAR(MAX) 。
上传.gz文件到S3: upload_gz_files_to_s3 函数上传当前目录下所有扩展名为 .gz 的文件到S3。
将数据从S3导入Redshift: copy_data_to_redshift 函数使用 COPY INTO 语句将S3上的CSV压缩包数据导入对应的Redshift表中。

请根据实际的数据库配置、S3桶名和IAM角色等信息修改代码中的相关参数。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/962108.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Python 梯度下降法(二):RMSProp Optimize

文章目录 Python 梯度下降法&#xff08;二&#xff09;&#xff1a;RMSProp Optimize一、数学原理1.1 介绍1.2 公式 二、代码实现2.1 函数代码2.2 总代码 三、代码优化3.1 存在问题3.2 收敛判断3.3 函数代码3.4 总代码 四、优缺点4.1 优点4.2 缺点 Python 梯度下降法&#xff…

excel如何查找一个表的数据在另外一个表是否存在

比如“Sheet1”有“张三”、“李四”“王五”三个人的数据&#xff0c;“Sheet2”只有“张三”、“李四”的数据。我们通过修改“Sheet1”的“民族”或者其他空的列&#xff0c;修改为“Sheet2”的某一列。这样修改后筛选这个修改的列为空的或者为出错的&#xff0c;就能找到两…

2024年数据记录

笔者注册时间超过98.06%的用户 CSDN 原力是衡量一个用户在 CSDN 的贡献和影响力的系统&#xff0c;笔者原力值超过99.99%的用户 其他年度数据

7层还是4层?网络模型又为什么要分层?

~犬&#x1f4f0;余~ “我欲贱而贵&#xff0c;愚而智&#xff0c;贫而富&#xff0c;可乎&#xff1f; 曰&#xff1a;其唯学乎” 一、为什么要分层 \quad 网络通信的复杂性促使我们需要一种分层的方法来理解和管理网络。就像建筑一样&#xff0c;我们不会把所有功能都混在一起…

JxBrowser 8.2.2 版本发布啦!

JxBrowser 8.2.2 版本发布啦&#xff01; • 已更新 #Chromium 至更新版本 • 实施了多项质量改进 &#x1f517; 点击此处了解更多详情。 &#x1f193; 获取 30 天免费试用。

论文阅读(十五):DNA甲基化水平分析的潜变量模型

1.论文链接&#xff1a;Latent Variable Models for Analyzing DNA Methylation 摘要&#xff1a; 脱氧核糖核酸&#xff08;DNA&#xff09;甲基化与细胞分化密切相关。例如&#xff0c;已经观察到肿瘤细胞中的DNA甲基化编码关于肿瘤的表型信息。因此&#xff0c;通过研究DNA…

【综合决策模型】考虑生命周期评估LCA 与多目标优化MOO的综合决策模型MOOLCA

目录 1. 概念和目的1.1 生命周期评估 (LCA, Life Cycle Assessment)1.2 多目标优化 (MOO, Multi-Objective Optimization)1.3 MOOLCA 的目标2. MOOLCA 的组成2.1 生命周期评估模块2.2 优化模块2.3 决策支持模块参考Life Cycle Assessment with Multi-Objective Optimization (M…

系统思考—蝴蝶效应

“个体行为的微小差异&#xff0c;可能在系统中引发巨大且不可预测的结果。” — 诺贝尔经济学得主托马斯谢林 我们常说&#xff0c;小变动带来大影响&#xff0c;这种现象&#xff0c;在复杂系统理论中被称为“蝴蝶效应”&#xff1a;即使极小的变化&#xff0c;也能在动态系…

设计模式Python版 适配器模式

文章目录 前言一、适配器模式二、适配器模式实现三、适配器模式在Django中的应用 前言 GOF设计模式分三大类&#xff1a; 创建型模式&#xff1a;关注对象的创建过程&#xff0c;包括单例模式、简单工厂模式、工厂方法模式、抽象工厂模式、原型模式和建造者模式。结构型模式&…

科研绘图系列:R语言绘制散点图(scatter plot)

禁止商业或二改转载,仅供自学使用,侵权必究,如需截取部分内容请后台联系作者! 文章目录 介绍加载R包数据下载导入数据画图保存图片系统信息参考介绍 科研绘图系列:R语言绘制散点图(scatter plot) 加载R包 library(tidyverse) library(ggthemes) library(ggpubr) libr…

基于单片机的超声波液位检测系统(论文+源码)

1总体设计 本课题为基于单片机的超声波液位检测系统的设计&#xff0c;系统的结构框图如图2.1所示。其中包括了按键模块&#xff0c;温度检测模块&#xff0c;超声波液位检测模块&#xff0c;显示模块&#xff0c;蜂鸣器等器件设备。其中&#xff0c;采用STC89C52单片机作为主控…

P1044 [NOIP2003 普及组] 栈 C语言

P1044 [NOIP2003 普及组] 栈 - 洛谷 | 计算机科学教育新生态 题目背景 栈是计算机中经典的数据结构&#xff0c;简单的说&#xff0c;栈就是限制在一端进行插入删除操作的线性表。 栈有两种最重要的操作&#xff0c;即 pop&#xff08;从栈顶弹出一个元素&#xff09;和 pus…

基础项目实战——学生管理系统(c++)

目录 前言一、功能菜单界面二、类与结构体的实现三、录入学生信息四、删除学生信息五、更改学生信息六、查找学生信息七、统计学生人数八、保存学生信息九、读取学生信息十、打印所有学生信息十一、退出系统十二、文件拆分结语 前言 这一期我们来一起学习我们在大学做过的课程…

OpenEuler学习笔记(十七):OpenEuler搭建Redis高可用生产环境

在OpenEuler上搭建Redis高可用生产环境&#xff0c;通常可以采用Redis Sentinel或Redis Cluster两种方式&#xff0c;以下分别介绍两种方式的搭建步骤&#xff1a; 基于Redis Sentinel的高可用环境搭建 安装Redis 配置软件源&#xff1a;可以使用OpenEuler的默认软件源&#…

Python的那些事第六篇:从定义到应用,Python函数的奥秘

新月人物传记&#xff1a;人物传记之新月篇-CSDN博客 目录 一、函数的定义与调用 二、函数的参数 三、返回值&#xff08;return语句&#xff09; 四、作用域 五、匿名函数&#xff08;lambda表达式&#xff09; 六、总结 Python函数的奥秘&#xff1a;从定义到应用 编程…

vue3的路由配置

先找到Layout布局文件&#xff0c;从中找到左侧边栏&#xff0c;找到下述代码 <SidebarItem v-for"route in noHiddenRoutes" :key"route.path" :item"route" :base-path"route.path" />/** *菜单项 <SidebarItem>: *使用…

VLLM性能调优

1. 抢占 显存不够的时候&#xff0c;某些request会被抢占。其KV cache被清除&#xff0c;腾退给其他request&#xff0c;下次调度到它&#xff0c;重新计算KV cache。 报这条消息&#xff0c;说明已被抢占&#xff1a; WARNING 05-09 00:49:33 scheduler.py:1057 Sequence gr…

Blazor-@bind

数据绑定 带有 value属性的标记都可以使用bind 绑定&#xff0c;<div>、<span>等非输入标记&#xff0c;无法使用bind 指令的&#xff0c;默认绑定了 onchange 事件&#xff0c;onchange 事件是指在输入框中输入内容之后&#xff0c;当失去焦点时执行。 page &qu…

H264原始码流格式分析

1.H264码流结构组成 H.264裸码流&#xff08;Raw Bitstream&#xff09;数据主要由一系列的NALU&#xff08;网络抽象层单元&#xff09;组成。每个NALU包含一个NAL头和一个RBSP&#xff08;原始字节序列载荷&#xff09;。 1.1 H.264码流层次 H.264码流的结构可以分为两个层…

Qt中QVariant的使用

1.使用QVariant实现不同类型数据的相加 方法&#xff1a;通过type函数返回数值的类型&#xff0c;然后通过setValue来构造一个QVariant类型的返回值。 函数&#xff1a; QVariant mainPage::dataPlus(QVariant a, QVariant b) {QVariant ret;if ((a.type() QVariant::Int) &a…