一. Doris数据导入
导入方式 | 使用场景 | 支持的文件格式 | 导入模式 |
---|---|---|---|
Stream Load | 导入本地文件或者应用程序写入 | csv、json、parquet、orc | 同步 |
Broker Load | 从对象存储、HDFS等导入 | csv、json、parquet、orc | 异步 |
Routine Load | 从kakfa实时导入 | csv、json | 异步 |
1. Stream Load
基本原理
在使用 Stream Load 时,需要通过 HTTP 协议发起导入作业给 FE 节点,FE 会以轮询方式,重定向(redirect)请求给一个 BE 节点以达到负载均衡的效果。也可以直接发送 HTTP 请求作业给指定的 BE 节点。在 Stream Load 中,Doris 会选定一个节点作为 Coordinator 节点。Coordinator 节点负责接受数据并分发数据到其他节点上。
下图展示了 Stream Load 的主要流程:
- Client 向 FE 提交 Stream Load 导入作业请求。
- FE 会轮询选择一台 BE 作为 Coordinator 节点,负责导入作业调度,然后返回给 Client 一个 HTTP 重定向。
- Client 连接 Coordinator BE 节点,提交导入请求。
- Coordinator BE 会分发数据给相应 BE 节点,导入完成后会返回导入结果给 Client。
- Client 也可以直接通过指定 BE 节点作为 Coordinator,直接分发导入作业。
数据导入
-
本次导入文件为CSV格式,共9个字段,文件部分内容如下:
[root@hadoop3 dns_data]# head -5 input.csv 85.0.144.47,V2-vOd.kwAiCDn.cOM.,20220729005737,106.120.158.110,0,1,v2-vod.kwaicdn.com.w.cdngslb.cOM.,"",123.59.182.42 111.0.40.49,apPle.COm.,20220729005737,17.253.144.10,0,1,"","",123.59.182.42 211.0.172.212,SzMINORSHORT.WEIxin.Qq.com.,20220729005737,157.148.59.242,0,1,"","",123.59.182.42 111.0.68.81,WWW.BILIbIlI.CoM.,20220729005737,61.156.196.6,0,1,a.w.bilicdn1.CoM.,"",123.59.182.42 211.0.21.16,www.wAsU.cn.,20220729005737,103.15.99.89,0,1,www.wasu.cn.w.kunlunpi.com.,"",123.59.182.42
-
在Doris创建目标表
mysql> use testdb; Database changed mysql> create table dns_data( client_ip varchar(1000), domain varchar(1000), time varchar(1000), target_ip varchar(1000), rcode varchar(1000), query_type varchar(1000), authority_record varchar(10000), add_msg varchar(1000), dns_ip varchar(1000) ) DUPLICATE KEY(client_ip, domain, time, target_ip) DISTRIBUTED BY HASH(client_ip) BUCKETS 20;
-
启动导入作业
# 本次导入的数据量为500w [root@hadoop3 dns_data]# wc -l sample.csv 5000000 sample.csv [root@hadoop3 dns_data]# curl --location-trusted -u admin:admin123 \ -H "Expect:100-continue" \ -H "column_separator:," \ -H "columns:client_ip,domain,time,target_ip,rcode,query_type,authority_record,add_msg,dns_ip" \ -T sample.csv \ -XPUT http://10.0.49.2:8050/api/testdb/dns_data/_stream_load
-
查看导入结果
mysql> select count(*) from dns_data; +----------+ | count(*) | +----------+ | 5000000 | +----------+ 1 row in set (0.32 sec)
2. Broker Load
基本原理
用户在提交导入任务后,FE 会生成对应的 Plan 并根据目前 BE 的个数和文件的大小,将 Plan 分给 多个 BE 执行,每个 BE 执行一部分导入数据。
BE 在执行的过程中会从 Broker 拉取数据,在对数据 transform 之后将数据导入系统。所有 BE 均完成导入,由 FE 最终决定导入是否成功。
从上图中可以看到,BE 会依赖 Broker 进程来读取相应远程存储系统的数据。之所以引入 Broker 进程,主要是用来针对不同的远程存储系统,用户可以按照 Broker 进程的标准开发其相应的 Broker 进程,Broker 进程可以使用 Java 程序开发,更好的兼容大数据生态中的各类存储系统。由于 broker 进程和 BE 进程的分离,也确保了两个进程的错误隔离,提升 BE 的稳定性。
数据导入
-
清理dns_data表的数据,并将要导入的文件上传到HDFS
mysql> truncate table dns_data; Query OK, 0 rows affected (4.32 sec) # 本次待导入的数据共计1.2亿 [root@hadoop3 dns_data]# wc -l input.csv 121936657 input.csv [root@hadoop3 dns_data]# hdfs dfs -put input.csv /test
-
启动导入作业
mysql> LOAD LABEL hdfs_load_2025_01_13 ( DATA INFILE("hdfs://10.0.49.4:9000/test/input.csv") INTO TABLE dns_data COLUMNS TERMINATED BY "," FORMAT AS "CSV" (client_ip,domain,time,target_ip,rcode,query_type,authority_record,add_msg,dns_ip) ) with HDFS ( "fs.defaultFS" = "hdfs://10.0.49.4:9000", "hadoop.username" = "root" ) PROPERTIES ( "timeout" = "3600" );
-
查看导入任务
mysql> show load| JobId | Label | State | Progress | Type | EtlInfo | TaskInfo | ErrorMsg | CreateTime | EtlStartTime | EtlFinishTime | LoadStartTime | LoadFinishTime | URL | JobDetails | TransactionId | ErrorTablets | User | Comment || 16265 | hdfs_load_2025_01_13 | LOADING | 0.00% (0/3) | BROKER | NULL | cluster:hdfs_cluster; timeout(s):3600; max_filter_ratio:0.0; priority:NORMAL | NULL | 2025-01-13 07:46:53 | 2025-01-13 07:47:20 | 2025-01-13 07:47:20 | 2025-01-13 07:47:20 | NULL | NULL | {"Unfinished backends":{"58e66f688844d74-afb60b4ad54e4987":[10040,10059,10078]},"ScannedRows":3255264,"TaskNumber":1,"LoadBytes":546907629,"All backends":{"58e66f688844d74-afb60b4ad54e4987":[10040,10059,10078]},"FileNumber":1,"FileSize":16123485004} | 2290 | {} | admin | |row in set (0.12 sec)
-
查看导入结果
mysql> select count(*) from dns_data; +----------+ | count(*) | +----------+ |121936657 | +----------+ 1 row in set (0.04 sec)
3. Routine Load
基本原理
Routine Load 会持续消费 Kafka Topic 中的数据,写入 Doris 中。
在 Doris 中,创建 Routine Load 作业后会生成一个常驻的导入作业,包括若干个导入任务:
- 导入作业(load job):一个 Routine Load Job 是一个常驻的导入作业,会持续不断地消费数据源中的数据。
- 导入任务(load task):一个导入作业会被拆解成若干个导入任务进行实际消费,每个任务都是一个独立的事务。
Routine Load 的导入具体流程如下图展示:
-
Client 向 FE 提交创建 Routine Load 作业请求,FE 通过 Routine Load Manager 生成一个常驻的导入作业(Routine Load Job)。
-
FE 通过Job Scheduler 将Routine Load Job 拆分成若干个 Routine Load Task,由 Task Scheduler 进行调度,下发到 BE 节点。
-
在 BE 上,一个 Routine Load Task 导入完成后向 FE 提交事务,并更新 Job 的元数据。
-
一个 Routine Load Task 提交后,会继续生成新的 Task,或对超时的 Task 进行重试。
-
新生成的 Routine Load Task 由 Task Scheduler 继续调度,不断循环。
数据导入
-
清理dns_data表的数据
mysql> truncate table dns_data; Query OK, 0 rows affected (4.32 sec)
-
在kafka中创建topic
[root@hadoop1 kafka-3.6.0]# bin/kafka-topics.sh --create --bootstrap-server 10.0.49.4:9092 --replication-factor 3 --partitions 3 --topic test
-
启动导入作业
mysql> CREATE ROUTINE LOAD testdb.example_routine_load_csv ON dns_data COLUMNS TERMINATED BY ",", COLUMNS(client_ip, domain, time, target_ip, rcode, query_type,authority_record,add_msg,dns_ip) FROM KAFKA( "kafka_broker_list" = "10.0.49.4:9092", "kafka_topic" = "test", "property.kafka_default_offsets" = "OFFSET_BEGINNING" );
-
向kafka topic中注入测试数据(自己写程序实现,很简单)
# 导入数据500w [root@hadoop3 dns_data]# wc -l sample.csv 5000000 sample.csv # 调用程序向topic test写入测试数据 [root@hadoop3 dns_data]# java -jar mock_data.jar 10.0.49.2:9092,10.0.49.3:9092,10.0.49.4:9092 test sample.csv
-
查看导入任务
mysql> show routine load\G; *************************** 1. row *************************** Id: 16361 Name: example_routine_load_csv CreateTime: 2025-01-13 08:17:23 PauseTime: NULL EndTime: NULL DbName: default_cluster:testdb TableName: dns_data IsMultiTable: false State: RUNNING DataSourceType: KAFKA CurrentTaskNum: 3 JobProperties: {"max_batch_rows":"200000","timezone":"Europe/London","send_batch_parallelism":"1","load_to_single_tablet":"false","column_separator":"','","line_delimiter":"\n","current_concurrent_number":"3","delete":"*","partial_columns":"false","merge_type":"APPEND","exec_mem_limit":"2147483648","strict_mode":"false","jsonpaths":"","max_batch_interval":"10","max_batch_size":"104857600","fuzzy_parse":"false","partitions":"*","columnToColumnExpr":"client_ip,domain,time,target_ip,rcode,query_type,authority_record,add_msg,dns_ip","whereExpr":"*","desired_concurrent_number":"5","precedingFilter":"*","format":"csv","max_error_number":"0","max_filter_ratio":"1.0","json_root":"","strip_outer_array":"false","num_as_string":"false"} DataSourceProperties: {"topic":"test","currentKafkaPartitions":"0,1,2","brokerList":"10.0.49.4:9092"} CustomProperties: {"kafka_default_offsets":"OFFSET_BEGINNING","group.id":"example_routine_load_csv_df6ba034-d178-4d05-bfb1-ad3635da7231"} Statistic: {"receivedBytes":0,"runningTxns":[2339,2340,2341],"errorRows":0,"committedTaskNum":3,"loadedRows":0,"loadRowsRate":0,"abortedTaskNum":0,"errorRowsAfterResumed":0,"totalRows":0,"unselectedRows":0,"receivedBytesRate":0,"taskExecuteTimeMs":30309} Progress: {"0":"OFFSET_BEGINNING","1":"OFFSET_BEGINNING","2":"OFFSET_BEGINNING"} Lag: {"0":2,"1":2,"2":2} ReasonOfStateChanged: ErrorLogUrls: OtherMsg: User: admin Comment: 1 row in set (0.07 sec) ERROR: No query specified
-
查看导入结果
mysql> select count(*) from dns_data; +----------+ | count(*) | +----------+ | 5000000 | +----------+ 1 row in set (0.04 sec)
二. Doris数据导出
SELECT INTO OUTFILE | EXPORT | |
---|---|---|
同步/异步 | 同步 | 异步(提交 EXPORT 任务后通过 SHOW EXPORT 命令查看任务进度) |
支持任意 SQL | 支持 | 不支持 |
导出指定分区 | 支持 | 支持 |
导出指定 Tablets | 支持 | 不支持 |
并发导出 | 支持且并发高(但取决于 SQL 语句是否有 ORDER BY 等需要单机处理的算子) | 支持且并发高(支持 Tablet 粒度的并发导出) |
支持导出的数据格式 | Parquet、ORC、CSV | Parquet、ORC、CSV |
是否支持导出外表 | 支持 | 部分支持 |
是否支持导出 View | 支持 | 支持 |
支持的导出位置 | S3、HDFS | S3、HDFS |
SELECT INTO OUTFILE
适用于以下场景:
- 导出数据需要经过复杂计算逻辑的,如过滤、聚合、关联等。
- 适合执行同步任务的场景。
EXPORT
适用于以下场景:
- 大数据量的单表导出、仅需简单的过滤条件。
- 需要异步提交任务的场景。
1. SELECT INTO OUTFILE
-
查看待导出表的数据
mysql> select count(*) from dns_data; +----------+ | count(*) | +----------+ | 5000000 | +----------+ 1 row in set (0.04 sec)
-
启动导出作业
mysql> SELECT * FROM dns_data INTO OUTFILE "hdfs://10.0.49.4:9000/doris/result_" FORMAT AS CSV PROPERTIES ( "fs.defaultFS" = "hdfs://10.0.49.4:9000", "hadoop.username" = "root", "column_separator" = "," ); +------------+-----------+-----------+-----------------------------------------------------------------------+ | FileNumber | TotalRows | FileSize | URL | +------------+-----------+-----------+-----------------------------------------------------------------------+ | 1 | 5000000 | 660546528 | hdfs://10.0.49.4:9000/doris/result_c1f1268d26b547f6-a906f9570e7bfa04_ | +------------+-----------+-----------+-----------------------------------------------------------------------+ 1 row in set (23.04 sec)
-
查看导出结果
[root@hadoop3 dns_data]# hdfs dfs -get /doris/result_c1f1268d26b547f6-a906f9570e7bfa04_0.csv a.csv [root@hadoop3 dns_data]# wc -l a.csv 5000000 a.csv
2. EXPORT
-
查看待导出表的数据
mysql> select count(*) from dns_data; +----------+ | count(*) | +----------+ | 5000000 | +----------+ 1 row in set (0.04 sec)
-
启动导出作业
mysql> EXPORT TABLE dns_data TO "hdfs://10.0.49.4:9000/doris/export_" PROPERTIES ( "line_delimiter" = "\n", "column_separator" = "," ) with HDFS ("fs.defaultFS"="hdfs://10.0.49.4:9000","hadoop.username" = "root");
-
查看导出作业
mysql> show export \G; *************************** 1. row *************************** JobId: 15099 Label: export_39314fdb-3962-40a5-aefa-463a56b33675 State: CANCELLED Progress: 0% TaskInfo: {"partitions":["*"],"max_file_size":"","delete_existing_files":"","columns":"","format":"csv","broker":"HDFS","column_separator":"\t","line_delimiter":",","db":"default_cluster:testdb","tbl":"tbl","tablet_num":0} Path: hdfs://doris/export/export_ CreateTime: 2025-01-12 15:44:57 StartTime: 2025-01-12 15:45:21 FinishTime: 2025-01-12 15:45:51 Timeout: 7200 ErrorMsg: type:RUN_FAIL; msg:errCode = 2, detailMessage = (10.0.49.2)[INTERNAL_ERROR]create dir failed. (BE: 10.0.49.2) namenode: hdfs://hadoop1:9000 path: hdfs://doris/export, err: (22), Invalid argument), reason: IllegalArgumentException: Wrong FS: hdfs://doris/export, expected: hdfs://hadoop1:9000 OutfileInfo: NULL *************************** 2. row *************************** JobId: 15100 Label: export_7e593913-3369-4138-91b1-b219d011e55c State: CANCELLED Progress: 0% TaskInfo: {"partitions":["*"],"max_file_size":"","delete_existing_files":"","columns":"","format":"csv","broker":"HDFS","column_separator":"\t","line_delimiter":",","db":"default_cluster:testdb","tbl":"tbl","tablet_num":0} Path: hdfs://doris/export/export_ CreateTime: 2025-01-12 15:49:12 StartTime: 2025-01-12 15:49:21 FinishTime: 2025-01-12 15:49:49 Timeout: 7200 ErrorMsg: type:RUN_FAIL; msg:errCode = 2, detailMessage = (10.0.49.4)[INTERNAL_ERROR]create dir failed. (BE: 10.0.49.4) namenode: hdfs://10.0.49.4:9000 path: hdfs://doris/export, err: (22), Invalid argument), reason: IllegalArgumentException: Wrong FS: hdfs://doris/export, expected: hdfs://10.0.49.4:9000 OutfileInfo: NULL *************************** 3. row *************************** ...
-
查看导出结果
[root@hadoop3 dns_data]# hdfs dfs -ls /doris Found 1 items -rw-r--r-- 3 root supergroup 660546528 2025-01-13 08:47 /doris/export_27d22e086e5e4ba8-b7d27eb2428844e2_0.csv [root@hadoop3 dns_data]# hdfs dfs -get /doris/export_27d22e086e5e4ba8-b7d27eb2428844e2_0.csv a.csv [root@hadoop3 dns_data]# wc -l a.csv 5000000 a.csv