Tuesday, October 9, 2007

Streams....

Streams:Streams enables the propagation and management of data, transactions and events in a data stream either within a database, or from one database to another






There are 3 important processes involved in this technology:

* Capture
* Propagation
* Apply

Instance Setup
In order to begin the following parameters should be set in the spfiles of participating databases:

ALTER SYSTEM SET JOB_QUEUE_PROCESSES=1;
ALTER SYSTEM SET AQ_TM_PROCESSES=1;
ALTER SYSTEM SET GLOBAL_NAMES=TRUE;
ALTER SYSTEM SET COMPATIBLE='9.2.0' SCOPE=SPFILE;
ALTER SYSTEM SET LOG_PARALLELISM=1 SCOPE=SPFILE;
SHUTDOWN IMMEDIATE;
STARTUP;
In addition, any databases involved in capture (nuc1) must be in ARCHIVELOG mode.


************ Stream Administrator Setup
*************
*************

Next we create a stream administrator, a stream queue table and a database link on the source database:

CONN sys/password@nuc1 AS SYSDBA

CREATE USER strmadmin IDENTIFIED BY strmadminpw
DEFAULT TABLESPACE users QUOTA UNLIMITED ON users;

GRANT CONNECT, RESOURCE, SELECT_CATALOG_ROLE TO strmadmin;

GRANT EXECUTE ON DBMS_AQADM TO strmadmin;
GRANT EXECUTE ON DBMS_CAPTURE_ADM TO strmadmin;
GRANT EXECUTE ON DBMS_PROPAGATION_ADM TO strmadmin;
GRANT EXECUTE ON DBMS_STREAMS_ADM TO strmadmin;
GRANT EXECUTE ON DBMS_APPLY_ADM TO strmadmin;
GRANT EXECUTE ON DBMS_FLASHBACK TO strmadmin;

BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ,
grantee => 'strmadmin',
grant_option => FALSE);
END;
/

BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ,
grantee => 'strmadmin',
grant_option => FALSE);
END;
/

CONNECT strmadmin/strmadminpw
EXEC DBMS_STREAMS_ADM.SET_UP_QUEUE();

CREATE DATABASE LINK TEST CONNECT TO strmadmin IDENTIFIED BY strmadminpw USING 'TEST';

This process must be repeated on the destination database (TEST). The reverse database link is not necessary in this example but the following grant must be added:

GRANT ALL ON scott.dept TO strmadmin;


LogMinor Tablespace Setup
Next we create a new tablespace to hold the logminor tables on the source database: nuc

CONN sys/password@nuc AS SYSDBA

CREATE TABLESPACE logmnr_ts DATAFILE '/export/u01/oradata/TEST/logmnr01.dbf'
SIZE 25 M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

EXECUTE DBMS_LOGMNR_D.SET_TABLESPACE('logmnr_ts');

Supplemental Logging
The apply process requires additional information for some actions so we must configure suplimental logging of primary key information for tables of interest:

CONN sys/password@nuc AS SYSDBA
ALTER TABLE scott.dept ADD SUPPLEMENTAL LOG GROUP log_group_dept_pk (deptno) ALWAYS;

Configure Propagation Process

Configure the propagation process on nuc:

CONNECT strmadmin/strmadminpw@nuc
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES(
table_name => 'scott.dept',
streams_name => 'nuc_to_TEST',
source_queue_name => 'strmadmin.streams_queue',
destination_queue_name => 'strmadmin.streams_queue@test',
include_dml => true,
include_ddl => true,
source_database => 'nuc');
END;
/

The propagation is performed using a job which can be monitored using:

SELECT job,
TO_CHAR(last_date, 'DD-Mon-YYYY HH24:MI:SS') last_date,
TO_CHAR(next_date, 'DD-Mon-YYYY HH24:MI:SS') next_date,
what
FROM dba_jobs;

**********
Configure Capture Process
****************


Configure the capture process on nuc:

CONNECT strmadmin/strmadminpw@nuc
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'scott.dept',
streams_type => 'capture',
streams_name => 'capture_simp',
queue_name => 'strmadmin.streams_queue',
include_dml => true,
include_ddl => true);
END;
/

Configure Instantiation SCN

The instantiation SCN of the source table must be configured in the destination table before the apply process will work. If the destination table is
already present this can be accomplished using a metadata only export/import:

exp userid=scott/tiger@nuc FILE=dept_instant.dmp TABLES=dept OBJECT_CONSISTENT=y ROWS=n
imp userid=scott/tiger@TEST FILE=dept_instant.dmp IGNORE=y COMMIT=y LOG=import.log STREAMS_INSTANTIATION=y

During the transfer of the meta information the supplematal logging was also transferred. Since no capture is done on TEST this can be removed:

CONN sys/password@TEST AS SYSDBA
ALTER TABLE scott.dept DROP SUPPLEMENTAL LOG GROUP log_group_dept_pk;

Alternatively the instantiation SCN can be set using the DBMS_APPLY_ADM package:

CONNECT strmadmin/strmadminpw@nuc
DECLARE
v_scn NUMBER;
BEGIN
v_scn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN@TEST(
source_object_name => 'scott.dept',
source_database_name => 'nuc',
instantiation_scn => v_scn);
END;


Configure Apply Process
Configure the apply process on the destination database (TEST):

CONNECT strmadmin/strmadminpw@TEST
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'scott.dept',
streams_type => 'apply',
streams_name => 'apply_simp',
queue_name => 'strmadmin.streams_queue',
include_dml => true,
include_ddl => true,
source_database => 'nuc');
END;
/

Start Apply Process
Start the apply process on destination database (TEST) and prevent errors stopping the process:

CONNECT strmadmin/strmadminpw@TEST
BEGIN
DBMS_APPLY_ADM.SET_PARAMETER(
apply_name => 'apply_simp',
parameter => 'disable_on_error',
value => 'n');

DBMS_APPLY_ADM.START_APPLY(
apply_name => 'apply_simp');
END;


Start Capture Process
Start the capture process on the source database (nuc):

CONNECT strmadmin/strmadminpw@nuc
BEGIN
DBMS_CAPTURE_ADM.START_CAPTURE(
capture_name => 'capture_simp');
END;
/

Test It
With the streams activated we can see that DML changes to the source table are visible in the destination table:

CONNECT scott/tiger@nuc
INSERT INTO dept (deptno, dname, loc) VALUES (99, 'Test Dept', 'UK');
COMMIT;

SELECT * FROM dept;

DEPTNO DNAME LOC
---------- -------------- -------------
10 ACCOUNTING NEW YORK
20 RESEARCH DALLAS
30 SALES CHICAGO
40 OPERATIONS BOSTON
99 Test Dept UK

5 rows selected.

CONNECT scott/tiger@TEST
SELECT * FROM dept;

DEPTNO DNAME LOC
---------- -------------- -------------
10 ACCOUNTING NEW YORK
20 RESEARCH DALLAS
30 SALES CHICAGO
40 OPERATIONS BOSTON
99 Test Dept UK

5 rows selected.

We can also see that DDL changes to the source table are reflected in the destination table:

CONNECT scott/tiger@nuc
ALTER TABLE dept ADD (
new_col NUMBER(10)
)
/
DESC dept

Name Null? Type
---------------------------- -------- --------------
DEPTNO NOT NULL NUMBER(2)
DNAME VARCHAR2(14)
LOC VARCHAR2(13)
NEW_COL NUMBER(10)

CONNECT scott/tiger@TEST
DESC dept

Name Null? Type
---------------------------- -------- --------------
DEPTNO NOT NULL NUMBER(2)
DNAME VARCHAR2(14)
LOC VARCHAR2(13)
NEW_COL NUMBER(10)

The contents of the streams can be viewed using statements like:

SELECT s.user_data.getTypeName()
FROM streams_queue_table s;

SET SERVEROUTPUT ON
DECLARE
v_anydata SYS.ANYDATA;
v_lcr SYS.LCR$_ROW_RECORD;
v_row_list SYS.LCR$_ROW_LIST;
v_result PLS_INTEGER;
BEGIN

SELECT user_data
INTO v_anydata
FROM strmadmin.streams_queue_table
WHERE rownum < 2;

v_result := ANYDATA.GetObject(
self => v_anydata,
obj => v_lcr);

DBMS_OUTPUT.PUT_LINE('Command Type : ' || v_lcr.Get_Command_Type);
DBMS_OUTPUT.PUT_LINE('Object Owner : ' || v_lcr.Get_Object_Owner);
DBMS_OUTPUT.PUT_LINE('Object Name : ' || v_lcr.Get_Object_Name);
DBMS_OUTPUT.PUT_LINE('Source Database Name : ' || v_lcr.Get_Source_Database_Name);
END;
/

Clean Up
All rules can be identified and removed using the following statements:

BEGIN
FOR cur_rec IN (SELECT rule_owner, rule_name FROM dba_rules) LOOP
DBMS_RULE_ADM.DROP_RULE(
rule_name => cur_rec.rule_owner || '.' || cur_rec.rule_name,
force => TRUE);
END LOOP;
END;
/

All capture and apply processes can be identified, stopped and dropped using:

BEGIN
FOR cur_rec IN (SELECT capture_name FROM dba_capture) LOOP
DBMS_CAPTURE_ADM.STOP_CAPTURE(
capture_name => cur_rec.capture_name);
DBMS_CAPTURE_ADM.DROP_CAPTURE(
capture_name => cur_rec.capture_name);
END LOOP;

FOR cur_rec IN (SELECT apply_name FROM dba_apply) LOOP
DBMS_APPLY_ADM.STOP_APPLY(
apply_name => cur_rec.apply_name);
DBMS_APPLY_ADM.DROP_APPLY(
apply_name => cur_rec.apply_name);
END LOOP;
END;
/

All streams information relating to a specific object can be purged using:

BEGIN
DBMS_STREAMS_ADM.PURGE_SOURCE_CATALOG(
source_database => 'nuc',
source_object_name => 'scott.dept',
source_object_type => 'TABLE');
END;

3 comments:

Anonymous said...

Nice dispatch and this enter helped me alot in my college assignement. Say thank you you for your information.

Tim... said...

Hi.

This article is a blatant copy of my original, posted here:

http://www.oracle-base.com/articles/9i/Streams9i.php

You even have all the typos, like spelling logminer as "logminor", which I corrected in my article today. Copying content is illegal. Please remove this blog post, or replace it with a link to the original article.

Regards

Tim...

Anonymous said...

Mr. ayyu please have some original ideas. do not copy oters document