相关
《Postgresql源码(125)游标恢复执行的原理分析》
《Postgresql游标使用介绍(cursor)》
总结
- 开源PG中使用tuple store来缓存tuple集,默认使用work_mem空间存放,超过可以落盘。
- 在PL的returns setof场景 和 loop内commit的场景 会使用tuple store暂存元组。
- tuple store的使用方法
- 配置dest reveiver为tuple store:CreateDestReceiver / SetTuplestoreDestReceiverParams
- 执行:ExecutorRun
- 使用tuple store:
- PL返回的tuplestore拿tuple:FunctionNext → tuplestore_gettupleslot
- 游标hold后从tuplestore拿tuple:RunFromStore → tuplestore_gettupleslot
TupleStore使用场景一:RETURNS SETOF函数
这个场景的惯用法如下:
// 1. 创建Dest Receiver
treceiver = CreateDestReceiver(DestTuplestore);
// 2 关键函数,配置TupleStore
SetTuplestoreDestReceiverParams(treceiver, tStore, .... ...)
// 3 treceiver传入执行器,调用执行器,结果进入tStore
// 4 清理DestReceiver
// 5 从tStore取出结果使用
用例
create table users(id int, name text, active bool);
insert into users values(1, 'a', true);
insert into users values(2, 'b', false);
insert into users values(3, 'c', true);
CREATE OR REPLACE FUNCTION get_active_users() RETURNS SETOF users AS $$
BEGIN
RETURN QUERY SELECT * FROM users WHERE active = true;
END;
$$ LANGUAGE plpgsql;
select * from get_active_users();
执行结果
postgres=# select * from get_active_users();
id | name | active
----+------+--------
1 | a | t
3 | c | t
(2 rows)
1 exec_stmt_return_query
执行时,会走到exec_stmt_return_query中获取执行结果:
调用SPI_execute_plan_extended执行后,可以看到tstore中有了两条结果。
当前堆栈:
(gdb) bt
#0 exec_stmt_return_query (estate=0x7ffc569f6950, stmt=0x17fe708) at pl_exec.c:3655
#1 0x00007fbaa37bc602 in exec_stmts (estate=0x7ffc569f6950, stmts=0x17fec18) at pl_exec.c:2079
#2 0x00007fbaa37bc229 in exec_stmt_block (estate=0x7ffc569f6950, block=0x17fec68) at pl_exec.c:1942
#3 0x00007fbaa37bba2b in exec_toplevel_block (estate=0x7ffc569f6950, block=0x17fec68) at pl_exec.c:1633
#4 0x00007fbaa37b99be in plpgsql_exec_function (func=0x16d72f0, fcinfo=0x17fb7d0, simple_eval_estate=0x0, simple_eval_resowner=0x0, procedure_resowner=0x0, atomic=true) at pl_exec.c:622
#5 0x00007fbaa37d4151 in plpgsql_call_handler (fcinfo=0x17fb7d0) at pl_handler.c:277
#6 0x000000000075c75d in ExecMakeTableFunctionResult (setexpr=0x1763e68, econtext=0x1763d38, argContext=0x17fb6d0, expectedDesc=0x1764138, randomAccess=false) at execSRF.c:234
#7 0x0000000000777ca7 in FunctionNext (node=0x1763b28) at nodeFunctionscan.c:94
#8 0x000000000075df9f in ExecScanFetch (node=0x1763b28, accessMtd=0x777bf5 <FunctionNext>, recheckMtd=0x777fff <FunctionRecheck>) at execScan.c:131
#9 0x000000000075e014 in ExecScan (node=0x1763b28, accessMtd=0x777bf5 <FunctionNext>, recheckMtd=0x777fff <FunctionRecheck>) at execScan.c:180
#10 0x0000000000778049 in ExecFunctionScan (pstate=0x1763b28) at nodeFunctionscan.c:269
#11 0x0000000000759f16 in ExecProcNodeFirst (node=0x1763b28) at execProcnode.c:464
#12 0x000000000074dccf in ExecProcNode (node=0x1763b28) at ../../../src/include/executor/executor.h:274
#13 0x00000000007507cc in ExecutePlan (estate=0x1763900, planstate=0x1763b28, use_parallel_mode=false, operation=CMD_SELECT, sendTuples=true, numberTuples=0, direction=ForwardScanDirection, dest=0x17df418, execute_once=true) at execMain.c:1646
#14 0x000000000074e367 in standard_ExecutorRun (queryDesc=0x17dfb90, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:363
#15 0x000000000074e17b in ExecutorRun (queryDesc=0x17dfb90, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:304
#16 0x00000000009e6e62 in PortalRunSelect (portal=0x1787ba0, forward=true, count=0, dest=0x17df418) at pquery.c:924
#17 0x00000000009e6b20 in PortalRun (portal=0x1787ba0, count=9223372036854775807, isTopLevel=true, run_once=true, dest=0x17df418, altdest=0x17df418, qc=0x7ffc569f7180) at pquery.c:768
#18 0x00000000009e06a2 in exec_simple_query (query_string=0x16dd080 "select * from get_active_users();") at postgres.c:1274
#19 0x00000000009e4cfb in PostgresMain (dbname=0x16d7910 "postgres", username=0x1715bf8 "mingjie") at postgres.c:4680
#20 0x00000000009dd07f in BackendMain (startup_data=0x7ffc569f748c "", startup_data_len=4) at backend_startup.c:101
#21 0x000000000090be72 in postmaster_child_launch (child_type=B_BACKEND, startup_data=0x7ffc569f748c "", startup_data_len=4, client_sock=0x7ffc569f74b0) at launch_backend.c:265
#22 0x0000000000911702 in BackendStartup (client_sock=0x7ffc569f74b0) at postmaster.c:3593
#23 0x000000000090ebf3 in ServerLoop () at postmaster.c:1674
#24 0x000000000090e5ad in PostmasterMain (argc=1, argv=0x16d58c0) at postmaster.c:1372
#25 0x00000000007d20ac in main (argc=1, argv=0x16d58c0) at main.c:197
2 exec_stmt_return
继续执行到exec_stmt_return,返回值是SET不需要干活。
3 plpgsql_exec_function
在plpgsql_exec_function最后,处理刚才保存在tstore里面的元组:
注意这里的estate->rsi指向的是fcinfo->resultinfo,在这里配置的:
这样结果数据就可以通过function调用框架返回给SQL层了。
TupleStore使用场景二:游标持久化(TupleStore作为Dest Receiver)
两条路径会使用到游标持久化的功能:
- 第一种是创SQL层游标时,使用with hold语法,游标可以跨多个事务存在。
- 第二种是循环体内执行commit,这时候循环游标正常会跟着最近一层事务被删掉,但循环还需要继续执行,所以需要hold给循环游标续命。
后面分析下第二种场景,循环游标提交hold。
用例
drop procedure tproc1;
CREATE OR REPLACE PROCEDURE tproc1() AS $$
DECLARE
rec1 record;
BEGIN
for rec1 in select t.a from generate_series(1,10) t(a)
loop
raise notice '%', rec1.a;
commit;
raise notice '%', rec1.a;
end loop;
END;
$$ LANGUAGE plpgsql;
call tproc1();
结果
postgres=# call tproc1();
NOTICE: 1
NOTICE: 1
NOTICE: 2
NOTICE: 2
NOTICE: 3
NOTICE: 3
NOTICE: 4
NOTICE: 4
NOTICE: 5
NOTICE: 5
NOTICE: 6
NOTICE: 6
NOTICE: 7
NOTICE: 7
NOTICE: 8
NOTICE: 8
NOTICE: 9
NOTICE: 9
NOTICE: 10
NOTICE: 10
CALL
1 HoldPinnedPortals
2 HoldPortal
3 HoldPortal→PortalCreateHoldStore
- 创建Context,切换过去。
- 在新上下文中,执行tuplestore的初始化。
void
PortalCreateHoldStore(Portal portal)
{
MemoryContext oldcxt;
portal->holdContext =
AllocSetContextCreate(TopPortalContext,
"PortalHoldContext",
ALLOCSET_DEFAULT_SIZES);
oldcxt = MemoryContextSwitchTo(portal->holdContext);
portal->holdStore =
tuplestore_begin_heap(portal->cursorOptions & CURSOR_OPT_SCROLL,
true, work_mem);
MemoryContextSwitchTo(oldcxt);
}
3 HoldPortal→PersistHoldablePortal
void
PersistHoldablePortal(Portal portal)
{
...
tuplestore需要再自己的上下文中保存一份desc。
oldcxt = MemoryContextSwitchTo(portal->holdContext);
portal->tupDesc = CreateTupleDescCopy(portal->tupDesc);
MemoryContextSwitchTo(oldcxt);
标记portal正在干活:READY → ACTIVE
MarkPortalActive(portal);
...
...
PG_TRY();
{
ScanDirection direction = ForwardScanDirection;
ActivePortal = portal;
if (portal->resowner)
CurrentResourceOwner = portal->resowner;
PortalContext = portal->portalContext;
MemoryContextSwitchTo(PortalContext);
PushActiveSnapshot(queryDesc->snapshot);
...
创建tuple store的dest receiver并开始执行:
queryDesc->dest = CreateDestReceiver(DestTuplestore);
SetTuplestoreDestReceiverParams(queryDesc->dest,
portal->holdStore,
portal->holdContext,
true,
NULL,
NULL);
注意这里ExecutorRun的第三个参数是0,表示拿完为止。和游标fetch是有区别的,fetch一次这里会传入1,只拿一条。
参考这篇:《Postgresql源码(125)游标恢复执行的原理分析》
继续分析:
ExecutorRun(queryDesc, direction, 0, false);
queryDesc->dest->rDestroy(queryDesc->dest);
queryDesc->dest = NULL;
portal->queryDesc = NULL; /* prevent double shutdown */
ExecutorFinish(queryDesc);
ExecutorEnd(queryDesc);
FreeQueryDesc(queryDesc);
MemoryContextSwitchTo(portal->holdContext);
执行后tuples=9,之前已经查出来一条,现在把剩下的9条都拿到了。
(gdb) p *portal->holdStore $6 = {status = TSS_INMEM, eflags = 4, backward = false, interXact = true, truncated = false, availMem = 8371768, allowedMem = 8388608, tuples = 9, myfile = 0x0, context = 0x1855da0, resowner = 0x1717530, copytup = 0xc0051f <copytup_heap>, writetup = 0xc0056a <writetup_heap>, readtup = 0xc00635 <readtup_heap>, memtuples = 0x1821e68, memtupdeleted = 0, memtupcount = 9, memtupsize = 2048, growmemtuples = true, readptrs = 0x1855fb0, activeptr = 0, readptrcount = 1, readptrsize = 8, writepos_file = 0, writepos_offset = 0}
...
...
}
...
PG_END_TRY();
MemoryContextSwitchTo(oldcxt);
portal用完了把状态改回来:
/* Mark portal not active */
portal->status = PORTAL_READY;
...
...
}
至此,游标内容已经全部进入tuple store。
SPI_cursor_fetch
commit后,在循环中会继续fetch游标,这里一个较大的区别时,不在执行portal拿结果了,从tuple store拿一行即可:
这里使用tuplestore_gettupleslot从tuple里面拿数据:
static uint64
RunFromStore(Portal portal, ScanDirection direction, uint64 count,
DestReceiver *dest)
{
...
...
ok = tuplestore_gettupleslot(portal->holdStore, forward, false, slot);
...
...
return current_tuple_count;
}