1总体简介
1 mysql需要开启binlog
binlog分类
1.1)statement:
语句级别,binlog 会记录每一次执行写操作的语句。相对于row模式节省空间,但是会产生数据不一致性,例如:update aa set create_time=new(); 如果用binlog日志进行恢复,由于执行时间不同可能产生的数据就不同。
优点:节省空间
缺点:可能造成数据不一致。(因为只记录sql语句,万一 id>5的个数不一样呢? 随机数函数等等情况)
1.2)row:
行级 binlog会记录每次操作后每行记录的变化。
优点:保持数据的绝对一致性。因为不管sql是什么,引用了什么函数,他只记录执行后的效果。
缺点:占用较大空间
1.3)mixed:
statement升级版,在一定程度上解决了statement模式数据不一致问题。例如:函数中包含UUID(),包含AUTO_INCREMENT字段被更新时,执行 insert ,delayed语句时,会按照row方式,进行处理。
优点:节省空间,同时兼顾一定的一致性
缺点:还是会造成数据不一致情况
2 Canal介绍
Canal:伪装成Slave,假装从Master复制数据
2 安装开始
2.1 mysql配置文件编辑
编辑mysql配置文件(看你安装方式)
vim /etc/my.cnf    [mysqld]
    #开启日志 
    log-bin = mysql-bin
    #binlog级别 (statement:只记操作命令,有可能导致主从数据不一致,row:数据一致 mixed:)
    binlog_format=row
    #设置服务id,主从不能一致  ,一般设置为ip最后一段
    server-id = 19 
   
 
   #设置需要同步的数据库   
    binlog-do-db=aa_db 
    #屏蔽系统库同步 
    binlog-ignore-db=mysql 
    binlog-ignore-db=bb_db 
 3 重启
systemctl restart mysqld4登录主库
mysql -uroot -p5 创建用户
降低密码强度(不建议使用)
SELECT @@VALIDATE_PASSWORD_POLICY;
set global validate_password_policy = 0;
set global validate_password_length=1;6 授权主从复制专用账号(给从库复制数据使用的)
GRANT REPLICATION SLAVE ON *.* TO canal'@'%' IDENTIFIED BY 'canal'; 
7 刷新权限
flush privileges;
2.2 Canal安装
下载安装包
Releases · alibaba/canal · GitHub
1解压
tar -zxvf canal.deployer-1.1.6.tar.gz -C /usr/local/canal/2编辑配置文件
example:一个mysql实例,可以有多个

cd usr/local/canal/confvim canal.properties我们不需要直接发送mq所以选择tcp。异步同步数据系列(canal+mq)
canal.properties文件
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ 
canal.serverMode = tcp
# 默认的实例,可以配置多个,同时可以监控多个mysql服务。多个用逗号隔开
canal.destinations = exampleinstance.properties 编辑
 vim conf/example/instance.properties #默认注释的 放开要  不要和主库重复了 slaveId号
 canal.instance.mysql.slaveId=0
# mysql主库的连接地址
canal.instance.master.address=192.168.135.128:3306
# mysql主库自己创建的同步数据的账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal开通端口号
firewall-cmd --permanent --add-port=11111/tcpfirewall-cmd --reload启动
./startup.sh查看是否连接数据成功
cd /usr/local/canal/logs/example
tail -n 1000 -f example.log
3 代码使用
3.1 代码
        <dependency>
            <groupId>com.xpand</groupId>
            <artifactId>starter-canal</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>canal:
  client:
    instances:
      example:
        host: 192.168.135.128
        port: 11111
        batchSize: 100@EnableCanalClient
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.xpand.starter.canal.annotation.CanalEventListener;
import com.xpand.starter.canal.annotation.ListenPoint;
/**
 * @创建人 赵伟
 * @创建时间 2022/12/10
 * @描述
 */
@CanalEventListener
public class BusinessListener {
    /**
     * destination = "example" mysql实例(默认:example)可以不配置
     * schema:库名
     * table:表名
     * eventType:CanalEntry.EventType.INSERT,DELETE,UPDATE。 配置了哪个他只接收哪个类型
     * @param eventType 当前操作数据库的类型
     * @param rowData   当前操作数据库的数据
     */
    @ListenPoint(destination = "example",schema = "db1", table = "t_order")
    public void listenerOrder(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
       //1新增数据: eventType:INSERT , BeforeColumnsList 将会是空
       //2 删除数据 eventType:DELETE , AfterColumnsList  将会是空
       //3 更新数据 eventType:UPDATE, BeforeColumnsList 将是旧数据   AfterColumnsList 将是新数据
       // 没有查询类型
        System.out.println("订单表数据发生改变");
        System.out.println("eventType:"+eventType);
        //获取改变之前的数据
        rowData.getBeforeColumnsList().forEach((c) ->
            System.out.println("改变前的数据:" + c.getName() + "::" + c.getValue())
        );
        //获取改变之后的数据
        rowData.getAfterColumnsList().forEach((c) ->
            System.out.println("改变之后的数据:" + c.getName() + "::" + c.getValue())
        );
    }
}3.2 坐标安装
https://github.com/chenqian56131/spring-boot-starter-cana
mvn install直接就打到了你配置好的仓库位置



















