Doris:StreamLoad导入数据

    

目录

1.基本原理

2.支持数据格式

3.StreamLoad语法

3.1.请求参数

3.2.返回参数

4.StreamLoad实践

4.1.使用 curl命令

4.2.使用Java代码


    Stream load 是一个同步的导入方式,用户通过发送 HTTP 协议发送请求将本地文件或数据流导入到 Doris 中。Stream load 主要适用于导入本地文件,或通过程序导入数据流中的数据。

1.基本原理

        Stream load 中,Doris 会选定一个节点作为 Coordinator 节点。该节点负责接数据并分发数据到其他数据节点。
        用户通过 HTTP 协议提交导入命令。如果提交到 FE,则 FE 会通过 HTTP redirect 指令将请求转发给某一个 BE。用户也可以直接提交导入命令给某一指定 BE。
        导入的最终结果由 Coordinator BE 返回给用户。

2.支持数据格式

        目前 Stream Load 支持数据格式:CSV(文本)、JSON,1.2+ 支持PARQUET 和 ORC。 

3.StreamLoad语法

        Stream Load 通过 HTTP 协议提交和传输数据。这里通过 curl 命令展示如何提交导入。用户也可以通过其他 HTTP client 进行操作。

3.1.请求参数

        Stream Load 由于使用的是 HTTP 协议,所以所有导入任务有关的参数均设置在 Header 中。

参数参数说明
user/passwdStream load 由于创建导入的协议使用的是 HTTP 协议,通过 Basic access authentication 进行签名。Doris 系统会根据签名验证用户身份和导入权限。
label导入任务的标识。每个导入任务,都有一个在单 database 内部唯一的 label。
column_separator用于指定导入文件中的列分隔符,默认为\t。如果是不可见字符,则需要加\x作为前缀,使用十六进制来表示分隔符。
line_delimiter用于指定导入文件中的换行符,默认为\n。
max_filter_ratio导入任务的最大容忍率,默认为0容忍,取值范围是0~1。当导入的错误率超过该值,则导入失败。如果用户希望忽略错误的行,可以通过设置这个参数大于 0,来保证导入可以成功。
where导入任务指定的过滤条件。Stream load 支持对原始数据指定 where 语句进行过滤。被过滤的数据将不会被导入,也不会参与 filter ratio 的计算,但会被计入num_rows_unselected。
Partitions待导入表的 Partition 信息,如果待导入数据不属于指定的 Partition 则不会被导入。这些数据将计入 dpp.abnorm.ALL
columns待导入数据的函数变换配置,目前 Stream load 支持的函数变换方法包含列的顺序变化以及表达式变换,其中表达式变换的方法与查询语句的一致。
format

指定导入数据格式,支持csv、json,默认是csv

支持csv_with_names(支持csv文件行首过滤)、csv_with_names_and_types(支持csv文件前两行过滤)

exec_mem_limit导入内存限制。默认为 2GB,单位为字节。
merge_type 数据的合并类型,一共支持三种类型APPEND、DELETE、MERGE。
APPEND是默认值,表示这批数据全部需要追加到现有数据中,
DELETE 表示删除与这批数据key相同的所有行,
MERGE 语义 需要与delete 条件联合使用,表示满足delete 条件的数据按照DELETE 语义处理其余的按照APPEND 语义处理
two_phase_commitStream load 导入可以开启两阶段事务提交模式:在Stream load过程中,数据写入完成即会返回信息给用户,此时数据不可见,事务状态为PRECOMMITTED,用户手动触发commit操作之后,数据才可见。
enable_profile当 enable_profile 为 true 时,Stream Load profile将会打印到日志中。否则不会打印。

3.2.返回参数

        Stream load 是一种同步的导入方式,所以导入的结果会通过创建导入的返回值直接返回给用户。

参数参数说明
TxnId导入的事务ID。
Label导入 Label。由用户指定或系统自动生成。
Status导入完成状态:
"Success":表示导入成功。
"Publish Timeout":该状态也表示导入已经完成,只是数据可能会延迟可见,无需重试。
"Label Already Exists":Label 重复,需更换 Label。
"Fail":导入失败。
ExistingJobStatus已存在的 Label 对应的导入作业的状态。
Message导入错误信息。
NumberTotalRows导入总处理的行数。
NumberLoadedRows成功导入的行数。
NumberFilteredRows数据质量不合格的行数。
NumberUnselectedRows被 where 条件过滤的行数。
LoadBytes导入的字节数。
LoadTimeMs导入完成时间。单位毫秒。
BeginTxnTimeMs向Fe请求开始一个事务所花费的时间,单位毫秒。
StreamLoadPutTimeMs向Fe请求获取导入数据执行计划所花费的时间,单位毫秒。
ReadDataTimeMs读取数据所花费的时间,单位毫秒。
WriteDataTimeMs执行写入数据操作所花费的时间,单位毫秒。
CommitAndPublishTimeMs向Fe请求提交并且发布事务所花费的时间,单位毫秒。
ErrorURL如果有数据质量问题,通过访问这个 URL 查看具体错误行。

4.StreamLoad实践

4.1.使用 curl命令

curl命令格式如下:

curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load

# Header 中支持属性见下面的 ‘导入任务参数’ 说明
# 格式为: -H "key1:value1"

csv文件数据如下:

id,username,age,sex,phone,register_time
3,user_3,24,0,13212345678,2023-11-03 10:23:34
4,user_4,31,0,13312345678,2023-11-03 12:34:56
5,user_5,53,1,13412345678,2023-11-03 09:12:34

执行导入:

 curl --location-trusted -u root -T /home/weisx/opt/doris/user.csv -H "label:label_user" -H "column_separator:," -H "format:csv_with_names" http://localhost:8030/api/demo/user/_stream_load

134c8f4632454e3c9a0991b84cac3ee2.png

8cb5af8095b34e4983df3c15c74144bc.png

4.2.使用Java代码

package com.yichenkeji.dataplus.core.drois.util;


import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.stream.Collectors;

@Slf4j
public class StreamLoadTest {
    public static void main(String[] args) throws IOException {
        List<Map<String,Object>> datas = loadData();
        String label = "label_user_java";
        String username = "root";
        String password = "";
        String db = "demo";
        String table ="user";
        String loadUrl = String.format("http://192.168.179.131:8030/api/%s/%s/_stream_load",db,table);
        List<String> columns = Arrays.asList("id,username,age,sex,phone,register_time".split(","));
        String columnSeparator = ",";
        String format = "csv";
        String  loadData = datas.stream().map(data -> columns.stream().map(column -> data.get(column).toString()).collect(Collectors.joining(columnSeparator))).collect(Collectors.joining("\n"));
        sendData(label,username,password,loadUrl,columns,loadData,columnSeparator,null,format,null);
    }

    /**
     * 加载数据
     * @return
     */
    private static List<Map<String, Object>> loadData() {
        List<Map<String,Object>> datas = new ArrayList<>();
        Map<String,Object> map = new HashMap<>();
        map.put("id",6);
        map.put("username","user_6");
        map.put("age",52);
        map.put("sex",1);
        map.put("phone","13612345678");
        map.put("register_time","2023-11-02-12:34:36");
        datas.add(map);
        return  datas;
    }

    /**
     * Basic access authentication 签名
     * @param username doris用户名
     * @param password doris用户密码
     * @return
     */
    public static String basicAuthHeader(String username, String password) {
        final String tobeEncode = username + ":" + password;
        byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
        return "Basic " + new String(encoded);
    }
    /**
     * Stream load 导入数据
     * @param label 导入任务的标识。每个导入任务,都有一个在单 database 内部唯一的 label。label 是用户在导入命令中自定义的名称。通过这个 label,用户可以查看对应导入任务的执行情况。
     * @param username
     * @param password
     * @param loadUrl
     * @param columns 待导入数据的函数变换配置,目前 Stream load 支持的函数变换方法包含列的顺序变化以及表达式变换,其中表达式变换的方法与查询语句的一致。
     * @param loadData
     * @param columnSeparator 用于指定导入文件中的列分隔符,默认为\t。如果是不可见字符,则需要加\x作为前缀,使用十六进制来表示分隔符。
     * @param lineDelimiter 用于指定导入文件中的换行符,默认为\n。
     * @param format 指定导入数据格式,支持csv、json,默认是csv
     * @param mergeType 数据的合并类型:一共支持三种类型APPEND、DELETE、MERGE 其中,APPEND是默认值
     * @throws IOException
     */
    public static void sendData(String label, String username, String password, String loadUrl
            , List<String> columns, String loadData, String columnSeparator, String lineDelimiter
            , String format, String mergeType) throws IOException {
        HttpClientBuilder
                httpClientBuilder = HttpClients
                .custom()
                .setRedirectStrategy(new DefaultRedirectStrategy() {
                    @Override
                    protected boolean isRedirectable(String method) {
                        return true;
                    }
                });
        log.info("loadUrl:{},columns:{}",loadUrl,columns);
        try (CloseableHttpClient client = httpClientBuilder.build()) {
            HttpPut put = new HttpPut(loadUrl);
            StringEntity entity = new StringEntity(loadData, "UTF-8");
            put.setHeader(HttpHeaders.EXPECT, "100-continue");
            put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(username, password));
            // the label header is optional, not necessary
            // use label header can ensure at most once semantics
            put.setHeader("label", label);
            if(StringUtils.isNotBlank(columnSeparator)){
                put.setHeader("column_separator", columnSeparator);
            }

            if(StringUtils.isNotBlank(lineDelimiter)){
                put.setHeader("line_delimiter", lineDelimiter);
            }


            put.setHeader("format", format);

            put.setHeader("merge_type", mergeType);
            //字段
            if (null != columns && !columns.isEmpty()) {
                put.setHeader("columns", String.join(",",
                        columns.stream().map(f -> String.format("`%s`", f)).
                                collect(Collectors.toList())));
            }
            //数据
            put.setEntity(entity);

            try (CloseableHttpResponse response = client.execute(put)) {
                String loadResultStr  =  null;
                if (response.getEntity() != null) {
                    loadResultStr  =  EntityUtils.toString(response.getEntity());
                }
                final int statusCode = response.getStatusLine().getStatusCode();

                log.info("statusCode:{},loadResultStr:{}",statusCode,loadResultStr);

            }
        }
    }
}

c88d986aeb55462ca5f1e256b52faa02.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/114808.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Git(七).git 文件夹瘦身,GitLab 永久删除文件

目录 一、问题背景二、问题复现2.1 新建项目2.2 上传大文件2.3 上传结果 三、解决方案3.1 GitLab备份与还原1&#xff09;备份2&#xff09;还原 3.2 删除方式一&#xff1a;git filter-repo 命令【推荐】1&#xff09;安装2&#xff09;删除本地仓库文件3&#xff09;重新关联…

深度学习实战:基于TensorFlow与OpenCV的手语识别系统

文章目录 写在前面基于TensorFlow与OpenCV的手语识别系统安装环境一、导入工具库二、导入数据集三、数据预处理四、训练模型基于CNN基于LeNet5基于ResNet50 五、模型预测基于OpenCV 写在后面 写在前面 本期内容&#xff1a;基于TensorFlow与OpenCV的手语识别系统 实验环境&…

333333333333

一、Map 接口 接下来讲的都是基于 jdk8 来开展的。 1.1 特点 1、Map 与 Collection 并列存在。Map 是用于保存具有映射关系的数据&#xff0c;即 key-value。 2、Map 中的 key 和 value 可以是任何引用类型的数据类型。 3、Map 中的 key 不允许重复&#xff0c;原因和 HashSet…

动态路由协议OSPF项目部署(二)

1. 静态和动态路由的区别&#xff1b; 2. OSPF协议通信过程与部署&#xff1b; 3. OSPF协议在项目上的应用场景 - OSPF - 开放式最短路径优先 - 一个动态路由协议 - 路由器转发数据 - 路由器需要一张地图 - 路由表 - 路由表如何构建的&#xff1f; - 依靠手动 或…

API接口加密,解决自动化中登录问题

一、加密方式 AES&#xff1a;对称加密&#xff0c;快RAS&#xff1a;非对称加密&#xff0c;慢AESRAS&#xff1a;安全高效 加密过程&#xff1a;字符串》字节流》加密的字节流&#xff08;算法&#xff09;&#xff0c;解密有可能出现乱码&#xff0c;所以不能直接转成字符…

【LeetCode】剑指 Offer Ⅱ 第8章:树(12道题) -- Java Version

题库链接&#xff1a;https://leetcode.cn/problem-list/e8X3pBZi/ 类型题目解决方案二叉树的深搜剑指 Offer II 047. 二叉树剪枝递归&#xff08;深搜&#xff09;&#xff1a;二叉树的后序遍历 &#xff08;⭐&#xff09;剑指 Offer II 048. 序列化和反序列化二叉树递归&…

吴恩达《机器学习》4-1->4-5:多变量线性回归

一、引入多维特征 在多维特征中&#xff0c;我们考虑的不再是单一的特征&#xff0c;而是一组特征&#xff0c;例如房价模型中可能包括房间数、楼层等多个特征。这些特征将组成一个向量&#xff0c;表示为(&#x1d465;₁, &#x1d465;₂, . . . , &#x1d465;ₙ)&#x…

【腾讯云HAI域探秘】速通腾讯云HAI

速览HAI 产品简介 腾讯云高性能应用服务(Hyper Application lnventor&#xff0c;HA)&#xff0c;是一款面向 Al、科学计算的 GPU 应用服务产品&#xff0c;为开发者量身打造的澎湃算力平台。无需复杂配置&#xff0c;便可享受即开即用的GPU云服务体验。在 HA] 中&#xff0c;…

配置git并把本地项目连接github

一.配置git 1.下载git&#xff08;Git&#xff09;&#xff0c;但推荐使用国内镜像下载&#xff08;CNPM Binaries Mirror&#xff09; 选好64和版本号下载&#xff0c;全部点下一步 下载完成后打开终端&#xff0c;输入 git --version 出现版本号则说明安装成功 然后继续…

Redis统计大法:挖掘数据的四重宝藏【redis第五部分】

Redis统计大法&#xff1a;挖掘数据的四重宝藏 前言第一&#xff1a;redis集合统计简介第二&#xff1a;聚合统计->数据的综合分析总和&#xff08;Sum&#xff09;&#xff1a;平均值&#xff08;Average&#xff09;中位数&#xff08;Median&#xff09; 第三&#xff1a…

【C++】多态 ⑪ ( 纯虚函数和抽象类 | 纯虚函数语法 | 抽象类和实现 | 代码示例 )

文章目录 一、纯虚函数和抽象类1、纯虚函数2、纯虚函数语法3、抽象类和实现 二、完整代码示例 一、纯虚函数和抽象类 1、纯虚函数 纯虚函数 : 在 C 语言中 , " 纯虚函数 " 是 特殊类型的 虚函数 , " 纯虚函数 " 在 父类 中 声明 , 但是没有实现 ; 抽象类 …

从瀑布模式到水母模式:ChatGPT引领软件研发的革新之路

ChatGPT引领软件研发的革新之路 概述操作建议本书优势 内容简介作者简介专家推荐读者对象目录直播预告写在末尾&#xff1a; 主页传送门&#xff1a;&#x1f4c0; 传送 概述 计算机技术的发展和互联网的普及&#xff0c;使信息处理和传输变得更加高效&#xff0c;极大地改变了…

Azure 机器学习 - 使用 AutoML 和 Python 训练物体检测模型

目录 一、Azure环境准备二、计算目标设置三、试验设置四、直观呈现输入数据五、上传数据并创建 MLTable六、配置物体检测试验适用于图像任务的自动超参数扫描 (AutoMode)适用于图像任务的手动超参数扫描作业限制 七、注册和部署模型获取最佳试用版注册模型配置联机终结点创建终…

JUL 日志

JUL日志级别 日志分为7个级别&#xff0c;详细信息我们可以在Level类中查看&#xff1a; SEVERE&#xff08;最高值&#xff09;- 一般用于代表严重错误WARNING - 一般用于表示某些警告&#xff0c;但是不足以判断为错误INFO &#xff08;默认级别&#xff09; - 常规消息CON…

Hadoop HDFS(分布式文件系统)

一、Hadoop HDFS(分布式文件系统) 为什么要分布式存储数据 假设一个文件有100tb&#xff0c;我们就把文件划分为多个部分&#xff0c;放入到多个服务器 靠数量取胜&#xff0c;多台服务器组合&#xff0c;才能Hold住 数据量太大&#xff0c;单机存储能力有上限&#xff0c;需要…

QT在线安装5.15之前的版本(下载速度飞快)

使用最新的QT在线安装器&#xff0c;安装QT版本时只能安装5.15以及之后的版本&#xff0c;安装QT5.15之前的版本只能通过离线安装的方式&#xff0c;离线安装后还要自己去配置QT&#xff0c;离线安装还有个问题的&#xff0c;后续维护比较麻烦&#xff0c;QT的维护工具还要自己…

springboot2.x使用@RestControllerAdvice实现通用异常捕获

文章目录 demo地址实现效果引入基础类准备1.通用枚举与错误状态枚举2.定义通用返回结果3.自定义业务异常 统一异常捕获测试 demo地址 demo工程地址 实现效果 当我们输入1时&#xff0c;正常的返回通用的响应结果当我们输入2时&#xff0c;抛出异常&#xff0c;被捕获然后返回…

macOS 安装brew

参考链接&#xff1a; https://mirrors4.tuna.tsinghua.edu.cn/help/homebrew/ https://www.yii666.com/blog/429332.html 安装中科大源的&#xff1a; https://zhuanlan.zhihu.com/p/470873649

《TCP/IP详解 卷一:协议》第5章的IPv4数据报的Checksum(校验和)字段的计算(这里才能解开你的困惑)

首先&#xff0c;我当你看过书&#xff0c;但是比较懵。 1&#xff0c;实例说明Checksum(校验和)的计算步骤 直奔主题&#xff0c;分析一下这个Checksum&#xff08;校验和&#xff09;怎么算出来的。 先用Wireshark随便抓一个UDP或TCP包分析一下。 如上面&#xff0c;我们得…

Python数据分析:在职场中的竞争优势

前言 在职场中&#xff0c;技能的重要性是不言而喻的。越来越多的职位要求员工具备数据分析能力&#xff0c;而Python作为一种强大的数据分析工具&#xff0c;正在成为职场中的“利器”。然而&#xff0c;尽管Python数据分析提供了巨大的优势&#xff0c;许多人依然未能掌握这…