好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Oracle同步数据到kafka的方法

环境准备

软件准备

CentOS Linux 7.6.1810 (2台,A主机,B主机) Oracle 11.2.0.4(A主机安装) Kafka 2.13-2.6.0 (B主机安装) kafka-connect-oracle-master (B主机安装,开源程序,用于同步Oracle数据到kafka) apache-maven 3.6.3 (B主机安装,kafka-connect-oracle-master 的打包工具) jdk-8u261-linux-x64.rpm (B主机安装)

下载地址

Cenos 和Oracle 和 JDK 自行到官网下载 Kafka http://kafka.apache.org/downloads

Kafka Connect Oracle    https://github.com/tianxiancode/kafka-connect-oracle apache-maven 3.6.3 http://maven.apache.org/download.cgi

实施过程

Oracle主机(A)配置

Oracle实例配置项:

开启归档日志 开启附加日志 创建kafka-connect-oracle-master连接用户 创建测试数据生成用户及测试表

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

--开启归档日志

sqlplus / as sysdba   

SQL>shutdown immediate

SQL>startup mount

SQL> alter database archivelog;

SQL> alter database open ;

--开启附加日志

SQL> alter database add supplemental log data ( all ) columns;

--创建kafka-connect-oracle-master连接用户

create role logmnr_role;

grant create session to logmnr_role;

grant   execute_catalog_role, select any transaction , select any dictionary to logmnr_role;

create user kminer identified by kminerpass;

grant   logmnr_role to kminer;

alter user kminer quota unlimited on users;

--创建测试数据生成用户及测试表

create tablespace test_date datafile '/u01/app/oracle/oradata/zzsrc/test_date01.dbf' size 100M autoextend on next 10M;

create user whtest identified by whtest default tablespace test_date;

grant connect ,resource to whtest;

grant execute on dbms_scheduler to whtest;

grant execute on dbms_random to whtest;

grant    create   job  to   whtest;

create table t1 (

id int ,

name char (10),

createtime date default sysdate

);

alter table WHTEST.T1  add constraint PK_ID_T1 primary key (ID)  using index    tablespace TEST_DATE;

 

create table t2 (

id int ,

name char (10),

createtime date default sysdate

);

alter table WHTEST.T2  add constraint PK_ID_T2 primary key (ID)  using index    tablespace TEST_DATE;

create table t3 (

id int ,

name char (10),

createtime date default sysdate

);

alter table WHTEST.T3  add constraint PK_ID_T3 primary key (ID)  using index    tablespace TEST_DATE;

begin

dbms_scheduler.create_job(

job_name=> 't1_job' ,

job_type=> 'PLSQL_BLOCK' ,

job_action => 'declare

v_id int;

v_name char(10);

begin

   for i in 1..10 loop

     v_id := round(dbms_random.value(1,1000000000));

     v_name :=round(dbms_random.value(1,1000000000));

     insert into whtest.t1 (id,name)values(v_id,v_name);

   end loop;

end;' ,

enabled=> true ,

repeat_interval=> 'sysdate + 5/86400' ,

comments=> 'insert into t1 every 5 sec' );

end ;

/

 

begin

dbms_scheduler.create_job(

job_name=> 't2_job' ,

job_type=> 'PLSQL_BLOCK' ,

job_action => 'declare

v_id int;

v_name char(10);

begin

   for i in 1..10 loop

     v_id := round(dbms_random.value(1,1000000000));

     v_name :=round(dbms_random.value(1,1000000000));

     insert into whtest.t2 (id,name)values(v_id,v_name);

   end loop;

end;' ,

enabled=> true ,

repeat_interval=> 'sysdate + 5/86400' ,

comments=> 'insert into t1 every 5 sec' );

end ;

/

 

begin

dbms_scheduler.create_job(

job_name=> 't3_job' ,

job_type=> 'PLSQL_BLOCK' ,

job_action => 'declare

v_id int;

v_name char(10);

begin

   for i in 1..10 loop

     v_id := round(dbms_random.value(1,1000000000));

     v_name :=round(dbms_random.value(1,1000000000));

     insert into whtest.t3 (id,name)values(v_id,v_name);

   end loop;

end;' ,

enabled=> true ,

repeat_interval=> 'sysdate + 5/86400' ,

comments=> 'insert into t3 every 5 sec' );

end ;

/

--JOB创建之后,暂时先diable,待kafka配置完成之后再enable

exec DBMS_SCHEDULER.DISABLE( 'T1_JOB' );

exec DBMS_SCHEDULER.DISABLE( 'T2_JOB' );

exec DBMS_SCHEDULER.DISABLE( 'T3_JOB' );

 

exec DBMS_SCHEDULER.ENABLE( 'T1_JOB' );

exec DBMS_SCHEDULER.ENABLE( 'T2_JOB' );

exec DBMS_SCHEDULER.ENABLE( 'T3_JOB' );

Kafka主机(B)配置

​ 将下载好的Kafka 2.13-2.6.0 、kafka-connect-oracle-master、apache-maven 3.6.3、JDK 1.8.0上传至B主机/soft目录待使用。

主机hosts文件添加解析

?

1

2

3

4

[root@softdelvily ~] # cat /etc/hosts

127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4

::1         localhost localhost.localdomain localhost6 localhost6.localdomain6

192.168.20.44 softdelvily localhost

安装JDK

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

[root@softdelvily soft] # rpm -ivh jdk-8u261-linux-x64.rpm

warning: jdk-8u261-linux-x64.rpm: Header V3 RSA /SHA256 Signature, key ID ec551f03: NOKEY

Preparing...                          ################################# [100%]

Updating / installing...

    1:jdk1.8-2000:1.8.0_261-fcs        ################################# [100%]

Unpacking JAR files...

         tools.jar...

         plugin.jar...

         javaws.jar...

         deploy.jar...

         rt.jar...

         jsse.jar...

         charsets.jar...

         localedata.jar...

配置apache-maven工具

​ 将apache-maven-3.6.3-bin.tar.gz解压至/usr/local目录,并设置相应的/etc/profile环境变量。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

[root@softdelvily soft] # tar xvf apache-maven-3.6.3-bin.tar.gz -C /usr/local/

apache-maven-3.6.3 /README .txt

apache-maven-3.6.3 /LICENSE

.....

[root@softdelvily soft] # cd /usr/local/

[root@softdelvily local ] # ll

total 0

drwxr-xr-x. 6 root root  99 Sep 23 09:56 apache-maven-3.6.3

drwxr-xr-x. 2 root root   6 Apr 11  2018 bin

.....

[root@softdelvily local ] # vi /etc/profile

.......

##添加如下环境变量

MAVEN_HOME= /usr/local/apache-maven-3 .6.3

export MAVEN_HOME

export PATH=${PATH}:${MAVEN_HOME} /bin

[root@softdelvily local ] # source /etc/profile

[root@softdelvily local ] # mvn -v

Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)

Maven home: /usr/local/apache-maven-3 .6.3

Java version: 1.8.0_262, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-1 .8.0-openjdk-1.8.0.262.b10-0.el7_8.x86_64 /jre

Default locale: en_US, platform encoding: UTF-8

OS name: "linux" , version: "3.10.0-957.el7.x86_64" , arch: "amd64" , family: "unix"

配置Kafka 2.13-2.6.0

​ 解压Kafka 2.13-2.6.0 至/usr/local目录。

?

1

2

3

4

5

6

7

8

9

10

[root@softdelvily soft] # tar xvf kafka_2.13-2.6.0.tgz -C /usr/local/

kafka_2.13-2.6.0/

kafka_2.13-2.6.0 /LICENSE

......

[root@softdelvily soft] # cd /usr/local/

[root@softdelvily local ] # ll

total 0

drwxr-xr-x. 6 root root 99 Sep 23 09:56 apache-maven-3.6.3

drwxr-xr-x. 6 root root 89 Jul 29 02:23 kafka_2.13-2.6.0

.....

​ 开启kafka,并创建对应同步数据库过的topic

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

--1、session1 启动ZK

[root@softdelvily kafka_2.13-2.6.0] # cd /usr/local/kafka_2.13-2.6.0/bin/

[root@softdelvily bin] # ./zookeeper-server-start.sh config/zookeeper.properties

[2020-09-23 10:06:49,158] INFO Reading configuration from: .. /config/zookeeper .properties

.......

[2020-09-23 10:06:49,311] INFO Using checkIntervalMs=60000 maxPerMinute=10000 (org.apache.zookeeper.server.ContainerManager)

--2、session2 启动kafka

[root@softdelvily kafka_2.13-2.6.0] # cd /usr/local/kafka_2.13-2.6.0/bin/

[root@softdelvily bin] # ./kafka-server-start.sh config/server.properties

--3、session3 创建cdczztar

[root@softdelvily kafka_2.13-2.6.0] # cd /usr/local/kafka_2.13-2.6.0/bin/

[root@softdelvily bin] # ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic cdczztar

Created topic cdczztar.

[root@softdelvily bin] # ./kafka-topics.sh --zookeeper localhost:2181 --list

__consumer_offsets

cdczztar

配置kafka-connect-oracle-maste

解压kafka-connect-oracle-master至/soft目录,并配置相应config文件,然后使用maven工具编译程序。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

--解压zip包

[root@softdelvily soft] # unzip kafka-connect-oracle-master.zip

[root@softdelvily soft] # ll

total 201180

-rw-r--r--. 1 root root   9506321 Sep 22 16:05 apache-maven-3.6.3-bin. tar .gz

-rw-r--r--. 1 root root 127431820 Sep  8 10:43 jdk-8u261-linux-x64.rpm

-rw-r--r--. 1 root root  65537909 Sep 22 15:59 kafka_2.13-2.6.0.tgz

drwxr-xr-x. 5 root root       107 Sep  8 15:48 kafka-connect-oracle-master

-rw-r--r--. 1 root root   3522729 Sep 22 14:14 kafka-connect-oracle-master.zip

[root@softdelvily soft] # cd kafka-connect-oracle-master/config/

[root@softdelvily config] # ll

total 4

-rw-r--r--. 1 root root 1135 Sep  8 15:48 OracleSourceConnector.properties

--调整properties配置文件

--需要调整项 db .name. alias 、topic、 db .name、 db . hostname 、 db .user、 db .user.password、table.whitelist、table.blacklist信息,具体说明参考README.md

[root@softdelvily config] # vi OracleSourceConnector.properties

name=oracle-logminer-connector

connector.class=com.ecer.kafka.connect.oracle.OracleSourceConnector

db .name. alias =zztar

tasks.max=1

topic=cdczztar

db .name=zztar

db . hostname =192.168.xx.xx

db .port=1521

db .user=kminer

db .user.password=kminerpass

db .fetch.size=1

table.whitelist=WHTEST.T1,WHTEST.T2

table.blacklist=WHTEST.T3

parse.dml.data= true

reset.offset= true

start.scn=

multitenant= false

--编译程序

[root@softdelvily ~] # cd /soft/kafka-connect-oracle-master

[root@softdelvily kafka-connect-oracle-master] # mvn clean package

[INFO] Scanning for projects...

.......

[INFO] Building jar: /soft/kafka-connect-oracle-master/target/kafka-connect-oracle-1 .0.68.jar

with assembly file : /soft/kafka-connect-oracle-master/target/kafka-connect-oracle-1 .0.68.jar

[INFO] ------------------------------------------------------------------------

[INFO] BUILD SUCCESS

[INFO] ------------------------------------------------------------------------

[INFO] Total time :  94.03 s

[INFO] Finished at: 2020-09-23T10:25:52+08:00

[INFO] ------------------------------------------------------------------------

​ 将如下文件复制到kafka工作目录。

复制kafka-connect-oracle-1.058.jar 和 lib/ojdbc7.jar 到$KAFKA_HOME/lib 复制config/OracleSourceConnector.properties 文件到$KAFKA_HOME/config

?

1

2

3

4

5

6

[root@softdelvily config] # cd /soft/kafka-connect-oracle-master/target/

[root@softdelvily target] # cp kafka-connect-oracle-1.0.68.jar /usr/local/kafka_2.13-2.6.0/libs/

[root@softdelvily lib] # cd /soft/kafka-connect-oracle-master/lib

[root@softdelvily lib] # cp ojdbc7.jar /usr/local/kafka_2.13-2.6.0/libs/

[root@softdelvily lib] # cd /soft/kafka-connect-oracle-master/config/

[root@softdelvily config] # cp OracleSourceConnector.properties /usr/local/kafka_2.13-2.6.0/config/

启动kafka-connect-oracle

?

1

2

3

4

5

[root@softdelvily kafka_2.13-2.6.0] # cd /usr/local/kafka_2.13-2.6.0/bin/

[root@softdelvily bin] # ./connect-standalone.sh config/connect-standalone.properties config/OracleSourceConnector.properties

......

(com.ecer.kafka.connect.oracle.OracleSourceTask:187)

[2020-09-23 10:40:31,375] INFO Log Miner will start at new position SCN : 2847346 with fetch size : 1 (com.ecer.kafka.connect.oracle.OracleSourceTask:188)

启动kafka消费者

?

1

2

[root@softdelvily kafka_2.13-2.6.0] # cd /usr/local/kafka_2.13-2.6.0/bin/

[root@softdelvily bin] # ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic cdczztar

启动数据库JOB

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

[oracle@oracle01 ~]$ sqlplus / as sysdba

 

SQL*Plus: Release 11.2.0.4.0 Production on Wed Sep 23 10:45:16 2020

 

Copyright (c) 1982, 2011, Oracle.  All rights reserved.

 

set pagesize 999

 

Connected to:

Oracle Database 11g Enterprise Edition Release 11.2.0.4.0 - 64bit Production

With the Partitioning, OLAP, Data Mining and Real Application Testing options

 

SQL> SQL> conn whtest /whtest

Connected.

SQL> exec DBMS_SCHEDULER.ENABLE( 'T1_JOB' );

 

PL /SQL procedure successfully completed.

kafka消费者界面

出现类似记录,表明同步成功,数据以key:value的形式输出。

{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"SCN"},{"type":"string","optional":false,"field":"SEG_OWNER"},{"type":"string","optional":false,"field":"TABLE_NAME"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"TIMESTAMP"},{"type":"string","optional":false,"field":"SQL_REDO"},{"type":"string","optional":false,"field":"OPERATION"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"ID"},{"type":"string","optional":true,"field":"NAME"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"CREATETIME"}],"optional":true,"name":"value","field":"data"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"ID"},{"type":"string","optional":true,"field":"NAME"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"CREATETIME"}],"optional":true,"name":"value","field":"before"}],"optional":false,"name":"zztar.whtest.t1.row"},"payload":{"SCN":2847668,"SEG_OWNER":"WHTEST","TABLE_NAME":"T1","TIMESTAMP":1600829206000,"SQL_REDO":"insert into \"WHTEST\".\"T1\"(\"ID\",\"NAME\",\"CREATETIME\") values (557005146,'533888119 ',TIMESTAMP ' 2020-09-23 10:46:46')","OPERATION":"INSERT","data":{"ID":5.57005146E8,"NAME":"533888119","CREATETIME":1600829206000},"before":null}}

到此这篇关于Oracle同步数据到kafka的文章就介绍到这了,更多相关Oracle同步数据到kafka内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://www.cnblogs.com/bicewow/p/13717143.html

查看更多关于Oracle同步数据到kafka的方法的详细内容...

  阅读:26次