前言
最近和朋友讨论oracle行级安全策略(VPD)时,查看了下官方文档,看起来VPD的原理是针对应用了Oracle行级安全策略的表、视图或同义词发出的 SQL 语句动态添加where子句。通俗理解就是将行级安全策略动态添加为where 条件。那么PG中的行级安全策略是怎么处理的呢?
行级安全简介
行级安全策略(Row Level Security)是更细粒度的数据安全控制策略。行级策略可以根据每个用户限制哪些行可以通过常规查询返回,哪些行可以通过数据修改命令插入、更新或删除。默认情况下,表没有任何行级安全策略,因此如果用户根据 SQL 权限系统具有表的访问权限,则其中的所有行都可以平等地用于查询或更新。
在PG中我们可以创建行级策略,在SQL执行时行级策略表达式将作为查询的一部分运行。
https://www.postgresql.org/docs/16/ddl-rowsecurity.html
行级安全演示
创建3个用户
postgres=# create user admin;
CREATE ROLE
postgres=# create user peter;
CREATE ROLE
postgres=# create user bob;
CREATE ROLE
创建一个rlsdb数据库
postgres=# create database rlsdb owner admin;
CREATE DATABASE
在rlsdb中使用admin用户创建表employee,并插入3个用户对应的数据
postgres=# \c rlsdb admin
You are now connected to database "rlsdb" as user "admin".
rlsdb=> create table employee ( empno int, ename text, address text, salary int, account_number text );
CREATE TABLE
rlsdb=> insert into employee values (1, 'admin', '2 down str', 80000, 'no0001' );
INSERT 0 1
rlsdb=> insert into employee values (2, 'peter', '132 south avn', 60000, 'no0002' );
INSERT 0 1
rlsdb=> insert into employee values (3, 'bob', 'Down st 17th', 60000, 'no0003' );
INSERT 0 1
rlsdb=>
授权后,三个用户都能看到employee表的所有数据
rlsdb=> grant select on table employee to peter;
GRANT
rlsdb=> grant select on table employee to bob;
GRANT
rlsdb=> select * from employee;
empno | ename | address | salary | account_number
-------+-------+---------------+--------+----------------
1 | admin | 2 down str | 80000 | no0001
2 | peter | 132 south avn | 60000 | no0002
3 | bob | Down st 17th | 60000 | no0003
(3 rows)
rlsdb=>
rlsdb=> \c rlsdb peter
You are now connected to database "rlsdb" as user "peter".
rlsdb=> select * from employee;
empno | ename | address | salary | account_number
-------+-------+---------------+--------+----------------
1 | admin | 2 down str | 80000 | no0001
2 | peter | 132 south avn | 60000 | no0002
3 | bob | Down st 17th | 60000 | no0003
(3 rows)
rlsdb=> \c rlsdb bob
You are now connected to database "rlsdb" as user "bob".
rlsdb=> select * from employee;
empno | ename | address | salary | account_number
-------+-------+---------------+--------+----------------
1 | admin | 2 down str | 80000 | no0001
2 | peter | 132 south avn | 60000 | no0002
3 | bob | Down st 17th | 60000 | no0003
(3 rows)
使用admin用户创建行级安全策略,对于peter和bob就只能看到自己的数据了。
rlsdb=> \c rlsdb admin
You are now connected to database "rlsdb" as user "admin".
rlsdb=> CREATE POLICY emp_rls_policy ON employee FOR ALL TO PUBLIC USING (ename=current_user);
CREATE POLICY
rlsdb=> ALTER TABLE employee ENABLE ROW LEVEL SECURITY;
ALTER TABLE
rlsdb=> \c rlsdb peter
You are now connected to database "rlsdb" as user "peter".
rlsdb=> select * from employee;
empno | ename | address | salary | account_number
-------+-------+---------------+--------+----------------
2 | peter | 132 south avn | 60000 | no0002
(1 row)
rlsdb=> \c rlsdb bob
You are now connected to database "rlsdb" as user "bob".
rlsdb=> select * from employee;
empno | ename | address | salary | account_number
-------+-------+--------------+--------+----------------
3 | bob | Down st 17th | 60000 | no0003
(1 row)
rlsdb=>
行级安全原理
先看下行级安全策略在数据库中的呈现是什么样的。
查看pg_policy表,可以看到我们创建的emp_rls_policy这个策略,具体的策略polqual是一串字符,熟悉parsetree结构的朋友能关注到这是一个OPEXPR node。我们常见的where 条件也是类似的结构。
我们可以使用函数让polqual以更适合人阅读的方式来展示。
创建策略时,其实是将策略转换为where子句存到pg_policy表中。
ObjectAddress
CreatePolicy(CreatePolicyStmt *stmt)
{
/*省略部分代码行*/
/*将策略转化为where子句*/
qual = transformWhereClause(qual_pstate,
stmt->qual,
EXPR_KIND_POLICY,
"POLICY");
with_check_qual = transformWhereClause(with_check_pstate,
stmt->with_check,
EXPR_KIND_POLICY,
"POLICY");
/* Fix up collation information */
assign_expr_collations(qual_pstate, qual);
assign_expr_collations(with_check_pstate, with_check_qual);
/* 将转换后的子句写入pg_policy*/
/* Open pg_policy catalog */
pg_policy_rel = table_open(PolicyRelationId, RowExclusiveLock);
/* Set key - policy's relation id. */
ScanKeyInit(&skey[0],
Anum_pg_policy_polrelid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(table_id));
/* Set key - policy's name. */
ScanKeyInit(&skey[1],
Anum_pg_policy_polname,
BTEqualStrategyNumber, F_NAMEEQ,
CStringGetDatum(stmt->policy_name));
sscan = systable_beginscan(pg_policy_rel,
PolicyPolrelidPolnameIndexId, true, NULL, 2,
skey);
policy_tuple = systable_getnext(sscan);
/* Complain if the policy name already exists for the table */
if (HeapTupleIsValid(policy_tuple))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("policy \"%s\" for table \"%s\" already exists",
stmt->policy_name, RelationGetRelationName(target_table))));
policy_id = GetNewOidWithIndex(pg_policy_rel, PolicyOidIndexId,
Anum_pg_policy_oid);
values[Anum_pg_policy_oid - 1] = ObjectIdGetDatum(policy_id);
values[Anum_pg_policy_polrelid - 1] = ObjectIdGetDatum(table_id);
values[Anum_pg_policy_polname - 1] = DirectFunctionCall1(namein,
CStringGetDatum(stmt->policy_name));
values[Anum_pg_policy_polcmd - 1] = CharGetDatum(polcmd);
values[Anum_pg_policy_polpermissive - 1] = BoolGetDatum(stmt->permissive);
values[Anum_pg_policy_polroles - 1] = PointerGetDatum(role_ids);
/* Add qual if present. */
if (qual)
values[Anum_pg_policy_polqual - 1] = CStringGetTextDatum(nodeToString(qual));
else
isnull[Anum_pg_policy_polqual - 1] = true;
/* Add WITH CHECK qual if present */
if (with_check_qual)
values[Anum_pg_policy_polwithcheck - 1] = CStringGetTextDatum(nodeToString(with_check_qual));
else
isnull[Anum_pg_policy_polwithcheck - 1] = true;
policy_tuple = heap_form_tuple(RelationGetDescr(pg_policy_rel), values,
isnull);
CatalogTupleInsert(pg_policy_rel, policy_tuple);
/* Record Dependencies */
target.classId = RelationRelationId;
target.objectId = table_id;
target.objectSubId = 0;
myself.classId = PolicyRelationId;
myself.objectId = policy_id;
myself.objectSubId = 0;
recordDependencyOn(&myself, &target, DEPENDENCY_AUTO);
recordDependencyOnExpr(&myself, qual, qual_pstate->p_rtable,
DEPENDENCY_NORMAL);
recordDependencyOnExpr(&myself, with_check_qual,
with_check_pstate->p_rtable, DEPENDENCY_NORMAL);
/* Register role dependencies */
target.classId = AuthIdRelationId;
target.objectSubId = 0;
for (i = 0; i < nitems; i++)
{
target.objectId = DatumGetObjectId(role_oids[i]);
/* no dependency if public */
if (target.objectId != ACL_ID_PUBLIC)
recordSharedDependencyOn(&myself, &target,
SHARED_DEPENDENCY_POLICY);
}
InvokeObjectPostCreateHook(PolicyRelationId, policy_id, 0);
/* Invalidate Relation Cache */
CacheInvalidateRelcache(target_table);
/* Clean up. */
heap_freetuple(policy_tuple);
free_parsestate(qual_pstate);
free_parsestate(with_check_pstate);
systable_endscan(sscan);
relation_close(target_table, NoLock);
table_close(pg_policy_rel, RowExclusiveLock);
return myself;
}
在SQL执行时,查询重写阶段会将对应的安全策略拼接到parsetree里,最后生成执行计划去执行。
从执行计划来看SQL没有where条件,但是执行计划中存在 Filter: (ename = CURRENT_USER),证明了这个过程。
rlsdb=> explain analyze select * from employee ;
QUERY PLAN
-----------------------------------------------------------------------------------------------------
Seq Scan on employee (cost=0.00..19.15 rows=3 width=104) (actual time=0.010..0.012 rows=1 loops=1)
Filter: (ename = CURRENT_USER)
Rows Removed by Filter: 2
Planning Time: 0.416 ms
Execution Time: 0.036 ms
(5 rows)
rlsdb=>
再debug验证下这个过程。
给fireRIRrules函数设置断点,进入断点后从stack可以看到目前是在QueryRewrite阶段,结合一些规则进行查询重写。
观察这个时候的parsetree,可以看到还没有将安全策略对应的OPEXPR拼接进来。
等执行到get_row_security_policies函数已获取到表对应安全策略securityQuals。
打印securityQuals可以看到和我们查询pg_policy中的OPEXPR是一致的。
接着将securityQuals加入到rte的list中,这样我们再去打印parsetree就可以看到安全策略securityQuals对应的OPEXPR已经被拼接进来。
然后就是去生成执行计划并执行。
小结
PG的RLS也是将对应的策略动态转换为where子句,在查询重写阶段将安全策略拼接到parsetree,生成执行计划去执行。
行级安全策略,可以提供更精细粒度的表数据权限管理,在一定的场景下,比如只让用户看到自己对应的数据,能做到更安全的权限把控。