本文字数:6782;估计阅读时间:17 分钟
作者:Dale McDiarmid
审校:庄晓东(魏庄)
本文在公众号【ClickHouseInc】首发
本月初,Decodable 公司的 Gunnar Morling 提出了一项为期一月挑战,引起了广泛关注 - 编写一个 Java 程序,从一个包含十亿行信息的文本文件中检索温度测量值,并计算每个气象站的最低、平均和最高温度。虽然我们并非 Java 专家,但作为一家热衷于大数据和速度基准测试的公司,我们决定正式组织一个 ClickHouse 的团队来加入这一挑战!
尽管最初的挑战是使用 Java,Gunnar 开设了一个名为 "Show & Tell" 的 Github 讨论,以允许更广泛的技术贡献。我们也要感谢我们的社区成员,他们也响应了这一挑战。
遵循规则
在回应这一挑战时,我们尽量保持对最初挑战的认同。因此,我们在最终提交中包括了所有处理时间或数据加载时间。例如,仅提供表加载完成后的查询响应时间,而不考虑插入时间,感觉有点像……嗯,作弊 :)
Gunnar 在 Hetzner AX161 上执行测试,限制执行核心数为 8。尽管我很想为这项互联网挑战,专门购置一台专用的裸金属服务器,但我们感觉这可能有点过了。为了尽可能保持可比性,我们的示例使用 Hetzer 虚拟实例(专用 CPU),具有 8 个核心和 32GB 的内存。虽然是虚拟实例,但这些实例使用了 AMD EPYC-Milan 处理器,采用了 Zen3 架构 - 晚于 Hetzner AX161 提供的 AMD EPYC-Rome 7502P 处理器。
生成数据「下载」
用户可以按照原始说明生成 10 亿行的数据集。这需要 Java 21 并且需要运行一些命令。
在编写这篇博客时,我发现了 sdkman(https://sdkman.io/jdks),它简化了 Java 的安装过程,适用于那些没有预安装Java的人。
然而,生成 13GB 的 measurements.txt 文件相当慢,如下所示:
# clone and build generation tool. Output omitted.
git clone git@github.com:gunnarmorling/1brc.git
./mvnw clean verify
./create_measurements.sh 1000000000
Created file with 1,000,000,000 measurements in 395955 ms
出于 ClickHouse Local 在生成此文件方面速度的好奇,我们查看了源代码。站点及其平均温度的列表编译进了代码中,通过从均值和方差为 10 的高斯分布中采样来产生随机点。将原始站点数据提取到 CSV 文件中,并将其托管在 s3 上,使我们能够使用 INSERT INTO FUNCTION FILE 复制此逻辑。请注意使用 s3 函数将我们的站点读入 CTE,然后使用随机函数对这些结果进行采样。
INSERT INTO FUNCTION file('measurements.csv', CustomSeparated)
WITH (
SELECT groupArray((station, avg)) FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/1brc/stations.csv')
) AS averages
SELECT
averages[floor(randUniform(1, length(averages)))::Int64].1 as city,
round(averages[floor(randUniform(1, length(averages)))::Int64].2 + (10 * SQRT(-2 * LOG(randCanonical(1))) * COS(2 * PI() * randCanonical(2))), 2) as temperature
FROM numbers(1_000_000_000)
SETTINGS format_custom_field_delimiter=';', format_custom_escaping_rule='Raw'
0 rows in set. Elapsed: 57.856 sec. Processed 1.00 billion rows, 8.00 GB (17.28 million rows/s., 138.27 MB/s.)
Peak memory usage: 36.73 MiB.
以6.8倍的速度,这似乎值得分享!
一个经验丰富的 ClickHouse 用户可能会在这里使用 randNormal 函数。不幸的是,目前这需要均值和方差是常数。因此,我们使用 randCanonical 函数,并使用它来通过 Muller 变换对 Guassian 分布进行采样。
或者,用户可以直接从这里下载我们生成的文件的 gzip 压缩版本 :)「https://datasets-documentation.s3.eu-west-3.amazonaws.com/1brc/measurements.txt.gz」
仅限 ClickHouse Local
虽然许多用户熟悉 ClickHouse 作为部署在服务器上的实时数据仓库,但它也可以作为本地二进制文件 "Clickhouse Local" 用于对文件进行按需的数据分析查询。自从我们在一年多前的博客中描述了这个用例以来,这已经成为 ClickHouse 越来越受欢迎的应用。
ClickHouse Local 有控制台模式(通过运行 clickhouse local 可以访问),从中可以创建表并提供交互式查询反馈,还有一个专为与脚本和外部工具集成而设计的命令行界面。我们使用后者来对我们的 measurements.txt 进行采样。设置 `format_csv_delimiter=';'` 允许指定 CSV 文件的分隔符。
clickhouse local --query "SELECT city, temperature FROM file('measurements.txt', CSV, 'city String, temperature DECIMAL(8,1)') LIMIT 5 SETTINGS format_csv_delimiter=';'"
Mexicali 44.8
Hat Yai 29.4
Villahermosa 27.1
Fresno 31.7
Ouahigouya 29.3
计算每个城市的温度的最低值、最高值和平均值需要一个简单的 GROUP BY 查询。我们使用 -t 来确保包含时间信息。挑战要求以特定格式输出:
{Abha=-23.0/18.0/59.2, Abidjan=-16.2/26.0/67.3, Abéché=-10.0/29.4/69.0, Accra=-10.1/26.4/66.4, Addis Ababa=-23.7/16.0/67.0, Adelaide=-27.8/17.3/58.5, ...}
为了实现这一点,我们可以使用 CustomSeparated 输出格式和 format 函数。这避免了使用任何函数,比如 groupArray,它将行折叠成一行。下面,我们使用 ClickHouse Local 的控制台模式。
SELECT format('{}={}/{}/{}', city, min(temperature), round(avg(temperature), 2), max(temperature))
FROM file('measurements.txt', CSV, 'city String, temperature DECIMAL(8,1)')
GROUP BY city
ORDER BY city ASC
FORMAT CustomSeparated
SETTINGS
format_custom_result_before_delimiter = '{',
format_custom_result_after_delimiter = '}',
format_custom_row_between_delimiter = ', ',
format_custom_row_after_delimiter = '',
format_csv_delimiter = ';'
{Abha=-34.6/18/70.3, Abidjan=-22.8/25.99/73.5, Abéché=-25.3/29.4/80.1, Accra=-25.6/26.4/76.8, Addis Ababa=-38.3/16/67, Adelaide=-33.4/17.31/65.5, …}
413 rows in set. Elapsed: 27.671 sec. Processed 1.00 billion rows, 13.79 GB (36.14 million rows/s., 498.46 MB/s.)
Peak memory usage: 47.46 MiB.
27.6秒代表我们的基准测试。与 Java 基准相比,在相同的硬件上完成几乎需要 3 分钟。
./calculate_average_baseline.sh
real 2m59.364s
user 2m57.511s
sys 0m3.372s
提高性能
我们可以通过观察到我们的 CSV 文件不使用值转义来提高上述性能。因此,CSV 读取器是不必要的 - 我们可以简单地将每一行作为字符串读取,并使用分隔符 ; 访问相关的子字符串。
SELECT format('{}={}/{}/{}', city, min(temperature), round(avg(temperature), 2), max(temperature))
FROM
(
SELECT
substringIndex(line, ';', 1) AS city,
substringIndex(line, ';', -1)::Decimal(8, 1) AS temperature
FROM file('measurements.txt', LineAsString)
)
GROUP BY city
ORDER BY city ASC FORMAT CustomSeparated
SETTINGS
format_custom_result_before_delimiter = '{',
format_custom_result_after_delimiter = '}',
format_custom_row_between_delimiter = ', ',
format_custom_row_after_delimiter = '',
format_csv_delimiter = ';'
413 rows in set. Elapsed: 19.907 sec. Processed 1.00 billion rows, 13.79 GB (50.23 million rows/s., 692.86 MB/s.)
Peak memory usage: 132.20 MiB.
这将我们的执行时间减少到不到 20 秒!
测试其他方法
我们的 ClickHouse Local 方法对文件执行完整的线性扫描。在这里的另一种替代方法可能是在运行文件上的查询之前,先将文件加载到表中。也许并不奇怪,这并没有真正提高性能,因为查询实际上对数据进行了第二次扫描。因此,总加载和查询时间超过了 19 秒。
CREATE TABLE weather
(
`city` String,
`temperature` Decimal(8, 1)
)
ENGINE = Memory
INSERT INTO weather SELECT
city,
temperature
FROM
(
SELECT
splitByChar(';', line) AS vals,
vals[1] AS city,
CAST(vals[2], 'Decimal(8, 1)') AS temperature
FROM file('measurements.txt', LineAsString)
)
0 rows in set. Elapsed: 21.219 sec. Processed 1.00 billion rows, 13.79 GB (47.13 million rows/s., 650.03 MB/s.)
Peak memory usage: 26.16 GiB.
SELECT
city,
min(temperature),
avg(temperature),
max(temperature)
FROM weather
GROUP BY city
ORDER BY city ASC
SETTINGS max_threads = 8
413 rows in set. Elapsed: 2.997 sec. Processed 970.54 million rows, 20.34 GB (323.82 million rows/s., 6.79 GB/s.)
Peak memory usage: 484.27 KiB.
请注意,我们在这里使用的是 Memory 表而不是经典的 MergeTree。鉴于数据集适应内存,查询不包含筛选条件(因此无法从 MergeTree 的稀疏索引中受益),我们可以使用这种引擎类型避免 I/O。
以上的明显好处是一旦数据加载到表中,用户就可以对数据发出任意查询。
最后,如果我们的目标查询计算最低值、最高值和平均值不够高效,我们可以将这项工作移到插入时使用 Materialized View。在这种情况下,一个名为 weather_mv 的 Materialized View 在数据插入时计算我们的统计信息。更具体地说,我们先前的聚合查询在数据块被插入时执行,结果(实际上是聚合状态)被发送到目标表 "weather_results",使用 AggregatingMergeTree 表引擎。对该表的查询将利用结果已经预先计算,从而实现了显着的更快的执行时间。
作为一种优化,接收我们数据的 weather 表可以使用 Null 引擎。这将导致行被丢弃,节省内存。
CREATE TABLE weather
(
`city` String,
`temperature` Decimal(8, 1)
)
ENGINE = Null
CREATE TABLE weather_results(
city String,
max AggregateFunction(max, Decimal(8, 1)),
min AggregateFunction(min, Decimal(8, 1)),
avg AggregateFunction(avg, Decimal(8, 1))
) ENGINE = AggregatingMergeTree ORDER BY tuple()
CREATE MATERIALIZED VIEW weather_mv TO weather_results
AS SELECT city, maxState(temperature) as max, minState(temperature) as min, avgState(temperature) as avg
FROM weather
GROUP BY city
INSERT INTO weather SELECT
city,
temperature
FROM
(
SELECT
splitByChar(';', line) AS vals,
vals[1] AS city,
CAST(vals[2], 'Decimal(8, 1)') AS temperature
FROM file('measurements.txt', LineAsString)
)
0 rows in set. Elapsed: 26.569 sec. Processed 2.00 billion rows, 34.75 GB (75.27 million rows/s., 1.31 GB/s.)
我们对 weather_results 的后续查询需要使用 merge- 函数来组合我们的聚合状态。
SELECT format('{}={}/{}/{}', city, minMerge(min), round(avgMerge(avg), 2), maxMerge(max))
FROM weather_results
GROUP BY city
ORDER BY city ASC
FORMAT CustomSeparated
SETTINGS format_custom_result_before_delimiter = '{', format_custom_result_after_delimiter = '}', format_custom_row_between_delimiter = ', ', format_custom_row_after_delimiter = '', format_csv_delimiter = ';'
413 rows in set. Elapsed: 0.014 sec.
这给我们提供了一个正常的执行时间,这已经被其他实验报告过。然而,当结合我们的 26 秒加载时间时,我们仍然无法超过简单的用 ClickHouse Local 查询。
结论
我们已经官方回应了这个十亿行挑战。我们已经证明了:在与挑战规则相当的硬件上,ClickHouse Local 可以在约 19 秒内解决问题。虽然它并不能与专业解决方案相比,但这只需要你写几行 SQL。我们要借此机会感谢 Gunnar 在这个挑战中投入的努力和时间。
联系我们
手机号:13910395701
邮箱:Tracy.Wang@clickhouse.com
满足您所有的在线分析列式数据库管理需求