好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

关于SpringBoot整合Canal数据同步的问题

1、CentOS7编译安装MySQL5.7.24

CentOS7编译安装MySQL5.7.24的教程详解

链接地址 : http://HdhCmsTesttuohang.net/article/2238.html

2、Mysql设置binLog配置

(1)检查binlog功能是否有开启
(2)如果显示状态为OFF表示该功能未开启,开启binlog功能

?

1

2

3

4

5

6

7

mysql> show variables like 'log_bin' ;

+---------------+-------+

| Variable_name | Value |

+---------------+-------+

| log_bin | OFF |

+---------------+-------+

1 row in set (0.00 sec)

1,修改 mysql 的配置文件 my.cnf
vi /etc/my.cnf
追加内容:

?

1

2

3

log-bin=mysql-bin #binlog文件名

binlog_format=ROW #选择row模式

server_id=1 #mysql实例id,不能和canal的slaveId重复

2,重启 mysql:
service mysql restart

3,登录 mysql 客户端,查看 log_bin 变量

?

1

2

3

4

5

6

7

8

9

mysql> show variables like 'log_bin' ;

+---------------+-------+

| Variable_name | Value |

+---------------+-------+

| log_bin | ON|

+---------------+-------+

1 row in set (0.00 sec)

 

如果显示状态为ON表示该功能已开启

(3)在mysql里面添加以下的相关用户和权限

?

1

2

3

4

CREATE USER 'canal' @ '%' IDENTIFIED BY 'canal' ;

GRANT SHOW VIEW , SELECT , REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO

'canal' @ '%' ;

FLUSH PRIVILEGES ;

3、Linux下载安装Canal服务

下载地址:
https://github测试数据/alibaba/canal/releases
(1)下载之后,放到目录中,解压文件

?

1

2

3

4

mkdir /usr/local/canal

cd /usr/local/canal

canal.deployer-1.1.4. tar .gz

tar zxvf canal.deployer-1.1.4. tar .gz

(2)修改配置文件
vi conf/example/instance.properties

?

1

2

3

4

5

6

7

8

9

10

#需要改成自己的数据库信息

canal.instance.master.address=39.106.224.236:3306

#需要改成自己的数据库用户名与密码

canal.instance.dbUsername=canal

canal.instance.dbPassword=canal

#需要改成同步的数据库表规则

   #1、同步所有的表

canal.instance.filter.regex=.*\\..*

   #2、需要同步的那个库中的那个表

#canal.instance.filter.regex=guli_ucenter.ucenter_member

mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\)
常见例子:

所有表:.* or .\… canal schema下所有表: canal\…* canal下的以canal打头的表:canal\.canal.* canal schema下的一张表:canal.test1 多个规则组合使用:canal\…*,mysql.test1,mysql.test2 (逗号分隔) 注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提 取tableName进行过滤).

(3)进入bin目录下启动

?

1

. /startup .sh

4、Boot项目中引入依赖

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

<dependencies>

        <dependency>

            <groupId>org.springframework.boot< /groupId >

            <artifactId>spring-boot-starter-web< /artifactId >

        < /dependency >

        <!--mysql-->

        <dependency>

            <groupId>mysql< /groupId >

            <artifactId>mysql-connector-java< /artifactId >

        < /dependency >

        <dependency>

            <groupId>commons-dbutils< /groupId >

            <artifactId>commons-dbutils< /artifactId >

        < /dependency >

        <dependency>

            <groupId>org.springframework.boot< /groupId >

            <artifactId>spring-boot-starter-jdbc< /artifactId >

        < /dependency >

        <dependency>

            <groupId>com.alibaba.otter< /groupId >

            <artifactId>canal.client< /artifactId >

        < /dependency >

    < /dependencies >

5 、修改properties配置文件

?

1

2

3

4

5

# mysql数据库连接

spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

spring.datasource.url=jdbc:mysql: //localhost :3306 /guli ?serverTimezone=GMT%2B8

spring.datasource.username=root

spring.datasource.password=123456

6、修改Application启动类

7、创建Canal配置类自动监听

注意: 在 run() 方法中自行修改Linux虚拟机Ip地址后直接使用!

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

package com.atguigu.canal.client;

 

import com.alibaba.otter.canal.client.CanalConnector;

import com.alibaba.otter.canal.client.CanalConnectors;

import com.alibaba.otter.canal.protocol.CanalEntry.*;

import com.alibaba.otter.canal.protocol.Message;

import com.google.protobuf.InvalidProtocolBufferException;

import org.apache测试数据mons.dbutils.DbUtils;

import org.apache测试数据mons.dbutils.QueryRunner;

import org.springframework.stereotype.Component;

import javax.annotation.Resource;

import javax.sql.DataSource;

import java.net.InetSocketAddress;

import java.sql.Connection;

import java.sql.SQLException;

import java.util.List;

import java.util.Queue;

import java.util.concurrent.ConcurrentLinkedQueue;

 

@Component

public class CanalClient {

     //sql队列

     private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();

     @Resource

     private DataSource dataSource;

     /**

      * canal入库方法

      */

     public void run() {

         CanalConnector connector = CanalConnectors.newSingleConnector( new

         /*

         * 此处Ip地址为linux虚拟机地址

         * 端口号为固定 11111

         * 其他不用修改

         */

                 InetSocketAddress( "39.106.224.236" ,

                 11111 ), "example" , "" , "" );

         int batchSize = 1000 ;

         try {

             connector.connect();

             connector.subscribe( ".*\\..*" );

             connector.rollback();

             try {

                 while ( true ) {

                     //尝试从master那边拉去数据batchSize条记录,有多少取多少

                     Message message = connector.getWithoutAck(batchSize);

                     long batchId = message.getId();

                     int size = message.getEntries().size();

                     if (batchId == - 1 || size == 0 ) {

                         Thread.sleep( 1000 );

                     } else {

                         dataHandle(message.getEntries());

                     }

                     connector.ack(batchId);

                     //当队列里面堆积的sql大于一定数值的时候就模拟执行

                     if (SQL_QUEUE.size() >= 1 ) {

                         executeQueueSql();

                     }

                 }

             } catch (InterruptedException e) {

                 e.printStackTrace();

             } catch (InvalidProtocolBufferException e) {

                 e.printStackTrace();

 

             }

         } finally {

             connector.disconnect();

         }

     }

     /**

      * 模拟执行队列里面的sql语句

      */

     public void executeQueueSql() {

         int size = SQL_QUEUE.size();

         for ( int i = 0 ; i < size; i++) {

             String sql = SQL_QUEUE.poll();

             System.out.println( "[sql]----> " + sql);

             this .execute(sql.toString());

         }

     }

     /**

      * 数据处理

      * @param entrys

      */

     private void dataHandle(List<Entry> entrys) throws

             InvalidProtocolBufferException {

         for (Entry entry : entrys) {

             if (EntryType.ROWDATA == entry.getEntryType()) {

                 RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());

                 EventType eventType = rowChange.getEventType();

                 if (eventType == EventType.DELETE) {

                     saveDeleteSql(entry);

                 } else if (eventType == EventType.UPDATE) {

                     saveUpdateSql(entry);

                 } else if (eventType == EventType.INSERT) {

                     saveInsertSql(entry);

                 }

             }

         }

     }

     /**

      * 保存更新语句

      * @param entry

      */

     private void saveUpdateSql(Entry entry) {

         try {

             RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());

             List<RowData> rowDatasList = rowChange.getRowDatasList();

             for (RowData rowData : rowDatasList) {

                 List<Column> newColumnList = rowData.getAfterColumnsList();

                 StringBuffer sql = new StringBuffer( "update " +

                         entry.getHeader().getTableName() + " set " );

                 for ( int i = 0 ; i < newColumnList.size(); i++) {

                     sql.append( " " + newColumnList.get(i).getName()

                             + " = '" + newColumnList.get(i).getValue() + "'" );

                     if (i != newColumnList.size() - 1 ) {

                         sql.append( "," );

                     }

                 }

                 sql.append( " where " );

                 List<Column> oldColumnList = rowData.getBeforeColumnsList();

                 for (Column column : oldColumnList) {

                     if (column.getIsKey()) {

//暂时只支持单一主键

                         sql.append(column.getName() + "=" + column.getValue());

                         break ;

                     }

                 }

                 SQL_QUEUE.add(sql.toString());

             }

         } catch (InvalidProtocolBufferException e) {

             e.printStackTrace();

         }

     }

     /**

      * 保存删除语句

      *

      * @param entry

      */

     private void saveDeleteSql(Entry entry) {

         try {

             RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());

             List<RowData> rowDatasList = rowChange.getRowDatasList();

 

             for (RowData rowData : rowDatasList) {

                 List<Column> columnList = rowData.getBeforeColumnsList();

                 StringBuffer sql = new StringBuffer( "delete from " +

                         entry.getHeader().getTableName() + " where " );

                 for (Column column : columnList) {

                     if (column.getIsKey()) {

//暂时只支持单一主键

                         sql.append(column.getName() + "=" + column.getValue());

                         break ;

                     }

                 }

                 SQL_QUEUE.add(sql.toString());

             }

         } catch (InvalidProtocolBufferException e) {

             e.printStackTrace();

         }

     }

     /**

      * 保存插入语句

      *

      * @param entry

      */

     private void saveInsertSql(Entry entry) {

         try {

             RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());

             List<RowData> rowDatasList = rowChange.getRowDatasList();

             for (RowData rowData : rowDatasList) {

                 List<Column> columnList = rowData.getAfterColumnsList();

                 StringBuffer sql = new StringBuffer( "insert into " +entry.getHeader().getTableName() + " (" );

                 for ( int i = 0 ; i < columnList.size(); i++) {

                     sql.append(columnList.get(i).getName());

                     if (i != columnList.size() - 1 ) {

                         sql.append( "," );

                     }

                 }

                 sql.append( ") VALUES (" );

                 for ( int i = 0 ; i < columnList.size(); i++) {

                     sql.append( "'" + columnList.get(i).getValue() + "'" );

                     if (i != columnList.size() - 1 ) {

                         sql.append( "," );

                     }

 

                 }

                 sql.append( ")" );

                 SQL_QUEUE.add(sql.toString());

             }

         } catch (InvalidProtocolBufferException e) {

             e.printStackTrace();

         }

     }

     /**

      * 入库

      * @param sql

      */

     public void execute(String sql) {

         Connection con = null ;

         try {

             if ( null == sql) return ;

             con = dataSource.getConnection();

             QueryRunner qr = new QueryRunner();

             int row = qr.execute(con, sql);

             System.out.println( "update: " + row);

         } catch (SQLException e) {

             e.printStackTrace();

         } finally {

             DbUtils.closeQuietly(con);

         }

     }

}

到此这篇关于SpringBoot整合Canal数据同步的文章就介绍到这了,更多相关SpringBoot Canal数据同步内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

原文链接:https://blog.csdn.net/zhangxuchuan111/article/details/111465327

查看更多关于关于SpringBoot整合Canal数据同步的问题的详细内容...

  阅读:14次