树结构的权限控制分两种:1、逐层加载;2、一次性加载
一、逐层加载
涉及的表结构
表名 T_PLAN | |||||||||||||||||||||||||||
表字段 | 字段类型 | 是否必 须字段 | 说明 | ||||||||||||||||||||||||
ID | VARCHAR2(50) | Y | 主键 | ||||||||||||||||||||||||
PARENT_ID | VARCHAR2(50) | Y | 父项节点ID,默认根节点的父节点ID=’-1’ | ||||||||||||||||||||||||
TREEPATH | VARCHAR2(4000) | Y | 从根节点到当前节点的ID路径,用“/”号分隔 | ||||||||||||||||||||||||
LEVEL_SN | NUMBER | 当前节点所在整个树结构的层次 | |||||||||||||||||||||||||
备注说明: LEVEL_SN 字段可以不需要,因为可以通过 TREEPATH的值来计算出来 | |||||||||||||||||||||||||||
示例数据如下:
LEVEL_SN字段可以通过 LENGTH(TREEPATH)-LENGTH(REPLACE(TREEPATH,’/’,NULL) 获取 也可以通过 REGEXP_COUNT(TREEPATH,’/’)获取,但此函数必须在11.2版本之后才能正常使用,建议还是用第一种方式得到 |
权限表:T_POWER | ||
表字段 | 字段类型 | 说明 |
ID | VARCHAR2(50) | 主键 |
PLAN_ID | VARCHAR2(50) | 树结构表T.ID |
USER_ID | VARCHAR2(50) | 用户id |
表T_PLAN 与表T_POWER 的关系是一对多关系
- 逐层加载的算法
逐层加载,顾名思义,就是每层每层的加载数据,通过传递PARENT_ID=当前节点id 来获取当前节点的下层节点数据
参数
L_PARENT_ID = 当前需要展开的节点id
L_POLICY_FLAG=伪列(值是枚举类型 DOWN_UP 或者UP_DOWN )
DOWN_UP 表示 节点(A)是 因为下层节点(B)有权限看到,为了显示完整的树结构,才需要显示的节点,本身节点(A)是无权看到的
UP_DOWN 表示 节点(A)本身是有权限看到的,那么按树的权限规则,节点(A)下属的所有子节点都应该可以看到,那么节点(A)在进行展开的时候,就不需要与权限表有关系运算了,省略权限表的参与,减少表与表的关联
L_LEVEL_SN=当前节点在整个树的层次,默认根节点的层次=1
L_USER_ID=当前登录人
第一层 根节点 参数 (L_PARENT_ID, L_POLICY_FLAG, L_LEVEL_SN) 规则: 根节点默认所有人都能看见,但需要创建一个伪列,用来标识节点的获取通道 | ||||||||||||||||||||||||||||||||||||||||
语法: SELECT M.ID,M.PARENT_ID, M.TREEPATH,M.LEVEL_SN, CASE WHEN P.USER IS NULL THEN 'DOWN_UP' ELSE 'UP_DOWN' END POLICY_FLAG FROM T_PLAN M,T_POWER P WHERE M.PARENT_ID = '-1' --L_PARENT_ID AND P.USER_ID = '登录人ID' -- L_USER_ID AND T.ID=P.PLAN_ID(+) | ||||||||||||||||||||||||||||||||||||||||
递归的节点 参数 (PARENT_ID,POLICY_FLAG,LEVEL_SN) 规则 1.判断页面传递过来的 POLICY_FLAG参数值,如果=UP_DOW 那么,表明展开的节点对登录人而言是有权限看见的,后续的所有节点都可以直接查询T_PLAN表了 如果=DOWN_UP那么表明展开的节点对登录人来说,是因为更底层的节点的权限反向查询出来的,再度展开此节点的时候,需要关联权限表T_POWER进行判断。 IF POLICY_FLAG =’UP_DOWN’ THEN SELECT PM.ID, PM.PARENT_ID, PM.TREEPATH, PM.LEVEL_SN,'UP_DOWN' POLICY_FLAG FROM T_PLAN PM WHERE PM.LEVEL_SN= 'L_LEVEL_SN+1 ' // =当前节点的层次+1 ,注意, AND PM.PARENT_ID='L_PARENT_ID' // --当前节点ID 1.此处 PM.LEVEL_SN= ' L_LEVEL_SN+1 ' 与 PM.PARENT_ID='L_PARENT_ID' 条件的组合其实只需要 PM.PARENT_ID='L_PARENT_ID' 即可, PM.LEVEL_SN= 'L_LEVEL_SN+1 '条件在此处可以拿掉 2.注意此处的伪列 POLICY_FLAG 是固定的值 UP_DOWN 为了从当前节点把展开下层的权限一直继承下去 3. ORDER BY 条件 如果树结构有排序规则,那么请增加排序字段 ELSE WITH T AS ( SELECT /*DISTINCT*/ REGEXP_SUBSTR(TREEPATH,'[^/]+',1, L_LEVEL_SN+1) SHOW_ID, DECODE(REGEXP_SUBSTR(TREEPATH,'[^/]+',1,L_LEAVEL_SN+1),J.ID,'UP_DOWN','DOWN_UP') POLICY_FLAG, ROW_NUMBER() OVER(PARTITION BY REGEXP_SUBSTR(TREEPATH,'[^/]+',1,L_LEVEL_SN+1) ORDER BY NULL) SN FROM T_POWER X,T_PLAN J WHERE X.PLAN_ID=J.ID AND X.USER_ID='USER_ID' //当前用户 AND J.LEVEL_SN > ='L_LEVEL_SN+1' //当前节点层级+1 ) SELECT PM.ID, PM.PARENT_ID, PM.TREEPATH, PM.LEVEL_SN,T.POLICY_FLAG FROM T,T_PLAN PM WHERE PM.LEVEL_SN=L_LEVEL_SN+1 AND T.SN = 1 AND PM.PARENT_ID='L_PARENT_ID' AND PM.ID=SHOW_ID ORDER BY
红色 c2 表示登录人有权限的节点 如上的数据格式,那么能看到的数据应该是除去 蓝色c1 ,其它都应该看到,每个数据的伪列值 DECODE( REGEXP_SUBSTR(TREEPATH,'[^/]+',1,L_LEAVEL_SN+1), J.ID,'UP_DOWN', 'DOWN_UP' ) 需要仔细读懂此处的代码含义; J.ID是登录人拥有的权限节点,根据路径和需要展示的层级获取显示的节点id,判断显示的节点id是否=J.ID,如果等于,那么显示的节点就是拥有权限的节点,给予 UP_DOWN的值,否则不是,还是需要给予 DOWN_UP值
ROW_NUMBER() OVER(PARTITION BY REGEXP_SUBSTR(TREEPATH,'[^/]+',1,L_LEVEL_SN+1) ORDER BY NULL) SN 与后面的条件 AND T.SN = 1 用来排除重复的节点id, 也可以直接使用distinct进行排重 END IF; | ||||||||||||||||||||||||||||||||||||||||
二、一次性加载
一次性加载的意思是,通过sql直接查询出登录入拥有的数据,然后向下显示所有孩子节点,向上显示父亲节点, 数据逻辑很简单,要写出高效率的sql确不是很容易的,
这里面使用了oracle的model函数,
Model函数的用法和讲解见model语法
这里做具体的解法说明
原理:
| ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
WITH T AS ( SELECT * FROM T_TEST MODEL ---此处的排序规则定义了从子节点开始进行深度优先的反向树排序 DIMENSION BY(ROW_NUMBER() OVER (ORDER BY TREE_PATH ) SN ) MEASURES( ID,PARENT_ID,NAME,BZ,TREE_PATH, CAST (NULL AS VARCHAR2(2)) HAS_CHILD, --判断当前节点是否有叶子节点 CAST (NULL AS VARCHAR2(2000)) FLAG_DOWN, --用于判断向下追溯树的权限节点tree_path CAST (NULL AS NUMBER) DOWN, CAST (NULL AS VARCHAR2(2000)) FLAG_UP, --用于判断向上追溯树的权限节点tree_path CAST (NULL AS NUMBER) UP ) --关建字 Sequential =让数据库的单元格运算先后关系以SQL的写法顺序执行,不需要oracle自己决定先后顺序 RULES Sequential order ( --向下级寻找节点 FLAG_DOWN[ANY] = --上一个节点没有权限,当前节点有权限,获取当前节点权限 CASE WHEN FLAG_DOWN[CV()- 1] IS NULL AND NVL(BZ[CV()],0)=1 THEN TREE_PATH[CV()] --上一个节点有权限继承,当前节点也有权限 需要判断权限的继承 WHEN FLAG_DOWN[CV()- 1] IS NOT NULL AND NVL(BZ[CV()],0)=1 THEN --如果当前tree_path包含了上一个节点的flag_down值 那么取上一个节点flag_down值 CASE WHEN INSTR(TREE_PATH[CV()],FLAG_DOWN[CV()-1])>0 THEN FLAG_DOWN[CV()-1] ELSE TREE_PATH[CV()] END --上一个节点有权限继承,当前节点无权限 继承上一节点权限 WHEN FLAG_DOWN[CV()- 1] IS NOT NULL AND NVL(BZ[CV()],0)=0 THEN FLAG_DOWN[CV()- 1] ELSE NULL END ,DOWN[ANY] = CASE WHEN INSTR(TREE_PATH[CV()],FLAG_DOWN[CV()])>0 THEN 1 ELSE 0 END --向上寻找节点,注意此处是倒序输出的 ,FLAG_UP[ANY] ORDER BY TREE_PATH DESC --上一个节点没有权限,当前节点有权限,获取当前节点权限 = CASE WHEN FLAG_UP[CV()+ 1] IS NULL AND NVL(BZ[CV()],0)=1 THEN TREE_PATH[CV()] --上一个节点有权限继承,当前节点也有权限 需要判断权限的继承 WHEN FLAG_UP[CV()+ 1] IS NOT NULL AND NVL(BZ[CV()],0)=1 THEN --如果当前tree_path包含了上一个节点的flag_down值 那么取上一个节点flag_down值 CASE WHEN INSTR(TREE_PATH[CV()],FLAG_UP[CV()+1])>0 THEN FLAG_UP[CV()+1] ELSE TREE_PATH[CV()] END --上一个节点有权限继承,当前节点无权限 继承上一节点权限 WHEN FLAG_UP[CV()+ 1] IS NOT NULL AND NVL(BZ[CV()],0)=0 THEN FLAG_UP[CV()+ 1] ELSE NULL END ,UP[ANY] = CASE WHEN INSTR(FLAG_UP[CV()],TREE_PATH[CV()])>0 THEN 1 ELSE 0 END --表示当前节点是下一个节点的父亲节点 ,HAS_CHILD[ANY]=CASE WHEN ID[CV()]=PARENT_ID[CV()+1] THEN 'Y' ELSE 'N' END ) ) SELECT * FROM T WHERE DOWN+UP>0; |
常规树权限查询与model树权限查询的测试场景
三、性能测试
测试数据
执行t_task_info.sql
清除数据库的共享池和数据缓冲区
ALTER SYSTEM FLUSH SHARED_POOL;
ALTER SYSTEM FLUSH BUFFER_CACHE;
数据量较少
--注意看下每个SQL的执行计划
没有tree_path的语法
select * from t_test connect by prior id=parent_id start with id in (select id from t_test where bz =1)
union
select * from t_test connect by prior parent_id =id start with id in (select id from t_test where bz =1)
;
有tree_path的语法
select * from t_test t where exists (select 1 from t_test z where z.bz =1 and instr(z.tree_path,t.name)>0)
union
select * from t_test t1 where exists (select 1 from t_test z where z.bz =1 and instr(t1.tree_path,z.name)>0)
;
--验证下 对treepath的排序,是否影响整个树结构的显示
select lpad(' ',(level-1)*10,' ')||name ,name,id,parent_id,tree_path,
sys_connect_by_path(name,'/') scbp from T_TEST
connect by prior id=parent_id start with parent_id='-1'
;
model解法
WITH T AS (
SELECT * FROM T_TEST
MODEL
---此处的排序规则定义了从子节点开始进行深度优先在广度的反向树排序
DIMENSION BY(ROW_NUMBER() OVER (ORDER BY TREE_PATH ) SN )
MEASURES(
ID,PARENT_ID,NAME,BZ,TREE_PATH,
CAST (NULL AS VARCHAR2(2)) HAS_CHILD, --判断当前节点是否有叶子节点
CAST (NULL AS VARCHAR2(2000)) FLAG_DOWN, --用于判断向下追溯树的权限节点tree_path
CAST (NULL AS NUMBER) DOWN,
CAST (NULL AS VARCHAR2(2000)) FLAG_UP, --用于判断向上追溯树的权限节点tree_path
CAST (NULL AS NUMBER) UP
)
--关建字 Sequential =让数据库的单元格运算先后关系以SQL的写法顺序执行,不需要oracle自己决定先后顺序
RULES Sequential order (
--向下级寻找节点
FLAG_DOWN[ANY] = --上一个节点没有权限,当前节点有权限,获取当前节点权限
CASE WHEN FLAG_DOWN[CV()- 1] IS NULL AND NVL(BZ[CV()],0)=1 THEN TREE_PATH[CV()]
--上一个节点有权限继承,当前节点也有权限 需要判断权限的继承
WHEN FLAG_DOWN[CV()- 1] IS NOT NULL AND NVL(BZ[CV()],0)=1 THEN
--如果当前tree_path包含了上一个节点的flag_down值 那么取上一个节点flag_down值
CASE WHEN INSTR(TREE_PATH[CV()],FLAG_DOWN[CV()-1])>0 THEN FLAG_DOWN[CV()-1]
ELSE TREE_PATH[CV()] END
--上一个节点有权限继承,当前节点无权限 继承上一节点权限
WHEN FLAG_DOWN[CV()- 1] IS NOT NULL AND NVL(BZ[CV()],0)=0 THEN
FLAG_DOWN[CV()- 1]
ELSE NULL END
,DOWN[ANY] = CASE WHEN INSTR(TREE_PATH[CV()],FLAG_DOWN[CV()])>0 THEN 1 ELSE 0 END
--向上寻找节点,注意此处是倒序输出的
,FLAG_UP[ANY] ORDER BY TREE_PATH DESC
--上一个节点没有权限,当前节点有权限,获取当前节点权限
= CASE WHEN FLAG_UP[CV()+ 1] IS NULL AND NVL(BZ[CV()],0)=1 THEN TREE_PATH[CV()]
--上一个节点有权限继承,当前节点也有权限 需要判断权限的继承
WHEN FLAG_UP[CV()+ 1] IS NOT NULL AND NVL(BZ[CV()],0)=1 THEN
--如果当前tree_path包含了上一个节点的flag_down值 那么取上一个节点flag_down值
CASE WHEN INSTR(TREE_PATH[CV()],FLAG_UP[CV()+1])>0 THEN FLAG_UP[CV()+1]
ELSE TREE_PATH[CV()] END
--上一个节点有权限继承,当前节点无权限 继承上一节点权限
WHEN FLAG_UP[CV()+ 1] IS NOT NULL AND NVL(BZ[CV()],0)=0 THEN
FLAG_UP[CV()+ 1]
ELSE NULL END
,UP[ANY] = CASE WHEN INSTR(FLAG_UP[CV()],TREE_PATH[CV()])>0 THEN 1 ELSE 0 END
--表示当前节点是下一个节点的父亲节点
,HAS_CHILD[ANY]=CASE WHEN ID[CV()]=PARENT_ID[CV()+1] THEN 'Y' ELSE 'N' END
)
)
SELECT * FROM T WHERE DOWN+UP>0
;
数据量很多,但符合条件的记录很少
select count(1) from t_task_info
select * from t_task_info
where task_name in ('工作包13233','工作包11113','工作包253');
没有 TASK_PATH 的语法
select * from t_task_info connect by prior id=parent_id
start with id in
(select id from t_task_info where task_name in ('工作包13233','工作包11113','工作包253'))
union
select * from t_task_info connect by prior parent_id =id
start with id in
( select id from t_task_info where task_name in ('工作包13233','工作包11113','工作包253'))
;
有tree_path的语法
select t.* from t_task_info t,sys_user su where t.user_id=su.id(+) and exists
(select 1 from t_task_info where ( task_name in ('工作包13233','工作包11113','工作包253')) and instr(TASK_PATH,t.TASK_PATH)>0)
union
select t1.* from t_task_info t1,sys_user su where t1.user_id=su.id(+) and exists
(select 1 from t_task_info where ( task_name in ('工作包13233','工作包11113','工作包253')) and instr(t1.TASK_PATH,id)>0)
;
MODEL语法
WITH T AS (
SELECT ID,PARENT_ID,TASK_NAME,TASK_PATH,HAS_CHILD,DOWN,UP FROM t_task_info i,sys_user su where user_id=su.id(+)
MODEL
DIMENSION BY(ROW_NUMBER() OVER (ORDER BY TASK_PATH ) SN ) ---此处的排序规则定义了从子节点开始进行深度优先在广度的反向树排序
MEASURES(
i.ID,i.PARENT_ID,i.TASK_NAME,i.TASK_PATH,
CASE WHEN TASK_NAME in ('工作包13233','工作包11113','工作包253') THEN 1 ELSE 0 END BZ,
CAST (NULL AS VARCHAR2(2)) HAS_CHILD, --判断当前节点是否有叶子节点
CAST (NULL AS VARCHAR2(2000)) FLAG_DOWN, --用于判断向下追溯树的权限节点tree_path
CAST (NULL AS NUMBER) DOWN,
CAST (NULL AS VARCHAR2(2000)) FLAG_UP, --用于判断向上追溯树的权限节点tree_path
CAST (NULL AS NUMBER) UP
)
RULES Sequential order (
--向下级寻找节点
FLAG_DOWN[ANY] =CASE WHEN FLAG_DOWN[CV()- 1] IS NULL AND NVL(BZ[CV()],0)=1 THEN TASK_PATH[CV()] --上一个节点没有权限,当前节点有权限,获取当前节点权限
WHEN FLAG_DOWN[CV()- 1] IS NOT NULL AND NVL(BZ[CV()],0)=1 THEN --上一个节点有权限继承,当前节点也有权限 需要判断权限的继承
CASE WHEN INSTR(TASK_PATH[CV()],FLAG_DOWN[CV()-1])>0 THEN FLAG_DOWN[CV()-1]
ELSE TASK_PATH[CV()] END
WHEN FLAG_DOWN[CV()- 1] IS NOT NULL AND NVL(BZ[CV()],0)=0 THEN --上一个节点有权限继承,当前节点无权限 继承上一节点权限
FLAG_DOWN[CV()- 1]
ELSE NULL END
,DOWN[ANY] = CASE WHEN INSTR(TASK_PATH[CV()],FLAG_DOWN[CV()])>0 THEN 1 ELSE 0 END
--向上寻找节点,注意此处是倒序输出的
,FLAG_UP[ANY] ORDER BY TASK_PATH DESC
= CASE WHEN FLAG_UP[CV()+ 1] IS NULL AND NVL(BZ[CV()],0)=1 THEN TASK_PATH[CV()] ----上一个节点没有权限,当前节点有权限,获取当前节点权限
WHEN FLAG_UP[CV()+ 1] IS NOT NULL AND NVL(BZ[CV()],0)=1 THEN --上一个节点有权限继承,当前节点也有权限 需要判断权限的继承
CASE WHEN INSTR(TASK_PATH[CV()],FLAG_UP[CV()+1])>0 THEN FLAG_UP[CV()+1]
ELSE TASK_PATH[CV()] END
WHEN FLAG_UP[CV()+ 1] IS NOT NULL AND NVL(BZ[CV()],0)=0 THEN --上一个节点有权限继承,当前节点无权限 继承上一节点权限
FLAG_UP[CV()+ 1]
ELSE NULL END
,UP[ANY] = CASE WHEN INSTR(FLAG_UP[CV()],TASK_PATH[CV()])>0 THEN 1 ELSE 0 END
,HAS_CHILD[ANY]=CASE WHEN ID[CV()]=PARENT_ID[CV()+1] THEN 'Y' ELSE 'N' END --表示当前节点是下一个节点的父亲节点
)
)
SELECT * FROM T WHERE DOWN+UP>0
数据量很多,但符合条件的记录很多
select count(*) from t_task_info where TASK_NAME like '工作包1%' or TASK_NAME like '工作包6%' or TASK_NAME like '工作包8%' or TASK_NAME like '工作包1%'
没有 TASK_PATH 的语法
select * from t_task_info connect by prior id=parent_id
start with id in
(select id from t_task_info where task_name like '工作包1%' or task_name like '工作包6%' or task_name like '工作包8%' or task_name like '工作包1%')
union
select * from t_task_info connect by prior parent_id =id
start with id in
( select id from t_task_info where task_name like '工作包1%' or task_name like '工作包6%' or task_name like '工作包8%' or task_name like '工作包1%')
;
有tree_path的语法
select t.* from t_task_info t,sys_user su where t.user_id=su.id(+) and exists
(select 1 from t_task_info where (task_name like '工作包1%' or task_name like '工作包6%' or task_name like '工作包8%' or task_name like '工作包1%') and instr(TASK_PATH,t.TASK_PATH)>0)
union
select t1.* from t_task_info t1,sys_user su where t1.user_id=su.id(+) and exists
(select 1 from t_task_info where (task_name like '工作包1%' or task_name like '工作包6%' or task_name like '工作包8%' or task_name like '工作包1%') and instr(t1.TASK_PATH,id)>0)
;
MODEL语法
WITH T AS (
SELECT ID,PARENT_ID,TASK_NAME,TASK_PATH,HAS_CHILD,DOWN,UP FROM t_task_info i,sys_user su where user_id=su.id(+)
MODEL
DIMENSION BY(ROW_NUMBER() OVER (ORDER BY TASK_PATH ) SN ) ---此处的排序规则定义了从子节点开始进行深度优先在广度的反向树排序
MEASURES(
i.ID,i.PARENT_ID,i.TASK_NAME,i.TASK_PATH,
CASE WHEN TASK_NAME like '工作包1%' or TASK_NAME like '工作包6%' or TASK_NAME like '工作包8%' or TASK_NAME like '工作包1%' THEN 1 ELSE 0 END BZ,
CAST (NULL AS VARCHAR2(2)) HAS_CHILD, --判断当前节点是否有叶子节点
CAST (NULL AS VARCHAR2(2000)) FLAG_DOWN, --用于判断向下追溯树的权限节点tree_path
CAST (NULL AS NUMBER) DOWN,
CAST (NULL AS VARCHAR2(2000)) FLAG_UP, --用于判断向上追溯树的权限节点tree_path
CAST (NULL AS NUMBER) UP
)
RULES Sequential order (
--向下级寻找节点
FLAG_DOWN[ANY] =CASE WHEN FLAG_DOWN[CV()- 1] IS NULL AND NVL(BZ[CV()],0)=1 THEN TASK_PATH[CV()] --上一个节点没有权限,当前节点有权限,获取当前节点权限
WHEN FLAG_DOWN[CV()- 1] IS NOT NULL AND NVL(BZ[CV()],0)=1 THEN --上一个节点有权限继承,当前节点也有权限 需要判断权限的继承
CASE WHEN INSTR(TASK_PATH[CV()],FLAG_DOWN[CV()-1])>0 THEN FLAG_DOWN[CV()-1]
ELSE TASK_PATH[CV()] END
WHEN FLAG_DOWN[CV()- 1] IS NOT NULL AND NVL(BZ[CV()],0)=0 THEN --上一个节点有权限继承,当前节点无权限 继承上一节点权限
FLAG_DOWN[CV()- 1]
ELSE NULL END
,DOWN[ANY] = CASE WHEN INSTR(TASK_PATH[CV()],FLAG_DOWN[CV()])>0 THEN 1 ELSE 0 END
--向上寻找节点,注意此处是倒序输出的
,FLAG_UP[ANY] ORDER BY TASK_PATH DESC
= CASE WHEN FLAG_UP[CV()+ 1] IS NULL AND NVL(BZ[CV()],0)=1 THEN TASK_PATH[CV()] ----上一个节点没有权限,当前节点有权限,获取当前节点权限
WHEN FLAG_UP[CV()+ 1] IS NOT NULL AND NVL(BZ[CV()],0)=1 THEN --上一个节点有权限继承,当前节点也有权限 需要判断权限的继承
CASE WHEN INSTR(TASK_PATH[CV()],FLAG_UP[CV()+1])>0 THEN FLAG_UP[CV()+1]
ELSE TASK_PATH[CV()] END
WHEN FLAG_UP[CV()+ 1] IS NOT NULL AND NVL(BZ[CV()],0)=0 THEN --上一个节点有权限继承,当前节点无权限 继承上一节点权限
FLAG_UP[CV()+ 1]
ELSE NULL END
,UP[ANY] = CASE WHEN INSTR(FLAG_UP[CV()],TASK_PATH[CV()])>0 THEN 1 ELSE 0 END
,HAS_CHILD[ANY]=CASE WHEN ID[CV()]=PARENT_ID[CV()+1] THEN 'Y' ELSE 'N' END --表示当前节点是下一个节点的父亲节点
)
)
SELECT * FROM T WHERE DOWN+UP>0
四、树结构的汇总
解法:利用树的深度和广度的遍历,建立主和关联的两个model记录,实现依次的数据累计
select id,parent_id,name,sal,tree_path,lpad(' ',regexp_count(tree_path,'/')*10,' ')||name level_name
,row_number() over (order by length(tree_path)-length(replace(tree_path,'/')) desc,parent_id desc ,name desc ) lev_sn
,row_number() over (order by tree_path desc )sn
from t_test
;
Connect By 解法
with t as (
select connect_by_root(name) g_name,sal
from t_test connect by prior id=parent_id
)
select g_name,sum(sal) from t group by g_name;
Tree_Path 解法
select regexp_substr(tree_path,'[^/]+',1,t.lev) g_name
,sum(sal) from t_test ,
(select level lev from dual connect by level<=20) t
where regexp_count(tree_path,'/')>=t.lev-1
group by regexp_substr(tree_path,'[^/]+',1,t.lev);
Model解法
select id,parent_id,name,sal,sum_total,tree_path,lev_sn,sn
from t_test
model
REFERENCE over_result ON ( --创建以深度遍历树的记录model,用来获取 越层级 的节点sn值
SELECT parent_id over_parent_id,row_number() over (order by tree_path desc ) over_sn --深度遍历优先序列
,row_number() over (order by length(tree_path)-length(replace(tree_path,'/')) desc,parent_id desc ,name desc ) over_lev_sn --广度遍历序列
FROM t_test
)
DIMENSION BY ( over_sn)
MEASURES (over_parent_id,over_lev_sn ) IGNORE NAV --- ignore nav 表示存在不满足的情况用0替换
MAIN main_model -- 主的model函数申明
dimension by ( -- 广度遍历优先的 序列
row_number() over (order by length(tree_path)-length(replace(tree_path,'/')) desc,parent_id desc ,name desc ) lev_sn
)
measures (
id,parent_id,name,sal
,cast (null as number(12,2)) sum_sal --创建的一个累计字段
,cast (null as number(12,2)) sum_total --当前节点的统计(包括所有子节点)
,tree_path
,row_number() over (order by tree_path desc ) sn --深度遍历序列
)
rules Sequential order (
--sum_sal 标识 按树结构从底层节点依次进行的累计汇总
sum_sal[any]= case when parent_id[cv()-1]=parent_id[cv()] then --当前节点和上一个节点是否同一个父节点
--从关联模型数据中取出 主模型数据的当前节点的上一个节点
--判断 当前节点id是否等于关联模型中 对应当前节点的上一个节点父项(如果是 进行sal的累计=当前节点sal+当前节点的最后子节点sum_sal,否则当前节点sal)
case when over_parent_id[sn[cv()]-1]=id[cv()] then sal[cv()]+ nvl(sum_sal[ over_lev_sn[ sn[cv()]-1] ],0)
else sal[cv()]
end + nvl(sum_sal[cv()-1] ,0) --此处进行sal的累计
else case when over_parent_id[sn[cv()]-1]=id[cv()] then sal[cv()]+ nvl(sum_sal[ over_lev_sn[ sn[cv()]-1] ],0)
else sal[cv()] end
end
--sum_total 标识 每个节点的汇总累计
--汇总累计=当前节点的值+当前节点的最后一个子节点的 累计值
,sum_total[any]= case when over_parent_id[sn[cv()]-1]=id[cv()] then sal[cv()]+ nvl(sum_sal[ over_lev_sn[ sn[cv()]-1] ],0)
else sal[cv()] end
);
大数据量的效率测试
数据量较大(1000-10000)
Connect by scott
清除数据库的共享池和数据缓冲区
ALTER SYSTEM FLUSH SHARED_POOL;
ALTER SYSTEM FLUSH BUFFER_CACHE;
tree_path
with t as (
select regexp_substr(task_path,'[^/]+',1,t.lev) g_name
,sum(task_no) from t_task_info ,
(select level lev from dual connect by level<=20) t
where regexp_count(task_path,'/')>=t.lev-1
group by regexp_substr(task_path,'[^/]+',1,t.lev)
)
select * from t where g_name='40288048411a331c01411a5f9bfb5396'
;
connect by
with t as (
select connect_by_root(id) g_name,task_no sal
from t_task_info connect by prior id=parent_id
) ,
t1 as (
select g_name,sum(sal) sum_sal
from t group by g_name )
select *
from t1 where g_name='40288048411a331c01411a5f9bfb5396'
;
model
with t as (
select id,parent_id,lpad(' ',regexp_count(task_path,'/')*10,' ')||name,sal,sum_total,task_path,lev_sn,sn
from t_task_info
model
REFERENCE over_result ON (
SELECT parent_id over_parent_id,row_number() over (order by task_path desc ) over_sn
,row_number() over (order by length(task_path)-length(replace(task_path,'/')) desc,parent_id desc ,id desc ) over_lev_sn
FROM t_task_info
)
DIMENSION BY ( over_sn)
MEASURES (over_parent_id,over_lev_sn ) IGNORE NAV
MAIN main_model
dimension by ( row_number() over (order by length(task_path)-length(replace(task_path,'/')) desc,parent_id desc ,id desc ) lev_sn )
measures (
id,parent_id,task_name name,task_no sal
,cast (null as number(12,2)) sum_sal
,cast (null as number(12,2)) sum_total
,task_path
,row_number() over (order by task_path desc ) sn
)
rules Sequential order (
sum_sal[any]= case when parent_id[cv()-1]=parent_id[cv()] then
case when over_parent_id[sn[cv()]-1]=id[cv()] then sal[cv()]+ nvl(sum_sal[ over_lev_sn[ sn[cv()]-1] ],0)
else sal[cv()]
end + nvl(sum_sal[cv()-1] ,0)
else case when over_parent_id[sn[cv()]-1]=id[cv()] then sal[cv()]+ nvl(sum_sal[ over_lev_sn[ sn[cv()]-1] ],0)
else sal[cv()] end
end
,sum_total[any]= case when over_parent_id[sn[cv()]-1]=id[cv()] then sal[cv()]+ nvl(sum_sal[ over_lev_sn[ sn[cv()]-1] ],0)
else sal[cv()]
end
)
) select * from t where id='40288048411a331c01411a5f9bfb5396'
数据量很大(>10000)
Connect by scott
connect by
with t as (select connect_by_root(i.id) g_id ,i.task_no from pm_task_info i
where i.pm_project_id='4028809540ddb4830140ddde7a5b009c'
connect by prior i.id=i.parent_id
),
t1 as (
select g_id,sum(task_no) from t group by g_id
)
select * from t1 where g_id='4028809540e1fe470140e22db8182537'
;
tree_path
with t as (
select regexp_substr(task_path,'[^/]+',1,t.lev) g_id
,sum(task_no) from pm_task_info ,
(select level lev from dual connect by level<=20) t
where length(task_path)-length(replace(task_path,'/'))>=t.lev-1
and pm_project_id='4028809540ddb4830140ddde7a5b009c'
group by regexp_substr(task_path,'[^/]+',1,t.lev)
)
select * from t where g_id='4028809540e1fe470140e22db8182537'
;
model
with t as (
select id,parent_id,lpad(' ',(length(task_path)-length(replace(task_path,'/')))*10,' ')||name,sal,sum_total,task_path,lev_sn,sn
from pm_task_info where pm_project_id='4028809540ddb4830140ddde7a5b009c'
model
REFERENCE over_result ON (
SELECT parent_id over_parent_id,row_number() over (order by task_path desc ) over_sn
,row_number() over (order by length(task_path)-length(replace(task_path,'/')) desc,parent_id desc ,id desc ) over_lev_sn
FROM pm_task_info where pm_project_id='4028809540ddb4830140ddde7a5b009c'
)
DIMENSION BY ( over_sn)
MEASURES (over_parent_id,over_lev_sn ) IGNORE NAV
MAIN main_model
dimension by ( row_number() over (order by length(task_path)-length(replace(task_path,'/')) desc,parent_id desc ,id desc ) lev_sn )
measures (
id,parent_id,task_name name,task_no sal
,cast (null as number(12,2)) sum_sal
,cast (null as number(12,2)) sum_total
,task_path
,row_number() over (order by task_path desc ) sn
)
rules Sequential order (
sum_sal[any]= case when parent_id[cv()-1]=parent_id[cv()] then
case when over_parent_id[sn[cv()]-1]=id[cv()] then sal[cv()]+ nvl(sum_sal[ over_lev_sn[ sn[cv()]-1] ],0)
else sal[cv()]
end + nvl(sum_sal[cv()-1] ,0)
else case when over_parent_id[sn[cv()]-1]=id[cv()] then sal[cv()]+ nvl(sum_sal[ over_lev_sn[ sn[cv()]-1] ],0)
else sal[cv()] end
end
,sum_total[any]= case when over_parent_id[sn[cv()]-1]=id[cv()] then sal[cv()]+ nvl(sum_sal[ over_lev_sn[ sn[cv()]-1] ],0)
else sal[cv()]
end
)
)
select * from t where id='4028809540e1fe470140e22db8182537'
;
五、model函数介绍
这个函数一般不太常见,但确是非常有用的一个函数,基本上model可以完成所有函数的功能
语法定义如下
--MODEL:MODEL语句的关键字,必须,后面可以跟 partition by
--DIMENSION BY: 维度的意思,必须,而且必须是一个主键或者是组合主键。
--MEASURES:指定作为数组的列,可以定义出许多有规则的伪列
--RULES:对数组的各列进行各种操作规则的定义,特有的函数有 any,cv(),cv(维度字段)
先从简单的了解下model函数的特性:
自循环功能
select key, m_1 from dual
model
dimension by(0 key) --定义维度 列名=key 值等于0
measures( cast(null as varchar2(100)) m_1 ) --定义一个度量 类型是 varchar2(100) 列名=m_1
rules --规则约束
iterate(5) --定义自循环次数 =5 从 0 开始循环
(m_1[0]= nvl(m_1 [0],'TEST')|| 'x'||'/'||iteration_number||'/')
利用model的循环来实现阶层的算法
select * from dual
model
dimension by (1 as c)
measures (1 as d)
rules iterate (5) --定义了循环5此,从 0开始 ITERATION_NUMBER FOR 0 TO 4
(
d[1]=d[1]*(ITERATION_NUMBER+1) --此处 ITERATION_NUMBER+1 for 1 to 5
)
当然,此处不是要真的实现 阶乘 的算法,只是为了理解model函数的用法,
再看看如下的SQL
目的:根据emp表的 mgr和empno的关系 来显示 上级的ename和job
最直接最常用的语法就是
select x.empno,x.ename,x.job,x.mgr,y.ename,y.job from emp x,emp y
where x.mgr=y.empno(+) ;
但这样的SQL的执行计划显示对EMP表进行了两次全表扫描
换成model函数执行下
select * from emp
model
dimension by (empno)
measures ( ename,job,mgr
,cast(null as varchar2(20)) mgr_ename
,cast(null as varchar2(20)) mgr_job
)
rules (
mgr_ename[any]=ename[mgr[cv()]]
--cv()代表对当前行的维度值
--mgr[cv()] 是获取当前维度下的mgr值 ,然后在对 mgr[cv()]进行维度的数据定位到度量ename 也就是当前ename的上级ename
,mgr_job[any]=job[mgr[cv()]]
)
再看看它的执行计划,如下图:
执行以下SQL,看看结果集,理解model 函数
--显示 部门,年份,当前年份汇总sal,上年汇总sal
with t as (select deptno,to_char(emp.hiredate,'yyyy') year,sum(sal) sal from emp group by deptno,to_char(emp.hiredate,'yyyy'))
select deptno,year,sal,p_sal
from t
model
dimension by (deptno,year)
measures (sal,0 p_sal)
rules
(
p_sal[any,any]=sal[cv(),cv(year)-1]
);
--分组 group by deptno 合计
select ename,sales from emp
model partition by (deptno)
dimension by (ename)
measures (sal sales)
rules
(
sales['合计']= sum(sales)[ cv(ename)='合计']
);
-- x =sal
-- y 只给deptno=30的赋予当前sum(sal)
-- z 显示 sum(sal) where deptno=20
-- m 汇总个部门的sum(sal)
select deptno,ename,sales,x,y,z,m from emp
model partition by (deptno)
dimension by (ename,deptno dep)
measures (sal sales,0 x,0 y,0 z,0 m)
rules
(
x[any,any]= sum(sales)[ cv(),cv()]
,y[any,any]=sales[cv(),30] --注意 此处是 30 可以不用sum, 而不是 cv()=30,cv()=30 存在多条记录
,z[any,any]=sum(sales) [any, cv()=20]
,m[any,any]=sum(sales) [any, any]
);
--部门号,年份,
--sum(sal) group by deptno,year
--sum(sal) group by deptno
--sum(sal) group by null
--sum(sal) group by year
--sum(sal) group by null
with t as (select deptno,to_char(emp.hiredate,'yyyy') year,sum(sal) sal from emp group by deptno,to_char(emp.hiredate,'yyyy'))
select deptno,year,sal,p_sal,x,y,m
from t
model
dimension by (deptno,year)
measures (sal,0 p_sal ,0 x,0 y ,0 m)
rules
(
p_sal[any,any]=sum(sal)[cv(),cv() is not null ] --sum(sal) group by deptno
,x[any,any]=sum(sal)[any,cv() is not null ] --sum(sal) group by null
,y[any,any]=sum(sal)[cv() is not null,cv()] --sum(sal) group by year
,m[any,any]=sum(sal)[cv() is not null,any ] --sum(sal) group by null
-- cv() 中如果没有null 的记录那么 cv() is not null 等价与 any
);
用model函数产生 行转列
字符串='adfd,bere,cf234,4d54d'
select x,y, r,z
from dual
model
dimension by (0 x)
measures ( cast ('adfd,bere,cf234,4d54d' as varchar2(200)) y
,cast(null as varchar2(1000)) z
,cast(null as varchar2(1000)) r --显示字符串列
) --申明一个字符串的伪列
rules iterate(10) --定义循环100次
--PRESENTV(cell,expr1,expr2)
--如果cell引用的记录在MODEL子句执行以前就存在,那么返回表达式expr1。如果这条记录不存在,则返回表达式expr2
until ( presentv( y[instr(y[0],',',1,iteration_number+2)],0,1) = 0 ) --循环退出的条件
( --对字符串进行循环截取操作
y[iteration_number+1]=substr(y[iteration_number],instr(y[iteration_number],',',1)+1)
,r[any]=y[0]
,z[iteration_number]=nvl(substr(y[iteration_number],1,instr(y[iteration_number],',',1)-1),y[iteration_number])
,z[iteration_number+1]=y[iteration_number+1]
)
用model函数产生 列转行
with t as (
select 'abc' x from dual
union all
select 'XTZ' from dual
union all
select 'IJM' from dual
union all
select 'KPI' from dual
)
select * from t
model
dimension by ( rownum sn)
measures( cast (x as varchar2(1000)) x)
rules
iterate (100)
until ( presentv( x[ iteration_number+1],1,0 )=0 )
(
x[0]=x[0]||','||x[iteration_number+1]
);
用model函数产生 交叉表格
select DEPTNO,CLERK_JOB,ANALYST_JOB,MANAGER_JOB,PRESIDENT_JOB,SALESMAN_JOB from emp
model partition by (deptno)
dimension by (empno,job)
measures ( ename,cast(null as varchar2(1000)) CLERK_JOB
,cast(null as varchar2(1000)) ANALYST_JOB
,cast(null as varchar2(1000)) MANAGER_JOB
,cast(null as varchar2(1000)) PRESIDENT_JOB
,cast(null as varchar2(1000)) SALESMAN_JOB
)
rules(
CLERK_JOB[ANY,ANY]= (ENAME[CV(),'CLERK'])
,ANALYST_JOB[ANY,ANY]=(ENAME[CV(),'ANALYST'])
,MANAGER_JOB[ANY,ANY]=(ENAME[CV(),'MANAGER'])
,PRESIDENT_JOB[ANY,ANY]=(ENAME[CV(),'PRESIDENT'])
,SALESMAN_JOB[ANY,ANY]=(ENAME[CV(),'SALESMAN'])
);