1、CentOS7编译安装MySQL5.7.24
CentOS7编译安装MySQL5.7.24的教程详解
链接地址 : http://HdhCmsTesttuohang.net/article/2238.html
2、Mysql设置binLog配置
(1)检查binlog功能是否有开启
(2)如果显示状态为OFF表示该功能未开启,开启binlog功能
1 2 3 4 5 6 7 |
mysql> show variables like 'log_bin' ; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | log_bin | OFF | +---------------+-------+ 1 row in set (0.00 sec) |
1,修改 mysql 的配置文件 my.cnf
vi /etc/my.cnf
追加内容:
1 2 3 |
log-bin=mysql-bin #binlog文件名 binlog_format=ROW #选择row模式 server_id=1 #mysql实例id,不能和canal的slaveId重复 |
2,重启 mysql:
service mysql restart
3,登录 mysql 客户端,查看 log_bin 变量
1 2 3 4 5 6 7 8 9 |
mysql> show variables like 'log_bin' ; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | log_bin | ON| +---------------+-------+ 1 row in set (0.00 sec)
如果显示状态为ON表示该功能已开启 |
(3)在mysql里面添加以下的相关用户和权限
1 2 3 4 |
CREATE USER 'canal' @ '%' IDENTIFIED BY 'canal' ; GRANT SHOW VIEW , SELECT , REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal' @ '%' ; FLUSH PRIVILEGES ; |
3、Linux下载安装Canal服务
下载地址:
https://github测试数据/alibaba/canal/releases
(1)下载之后,放到目录中,解压文件
1 2 3 4 |
mkdir /usr/local/canal cd /usr/local/canal canal.deployer-1.1.4. tar .gz tar zxvf canal.deployer-1.1.4. tar .gz |
(2)修改配置文件
vi conf/example/instance.properties
1 2 3 4 5 6 7 8 9 10 |
#需要改成自己的数据库信息 canal.instance.master.address=39.106.224.236:3306 #需要改成自己的数据库用户名与密码 canal.instance.dbUsername=canal canal.instance.dbPassword=canal #需要改成同步的数据库表规则 #1、同步所有的表 canal.instance.filter.regex=.*\\..* #2、需要同步的那个库中的那个表 #canal.instance.filter.regex=guli_ucenter.ucenter_member |
mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\)
常见例子:
(3)进入bin目录下启动
1 |
. /startup .sh |
4、Boot项目中引入依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
<dependencies> <dependency> <groupId>org.springframework.boot< /groupId > <artifactId>spring-boot-starter-web< /artifactId > < /dependency > <!--mysql--> <dependency> <groupId>mysql< /groupId > <artifactId>mysql-connector-java< /artifactId > < /dependency > <dependency> <groupId>commons-dbutils< /groupId > <artifactId>commons-dbutils< /artifactId > < /dependency > <dependency> <groupId>org.springframework.boot< /groupId > <artifactId>spring-boot-starter-jdbc< /artifactId > < /dependency > <dependency> <groupId>com.alibaba.otter< /groupId > <artifactId>canal.client< /artifactId > < /dependency > < /dependencies > |
5 、修改properties配置文件
1 2 3 4 5 |
# mysql数据库连接 spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.url=jdbc:mysql: //localhost :3306 /guli ?serverTimezone=GMT%2B8 spring.datasource.username=root spring.datasource.password=123456 |
6、修改Application启动类
7、创建Canal配置类自动监听
注意: 在 run() 方法中自行修改Linux虚拟机Ip地址后直接使用!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 |
package com.atguigu.canal.client;
import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry.*; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.InvalidProtocolBufferException; import org.apache测试数据mons.dbutils.DbUtils; import org.apache测试数据mons.dbutils.QueryRunner; import org.springframework.stereotype.Component; import javax.annotation.Resource; import javax.sql.DataSource; import java.net.InetSocketAddress; import java.sql.Connection; import java.sql.SQLException; import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue;
@Component public class CanalClient { //sql队列 private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>(); @Resource private DataSource dataSource; /** * canal入库方法 */ public void run() { CanalConnector connector = CanalConnectors.newSingleConnector( new /* * 此处Ip地址为linux虚拟机地址 * 端口号为固定 11111 * 其他不用修改 */ InetSocketAddress( "39.106.224.236" , 11111 ), "example" , "" , "" ); int batchSize = 1000 ; try { connector.connect(); connector.subscribe( ".*\\..*" ); connector.rollback(); try { while ( true ) { //尝试从master那边拉去数据batchSize条记录,有多少取多少 Message message = connector.getWithoutAck(batchSize); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == - 1 || size == 0 ) { Thread.sleep( 1000 ); } else { dataHandle(message.getEntries()); } connector.ack(batchId); //当队列里面堆积的sql大于一定数值的时候就模拟执行 if (SQL_QUEUE.size() >= 1 ) { executeQueueSql(); } } } catch (InterruptedException e) { e.printStackTrace(); } catch (InvalidProtocolBufferException e) { e.printStackTrace();
} } finally { connector.disconnect(); } } /** * 模拟执行队列里面的sql语句 */ public void executeQueueSql() { int size = SQL_QUEUE.size(); for ( int i = 0 ; i < size; i++) { String sql = SQL_QUEUE.poll(); System.out.println( "[sql]----> " + sql); this .execute(sql.toString()); } } /** * 数据处理 * @param entrys */ private void dataHandle(List<Entry> entrys) throws InvalidProtocolBufferException { for (Entry entry : entrys) { if (EntryType.ROWDATA == entry.getEntryType()) { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); EventType eventType = rowChange.getEventType(); if (eventType == EventType.DELETE) { saveDeleteSql(entry); } else if (eventType == EventType.UPDATE) { saveUpdateSql(entry); } else if (eventType == EventType.INSERT) { saveInsertSql(entry); } } } } /** * 保存更新语句 * @param entry */ private void saveUpdateSql(Entry entry) { try { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); List<RowData> rowDatasList = rowChange.getRowDatasList(); for (RowData rowData : rowDatasList) { List<Column> newColumnList = rowData.getAfterColumnsList(); StringBuffer sql = new StringBuffer( "update " + entry.getHeader().getTableName() + " set " ); for ( int i = 0 ; i < newColumnList.size(); i++) { sql.append( " " + newColumnList.get(i).getName() + " = '" + newColumnList.get(i).getValue() + "'" ); if (i != newColumnList.size() - 1 ) { sql.append( "," ); } } sql.append( " where " ); List<Column> oldColumnList = rowData.getBeforeColumnsList(); for (Column column : oldColumnList) { if (column.getIsKey()) { //暂时只支持单一主键 sql.append(column.getName() + "=" + column.getValue()); break ; } } SQL_QUEUE.add(sql.toString()); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } /** * 保存删除语句 * * @param entry */ private void saveDeleteSql(Entry entry) { try { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); List<RowData> rowDatasList = rowChange.getRowDatasList();
for (RowData rowData : rowDatasList) { List<Column> columnList = rowData.getBeforeColumnsList(); StringBuffer sql = new StringBuffer( "delete from " + entry.getHeader().getTableName() + " where " ); for (Column column : columnList) { if (column.getIsKey()) { //暂时只支持单一主键 sql.append(column.getName() + "=" + column.getValue()); break ; } } SQL_QUEUE.add(sql.toString()); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } /** * 保存插入语句 * * @param entry */ private void saveInsertSql(Entry entry) { try { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); List<RowData> rowDatasList = rowChange.getRowDatasList(); for (RowData rowData : rowDatasList) { List<Column> columnList = rowData.getAfterColumnsList(); StringBuffer sql = new StringBuffer( "insert into " +entry.getHeader().getTableName() + " (" ); for ( int i = 0 ; i < columnList.size(); i++) { sql.append(columnList.get(i).getName()); if (i != columnList.size() - 1 ) { sql.append( "," ); } } sql.append( ") VALUES (" ); for ( int i = 0 ; i < columnList.size(); i++) { sql.append( "'" + columnList.get(i).getValue() + "'" ); if (i != columnList.size() - 1 ) { sql.append( "," ); }
} sql.append( ")" ); SQL_QUEUE.add(sql.toString()); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } /** * 入库 * @param sql */ public void execute(String sql) { Connection con = null ; try { if ( null == sql) return ; con = dataSource.getConnection(); QueryRunner qr = new QueryRunner(); int row = qr.execute(con, sql); System.out.println( "update: " + row); } catch (SQLException e) { e.printStackTrace(); } finally { DbUtils.closeQuietly(con); } } } |
到此这篇关于SpringBoot整合Canal数据同步的文章就介绍到这了,更多相关SpringBoot Canal数据同步内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!
原文链接:https://blog.csdn.net/zhangxuchuan111/article/details/111465327
查看更多关于关于SpringBoot整合Canal数据同步的问题的详细内容...