阿里同步神器Canal原理+安装+快速使用

news2025/7/7 3:12:03

文章目录

  • 前言
  • Canal简介
    • MySQL主备复制原理
    • canal 工作原理
  • 1、MySQL配置
    • 1.1 修改MySQL配置支持binlog
        • log-bin=mysql-bin
        • binlog-format=ROW
    • 1.2 创建canal用户
    • 1.3 重启mysql服务
    • 1.4 基本的查看binlog命令
  • 2、下载安装canal
    • 2.1 解压canal
    • 2.2 配置与mysql信息
    • 2.3 启动canal
  • 3. 快速使用
    • 3.1 官方客户端
      • 创建类SimpleCanalClient
      • insert测试
      • update测试
      • delete测试
    • 3.2 第三方客户端
  • Demo下载
  • 参考


前言

最开始听说canal是从mysql与redis双写一致性解决方案,当时并没有太在意,最近由于需要实时同步数据,如果在代码对insert/update/delete做拦截也可以实现,但对代码侵入性太大了,并且后期更改时容易有遗漏,风险太高,这时就又想到了canal,canal的好处在于对业务代码没有侵入,因为是基于监听binlog日志去进行同步数据,这个真的是太爽爽爽了。并且实时性也能做到准实时,这也是canal为什么这么流行,因为确实很多企业会用来做数据同步的方案。

Canal简介

canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

MySQL主备复制原理

在这里插入图片描述

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

1、MySQL配置

我们提前得有一台MySQL数据库,本文基于mysql 5.7,参考安装装程:window mysql 5.7安装教程
在安装Canal之前,我们需要做2件事:1). 修改MySQL配置支持binlog,2). 创建canal用户

1.1 修改MySQL配置支持binlog

修改my.ini,在[mysqld]下配置以下4项:

[mysqld]
# 打开binlog
log-bin=mysql-bin
# 选择ROW(行)模式
binlog-format=ROW
# 配置MySQL replaction需要定义,不要和canal的slaveId重复
server_id=1
# 要监控的数据库名称
binlog-do-db=my-test

在这里插入图片描述

log-bin=mysql-bin

打开binlog:表示 binlog 日 志 的 前 缀 是 mysql-bin , 以后生成的日志文件就是mysql-bin.000001,mysql-bin.000002…
文件后面的数字按顺序生成,每次 mysql 重启或者到达单个文件大小的阈值时,新生一个文件,按顺序编号。

binlog-format=ROW

mysql binlog 的格式有三种:binlog_format = statement | mixed | row
◼ statement
语句级,binlog 会记录每次一执行写操作的语句。
相对 row 模式节省空间,但是可能产生不一致性,比如
update tt set create_date=now()
如果用 binlog 日志进行恢复,由于执行时间不同可能产生的数据不同。
优点: 节省空间
缺点: 有可能造成数据不一致。
◼ row
行级, binlog 会记录每次操作后每行记录的变化
优点:保持数据的绝对一致性。因为不管 sql 是什么,引用了什么函数,只记录执行后的效果。
缺点:占用较大空间。
◼ mixed
statement 的升级版,一定程度上解决了,因为一些情况而造成的 statement 模式不一致问题.
默认还是 statement,在某些情况下譬如:
当函数中包含 UUID() 时; 包含 AUTO_INCREMENT 字段的表被更新时; 执行 INSERT DELAYED 语句时;用 UDF 时;会按照 ROW 的方式进行处理。
优点:节省空间,同时兼顾了一定的一致性。
缺点:还有些极个别情况依旧会造成不一致,另外 statement 和 mixed 对于需要对 binlog 的监控的情况都不方便。

1.2 创建canal用户

-- 1. 使用命令登录:mysql -u root -p
-- 2. 创建用户 用户名:canal 密码:canal@123456
create user 'canal'@'%' identified by 'canal@123456';
-- 3. 授权 *.*表示所有库
grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' identified by 'canal@123456';

在这里插入图片描述

1.3 重启mysql服务

以管理员身份运行cmd
C:\Windows\system32>net stop mysql
MySQL 服务正在停止…
MySQL 服务已成功停止。
C:\Windows\system32>net start mysql
MySQL 服务正在启动 .
MySQL 服务已经启动成功。

1.4 基本的查看binlog命令

  • 查看是否打开binlog模式:show variables like 'log_bin';
    在这里插入图片描述

  • 查看binlog日志文件列表:show binary logs;
    在这里插入图片描述

  • 查看当前正在写入的binlog文件:show master status;
    在这里插入图片描述


2、下载安装canal

官网下载:https://github.com/alibaba/canal/releases
Latest: v1.1.6
在这里插入图片描述

2.1 解压canal

我解压到 E:\servers\canal\canal.deployer-1.1.6
在这里插入图片描述

2.2 配置与mysql信息

打开配置文件conf/example/instance.properties,主要配置数据库地址和用户:

# mysql数据库地址
canal.instance.master.address=127.0.0.1:3306
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal@123456

注意:这里不需要设置binlog的文件名和位置,自动为空即可。

2.3 启动canal

转到bin目录,cmd运行startup.bat
在这里插入图片描述


3. 快速使用

3.1 官方客户端

创建Maven项目,不需要依赖Spring,添加maven依赖:

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.6</version>
</dependency>
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.protocol</artifactId>
    <version>1.1.6</version>
</dependency>

创建类SimpleCanalClient

package com.tiangang;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;
import java.util.List;

public class SimpleCanalClient {

    private final CanalConnector connector;
    private Thread thread = null;
    private final Thread.UncaughtExceptionHandler handler = (t, e) -> e.printStackTrace();
    private volatile boolean running = false;
    private final static int BATCH_SIZE = 5 * 1024;

    public SimpleCanalClient(CanalConnector connector) {
        this.connector = connector;
    }

    public static void main(String[] args) {
        // 根据ip,直接创建链接,无HA的功能
        String destination = "example";
        String ip = AddressUtils.getHostIp();
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(ip, 11111),
                destination,
                "canal",
                "canal@123456");
        final SimpleCanalClient simpleCanalClient = new SimpleCanalClient(connector);
        simpleCanalClient.start();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                System.out.println("## stop the canal client");
                simpleCanalClient.stop();
            } catch (Throwable e) {
                System.out.println("##something goes wrong when stopping canal:");
                e.printStackTrace();
            } finally {
                System.out.println("## canal client is down.");
            }
        }));
    }

    public void start() {
        if (this.connector == null) {
            System.out.println("connector不能为空,启动失败");
            return;
        }
        thread = new Thread(this::process);
        thread.setUncaughtExceptionHandler(handler);
        running = true;
        thread.start();
        System.out.println("canal client started...");
    }

    public void stop() {
        if (!running) {
            return;
        }
        running = false;
        if (thread != null) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                // ignore
            }
        }
        System.out.println("canal client stopped...");
    }

    private void process() {
        while (running) {
            try {
                //打开连接
                connector.connect();
                //订阅数据库表,全部表
                connector.subscribe(".*\\..*");
                //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
                connector.rollback();
                while (running) {
                    // 获取指定数量的数据
                    Message message = connector.getWithoutAck(BATCH_SIZE);
                    //获取批量ID
                    long batchId = message.getId();
                    //获取批量的数量
                    int size = message.getEntries().size();
                    //如果没有数据
                    if (batchId == -1 || size == 0) {
                        try {
                            //线程休眠2秒
                            Thread.sleep(2000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    } else {
                        //如果有数据,处理数据
                        printEntry(message.getEntries());
                    }
                    if (batchId != -1) {
                        // 提交确认
                        connector.ack(batchId);
                    }
                }
            } catch (Throwable e) {
                e.printStackTrace();
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e1) {
                    // ignore
                }
                connector.rollback(); // 处理失败, 回滚数据
            } finally {
                connector.disconnect();
            }
        }
    }

    /**
     * 打印canal server解析binlog获得的实体类信息
     */
    private static void printEntry(List<CanalEntry.Entry> entrys) {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                //开启/关闭事务的实体类型,跳过
                continue;
            }
            //RowChange对象,包含了一行数据变化的所有特征
            //比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
            CanalEntry.RowChange rowChage;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }
            //获取操作类型:insert/update/delete类型
            CanalEntry.EventType eventType = rowChage.getEventType();
            //打印Header信息
            System.out.println(String.format("================》; binlog[%s:%s] , dbName:%s, tableName:%s , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));
            //判断是否是DDL语句
            if (rowChage.getIsDdl()) {
                System.out.println("================》;isDDL: true,sql:" + rowChage.getSql());
            }
            //获取RowChange对象里的每一行数据,打印出来
            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                //如果是删除语句
                if (eventType == CanalEntry.EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                    //如果是新增语句
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                    //如果是更新的语句
                } else {
                    //变更前的数据
                    System.out.println("------->; before");
                    printColumn(rowData.getBeforeColumnsList());
                    //变更后的数据
                    System.out.println("------->; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

insert测试

我提前创建了一个表

CREATE TABLE `canal_user`  (
  `id` bigint(0) UNSIGNED NOT NULL AUTO_INCREMENT,
  `name` varchar(100) NULL,
  `age` int(0) UNSIGNED NULL,
  `create_time` datetime(0) NULL,
  `is_deleted` tinyint(1) NULL,
    PRIMARY KEY (`id`)
)

插入一条:

INSERT INTO `canal_user`(`name`, `age`, create_time, is_deleted) VALUES ('hello-canal', 18, '2022-11-15 00:00:00', 0);

SimpleCanalClient监听到了:

================; binlog[mysql-bin.000005:7300] , dbName:my-test, tableName:canal_user , eventType : INSERT
id : 1    update=true
name : hello-canal    update=true
age : 18    update=true
create_time : 2022-11-15 00:00:00    update=true
is_deleted : 0    update=true

update测试

update  `canal_user` set `name` = 'hello-canal-28', age = 28 where id = 1;

SimpleCanalClient监听到了:

================; binlog[mysql-bin.000005:7601] , dbName:my-test, tableName:canal_user , eventType : UPDATE
------->; before
id : 1    update=false
name : hello-canal    update=false
age : 18    update=false
create_time : 2022-11-15 00:00:00    update=false
is_deleted : 0    update=false
------->; after
id : 1    update=false
name : hello-canal-28    update=true
age : 28    update=true
create_time : 2022-11-15 00:00:00    update=false
is_deleted : 0    update=false

delete测试

delete  from `canal_user`  where id = 1;

SimpleCanalClient监听到了:

================; binlog[mysql-bin.000005:7938] , dbName:my-test, tableName:canal_user , eventType : DELETE
id : 1    update=false
name : hello-canal-28    update=false
age : 28    update=false
create_time : 2022-11-15 00:00:00    update=false
is_deleted : 0    update=false

3.2 第三方客户端

第三方客户端采用SpringBoot整合:https://github.com/chenqian56131/spring-boot-starter-canal
具体就不demo了,有兴趣的可以自行玩玩.


Demo下载

https://download.csdn.net/download/scm_2008/87017870

参考

Canal官网
超详细canal入门,看这篇就够了
阿里的数据同步神器——Canal
实时采集Canal快速入门

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/6988.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【文件传输】实现下载

文章目录下载&#xff1a;下载的过程&#xff1a;单文件传输&#xff1a;多文件传输&#xff1a;下载&#xff1a; 如下图&#xff1a; 如果刚发完size&#xff0c;客户端不回复ok&#xff0c;直接发送数据&#xff0c;会出现粘包问题。如果想要确保客户端收到数据&#xff0…

流行的前端开源报表工具有哪些?适合在企业级应用的

前端开源报表工具有很多&#xff0c;但是如果是企业级应用真心不建议选用。比如非要选择开源的报表工具&#xff0c;你需要投入一个或多个程序员来做这个事情&#xff0c;他们还得先学习这个开源报表工具的界面、功能使用操作等等&#xff0c;尤其是很多开源工具的学习资料还是…

dpdk Vhost 库

1、 怎么实现vhost_dev的VhostOps的vhost_set_vring_kick和vhost_set_vring_call&#xff1b; vhost_net kernel方式的vhost_set_vring_kick和vhost_set_vring_call依赖于/dev/vhost_net的ioctl。 有两种实现方式&#xff1a; 1、guest是server&#xff0c;dpdk vhost user是…

Sourcetree的实际使用开发笔记

目录 前言 一、Sourcetree是什么 二、Sourcetree使用步骤 1.创建仓库 2.拉取和推送的使用 3.创建新的自己的分支 三、使用Sourcetree来进行不提交本地的代码&#xff0c;而获取最新的代码拉取 总结 前言 本章主要是记录一下Sourcetree的基本使用功能。 一、Sourcetree是…

Android:Binder思考笔记

基础知识 进程空间划分 一个进程空间分为用户空间与内核空间。用户空间与内核空间都是虚拟内存&#xff0c;映射到物理内存。所有进程的内核空间映射到同一块物理内存&#xff0c;是共享的二者区别&#xff1a; 进程间&#xff0c;用户空间的数据不可共享&#xff0c;即用户空…

线性与树型数据结构可视化模拟器

线性与树型数据结构可视化模拟器 题目2:线性与树型数据结构可视化模拟器 [问题描述] 数据结构课程是计算机类专业的核心课程之一&#xff0c;是计算机科学与技术必修的专业基础课程。数据结构研究的范围和计算机软件有着密切的联系。课程涉及到大量的概念、定义以及数据结构的…

JS数据类型的探究

JS数据类型的探究 分思考三部曲? 是什么?为什么?怎么做? 一:什么是数据类型? 在程序设计的类型系统中&#xff0c;数据类型&#xff08;英语&#xff1a;Data type&#xff09;&#xff0c;又称资料型态、资料型别&#xff0c;是用来约束数据的解释。在编程语言中&…

我去面试聊了半天MySQL索引,结果面试官黑脸让我回家等结果...

V-xin&#xff1a;ruyuanhadeng获得600页原创精品文章汇总PDF 目录 1、面试真题2、面试官心理分析3、面试题剖析 1、面试真题 MySQ索引的原理和数据结构能介绍一下吗&#xff1f;b树和b-树有什么区别&#xff1f;MySQL聚簇索引和非聚簇索引的区别是什么&#xff1f;他们分别是…

影响网站排名的4个因素,教你提高网站排名的方法

我们优化网站的目的是&#xff0c;使网站的排名靠前&#xff0c;让更多的用户看到并访问网站&#xff0c;达到营销的效果。网站排名靠前&#xff0c;可以提高企业品牌的知名度&#xff0c;对网站是非常有益的。在网络多变的环境下&#xff0c;影响网站排名有哪些因素呢&#xf…

JAVA使用springboot整合佳博标签打印机(三)

在JAVA使用springboot整合佳博标签打印机&#xff08;二&#xff09;的文章中使用的是花生壳进行的内网穿透 花生壳内网穿透会有异常情况,因为使用的是花生壳免费版本的内网穿透,可能会有服务异常的情况出现,免费服务没有保障 优化方案使用NGINX替换花生壳来实现内网穿透 准…

从头开始实现一个留言板-README

前言 仓库地址&#xff1a;da1234cao/RestFulAPIDemo 由于我没做过C服务器的开发&#xff0c;没有这方面的工程经验&#xff0c;所以代码中&#xff0c;大概率有一些我意识不到的问题&#xff0c;欢迎提issue。 PS: 代码使用C实现restful接口&#xff0c;给前端提供数据读取…

如何封装一个实用的上传组件

前言 马上放假了&#xff0c;时间上相对宽裕&#xff0c;对最近做的东西进行一些总结。今天我们来看一个非常实用的组件&#xff0c;上传组件 我们先从组件的定位、组件的应用场景、组件的特性几个方面进行归纳 定位&#xff1a; 对于上传组件&#xff0c;基础的组件功能属…

【C++】-- 继承

目录 继承的概念及定义 继承的概念 继承的定义 定义格式 继承基类成员访问方式的变化 基类和派生类对象赋值转换 子类对象可以赋值给父类对象/指针/引用 派生类对象赋值给基类的对象 派生类对象赋值给基类的指针 派生类对象赋值给基类的引用 继承中的作用域 派生类的默认成员函…

IM开源项目OpenIM部署文档-从准备工作到nginx配置

IM开源项目OpenIM部署文档-从准备工作到nginx配置 2022-11-14 22:27OpenIM 一、准备工作 运行环境 linux系统即可&#xff0c; Ubuntu 7.5.0-3ubuntu1~18.04最优 图片视频文件存储 支持cos/MinIO https/wss协议 1. 需申请域名或者子域名&#xff08;web im端登录注册及im…

C# HTML

一 HTML 超文本标记语言 在HTML当中存在着大量的标签&#xff0c;我们用HTML提供的标签&#xff0c;将要显示在网页中的内容包含起来。就构成了我们的网页。 二 CSS CSS 控制网页内容显示的效果。 HTMLCSS静态网页。 JSJquery 动态效果。 三 开始动手写HTML页面 ① 首先在…

XSS进阶二

目录实验目的预备知识实验环境实验步骤一实例四、换一个角度&#xff0c;阳光依旧实验步骤二实例五、限制了我的左手&#xff0c;我还有右手实验步骤三实例六、大胆去思考&#xff0c;小心去求证实验目的 1.深入理解xss工作原理。 2.怎么去绕过规则实现xss。 3.培养学生的独立…

Spring Cloud(十):Spring Cloud Skywalking

链路追踪组件选型 Zipkin是Twitter开源的调用链分析工具&#xff0c;目前基于springcloud sleuth得到了广泛的使用&#xff0c;特点是轻量&#xff0c;使用部署简单。Pinpoint是韩国人开源的基于字节码注入的调用链分析&#xff0c;以及应用监控分析工具。特点是支持多种插件&…

XCTF-web Robots

场景一&#xff1a;Training-WWW-Robots 进入场景&#xff0c;提示关于robots.txt文件 访问robots.txt文件&#xff0c;目录下存在 /fl0g.php 文件&#xff0c;进一步访问得到flag 场景二&#xff1a;robots 根据题目&#xff0c;提示关于robots.txt协议 访问成功&#xff…

api股票数据接口能实现什么功能?

api股票数据接口在量化投资方面能够受到比较多交易者的开发和使用的&#xff0c;主要是得于股票量化交易数据接口的7个策略十档行情&#xff0c;可以实现一键解决炒股难题&#xff0c;和多指标辅助追踪主力&#xff0c;跟主力做强势股&#xff0c;只有在行股票数据接口一键就可…

零样本图像分类综述

零样本图像分类综述 摘要 零样本图像分类指训练集和测试集在数据的类别上没有交集的情况下进行图像分类&#xff0c;该技术是解决类别标签缺失问题的一种有效手段&#xff0c;因此受到了日益广泛的关注&#xff0c;自提出问题至今。零样本图像分类研究已经大致有十年时间啦。…