前言
针对PostgreSQL进行压缩,有很多相关的工具。有同学又要问了,为何还要再搞一个?比如,pgbench, sysbench之类的,已经很强大了。是的,它们都很强大。但有时候,在一些特殊的场景,可能自己构造一个更能接近真实的生产环境。
这里,我半写,半借助于ChatGPT,搞出一个代码片段来模拟启动一段多线程并发SQL请求,作用于PostgreSQL数据库。然后,你可以对请求执行完以后的结果进行观测,尤其是表膨胀,受影响记录条数之类的。
基于此,我们还可以进行持续改造,快速用于工作之中。
实作
需求:
实现一段代码,读取一个sql文件,然后分段分批执行,并且是以多线程(比如10个线程,go里边可能就是协程,非常高效)去执行这个SQL中的所有SQL语句。再加一个时间限制,比如持续执行120秒。
实现:
package main
import (
"bufio"
"context"
"database/sql"
"fmt"
"io"
"os"
"strings"
"sync"
"time"
_ "github.com/lib/pq"
)
const (
host = "localhost"
port = 5555
user = "postgres"
password = "password"
dbname = "mydb"
)
func execute_sqls(ctx context.Context, sqls []string, wg *sync.WaitGroup, thread int) {
defer wg.Done()
psqlInfo := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", host, port, user, password, dbname)
db, err := sql.Open("postgres", psqlInfo)
if err != nil {
panic(err)
}
defer db.Close()
start := time.Now()
for {
for _, sql := range sqls {
select {
case <-ctx.Done():
elapsed := time.Since(start)
fmt.Printf("Thread %d stopped. It executed SQLs for %s \n", thread, elapsed)
return
default:
_, err := db.Exec(sql)
if err != nil {
fmt.Println(err)
}
}
}
}
}
func read_sqls(file string) []string {
f, err := os.Open(file)
if err != nil {
panic(err)
}
defer f.Close()
sqls := make([]string, 0)
r := bufio.NewReader(f)
for {
line, err := r.ReadString(';')
if err == io.EOF {
break
} else if err != nil {
panic(err)
}
sql := strings.TrimSpace(line)
if sql != "" {
sqls = append(sqls, sql)
}
}
return sqls
}
func main() {
filepath := "file.sql" // Replace with your file path
numThreads := 10 // Number of threads
sqls := read_sqls(filepath)
var wg sync.WaitGroup
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) // 60 seconds
for i := 0; i < numThreads; i++ {
wg.Add(1)
go execute_sqls(ctx, sqls, &wg, i)
}
wg.Wait()
cancel()
fmt.Println("All goroutines stopped")
}
上边的代码,关于输入文件:file.sql, 线程数:10, 运行时间:60秒,都是硬编码进去的。你可以根据实际情况,进行参数化。
体验:
在你的go环境已经安装了"github.com/lib/pq"等必备包之后(go get github.com/lib/pq
),就可以直接执行了。我们准备一个pg的基本环境。database: mydb, 端口:5555, 就用postgres用户及相应密码(仅用于测试目的),不缀述。
目标表的准备:
\c mydb
create table t(id int, col2 varchar(32));
file.sql文件内容如下:
insert into t values ((10000*random())::int, md5(random()::varchar));
with updates as (select (10000*random())::int as id) update t set col2 = 'update' || updates.id from updates where t.id=updates.id returning updates.id;
这个测试的代码片段,就是插入一条随机记录,并且再随机更新一条记录,使用CTE语法,把对应的id值返回来,有可能找不到对应的记录,就返回的是空值。在并发大的情况下,update语句慢慢就起作用了。这样就可以反复执行。
来看看效果:
go run ./stress.go
hread 7 stopped. It executed SQLs for 59.999324s
Thread 1 stopped. It executed SQLs for 1m0.000756208s
Thread 2 stopped. It executed SQLs for 1m0.000604792s
Thread 4 stopped. It executed SQLs for 1m0.001703583s
Thread 0 stopped. It executed SQLs for 1m0.008518875s
Thread 9 stopped. It executed SQLs for 1m0.008456083s
Thread 5 stopped. It executed SQLs for 1m0.007964375s
Thread 6 stopped. It executed SQLs for 1m0.007968292s
Thread 3 stopped. It executed SQLs for 1m0.008145042s
Thread 8 stopped. It executed SQLs for 1m0.008202209s
All goroutines stopped
1分钟跑完之后,我们看到这样的部分记录结果:
mydb=# select * from t limit 10;
id | col2
------+------------
4792 | update4792
3416 | update3416
9290 | update9290
887 | update887
8778 | update8778
7472 | update7472
4602 | update4602
3454 | update3454
2604 | update2604
1990 | update1990
(10 rows)
总记录条数:
mydb=# select count(*) from t;
count
--------
126056
(1 row)
引申:可以认为单个C+U操作,10个线程并发,1分钟入库12.6万。
表大小:
mydb=# select pg_total_relation_size('t');
pg_total_relation_size
------------------------
8372224
(1 row)
使用下边的SQL看看相关指标:
WITH cteTableInfo AS
(
SELECT
COUNT(1) AS ct
,SUM(length(t::text)) AS TextLength
,'public.t'::regclass AS TableName
FROM public.t AS t
)
,cteRowSize AS
(
SELECT ARRAY [pg_relation_size(TableName)
, pg_relation_size(TableName, 'vm')
, pg_relation_size(TableName, 'fsm')
, pg_table_size(TableName)
, pg_indexes_size(TableName)
, pg_total_relation_size(TableName)
, TextLength
] AS val
, ARRAY ['Relation Size'
, 'Visibility Map'
, 'Free Space Map'
, 'Table Included Toast Size'
, 'Indexes Size'
, 'Total Relation Size'
, 'Live Row Byte Size'
] AS Name
FROM cteTableInfo
)
SELECT
unnest(name) AS Description
,unnest(val) AS Bytes
,pg_size_pretty(unnest(val)) AS BytesPretty
,unnest(val) / ct AS bytes_per_row
FROM cteTableInfo, cteRowSize
UNION ALL SELECT '------------------------------', NULL, NULL, NULL
UNION ALL SELECT 'TotalRows', ct, NULL, NULL FROM cteTableInfo
UNION ALL SELECT 'LiveTuples', pg_stat_get_live_tuples(TableName), NULL, NULL FROM cteTableInfo
UNION ALL SELECT 'DeadTuples', pg_stat_get_dead_tuples(TableName), NULL, NULL FROM cteTableInfo;
结果:
description | bytes | bytespretty | bytes_per_row
--------------------------------+---------+-------------+---------------
Relation Size | 8339456 | 8144 kB | 66
Visibility Map | 8192 | 8192 bytes | 0
Free Space Map | 24576 | 24 kB | 0
Table Included Toast Size | 8372224 | 8176 kB | 66
Indexes Size | 0 | 0 bytes | 0
Total Relation Size | 8372224 | 8176 kB | 66
Live Row Byte Size | 2338451 | 2284 kB | 18
------------------------------ | | |
TotalRows | 126056 | |
LiveTuples | 126056 | |
DeadTuples | 12274 | |
(11 rows)
里边有涉及到的死元组为12274行。
mydb=# create extension pgstattuple;
CREATE EXTENSION
mydb=# select * from pgstattuple('public.t') \gx
-[ RECORD 1 ]------+--------
table_len | 8339456
tuple_count | 126056
tuple_len | 5125646
tuple_percent | 61.46
dead_tuple_count | 12110
dead_tuple_len | 483172
dead_tuple_percent | 5.79
free_space | 1451672
free_percent | 17.41
这两种统计结果也都比较接近。
当你针对相同的表,进行随机多次测试,发现上边的值也会不断变化(update的命中率会越来越高)。
总结:
本文的目的,只是作一个抛砖引玉,可以随时使用go, python甚至rust去构建一个小的压缩环境,对各种复杂的压力环境进行模拟,并得出相关结论。当然,作为一个团队,可以开发出使用Java之类的接近业务逻辑的工具也是可以的。也有的测试团队,原意使用JMeter + jdbc来构建测试套集,都不失为一种方式。这类工具,是介于pgbench 和 真实业务场景压测之间的一种使用方式。哪个更方便,就可以用哪个。
上边的代码片段,稍加改造,就可以用到实际的实验当中。
关于表膨胀,可以看看我前边的文章:
也聊聊PostgreSQL中的空间膨胀与AutoVacuum
PG中的一例简单的update看表膨胀