目录
1.基本原理
2.支持数据格式
3.StreamLoad语法
3.1.请求参数
3.2.返回参数
4.StreamLoad实践
4.1.使用 curl命令
4.2.使用Java代码
Stream load 是一个同步的导入方式,用户通过发送 HTTP 协议发送请求将本地文件或数据流导入到 Doris 中。Stream load 主要适用于导入本地文件,或通过程序导入数据流中的数据。
1.基本原理
Stream load 中,Doris 会选定一个节点作为 Coordinator 节点。该节点负责接数据并分发数据到其他数据节点。
用户通过 HTTP 协议提交导入命令。如果提交到 FE,则 FE 会通过 HTTP redirect 指令将请求转发给某一个 BE。用户也可以直接提交导入命令给某一指定 BE。
导入的最终结果由 Coordinator BE 返回给用户。
2.支持数据格式
目前 Stream Load 支持数据格式:CSV(文本)、JSON,1.2+ 支持PARQUET 和 ORC。
3.StreamLoad语法
Stream Load 通过 HTTP 协议提交和传输数据。这里通过 curl 命令展示如何提交导入。用户也可以通过其他 HTTP client 进行操作。
3.1.请求参数
Stream Load 由于使用的是 HTTP 协议,所以所有导入任务有关的参数均设置在 Header 中。
参数 | 参数说明 |
user/passwd | Stream load 由于创建导入的协议使用的是 HTTP 协议,通过 Basic access authentication 进行签名。Doris 系统会根据签名验证用户身份和导入权限。 |
label | 导入任务的标识。每个导入任务,都有一个在单 database 内部唯一的 label。 |
column_separator | 用于指定导入文件中的列分隔符,默认为\t。如果是不可见字符,则需要加\x作为前缀,使用十六进制来表示分隔符。 |
line_delimiter | 用于指定导入文件中的换行符,默认为\n。 |
max_filter_ratio | 导入任务的最大容忍率,默认为0容忍,取值范围是0~1。当导入的错误率超过该值,则导入失败。如果用户希望忽略错误的行,可以通过设置这个参数大于 0,来保证导入可以成功。 |
where | 导入任务指定的过滤条件。Stream load 支持对原始数据指定 where 语句进行过滤。被过滤的数据将不会被导入,也不会参与 filter ratio 的计算,但会被计入num_rows_unselected。 |
Partitions | 待导入表的 Partition 信息,如果待导入数据不属于指定的 Partition 则不会被导入。这些数据将计入 dpp.abnorm.ALL |
columns | 待导入数据的函数变换配置,目前 Stream load 支持的函数变换方法包含列的顺序变化以及表达式变换,其中表达式变换的方法与查询语句的一致。 |
format | 指定导入数据格式,支持csv、json,默认是csv 支持csv_with_names(支持csv文件行首过滤)、csv_with_names_and_types(支持csv文件前两行过滤) |
exec_mem_limit | 导入内存限制。默认为 2GB,单位为字节。 |
merge_type | 数据的合并类型,一共支持三种类型APPEND、DELETE、MERGE。 APPEND是默认值,表示这批数据全部需要追加到现有数据中, DELETE 表示删除与这批数据key相同的所有行, MERGE 语义 需要与delete 条件联合使用,表示满足delete 条件的数据按照DELETE 语义处理其余的按照APPEND 语义处理 |
two_phase_commit | Stream load 导入可以开启两阶段事务提交模式:在Stream load过程中,数据写入完成即会返回信息给用户,此时数据不可见,事务状态为PRECOMMITTED,用户手动触发commit操作之后,数据才可见。 |
enable_profile | 当 enable_profile 为 true 时,Stream Load profile将会打印到日志中。否则不会打印。 |
3.2.返回参数
Stream load 是一种同步的导入方式,所以导入的结果会通过创建导入的返回值直接返回给用户。
参数 | 参数说明 |
TxnId | 导入的事务ID。 |
Label | 导入 Label。由用户指定或系统自动生成。 |
Status | 导入完成状态: "Success":表示导入成功。 "Publish Timeout":该状态也表示导入已经完成,只是数据可能会延迟可见,无需重试。 "Label Already Exists":Label 重复,需更换 Label。 "Fail":导入失败。 |
ExistingJobStatus | 已存在的 Label 对应的导入作业的状态。 |
Message | 导入错误信息。 |
NumberTotalRows | 导入总处理的行数。 |
NumberLoadedRows | 成功导入的行数。 |
NumberFilteredRows | 数据质量不合格的行数。 |
NumberUnselectedRows | 被 where 条件过滤的行数。 |
LoadBytes | 导入的字节数。 |
LoadTimeMs | 导入完成时间。单位毫秒。 |
BeginTxnTimeMs | 向Fe请求开始一个事务所花费的时间,单位毫秒。 |
StreamLoadPutTimeMs | 向Fe请求获取导入数据执行计划所花费的时间,单位毫秒。 |
ReadDataTimeMs | 读取数据所花费的时间,单位毫秒。 |
WriteDataTimeMs | 执行写入数据操作所花费的时间,单位毫秒。 |
CommitAndPublishTimeMs | 向Fe请求提交并且发布事务所花费的时间,单位毫秒。 |
ErrorURL | 如果有数据质量问题,通过访问这个 URL 查看具体错误行。 |
4.StreamLoad实践
4.1.使用 curl命令
curl命令格式如下:
curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load
# Header 中支持属性见下面的 ‘导入任务参数’ 说明
# 格式为: -H "key1:value1"
csv文件数据如下:
id,username,age,sex,phone,register_time
3,user_3,24,0,13212345678,2023-11-03 10:23:34
4,user_4,31,0,13312345678,2023-11-03 12:34:56
5,user_5,53,1,13412345678,2023-11-03 09:12:34
执行导入:
curl --location-trusted -u root -T /home/weisx/opt/doris/user.csv -H "label:label_user" -H "column_separator:," -H "format:csv_with_names" http://localhost:8030/api/demo/user/_stream_load
4.2.使用Java代码
package com.yichenkeji.dataplus.core.drois.util;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.stream.Collectors;
@Slf4j
public class StreamLoadTest {
public static void main(String[] args) throws IOException {
List<Map<String,Object>> datas = loadData();
String label = "label_user_java";
String username = "root";
String password = "";
String db = "demo";
String table ="user";
String loadUrl = String.format("http://192.168.179.131:8030/api/%s/%s/_stream_load",db,table);
List<String> columns = Arrays.asList("id,username,age,sex,phone,register_time".split(","));
String columnSeparator = ",";
String format = "csv";
String loadData = datas.stream().map(data -> columns.stream().map(column -> data.get(column).toString()).collect(Collectors.joining(columnSeparator))).collect(Collectors.joining("\n"));
sendData(label,username,password,loadUrl,columns,loadData,columnSeparator,null,format,null);
}
/**
* 加载数据
* @return
*/
private static List<Map<String, Object>> loadData() {
List<Map<String,Object>> datas = new ArrayList<>();
Map<String,Object> map = new HashMap<>();
map.put("id",6);
map.put("username","user_6");
map.put("age",52);
map.put("sex",1);
map.put("phone","13612345678");
map.put("register_time","2023-11-02-12:34:36");
datas.add(map);
return datas;
}
/**
* Basic access authentication 签名
* @param username doris用户名
* @param password doris用户密码
* @return
*/
public static String basicAuthHeader(String username, String password) {
final String tobeEncode = username + ":" + password;
byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
return "Basic " + new String(encoded);
}
/**
* Stream load 导入数据
* @param label 导入任务的标识。每个导入任务,都有一个在单 database 内部唯一的 label。label 是用户在导入命令中自定义的名称。通过这个 label,用户可以查看对应导入任务的执行情况。
* @param username
* @param password
* @param loadUrl
* @param columns 待导入数据的函数变换配置,目前 Stream load 支持的函数变换方法包含列的顺序变化以及表达式变换,其中表达式变换的方法与查询语句的一致。
* @param loadData
* @param columnSeparator 用于指定导入文件中的列分隔符,默认为\t。如果是不可见字符,则需要加\x作为前缀,使用十六进制来表示分隔符。
* @param lineDelimiter 用于指定导入文件中的换行符,默认为\n。
* @param format 指定导入数据格式,支持csv、json,默认是csv
* @param mergeType 数据的合并类型:一共支持三种类型APPEND、DELETE、MERGE 其中,APPEND是默认值
* @throws IOException
*/
public static void sendData(String label, String username, String password, String loadUrl
, List<String> columns, String loadData, String columnSeparator, String lineDelimiter
, String format, String mergeType) throws IOException {
HttpClientBuilder
httpClientBuilder = HttpClients
.custom()
.setRedirectStrategy(new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(String method) {
return true;
}
});
log.info("loadUrl:{},columns:{}",loadUrl,columns);
try (CloseableHttpClient client = httpClientBuilder.build()) {
HttpPut put = new HttpPut(loadUrl);
StringEntity entity = new StringEntity(loadData, "UTF-8");
put.setHeader(HttpHeaders.EXPECT, "100-continue");
put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(username, password));
// the label header is optional, not necessary
// use label header can ensure at most once semantics
put.setHeader("label", label);
if(StringUtils.isNotBlank(columnSeparator)){
put.setHeader("column_separator", columnSeparator);
}
if(StringUtils.isNotBlank(lineDelimiter)){
put.setHeader("line_delimiter", lineDelimiter);
}
put.setHeader("format", format);
put.setHeader("merge_type", mergeType);
//字段
if (null != columns && !columns.isEmpty()) {
put.setHeader("columns", String.join(",",
columns.stream().map(f -> String.format("`%s`", f)).
collect(Collectors.toList())));
}
//数据
put.setEntity(entity);
try (CloseableHttpResponse response = client.execute(put)) {
String loadResultStr = null;
if (response.getEntity() != null) {
loadResultStr = EntityUtils.toString(response.getEntity());
}
final int statusCode = response.getStatusLine().getStatusCode();
log.info("statusCode:{},loadResultStr:{}",statusCode,loadResultStr);
}
}
}
}