Oracle 流stream数据的复制
--实验的目的是捕获scott.emp1表的变化,将变化应用到远程数据库scott.emp1表中。
--设置初始化参数
AQ_TM_PROCESSES=1
COMPATIBLE=9.2.0
LOG_PARALLELISM=1
GLOBAL_NAMES=true
JOB_QUEUE_PROCESSES=2
--查看数据库的名称,我的为ora9,将以下的ora9全部替换为你的数据库名称
--数据库为归档模式
conn scott/tiger
drop table emp1;
create table emp1 as select * from emp;
alter table emp1 add constraint pk_emp1 primary key (empno);
--建立管理用户,设定默认表空间,授权
conn / as sysdba
create tablespace streamout datafile 'F:\ORACLE\ORADATA\ORA9\streamout.dbf' size 20m
autoextend on;
create tablespace streamin datafile 'F:\ORACLE\ORADATA\ORA9\streamin.dbf' size 20m
autoextend on;
create tablespace tslogmnr datafile 'F:\ORACLE\ORADATA\ORA9\logmnr.dbf' size 20m
autoextend on;
drop user streamout cascade;
drop user streamin cascade;
--建立streamout帐号-------------------------------------------------------------------
conn / as sysdba
GRANT CONNECT, RESOURCE, SELECT_CATALOG_ROLE
TO streamout IDENTIFIED BY streamout;
ALTER USER streamout DEFAULT TABLESPACE streamout;
GRANT EXECUTE ON DBMS_AQADM TO streamout;
GRANT EXECUTE ON DBMS_CAPTURE_ADM TO streamout;
GRANT EXECUTE ON DBMS_PROPAGATION_ADM TO streamout;
GRANT EXECUTE ON DBMS_STREAMS_ADM TO streamout;
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ,
grantee => 'streamout',
grant_option => FALSE);
END;
/
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ,
grantee => 'streamout',
grant_option => FALSE);
END;
/
--建立流队列,名称叫streams_queue ,用于存储捕获的变化,同时建立了表streams_queue_table
CONNECT streamout/streamout
EXEC DBMS_STREAMS_ADM.SET_UP_QUEUE();
select * from tab;
--建立数据库连接,连接到远程,先配置网络连接,建立的时候要使用数据库的名称
CREATE DATABASE LINK ora11 CONNECT TO streamin IDENTIFIED BY streamin USING 'ora11';
--建立streamin帐号----要是远程其它数据库-------------------------
conn / as sysdba
GRANT CONNECT, RESOURCE, SELECT_CATALOG_ROLE
TO streamin IDENTIFIED BY streamin;
ALTER USER streamin DEFAULT TABLESPACE streamin;
GRANT EXECUTE ON DBMS_APPLY_ADM TO streamin;
GRANT EXECUTE ON DBMS_AQADM TO streamin;
GRANT EXECUTE ON DBMS_STREAMS_ADM TO streamin;
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ,
grantee => 'streamin',
grant_option => FALSE);
END;
/
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ,
grantee => 'streamin',
grant_option => FALSE);
END;
/
conn streamin/streamin
EXEC DBMS_STREAMS_ADM.SET_UP_QUEUE();
select * from tab;
----------------------------本地的数据库--------------------------
--配置logmnr使用的表空间,我们就用tslogmnr
conn / as sysdba
EXECUTE DBMS_LOGMNR_D.SET_TABLESPACE('tslogmnr');
--增强日志的模式
ALTER TABLE scott.emp1 ADD SUPPLEMENTAL LOG GROUP log_group_emp1_pk (empno) ALWAYS;
grant all on scott.emp1 to streamout;
--配置捕获城序-----------------输出的帐号-----------------
CONNECT streamout/streamout
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES(
table_name => 'scott.emp1',
streams_name => 'str1_to_str2',
source_queue_name => 'streamout.streams_queue',
destination_queue_name => 'streamin.streams_queue@ora11',
include_dml => true,
include_ddl => true,
source_database => 'ora9');
END;
/
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'scott.emp1',
streams_type => 'capture',
streams_name => 'capture_simp',
queue_name => 'streamout.streams_queue',
include_dml => true,
include_ddl => true);
END;
/
-------------------------------dos下exp/imp----------------------------------
--实例化emp1表,emp1存在于scott帐号内
将scott.emp1导出,再导入到远程数据库的scott帐号
exp scott/tiger FILE=d:\bk\1.dmp TABLES=emp1 OBJECT_CONSISTENT=y ROWS=n
--imp到远程数据库中
imp scott/tiger FILE=d:\bk\1.dmp IGNORE=y COMMIT=y LOG=import.log STREAMS_INSTANTIATION=y
-----------------------------远程数据库---------------------------------
conn scott/tiger
ALTER TABLE emp1 DROP SUPPLEMENTAL LOG GROUP log_group_emp1_pk;
GRANT ALL ON EMP1 TO streamin;
--配置应用程序-------------------输入的帐号------远程数据库----------
conn streamin/streamin
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'scott.emp1',
streams_type => 'apply',
streams_name => 'apply_simp',
queue_name => 'streamin.streams_queue',
include_dml => true,
include_ddl => true,
source_database => 'ora9');
END;
/
--启动应用程序
conn streamin/streamin
BEGIN
DBMS_APPLY_ADM.SET_PARAMETER(
apply_name => 'apply_simp',
parameter => 'disable_on_error',
value => 'n');
END;
/
conn streamin/streamin
BEGIN
DBMS_APPLY_ADM.START_APPLY(
apply_name => 'apply_simp');
END;
/
--如果想停止应用,该句话不运行
conn streamin/streamin
BEGIN
DBMS_APPLY_ADM.STOP_APPLY(
apply_name => 'apply_simp');
END;
/
--启动捕获程序----------------------输出的帐号-----本地数据库-------
conn streamout/streamout
--如果想停止应用,该句话不运行
BEGIN
DBMS_CAPTURE_ADM.STop_CAPTURE(
capture_name => 'capture_simp');
END;
/
--启动
conn streamout/streamout
BEGIN
DBMS_CAPTURE_ADM.START_CAPTURE(
capture_name => 'capture_simp');
END;
/
--对scott.emp1进行插入,删除和修改
conn scott/tiger
update emp1 set sal=300 where empno=7369;
COMMIT;
--远程数据库
CONNECT scott/tiger
SELECT * FROM emp1;
--显示应用程序的错误------------------------------------------------------
conn streamin/streamin
COLUMN APPLY_NAME HEADING 'Apply|Process|Name' FORMAT A8
COLUMN SOURCE_DATABASE HEADING 'Source|Database' FORMAT A8
COLUMN LOCAL_TRANSACTION_ID HEADING 'Local|Transaction|ID' FORMAT A11
COLUMN ERROR_MESSAGE HEADING 'Error Message' FORMAT A50
SELECT APPLY_NAME, SOURCE_DATABASE, LOCAL_TRANSACTION_ID, ERROR_MESSAGE
FROM DBA_APPLY_ERROR;
conn / as sysdba
select * from DBA_QUEUE_SCHEDULES;
SELECT
p.SOURCE_QUEUE_OWNER||'.'||
p.SOURCE_QUEUE_NAME||'@'||
g.GLOBAL_NAME SOURCE_QUEUE,
p.DESTINATION_QUEUE_OWNER||'.'||
p.DESTINATION_QUEUE_NAME||'@'||
p.DESTINATION_DBLINK DESTINATION_QUEUE
FROM DBA_PROPAGATION p, GLOBAL_NAME g;
SELECT STATUS FROM DBA_CAPTURE;
select * from V$STREAMS_CAPTURE ;