Oracle同步数据到kafka的方法
环境准备
软件准备
- CentOS Linux 7.6.1810 (2台,A主机,B主机)
- Oracle 11.2.0.4(A主机安装)
- Kafka 2.13-2.6.0 (B主机安装)
- kafka-connect-oracle-master (B主机安装,开源程序,用于同步Oracle数据到kafka)
- apache-maven 3.6.3 (B主机安装,kafka-connect-oracle-master 的打包工具)
- jdk-8u261-linux-x64.rpm (B主机安装)
下载地址
- Cenos 和Oracle 和 JDK 自行到官网下载
- Kafka http://kafka.apache.org/downloads
- Kafka Connect Oracle https://github.com/tianxiancode/kafka-connect-oracle
- apache-maven 3.6.3 http://maven.apache.org/download.cgi
实施过程
Oracle主机(A)配置
Oracle实例配置项:
- 开启归档日志
- 开启附加日志
- 创建kafka-connect-oracle-master连接用户
- 创建测试数据生成用户及测试表
--开启归档日志 sqlplus / as sysdba SQL>shutdown immediate SQL>startup mount SQL>alter database archivelog; SQL>alter database open; --开启附加日志 SQL>alter database add supplemental log data (all) columns; --创建kafka-connect-oracle-master连接用户 create role logmnr_role; grant create session to logmnr_role; grant execute_catalog_role,select any transaction ,select any dictionary to logmnr_role; create user kminer identified by kminerpass; grant logmnr_role to kminer; alter user kminer quota unlimited on users; --创建测试数据生成用户及测试表 create tablespace test_date datafile '/u01/app/oracle/oradata/zzsrc/test_date01.dbf' size 100M autoextend on next 10M; create user whtest identified by whtest default tablespace test_date; grant connect,resource to whtest; grant execute on dbms_scheduler to whtest; grant execute on dbms_random to whtest; grant create job to whtest; create table t1 ( id int , name char(10), createtime date default sysdate ); alter table WHTEST.T1 add constraint PK_ID_T1 primary key (ID) using index tablespace TEST_DATE; create table t2 ( id int , name char(10), createtime date default sysdate ); alter table WHTEST.T2 add constraint PK_ID_T2 primary key (ID) using index tablespace TEST_DATE; create table t3 ( id int , name char(10), createtime date default sysdate ); alter table WHTEST.T3 add constraint PK_ID_T3 primary key (ID) using index tablespace TEST_DATE; begin dbms_scheduler.create_job( job_name=> 't1_job', job_type=> 'PLSQL_BLOCK', job_action =>'declare v_id int; v_name char(10); begin for i in 1..10 loop v_id := round(dbms_random.value(1,1000000000)); v_name :=round(dbms_random.value(1,1000000000)); insert into whtest.t1 (id,name)values(v_id,v_name); end loop; end;', enabled=>true, repeat_interval=>'sysdate + 5/86400', comments=>'insert into t1 every 5 sec'); end; / begin dbms_scheduler.create_job( job_name=> 't2_job', job_type=> 'PLSQL_BLOCK', job_action =>'declare v_id int; v_name char(10); begin for i in 1..10 loop v_id := round(dbms_random.value(1,1000000000)); v_name :=round(dbms_random.value(1,1000000000)); insert into whtest.t2 (id,name)values(v_id,v_name); end loop; end;', enabled=>true, repeat_interval=>'sysdate + 5/86400', comments=>'insert into t1 every 5 sec'); end; / begin dbms_scheduler.create_job( job_name=> 't3_job', job_type=> 'PLSQL_BLOCK', job_action =>'declare v_id int; v_name char(10); begin for i in 1..10 loop v_id := round(dbms_random.value(1,1000000000)); v_name :=round(dbms_random.value(1,1000000000)); insert into whtest.t3 (id,name)values(v_id,v_name); end loop; end;', enabled=>true, repeat_interval=>'sysdate + 5/86400', comments=>'insert into t3 every 5 sec'); end; / --JOB创建之后,暂时先diable,待kafka配置完成之后再enable exec DBMS_SCHEDULER.DISABLE('T1_JOB'); exec DBMS_SCHEDULER.DISABLE('T2_JOB'); exec DBMS_SCHEDULER.DISABLE('T3_JOB'); exec DBMS_SCHEDULER.ENABLE('T1_JOB'); exec DBMS_SCHEDULER.ENABLE('T2_JOB'); exec DBMS_SCHEDULER.ENABLE('T3_JOB');
Kafka主机(B)配置
? 将下载好的Kafka 2.13-2.6.0 、kafka-connect-oracle-master、apache-maven 3.6.3、JDK 1.8.0上传至B主机/soft目录待使用。
主机hosts文件添加解析
[root@softdelvily ~]# cat /etc/hosts 127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6 192.168.20.44 softdelvily localhost
安装JDK
[root@softdelvily soft]# rpm -ivh jdk-8u261-linux-x64.rpm warning: jdk-8u261-linux-x64.rpm: Header V3 RSA/SHA256 Signature, key ID ec551f03: NOKEY Preparing... ################################# [100%] Updating / installing... 1:jdk1.8-2000:1.8.0_261-fcs ################################# [100%] Unpacking JAR files... tools.jar... plugin.jar... javaws.jar... deploy.jar... rt.jar... jsse.jar... charsets.jar... localedata.jar...
配置apache-maven工具
? 将apache-maven-3.6.3-bin.tar.gz解压至/usr/local目录,并设置相应的/etc/profile环境变量。
[root@softdelvily soft]# tar xvf apache-maven-3.6.3-bin.tar.gz -C /usr/local/ apache-maven-3.6.3/README.txt apache-maven-3.6.3/LICENSE ..... [root@softdelvily soft]# cd /usr/local/ [root@softdelvily local]# ll total 0 drwxr-xr-x. 6 root root 99 Sep 23 09:56 apache-maven-3.6.3 drwxr-xr-x. 2 root root 6 Apr 11 2018 bin ..... [root@softdelvily local]# vi /etc/profile ....... ##添加如下环境变量 MAVEN_HOME=/usr/local/apache-maven-3.6.3 export MAVEN_HOME export PATH=${PATH}:${MAVEN_HOME}/bin [root@softdelvily local]# source /etc/profile [root@softdelvily local]# mvn -v Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f) Maven home: /usr/local/apache-maven-3.6.3 Java version: 1.8.0_262, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.262.b10-0.el7_8.x86_64/jre Default locale: en_US, platform encoding: UTF-8 OS name: "linux", version: "3.10.0-957.el7.x86_64", arch: "amd64", family: "unix"
配置Kafka 2.13-2.6.0
? 解压Kafka 2.13-2.6.0 至/usr/local目录。
[root@softdelvily soft]# tar xvf kafka_2.13-2.6.0.tgz -C /usr/local/ kafka_2.13-2.6.0/ kafka_2.13-2.6.0/LICENSE ...... [root@softdelvily soft]# cd /usr/local/ [root@softdelvily local]# ll total 0 drwxr-xr-x. 6 root root 99 Sep 23 09:56 apache-maven-3.6.3 drwxr-xr-x. 6 root root 89 Jul 29 02:23 kafka_2.13-2.6.0 .....
? 开启kafka,并创建对应同步数据库过的topic
--1、session1 启动ZK [root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/ [root@softdelvily bin]# ./zookeeper-server-start.sh ../config/zookeeper.properties [2020-09-23 10:06:49,158] INFO Reading configuration from: ../config/zookeeper.properties ....... [2020-09-23 10:06:49,311] INFO Using checkIntervalMs=60000 maxPerMinute=10000 (org.apache.zookeeper.server.ContainerManager) --2、session2 启动kafka [root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/ [root@softdelvily bin]# ./kafka-server-start.sh ../config/server.properties --3、session3 创建cdczztar [root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/ [root@softdelvily bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic cdczztar Created topic cdczztar. [root@softdelvily bin]# ./kafka-topics.sh --zookeeper localhost:2181 --list __consumer_offsets cdczztar
配置kafka-connect-oracle-maste
解压kafka-connect-oracle-master至/soft目录,并配置相应config文件,然后使用maven工具编译程序。
--解压zip包 [root@softdelvily soft]# unzip kafka-connect-oracle-master.zip [root@softdelvily soft]# ll total 201180 -rw-r--r--. 1 root root 9506321 Sep 22 16:05 apache-maven-3.6.3-bin.tar.gz -rw-r--r--. 1 root root 127431820 Sep 8 10:43 jdk-8u261-linux-x64.rpm -rw-r--r--. 1 root root 65537909 Sep 22 15:59 kafka_2.13-2.6.0.tgz drwxr-xr-x. 5 root root 107 Sep 8 15:48 kafka-connect-oracle-master -rw-r--r--. 1 root root 3522729 Sep 22 14:14 kafka-connect-oracle-master.zip [root@softdelvily soft]# cd kafka-connect-oracle-master/config/ [root@softdelvily config]# ll total 4 -rw-r--r--. 1 root root 1135 Sep 8 15:48 OracleSourceConnector.properties --调整properties配置文件 --需要调整项db.name.alias、topic、db.name、db.hostname、db.user、db.user.password、table.whitelist、table.blacklist信息,具体说明参考README.md [root@softdelvily config]# vi OracleSourceConnector.properties name=oracle-logminer-connector connector.class=com.ecer.kafka.connect.oracle.OracleSourceConnector db.name.alias=zztar tasks.max=1 topic=cdczztar db.name=zztar db.hostname=192.168.xx.xx db.port=1521 db.user=kminer db.user.password=kminerpass db.fetch.size=1 table.whitelist=WHTEST.T1,WHTEST.T2 table.blacklist=WHTEST.T3 parse.dml.data=true reset.offset=true start.scn= multitenant=false --编译程序 [root@softdelvily ~]# cd /soft/kafka-connect-oracle-master [root@softdelvily kafka-connect-oracle-master]# mvn clean package [INFO] Scanning for projects... ....... [INFO] Building jar: /soft/kafka-connect-oracle-master/target/kafka-connect-oracle-1.0.68.jar with assembly file: /soft/kafka-connect-oracle-master/target/kafka-connect-oracle-1.0.68.jar [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 94.03 s [INFO] Finished at: 2020-09-23T10:25:52+08:00 [INFO] ------------------------------------------------------------------------
? 将如下文件复制到kafka工作目录。
- 复制kafka-connect-oracle-1.058.jar 和 lib/ojdbc7.jar 到$KAFKA_HOME/lib
- 复制config/OracleSourceConnector.properties 文件到$KAFKA_HOME/config
[root@softdelvily config]# cd /soft/kafka-connect-oracle-master/target/ [root@softdelvily target]# cp kafka-connect-oracle-1.0.68.jar /usr/local/kafka_2.13-2.6.0/libs/ [root@softdelvily lib]# cd /soft/kafka-connect-oracle-master/lib [root@softdelvily lib]# cp ojdbc7.jar /usr/local/kafka_2.13-2.6.0/libs/ [root@softdelvily lib]# cd /soft/kafka-connect-oracle-master/config/ [root@softdelvily config]# cp OracleSourceConnector.properties /usr/local/kafka_2.13-2.6.0/config/
启动kafka-connect-oracle
[root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/ [root@softdelvily bin]# ./connect-standalone.sh ../config/connect-standalone.properties ../config/OracleSourceConnector.properties ...... (com.ecer.kafka.connect.oracle.OracleSourceTask:187) [2020-09-23 10:40:31,375] INFO Log Miner will start at new position SCN : 2847346 with fetch size : 1 (com.ecer.kafka.connect.oracle.OracleSourceTask:188)
启动kafka消费者
[root@softdelvily kafka_2.13-2.6.0]# cd /usr/local/kafka_2.13-2.6.0/bin/ [root@softdelvily bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic cdczztar
启动数据库JOB
[oracle@oracle01 ~]$ sqlplus / as sysdba SQL*Plus: Release 11.2.0.4.0 Production on Wed Sep 23 10:45:16 2020 Copyright (c) 1982, 2011, Oracle. All rights reserved. set pagesize 999 Connected to: Oracle Database 11g Enterprise Edition Release 11.2.0.4.0 - 64bit Production With the Partitioning, OLAP, Data Mining and Real Application Testing options SQL> SQL> conn whtest/whtest Connected. SQL> exec DBMS_SCHEDULER.ENABLE('T1_JOB'); PL/SQL procedure successfully completed.
kafka消费者界面
出现类似记录,表明同步成功,数据以key:value的形式输出。
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"SCN"},{"type":"string","optional":false,"field":"SEG_OWNER"},{"type":"string","optional":false,"field":"TABLE_NAME"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"TIMESTAMP"},{"type":"string","optional":false,"field":"SQL_REDO"},{"type":"string","optional":false,"field":"OPERATION"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"ID"},{"type":"string","optional":true,"field":"NAME"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"CREATETIME"}],"optional":true,"name":"value","field":"data"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"ID"},{"type":"string","optional":true,"field":"NAME"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"CREATETIME"}],"optional":true,"name":"value","field":"before"}],"optional":false,"name":"zztar.whtest.t1.row"},"payload":{"SCN":2847668,"SEG_OWNER":"WHTEST","TABLE_NAME":"T1","TIMESTAMP":1600829206000,"SQL_REDO":"insert into \"WHTEST\".\"T1\"(\"ID\",\"NAME\",\"CREATETIME\") values (557005146,'533888119 ',TIMESTAMP ' 2020-09-23 10:46:46')","OPERATION":"INSERT","data":{"ID":5.57005146E8,"NAME":"533888119","CREATETIME":1600829206000},"before":null}}
上一篇:oracle查询截至到当前日期月份所在年份的所有月份
栏 目:Oracle
本文标题:Oracle同步数据到kafka的方法
本文地址:http://www.codeinn.net/misctech/214173.html