抽取oracle数据到mysql数据库的实现过程
时间:2021-02-06 10:05:23|栏目:Mysql|点击: 次
在oracle数据库迁移至mysql数据库,除了oracle数据库模型移到mysql外,还一个重要环节就是要将oracle数据库的数据移到mysql数据库,本人尝试用过多款数据迁移程序,性能都不是很好的,于是自己动手写一个针对于oracle数据库数据迁移到mysql数据程序,其具体过程如下:
1、要抽取mysql表、字段及过滤条件的配制文件imp_data.sql
2、建立一个目录ETL_DIR
3、运行oracle数据库程序P_ETL_ORA_DATA,生成各表的csv数据文件,同时也生成一个导入mysql的脚本文件imp_data.sql
4、导入mysql数据,文件内容如下
load data infile "alarm_hist_inc.csv" into table alarm_hist_inc fields terminated by "," enclosed by "^" lines terminated by "\r\n"; load data infile "button_authority.csv" into table button_authority fields terminated by "," enclosed by "^" lines terminated by "\r\n"; load data infile "c3_sms_hist_inc.csv" into table c3_sms_hist_inc fields terminated by "," enclosed by "^" lines terminated by "\r\n"; load data infile "datapermisson.csv" into table datapermisson fields terminated by "," enclosed by "^" lines terminated by "\r\n";
附:数据库脚本P_ETL_ORA_DATA
CREATE OR REPLACE PROCEDURE P_ETL_ORA_DATA ( P_ORA_DIR VARCHAR2, P_DATA_PATH VARCHAR2 ) IS TYPE T_REC IS RECORD( TBN VARCHAR2(40), WHR VARCHAR2(4000)); TYPE T_TABS IS TABLE OF T_REC; V_TABS T_TABS := T_TABS(); V_ETL_DIR VARCHAR2(40) := P_ORA_DIR; V_LOAD_FILE UTL_FILE.FILE_TYPE; PROCEDURE ETL_DATA ( P_SQL_STMT VARCHAR2, P_DATA_PATH VARCHAR2, P_TB_NAME VARCHAR2 ) IS BEGIN DECLARE V_VAR_COL VARCHAR2(32767); V_NUM_COL NUMBER; V_DATE_COL DATE; V_TMZ TIMESTAMP; V_COLS NUMBER; V_COLS_DESC DBMS_SQL.DESC_TAB; V_ROW_STR VARCHAR2(32767); V_COL_STR VARCHAR2(32767); V_SQL_ID NUMBER; V_SQL_REF SYS_REFCURSOR; V_EXP_FILE UTL_FILE.FILE_TYPE; V_DATA_PATH VARCHAR2(200); BEGIN V_DATA_PATH := P_DATA_PATH; IF REGEXP_SUBSTR(V_DATA_PATH, '\\$') IS NULL THEN V_DATA_PATH := V_DATA_PATH || '\'; END IF; V_DATA_PATH := REPLACE(V_DATA_PATH, '\', '\\'); OPEN V_SQL_REF FOR P_SQL_STMT; V_SQL_ID := DBMS_SQL.TO_CURSOR_NUMBER(V_SQL_REF); DBMS_SQL.DESCRIBE_COLUMNS(V_SQL_ID, V_COLS, V_COLS_DESC); FOR I IN V_COLS_DESC.FIRST .. V_COLS_DESC.LAST LOOP CASE WHEN V_COLS_DESC(I).COL_TYPE IN (1, 9, 96) THEN DBMS_SQL.DEFINE_COLUMN(V_SQL_ID, I, V_VAR_COL, 32767); WHEN V_COLS_DESC(I).COL_TYPE = 2 THEN DBMS_SQL.DEFINE_COLUMN(V_SQL_ID, I, V_NUM_COL); WHEN V_COLS_DESC(I).COL_TYPE = 12 THEN DBMS_SQL.DEFINE_COLUMN(V_SQL_ID, I, V_DATE_COL); WHEN V_COLS_DESC(I).COL_TYPE = 180 THEN DBMS_SQL.DEFINE_COLUMN(V_SQL_ID, I, V_TMZ); END CASE; END LOOP; DECLARE V_FLUSH_OVER PLS_INTEGER := 1; V_FILE_OVER PLS_INTEGER := 1; V_FILE_NO PLS_INTEGER := 1; V_FILE_NAME VARCHAR2(200); V_LINE VARCHAR2(400); BEGIN WHILE DBMS_SQL.FETCH_ROWS(V_SQL_ID) > 0 LOOP IF V_FILE_OVER = 1 THEN V_FILE_NAME := P_TB_NAME || '_' || V_FILE_NO || '.csv'; V_EXP_FILE := UTL_FILE.FOPEN(V_ETL_DIR, V_FILE_NAME, OPEN_MODE => 'w', MAX_LINESIZE => 32767); END IF; V_ROW_STR := ''; FOR I IN 1 .. V_COLS LOOP V_COL_STR := '\N'; BEGIN CASE WHEN V_COLS_DESC(I).COL_TYPE IN (1, 9, 96) THEN DBMS_SQL.COLUMN_VALUE(V_SQL_ID, I, V_VAR_COL); IF V_VAR_COL IS NOT NULL THEN V_COL_STR := '^' || V_VAR_COL || '^'; END IF; WHEN V_COLS_DESC(I).COL_TYPE = 2 THEN DBMS_SQL.COLUMN_VALUE(V_SQL_ID, I, V_NUM_COL); IF V_NUM_COL IS NOT NULL THEN V_COL_STR := V_NUM_COL; END IF; WHEN V_COLS_DESC(I).COL_TYPE = 12 THEN DBMS_SQL.COLUMN_VALUE(V_SQL_ID, I, V_DATE_COL); IF V_DATE_COL IS NOT NULL THEN V_COL_STR := '^' || TO_CHAR(V_DATE_COL, 'yyyy-mm-dd hh24:mi:ss') || '^'; END IF; WHEN V_COLS_DESC(I).COL_TYPE IN (180, 181, 231) THEN DBMS_SQL.COLUMN_VALUE(V_SQL_ID, I, V_TMZ); IF V_TMZ IS NOT NULL THEN V_COL_STR := '^' || TO_CHAR(V_TMZ, 'yyyy-mm-dd hh24:mi:ss.ff6') || '^'; END IF; END CASE; IF I = 1 THEN V_ROW_STR := V_COL_STR; ELSE V_ROW_STR := V_ROW_STR || ',' || V_COL_STR; END IF; END; END LOOP; UTL_FILE.PUT_LINE(V_EXP_FILE, CONVERT(V_ROW_STR, 'UTF8')); IF V_FILE_OVER > 200000 /*每200000条记录就产生一个新的文件*/ THEN V_FILE_OVER := 1; V_FLUSH_OVER := 1; V_FILE_NO := V_FILE_NO + 1; UTL_FILE.FCLOSE(V_EXP_FILE); V_LINE := 'load data infile "' || V_DATA_PATH || V_FILE_NAME || '" into table ' || P_TB_NAME; V_LINE := V_LINE || ' fields terminated by "," enclosed by "^" lines terminated by "\r\n";'; UTL_FILE.PUT_LINE(V_LOAD_FILE, V_LINE); UTL_FILE.FFLUSH(V_LOAD_FILE); CONTINUE; END IF; V_FILE_OVER := V_FILE_OVER + 1; IF V_FLUSH_OVER > 2000 /*每2000条记录就刷新缓存,写到文件中 */ THEN UTL_FILE.FFLUSH(V_EXP_FILE); V_FLUSH_OVER := 1; ELSE V_FLUSH_OVER := V_FLUSH_OVER + 1; END IF; END LOOP; DBMS_SQL.CLOSE_CURSOR(V_SQL_ID); IF UTL_FILE.IS_OPEN(V_EXP_FILE) THEN UTL_FILE.FCLOSE(V_EXP_FILE); V_LINE := 'load data infile "' || V_DATA_PATH || V_FILE_NAME || '" into table ' || P_TB_NAME; V_LINE := V_LINE || ' fields terminated by "," enclosed by "^" lines terminated by "\r\n";'; UTL_FILE.PUT_LINE(V_LOAD_FILE, V_LINE); UTL_FILE.FFLUSH(V_LOAD_FILE); END IF; END; EXCEPTION WHEN OTHERS THEN IF DBMS_SQL.IS_OPEN(V_SQL_ID) THEN DBMS_SQL.CLOSE_CURSOR(V_SQL_ID); END IF; IF UTL_FILE.IS_OPEN(V_EXP_FILE) THEN UTL_FILE.FCLOSE(V_EXP_FILE); END IF; DBMS_OUTPUT.PUT_LINE(SQLERRM); DBMS_OUTPUT.PUT_LINE(P_SQL_STMT); END; END; BEGIN BEGIN EXECUTE IMMEDIATE 'create table mysql_etl_tbs(tn varchar2(40),cn varchar2(40),ci number) '; EXCEPTION WHEN OTHERS THEN NULL; END; EXECUTE IMMEDIATE 'truncate table mysql_etl_tbs'; DECLARE V_CI PLS_INTEGER; V_CN VARCHAR2(40); V_ETL_COLS VARCHAR2(32767); V_TBN VARCHAR2(30); V_ETL_CFG VARCHAR2(32767); V_CNF_FILE UTL_FILE.FILE_TYPE; V_FROM_POS PLS_INTEGER; BEGIN V_CNF_FILE := UTL_FILE.FOPEN(V_ETL_DIR, 'ETL_TABS.CNF', 'r', 32767); LOOP UTL_FILE.GET_LINE(V_CNF_FILE, V_ETL_CFG, 32767); V_FROM_POS := REGEXP_INSTR(V_ETL_CFG, 'from', 1, 1, 0, 'i'); V_ETL_COLS := SUBSTR(V_ETL_CFG, 1, V_FROM_POS - 1); V_ETL_COLS := REGEXP_SUBSTR(V_ETL_COLS, '(select)(.+)', 1, 1, 'i', 2); V_TBN := REGEXP_SUBSTR(V_ETL_CFG, '(\s+from\s+)(\w+)(\s*)', 1, 1, 'i', 2); V_TBN := UPPER(V_TBN); V_TABS.EXTEND(); V_TABS(V_TABS.LAST).TBN := V_TBN; V_TABS(V_TABS.LAST).WHR := REGEXP_SUBSTR(V_ETL_CFG, '\s+where .+', 1, 1, 'i'); V_CI := 1; LOOP V_CN := REGEXP_SUBSTR(V_ETL_COLS, '\S+', 1, V_CI); EXIT WHEN V_CN IS NULL; V_CN := UPPER(V_CN); EXECUTE IMMEDIATE 'insert into mysql_etl_tbs(tn,cn,ci) values(:1,:2,:3)' USING V_TBN, V_CN, V_CI; COMMIT; V_CI := V_CI + 1; END LOOP; END LOOP; EXCEPTION WHEN UTL_FILE.INVALID_PATH THEN DBMS_OUTPUT.PUT_LINE('指定的目录:ETL_DIR"' || '"无效!'); RETURN; WHEN UTL_FILE.INVALID_FILENAME THEN DBMS_OUTPUT.PUT_LINE('指定的文件:" ETL_TABS.CNF' || '"无效!'); RETURN; WHEN NO_DATA_FOUND THEN UTL_FILE.FCLOSE(V_CNF_FILE); WHEN OTHERS THEN DBMS_OUTPUT.PUT_LINE(SQLERRM); RETURN; END; DECLARE V_CUR_MATCH SYS_REFCURSOR; V_SQL_SMT VARCHAR2(32767); V_TN VARCHAR2(40); V_CN VARCHAR2(40); V_CI PLS_INTEGER; V_COLUMN_NAME VARCHAR2(40); V_ETL_COLS VARCHAR2(32767); V_LINE VARCHAR2(4000); V_TBN VARCHAR2(40); BEGIN V_LOAD_FILE := UTL_FILE.FOPEN(V_ETL_DIR, 'load_data.sql', OPEN_MODE => 'w', MAX_LINESIZE => 32767); FOR T_IX IN V_TABS.FIRST .. V_TABS.LAST LOOP V_SQL_SMT := 'select tn,cn,column_name,ci from ( select * from mysql_etl_tbs where tn='':tbn:'' ) l left join user_tab_columns r on l.tn = r.table_name and l.cn= r.column_name order by ci'; V_TBN := V_TABS(T_IX).TBN; V_SQL_SMT := REPLACE(V_SQL_SMT, ':tbn:', V_TBN); V_ETL_COLS := NULL; OPEN V_CUR_MATCH FOR V_SQL_SMT; LOOP FETCH V_CUR_MATCH INTO V_TN, V_CN, V_COLUMN_NAME, V_CI; EXIT WHEN V_CUR_MATCH%NOTFOUND; IF V_CI > 1 THEN V_ETL_COLS := V_ETL_COLS || ' , '; END IF; IF V_COLUMN_NAME IS NULL THEN V_ETL_COLS := V_ETL_COLS || ' cast(null as number) ' || V_CN; ELSE V_ETL_COLS := V_ETL_COLS || V_CN; END IF; END LOOP; CLOSE V_CUR_MATCH; V_TBN := LOWER(V_TBN); V_SQL_SMT := 'select ' || V_ETL_COLS || ' from ' || V_TBN || V_TABS(T_IX).WHR; ETL_DATA(V_SQL_SMT, P_DATA_PATH, V_TBN); END LOOP; IF UTL_FILE.IS_OPEN(V_LOAD_FILE) THEN UTL_FILE.FCLOSE(V_LOAD_FILE); END IF; END; END P_ETL_ORA_DATA;
总结