Oracle® Streams Extended Examples 11g Release 2 (11.2) Part Number E12862-01 |
|
|
This chapter illustrates an example that creates a PL/SQL procedure for constructing and enqueuing LCRs that contain LOBs.
This chapter contains this topic:
Grant the Oracle Streams Administrator EXECUTE Privilege on DBMS_STREAMS_MESSAGING
Grant the Oracle Streams Administrator Necessary Privileges on the Tables
Note:
If you are viewing this document online, then you can copy the text from the "BEGINNING OF SCRIPT" line after this note to the next "END OF SCRIPT" line into a text editor and then edit the text to create a script for your environment. Run the script with SQL*Plus on a computer that can connect to all of the databases in the environment./************************* BEGINNING OF SCRIPT ******************************
Run SET
ECHO
ON
and specify the spool file for the script. Check the spool file for errors after you run this script.
*/ SET ECHO ON SPOOL lob_construct.out /*
Explicit EXECUTE
privilege on the package is required because a procedure in the package is called in within a PL/SQL procedure in Step 8.
*/ CONNECT / AS SYSDBA; GRANT EXECUTE ON DBMS_STREAMS_MESSAGING TO strmadmin; /*
*/
SET ECHO ON SET FEEDBACK 1 SET NUMWIDTH 10 SET LINESIZE 80 SET TRIMSPOOL ON SET TAB OFF SET PAGESIZE 100 SET SERVEROUTPUT ON SIZE 100000 CONNECT strmadmin /*
*/
BEGIN DBMS_STREAMS_ADM.SET_UP_QUEUE( queue_table => 'lobex_queue_table', queue_name => 'lobex_queue'); END; / /*
*/
BEGIN DBMS_APPLY_ADM.CREATE_APPLY( queue_name => 'strmadmin.lobex_queue', apply_name => 'apply_lob', apply_captured => FALSE); END; / BEGIN DBMS_APPLY_ADM.SET_PARAMETER( apply_name => 'apply_lob', parameter => 'disable_on_error', value => 'N'); END; / BEGIN DBMS_APPLY_ADM.START_APPLY( 'apply_lob'); END; / /*
*/
CONNECT system CREATE TABLESPACE lob_user_tbs DATAFILE 'lob_user_tbs.dbf' SIZE 5M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED; ACCEPT password PROMPT 'Enter password for user: ' HIDE CREATE USER lob_user IDENTIFIED BY &password DEFAULT TABLESPACE lob_user_tbs QUOTA UNLIMITED ON lob_user_tbs; GRANT ALTER SESSION, CREATE CLUSTER, CREATE DATABASE LINK, CREATE SEQUENCE, CREATE SESSION, CREATE SYNONYM, CREATE TABLE, CREATE VIEW, CREATE INDEXTYPE, CREATE OPERATOR, CREATE PROCEDURE, CREATE TRIGGER, CREATE TYPE TO lob_user; CONNECT lob_user/lob_user_pw CREATE TABLE with_clob (a NUMBER PRIMARY KEY, c1 CLOB, c2 CLOB, c3 CLOB); CREATE TABLE with_blob (a NUMBER PRIMARY KEY, b BLOB); /*
Granting these privileges enables the Oracle Streams administrator to get the LOB length for offset and to perform DML operations on the tables.
*/ GRANT ALL ON with_clob TO strmadmin; GRANT ALL ON with_blob TO strmadmin; COMMIT; /*
*/
CONNECT strmadmin CREATE OR REPLACE PROCEDURE enq_row_lcr(source_dbname VARCHAR2, cmd_type VARCHAR2, obj_owner VARCHAR2, obj_name VARCHAR2, old_vals SYS.LCR$_ROW_LIST, new_vals SYS.LCR$_ROW_LIST) AS xr_lcr SYS.LCR$_ROW_RECORD; BEGIN xr_lcr := SYS.LCR$_ROW_RECORD.CONSTRUCT( source_database_name => source_dbname, command_type => cmd_type, object_owner => obj_owner, object_name => obj_name, old_values => old_vals, new_values => new_vals); -- Enqueue a row lcr DBMS_STREAMS_MESSAGING.ENQUEUE( queue_name => 'lobex_queue', payload => ANYDATA.ConvertObject(xr_lcr)); END enq_row_lcr; / SHOW ERRORS /*
*/
-- Description of each variable: -- src_dbname : Source database name -- tab_owner : Table owner -- tab_name : Table name -- col_name : Name of the CLOB column -- new_vals : SYS.LCR$_ROW_LIST containing primary key and supplementally -- logged colums -- clob_data : CLOB that contains data to be sent -- offset : Offset from which data should be sent, default is 1 -- lsize : Size of data to be sent, default is 0 -- chunk_size : Size used for creating LOB chunks, default is 2048 CREATE OR REPLACE FUNCTION do_enq_clob(src_dbname VARCHAR2, tab_owner VARCHAR2, tab_name VARCHAR2, col_name VARCHAR2, new_vals SYS.LCR$_ROW_LIST, clob_data CLOB, offset NUMBER default 1, lsize NUMBER default 0, chunk_size NUMBER default 2048) RETURN NUMBER IS lob_offset NUMBER; -- maintain lob offset newunit SYS.LCR$_ROW_UNIT; tnewvals SYS.LCR$_ROW_LIST; lob_flag NUMBER; lob_data VARCHAR2(32767); lob_size NUMBER; unit_pos NUMBER; final_size NUMBER; exit_flg BOOLEAN; c_size NUMBER; i NUMBER; BEGIN lob_size := DBMS_LOB.GETLENGTH(clob_data); unit_pos := new_vals.count + 1; tnewvals := new_vals; c_size := chunk_size; i := 0; -- validate parameters IF (unit_pos <= 1) THEN DBMS_OUTPUT.PUT_LINE('Invalid new_vals list'); RETURN 1; END IF; IF (c_size < 1) THEN DBMS_OUTPUT.PUT_LINE('Invalid LOB chunk size'); RETURN 1; END IF; IF (lsize < 0 OR lsize > lob_size) THEN DBMS_OUTPUT.PUT_LINE('Invalid LOB size'); RETURN 1; END IF; IF (offset < 1 OR offset >= lob_size) THEN DBMS_OUTPUT.PUT_LINE('Invalid lob offset'); RETURN 1; ELSE lob_offset := offset; END IF; -- calculate final size IF (lsize = 0) THEN final_size := lob_size; ELSE final_size := lob_offset + lsize; END IF; -- The following output lines are for debugging purposes only. -- DBMS_OUTPUT.PUT_LINE('Final size: ' || final_size); -- DBMS_OUTPUT.PUT_LINE('Lob size: ' || lob_size); IF (final_size < 1 OR final_size > lob_size) THEN DBMS_OUTPUT.PUT_LINE('Invalid lob size'); RETURN 1; END IF; -- expand new_vals list for LOB column tnewvals.extend(); exit_flg := FALSE; -- Enqueue all LOB chunks LOOP -- The following output line is for debugging purposes only. DBMS_OUTPUT.PUT_LINE('About to write chunk#' || i); i := i + 1; -- check if last LOB chunk IF ((lob_offset + c_size) < final_size) THEN lob_flag := DBMS_LCR.LOB_CHUNK; ELSE lob_flag := DBMS_LCR.LAST_LOB_CHUNK; exit_flg := TRUE; -- The following output line is for debugging purposes only. DBMS_OUTPUT.PUT_LINE('Last LOB chunk'); END IF; -- The following output lines are for debugging purposes only. DBMS_OUTPUT.PUT_LINE('lob offset: ' || lob_offset); DBMS_OUTPUT.PUT_LINE('Chunk size: ' || to_char(c_size)); lob_data := DBMS_LOB.SUBSTR(clob_data, c_size, lob_offset); -- create row unit for clob newunit := SYS.LCR$_ROW_UNIT(col_name, ANYDATA.ConvertVarChar2(lob_data), lob_flag, lob_offset, NULL); -- insert new LCR$_ROW_UNIT tnewvals(unit_pos) := newunit; -- enqueue lcr enq_row_lcr( source_dbname => src_dbname, cmd_type => 'LOB WRITE', obj_owner => tab_owner, obj_name => tab_name, old_vals => NULL, new_vals => tnewvals); -- calculate next chunk size lob_offset := lob_offset + c_size; IF ((final_size - lob_offset) < c_size) THEN c_size := final_size - lob_offset + 1; END IF; -- The following output line is for debugging purposes only. DBMS_OUTPUT.PUT_LINE('Next chunk size : ' || TO_CHAR(c_size)); IF (c_size < 1) THEN exit_flg := TRUE; END IF; EXIT WHEN exit_flg; END LOOP; RETURN 0; END do_enq_clob; / SHOW ERRORS /*
The DBMS_OUTPUT
lines in the following example can be used for debugging purposes if necessary. If they are not needed, then they can be commented out or deleted.
*/ SET SERVEROUTPUT ON SIZE 100000 DECLARE c1_data CLOB; c2_data CLOB; c3_data CLOB; newunit1 SYS.LCR$_ROW_UNIT; newunit2 SYS.LCR$_ROW_UNIT; newunit3 SYS.LCR$_ROW_UNIT; newunit4 SYS.LCR$_ROW_UNIT; newvals SYS.LCR$_ROW_LIST; big_data VARCHAR(22000); n NUMBER; BEGIN -- Create primary key for LCR$_ROW_UNIT newunit1 := SYS.LCR$_ROW_UNIT('A', ANYDATA.ConvertNumber(3), NULL, NULL, NULL); -- Create empty CLOBs newunit2 := sys.lcr$_row_unit('C1', ANYDATA.ConvertVarChar2(NULL), DBMS_LCR.EMPTY_LOB, NULL, NULL); newunit3 := SYS.LCR$_ROW_UNIT('C2', ANYDATA.ConvertVarChar2(NULL), DBMS_LCR.EMPTY_LOB, NULL, NULL); newunit4 := SYS.LCR$_ROW_UNIT('C3', ANYDATA.ConvertVarChar2(NULL), DBMS_LCR.EMPTY_LOB, NULL, NULL); newvals := SYS.LCR$_ROW_LIST(newunit1,newunit2,newunit3,newunit4); -- Perform an insert enq_row_lcr( source_dbname => 'MYDB.EXAMPLE.COM', cmd_type => 'INSERT', obj_owner => 'LOB_USER', obj_name => 'WITH_CLOB', old_vals => NULL, new_vals => newvals); -- construct clobs big_data := RPAD('Hello World', 1000, '_'); big_data := big_data || '#'; big_data := big_data || big_data || big_data || big_data || big_data; DBMS_LOB.CREATETEMPORARY( lob_loc => c1_data, cache => TRUE); DBMS_LOB.WRITEAPPEND( lob_loc => c1_data, amount => length(big_data), buffer => big_data); big_data := RPAD('1234567890#', 1000, '_'); big_data := big_data || big_data || big_data || big_data; DBMS_LOB.CREATETEMPORARY( lob_loc => c2_data, cache => TRUE); DBMS_LOB.WRITEAPPEND( lob_loc => c2_data, amount => length(big_data), buffer => big_data); big_data := RPAD('ASDFGHJKLQW', 2000, '_'); big_data := big_data || '#'; big_data := big_data || big_data || big_data || big_data || big_data; DBMS_LOB.CREATETEMPORARY( lob_loc => c3_data, cache => TRUE); DBMS_LOB.WRITEAPPEND( lob_loc => c3_data, amount => length(big_data), buffer => big_data); -- pk info newunit1 := SYS.LCR$_ROW_UNIT('A', ANYDATA.ConvertNumber(3), NULL, NULL, NULL); newvals := SYS.LCR$_ROW_LIST(newunit1); -- write c1 clob n := do_enq_clob( src_dbname => 'MYDB.EXAMPLE.COM', tab_owner => 'LOB_USER', tab_name => 'WITH_CLOB', col_name => 'C1', new_vals => newvals, clob_data => c1_data, offset => 1, chunk_size => 1024); DBMS_OUTPUT.PUT_LINE('n=' || n); -- write c2 clob newvals := SYS.LCR$_ROW_LIST(newunit1); n := do_enq_clob( src_dbname => 'MYDB.EXAMPLE.COM', tab_owner => 'LOB_USER', tab_name => 'WITH_CLOB', col_name => 'C2', new_vals => newvals, clob_data => c2_data, offset => 1, chunk_size => 2000); DBMS_OUTPUT.PUT_LINE('n=' || n); -- write c3 clob newvals := SYS.LCR$_ROW_LIST(newunit1); n := do_enq_clob(src_dbname=>'MYDB.EXAMPLE.COM', tab_owner => 'LOB_USER', tab_name => 'WITH_CLOB', col_name => 'C3', new_vals => newvals, clob_data => c3_data, offset => 1, chunk_size => 500); DBMS_OUTPUT.PUT_LINE('n=' || n); COMMIT; END; / /*
Check the lob_construct.out
spool file to ensure that all actions completed successfully after this script completes.
*/ SET ECHO OFF SPOOL OFF /*************************** END OF SCRIPT ******************************/
After you run the script, you can check the lob_user.with_clob
table to list the rows applied by the apply process. The DBMS_LOCK.SLEEP
statement is used to give the apply process time to apply the enqueued rows.
CONNECT lob_user/lob_user_pw EXECUTE DBMS_LOCK.SLEEP(10); SELECT a, c1, c2, c3 FROM with_clob ORDER BY a; SELECT a, LENGTH(c1), LENGTH(c2), LENGTH(c3) FROM with_clob ORDER BY a;