Oracle 流stream将删除的数据保存
--实验的目的是捕获hr.employees表的删除行,将删除行插入到emp_del表中。
--设置初始化参数
AQ_TM_PROCESSES=1
COMPATIBLE=9.2.0
LOG_PARALLELISM=1
--查看数据库的名称,我的为ora9,将以下的ora9全部替换为你的数据库名称
--数据库为归档模式
--建立表emp_del,用于存放EMPLOYEES的删除数据
conn hr/hr
CREATE TABLE emp_del(
employee_id NUMBER(6),
first_name VARCHAR2(20),
last_name VARCHAR2(25),
email VARCHAR2(25),
phone_number VARCHAR2(20),
hire_date DATE,
job_id VARCHAR2(10),
salary NUMBER(8,2),
commission_pct NUMBER(2,2),
manager_id NUMBER(6),
department_id NUMBER(4),
timestamp DATE);
CREATE UNIQUE INDEX emp_del_id_pk ON emp_del (employee_id);
ALTER TABLE emp_del ADD (CONSTRAINT emp_del_id_pk PRIMARY KEY (employee_id));
--建立管理用户,设定默认表空间,授权
conn / as sysdba
drop user strmadmin cascade;
GRANT CONNECT, RESOURCE, SELECT_CATALOG_ROLE
TO strmadmin IDENTIFIED BY strmadmin;
ALTER USER strmadmin DEFAULT TABLESPACE users;
GRANT ALL ON hr.emp_del TO strmadmin;
GRANT EXECUTE ON DBMS_APPLY_ADM TO strmadmin;
GRANT EXECUTE ON DBMS_AQ TO strmadmin;
GRANT EXECUTE ON DBMS_AQADM TO strmadmin;
GRANT EXECUTE ON DBMS_CAPTURE_ADM TO strmadmin;
GRANT EXECUTE ON DBMS_FLASHBACK TO strmadmin;
GRANT EXECUTE ON DBMS_STREAMS_ADM TO strmadmin;
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ,
grantee => 'strmadmin',
grant_option => FALSE);
END;
/
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ,
grantee => 'strmadmin',
grant_option => FALSE);
END;
/
--建立流队列,名称叫streams_queue ,用于存储捕获的变化
CONNECT strmadmin/strmadmin
EXEC DBMS_STREAMS_ADM.SET_UP_QUEUE();
--配置logmnr使用的表空间,我们就用tools
conn / as sysdba
EXECUTE DBMS_LOGMNR_D.SET_TABLESPACE('TOOLS');
--增强日志的模式
ALTER TABLE hr.employees ADD SUPPLEMENTAL LOG GROUP log_group_employees_pk
(employee_id) ALWAYS;
--配置捕获程序
CONNECT strmadmin/strmadmin
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'hr.employees',
streams_type => 'capture',
streams_name => 'capture_emp',
queue_name => 'strmadmin.streams_queue',
include_dml => true,
include_ddl => false);
END;
/
--设置scn
DECLARE
iscn NUMBER; -- Variable to hold instantiation SCN value
BEGIN
iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(
source_object_name => 'hr.employees',
source_database_name => 'ora9',
instantiation_scn => iscn);
END;
/
--配置叫emp_agent的代理程序
BEGIN
DBMS_AQADM.DROP_AQ_AGENT(
agent_name => 'emp_agent');
END;
/
BEGIN
DBMS_AQADM.CREATE_AQ_AGENT(
agent_name => 'emp_agent');
DBMS_AQADM.ENABLE_DB_ACCESS(
agent_name => 'emp_agent',
db_username => 'strmadmin');
END;
/
--建立队列订户
DECLARE
subscriber SYS.AQ$_AGENT;
BEGIN
subscriber := SYS.AQ$_AGENT('emp_agent', NULL, NULL);
SYS.DBMS_AQADM.ADD_SUBSCRIBER(
queue_name => 'strmadmin.streams_queue',
subscriber => subscriber,
rule => NULL,
transformation => NULL);
END;
/
--建立存储过程enq_row_lcr
CREATE OR REPLACE PROCEDURE enq_row_lcr(in_any IN SYS.ANYDATA) IS
enqopt DBMS_AQ.ENQUEUE_OPTIONS_T;
mprop DBMS_AQ.MESSAGE_PROPERTIES_T;
recipients DBMS_AQ.AQ$_RECIPIENT_LIST_T;
enq_eventid RAW(16);
BEGIN
mprop.SENDER_ID := SYS.AQ$_AGENT(
name => 'emp_agent',
address => NULL,
protocol => NULL);
recipients(1) := SYS.AQ$_AGENT(
name => 'emp_agent',
address => NULL,
protocol => NULL);
mprop.RECIPIENT_LIST := recipients;
DBMS_AQ.ENQUEUE(
queue_name => 'strmadmin.streams_queue',
enqueue_options => enqopt,
message_properties => mprop,
payload => in_any,
msgid => enq_eventid);
END;
/
--建立DML处理存储过程
CREATE OR REPLACE PROCEDURE emp_dml_handler(in_any IN SYS.ANYDATA) IS
lcr SYS.LCR$_ROW_RECORD;
rc PLS_INTEGER;
command VARCHAR2(10);
old_values SYS.LCR$_ROW_LIST;
BEGIN
-- Re-enqueue the row LCR for explicit dequeue by another application
enq_row_lcr(in_any);
-- Access the LCR
rc := in_any.GETOBJECT(lcr);
-- Get the object command type
command := lcr.GET_COMMAND_TYPE();
-- Check for DELETE command on the hr.employees table
IF command = 'DELETE' THEN
-- Set the command_type in the row LCR to INSERT
lcr.SET_COMMAND_TYPE('INSERT');
-- Set the object_name in the row LCR to EMP_DEL
lcr.SET_OBJECT_NAME('EMP_DEL');
-- Get the old values in the row LCR
old_values := lcr.GET_VALUES('old');
-- Set the old values in the row LCR to the new values in the row LCR
lcr.SET_VALUES('new', old_values);
-- Set the old values in the row LCR to NULL
lcr.SET_VALUES('old', NULL);
-- Add a SYSDATE value for the timestamp column
lcr.ADD_COLUMN('new', 'TIMESTAMP', SYS.AnyData.ConvertDate(SYSDATE));
-- Apply the row LCR as an INSERT into the EMP_DEL table
lcr.EXECUTE(true);
END IF;
END;
/
--配置DML管理者,为hr.employees
BEGIN
DBMS_APPLY_ADM.SET_DML_HANDLER(
object_name => 'hr.employees',
object_type => 'TABLE',
operation_name => 'INSERT',
error_handler => false,
user_procedure => 'strmadmin.emp_dml_handler',
apply_database_link => NULL);
END;
/
BEGIN
DBMS_APPLY_ADM.SET_DML_HANDLER(
object_name => 'hr.employees',
object_type => 'TABLE',
operation_name => 'UPDATE',
error_handler => false,
user_procedure => 'strmadmin.emp_dml_handler',
apply_database_link => NULL);
END;
/
BEGIN
DBMS_APPLY_ADM.SET_DML_HANDLER(
object_name => 'hr.employees',
object_type => 'TABLE',
operation_name => 'DELETE',
error_handler => false,
user_procedure => 'strmadmin.emp_dml_handler',
apply_database_link => NULL);
END;
/
--建立存储过程为出列和再入列事件
CREATE OR REPLACE PROCEDURE emp_dq (consumer IN VARCHAR2) AS
deqopt DBMS_AQ.DEQUEUE_OPTIONS_T;
mprop DBMS_AQ.MESSAGE_PROPERTIES_T;
msgid RAW(16);
payload SYS.AnyData;
new_messages BOOLEAN := TRUE;
row_lcr SYS.LCR$_ROW_RECORD;
tc pls_integer;
next_trans EXCEPTION;
no_messages EXCEPTION;
pragma exception_init (next_trans, -25235);
pragma exception_init (no_messages, -25228);
BEGIN
deqopt.consumer_name := consumer;
deqopt.wait := 1;
WHILE (new_messages) LOOP
BEGIN
DBMS_AQ.DEQUEUE(
queue_name => 'strmadmin.streams_queue',
dequeue_options => deqopt,
message_properties => mprop,
payload => payload,
msgid => msgid);
COMMIT;
deqopt.navigation := DBMS_AQ.NEXT;
IF (payload.GetTypeName = 'SYS.LCR$_ROW_RECORD') THEN
tc := payload.GetObject(row_lcr);
DBMS_OUTPUT.PUT_LINE(row_lcr.GET_COMMAND_TYPE || ' row LCR dequeued');
END IF;
EXCEPTION
WHEN next_trans THEN
deqopt.navigation := DBMS_AQ.NEXT_TRANSACTION;
WHEN no_messages THEN
new_messages := FALSE;
DBMS_OUTPUT.PUT_LINE('No more events');
END;
END LOOP;
END;
/
--配置应用程序
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'hr.employees',
streams_type => 'apply',
streams_name => 'apply_emp',
queue_name => 'strmadmin.streams_queue',
include_dml => true,
include_ddl => false,
source_database => 'ora9');
END;
/
--启动应用程序
BEGIN
DBMS_APPLY_ADM.SET_PARAMETER(
apply_name => 'apply_emp',
parameter => 'disable_on_error',
value => 'n');
END;
/
BEGIN
DBMS_APPLY_ADM.START_APPLY(
apply_name => 'apply_emp');
END;
/
--启动捕获程序
BEGIN
DBMS_CAPTURE_ADM.START_CAPTURE(
capture_name => 'capture_emp');
END;
/
--对hr.employees进行插入,删除和修改
conn hr/hr
INSERT INTO hr.employees values(207, 'JOHN', 'SMITH', 'JSMITH@MYCOMPANY.COM',
NULL, '07-JUN-94', 'AC_ACCOUNT', 777, NULL, NULL, 110);
COMMIT;
UPDATE hr.employees SET salary=5999 WHERE employee_id=206;
COMMIT;
DELETE FROM hr.employees WHERE employee_id=207;
COMMIT;
CONNECT strmadmin/strmadmin
SELECT * FROM hr.emp_del;
SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$STREAMS_QUEUE_TABLE;
EXEC emp_dq('emp_agent');
SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$STREAMS_QUEUE_TABLE;
--显示应用程序的错误
COLUMN APPLY_NAME HEADING 'Apply|Process|Name' FORMAT A8
COLUMN SOURCE_DATABASE HEADING 'Source|Database' FORMAT A8
COLUMN LOCAL_TRANSACTION_ID HEADING 'Local|Transaction|ID' FORMAT A11
COLUMN ERROR_MESSAGE HEADING 'Error Message' FORMAT A50
SELECT APPLY_NAME, SOURCE_DATABASE, LOCAL_TRANSACTION_ID, ERROR_MESSAGE
FROM DBA_APPLY_ERROR;