Doris是一个开源的分布式分析型数据库,最初由阿里巴巴开发并开源,目前隶属于Apache基金会。
Doris基于大规模并行处理(MPP)架构,提供高性能和实时的数据分析能力。它以极速易用的特点被广泛使用,能够应对高并发的点查询场景,同时也支持高吞吐的复杂分析场景,如用户行为分析、日志检索、用户画像分析和订单分析等。Doris具有亚秒级的响应时间,能够处理大规模数据存储和分析的需求。
一、架构原理
Doris整体架构
BE / FE
- Doris主要分为FE、BE两个组件,FE主要负责查询的编译,分发和元数据管理(基于内存,类似HDFS NN);BE主要负责查询的执行和存储系统。
索引结构
- Sorted Compound Key Index,可以最多指定三个列组成复合排序键,通过该索引,能够有效进行数据裁剪,从而能够更好支持高并发的报表场景
- Z-order Index :使用 Z-order 索引,可以高效对数据模型中的任意字段组合进行范围查询
- Min/Max :有效过滤数值类型的等值和范围查询
- Bloom Filter :对高基数列的等值过滤裁剪非常有效
- Invert Index :能够对任意字段实现快速检索
存储模型
- Aggregate Key 模型:相同 Key 的 Value 列合并(求和、替换、最大值、最小值)。通过提前聚合大幅提升性能
- Unique Key 模型:Key 唯一,相同 Key 的数据覆盖,实现行级别数据更新。(Unique 模型等同于聚合模型Aggregate Key 中的 REPLACE 方式替代)
- Duplicate Key 模型:明细数据模型,满足事实表的明细存储。(相同的两行数据会被保留)
存储引擎
- 列式存储,按列进行数据的编码压缩和读取,能够实现极高的压缩比,同时减少大量非相关数据的扫描,从而更加有效利用 IO 和 CPU 资源。
查询引擎
- 采用 MPP 的模型,节点间和节点内都并行执行,也支持多个大表的分布式 Shuffle Join,从而能够更好应对复杂查询。
二、数据划分
一张“表”的数据会被拆分到多个分区Partition。Partition 可以视为是逻辑上最小的管理单元。数据的导入与删除,都可以或仅能针对一个 Partition 进行。而一个 Partition 包含若干个 Tablet,每个 Tablet 包含若干数据行。各个 Tablet 之间的数据没有交集,并且在物理上是独立存储的。
我们以一个建表操作来说明 Doris 的数据划分。
-- Range Partition
CREATE TABLE IF NOT EXISTS example_db.example_range_tbl
(
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`date` DATE NOT NULL COMMENT "数据灌入日期时间",
`timestamp` DATETIME NOT NULL COMMENT "数据灌入的时间戳",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间",
`cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
ENGINE=OLAP
AGGREGATE KEY(`user_id`, `date`, `timestamp`, `city`, `age`, `sex`)
PARTITION BY RANGE(`date`)
(
PARTITION `p201701` VALUES LESS THAN ("2017-02-01"),
PARTITION `p201702` VALUES LESS THAN ("2017-03-01"),
PARTITION `p201703` VALUES LESS THAN ("2017-04-01")
)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 16
-- 或者 List Partition
CREATE TABLE IF NOT EXISTS example_db.example_list_tbl
(
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`date` DATE NOT NULL COMMENT "数据灌入日期时间",
`timestamp` DATETIME NOT NULL COMMENT "数据灌入的时间戳",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间",
`cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
ENGINE=olap
AGGREGATE KEY(`user_id`, `date`, `timestamp`, `city`, `age`, `sex`)
PARTITION BY LIST(`city`)
(
PARTITION `p_cn` VALUES IN ("Beijing", "Shanghai", "Hong Kong"),
PARTITION `p_usa` VALUES IN ("New York", "San Francisco"),
PARTITION `p_jp` VALUES IN ("Tokyo")
)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 16
分区和分桶
Doris 支持两层的数据划分。
- 第一层称为 Partition,即分区,支持 Range 和 List 的划分方式。用户可以指定某一维度列作为分区列,并指定每个分区的取值范围。Range分区列通常为时间列,以方便的管理新旧数据。List分区值为枚举值。只有当数据为目标分区枚举值其中之一时,才可以命中分区。
- 第二层称为 Bucket(Tablet),即分桶,仅支持Hash 的划分方式。即将同一行数据的不同列分别存储在不同的桶中。这种方式可以将经常一起查询的列存储在同一个桶中,提高查询效率。
官方关于 Partition 和 Bucket 的数量和数据量的建议
- 一个表的 Tablet 总数量等于 (Partition num * Bucket num)。
- 一个表的 Tablet 数量,在不考虑扩容的情况下,推荐略多于整个集群的磁盘数量。
- 单个 Tablet 的数据量理论上没有上下界,但建议在 1G - 10G 的范围内。如果单个 Tablet 数据量过小,则数据的聚合效果不佳,且元数据管理压力大。如果数据量过大,则不利于副本的迁移、补齐,且会增加 Schema Change 或者 Rollup 操作失败重试的代价(这些操作失败重试的粒度是 Tablet)。
- 当 Tablet 的数据量原则和数量原则冲突时,建议优先考虑数据量原则。
- 在建表时,每个分区的 Bucket 数量统一指定。但是在动态增加分区时(ADD PARTITION),可以单独指定新分区的 Bucket 数量。可以利用这个功能方便的应对数据缩小或膨胀。
- 一个 Partition 的 Bucket 数量一旦指定,不可更改。所以在确定 Bucket 数量时,需要预先考虑集群扩容的情况。比如当前只有 3 台 host,每台 host 有 1 块盘。如果 Bucket 的数量只设置为 3 或更小,那么后期即使再增加机器,也不能提高并发度。
- 举一些例子:假设在有10台BE,每台BE一块磁盘的情况下。如果一个表总大小为 500 MB,则可以考虑4-8个分片。5 GB: 8-16个分片。50 GB: 32个分片。500 GB:建议分区,每个分区大小在 50 GB 左右,每个分区16-32个分片。5 TB:建议分区,每个分区大小在 50 GB 左右,每个分区16-32个分片。
动态分区
Doris支持动态分区,对表级别的分区实现生命周期管理(TTL),减少用户的使用负担。
dynamic_partition.参数
三、数据导入导出
数据导入
导入(Load)功能就是将用户的原始数据导入到 Doris 中。导入成功后,用户即可通过Mysql 客户端查询数据。为适配不同的数据导入需求,Doris 系统提供了 6 种不同的导入方式。每种导入方式支持不同的数据源,存在不同的使用方式(异步,同步)。
所有导入方式都支持 csv 数据格式。其中 Broker load 还支持 parquet 和 orc 数据格式。
- Broker load:通过 Broker 进程访问并读取外部数据源(如 HDFS)导入到 Doris。用户通过 Mysql协议提交导入作业后,异步执行。通过 SHOW LOAD 命令查看导入结果。
- Stream load:用户通过 HTTP 协议提交请求并携带原始数据创建导入。主要用于快速将本地文件或数据流中的数据导入到 Doris。导入命令同步返回导入结果。目前 Stream Load 支持两个数据格式:CSV(文本)和 JSON。
- Insert:类似 MySQL 中的 Insert 语句,Doris 提供 INSERT INTO tbl SELECT …; 的方式从Doris 的表中读取数据并导入到另一张表。或者通过 INSERT INTO tbl VALUES(…); 插入单条数据。
- Multi load:用户通过 HTTP 协议提交多个导入作业。Multi Load 可以保证多个导入作业的原子生效。
- Routine load:用户通过 MySQL 协议提交例行导入作业,生成一个常驻线程,不间断的从数据源(如Kafka)中读取数据并导入到 Doris 中。
- 通过S3 协议直接导入:用户通过 S3 协议直接导入数据,用法和 Broker Load 类似。Broker load 是一个异步的导入方式,支持的数据源取决于 Broker 进程支持的数据源。用户需要通过 MySQL 协议创建 Broker load 导入,并通过查看导入命令检查导入结果。
- Binlog Load:提供了一种使Doris增量同步用户在Mysql数据库的对数据更新操作的CDC(Change Data Capture)功能。需要依赖canal作为中间媒介。
数据导出
数据导出是 Doris 提供的一种将数据导出的功能。该功能可以将用户指定的表或分区的数据以文本的格式,通过 Broker 进程导出到远端存储上,如 HDFS/BOS 等。
基本原理:用户提交一个 Export 作业后。Doris 会统计这个作业涉及的所有 Tablet。然后对这些Tablet 进行分组,每组生成一个特殊的查询计划。该查询计划会读取所包含的 Tablet 上的数据,然后通过 Broker 将数据写到远端存储指定的路径中,也可以通过 S3 协议直接导出到支持 S3 协议的远端存储上。
四、Rollup
Rollup概念
- 在 Doris 中,我们将用户通过建表语句创建出来的表成为 Base 表(Base Table),Base
- 表中保存着按用户建表语句指定的方式存储的基础数据。
- 在 Base 表之上,我们可以创建任意多个 ROLLUP 表。这些 ROLLUP 的数据是基于 Base
- 表产生的,并且在物理上是独立存储的。
- ROLLUP 最根本的作用是提高某些查询的查询效率(无论是通过聚合来减少数据量,还是修改列顺序以匹配前缀索引)。
- Schema 中的字段顺序也可与 Base Table 不同。ROLLUP 创建完成之后的触发是程序自动的,不需要任何其他指定或者配置。
- ROLLUP 的数据更新与 Base 表示完全同步的。用户无需关心这个问题。ROLLUP 中列的聚合方式,与 Base 表完全相同。
- 在创建 ROLLUP 无需指定,也不能修改。查询能否命中 ROLLUP 的一个必要条件(非充分条件)是,查询所涉及的所有列(包括
- select list 和 where 中的查询条件列等)都存在于该 ROLLUP 的列中。否则,查询只能命中 Base 表。
Rollup使用
ALTER TABLE example_log ADD ROLLUP rollup1(type, error_code);
# 命中
explain select type, error_code from example_log;
# 不命中
explain select type, timestamp from example_log;
Rollup调整前缀索引
Doris会将一行数据的前36个字节作为这行数据的前缀索引。
Rollup还有一个功能是可以调整索引。
因为建表时已经指定了列顺序,所以一个表只有一种前缀索引。这对于使用其他不能命中前缀索引的列作为条件进行的查询来说,效率上可能无法满足需求。因此,我们可以通过创建 ROLLUP 来人为的调整列顺序。
CREATE TABLE IF NOT EXISTS test_db.example_rollup_index
(
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`age` SMALLINT COMMENT "用户年龄",
`message` varchar(100) COMMENT "信息",
`max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
AGGREGATE KEY(`user_id`, `age`, `message`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 10;
-- 创建Rollup
ALTER TABLE test_db.example_rollup_index ADD ROLLUP rollup_city(age, user_id,message,max_dwell_time,min_dwell_time);
-- 通过命令查看完成状态
SHOW ALTER TABLE ROLLUP;
-- 查看是否命中ROLLUP
explain SELECT * FROM test_db.example_rollup_index where age=20 and message LIKE "%error%";
五、物化视图
使用场景:
在实际的业务场景中,通常存在两种场景并存的分析需求:对固定维度的聚合分析 和 对原始明细数据任意维度的分析。
例如,在销售场景中,每条订单数据包含这几个维度信息(item_id, sold_time, customer_id, price)。在这种场景下,有两种分析需求并存:
- 业务方需要获取某个商品在某天的销售额是多少,那么仅需要在维度(item_id, sold_time)维度上对 price 进行聚合即可。
- 分析某个人在某天对某个商品的购买明细数据。
物化视图的出现主要是为了满足用户,既能对原始明细数据的任意维度分析,也能快速的对固定维度进行分析查询的需求。
相关命令:
1、查看该database下的所有物化视图
- SHOW MATERIALIZED VIEW [IN|FROM db_name]
2、查看指定物化视图的表结构
- DESC table_name all
3、查看物化视图处理进度
- SHOW ALTER MATERIALIZED VIEW FROM db_name
4、取消正在创建的物化视图
- CANCEL ALTER MATERIALIZED VIEW FROM db_name.table_name
六、Doris查询
Doris 的 Join 性能十分优异,当 Doris 集群完成业务数据的全同步后,我们对 1亿 和近百亿的两张表进行 Join 操作,可以在 5 秒内输出数据结果,相较之前有接近 300 倍的查询速度提升。为了验证其是否偶然,我们接着对 10 张分别有 4000万数据的表进行 Join,Doris 可以在 10s 内返回查询结果。而在物化视图加持下,可以达到毫秒级别的查询响应。这充分说明了 Doris 在处理大规模数据的优势。
作为分布式的MPP数据库,在Join的过程中是需要进行数据的Shuffle,数据需要拆分调度,才能保证最终的Join结果是正确的。
目前Doris支持的Join方式有 Broadcast Join、Shuffle Join、Bucket Shuffle Join 和Colocate Join 这4种,这4种方式灵活度和适用性是从高到低的,对数据分布的要求越来越严,但Join计算的性能则通过降低网络开销而越来越好。
Join 方式的选择是FE生成分布式计划阶段会考虑的事项之一。在 FE 进行分布式计划时,优先选择的顺序为:Colocate Join -> Bucket Shuffle Join -> Broadcast Join -> Shuffle Join。很明显,Colocate 以及 Bucket Shuffle 是可遇不可求的。当无法使用它们时,Doris会自动尝试进行 Broadcast Join(大表join 小表),如果预估小表过大则会自动切换至 Shuffle Join。
Broadcast Join:通过将B表的数据全量广播到A表的机器上,在A表的机器上进行Join操作,相比较于Shuffle join 节省了A表数据Shuffle,但是B表的数据是全量广播,适合B表是个小表的场景。
Shuffle Join:根据表A和表B执行join的列进行hash,相同hash值的数据分发到同一个节点上。它的网络开销会比较大。
Bucket Shuffle Join:在Broadcast的基础上进一步优化,将B表按照A表的分布方式,Shuffle到A表机器上进行Join操作,B表Shuffle的数据量全局只有一份,比Broadcast少传输了很多倍数量。
Colocation Join:Bucket Shuffle Join相似,通过建表时指定 A 表和B表是同一个 Colocate Group,确保 A、B 表的数据分布完全一致,那么,计算节点只需做本地 Join,减少跨节点的数据移动和网络传输开销,提高查询性能。Colocate Join 十分适合几张大表按照相同字段分桶的场景,这样可以将数据预先存储到相同的分桶中,实现本地计算。所以它的网络开销是0,数据已经预先分区,直接在本地进行Join 计算。
七、Hive外表
Hive是基于Hadoop的数据仓库工具,它使用Hadoop分布式文件系统(HDFS)来存储数据。Doris也是基于Hadoop的,但它不使用HDFS来存储数据,而是使用自己的列式存储引擎。这样可以避免HDFS的复杂性和低效性,提高数据存储和查询的效率。
然而,Doris与Hive并不是互斥的关系。实际上,我们可以将Doris作为Hive的外部表来使用。这样一来,我们既可以享受Doris高效的查询能力,又可以利用Hive的生态系统来进行数据处理和分析。