canal实现mysql数据同步

news2025/5/25 0:40:36

目录

1、canal下载

2、mysql同步用户创建和授权

3、canal admin安装和启动

4、canal server安装和启动

5、java 端集成监听canal 同步的mysql数据

6、java tcp同步只是其中一种方式,还可以通过kafka、rabbitmq等方式进行数据同步


1、canal下载

canal实现mysql数据同步可以直接安装canal server就可以了,但是为了方便管理(instance配置,canal server状态管理,集群等),需要安装canal admin,应用下载地址:Releases · alibaba/canal · GitHub

进入页面可以选择需要安装的版本

下载canal.deployer-1.1.8.tar.gz和canal.admin-1.1.8.tar.gz

2、mysql同步用户创建和授权

登录mysql
mysql -h 127.0.0.1 -P 3306 -u root -p

创建同步用户 repl 密码设为123456
CREATE USER 'repl'@'%' IDENTIFIED BY '123456';

给予同步权限
GRANT REPLICATION SLAVE ON *.* to 'repl'@'%' identified by '123456';

给予repl只读test库的权限,test库是用来同步数据的
GRANT SELECT ON test.* to 'repl'@'%' identified by '123456';

canal_manager是canal admin需要的,给予repl对该库的读写权限
GRANT ALL PRIVILEGES ON canal_manager.* to 'repl'@'%' identified by '123456';



mysql my.cnf配置文件增加主从配置master数据库的配置信息
#主数据主从配置 唯一id
server_id=1
#开启logbin
log-bin=mysql-bin
#写入模式 row
binlog-format=ROW
#需要同步的库
binlog-do-db=test
#忽略的数据库
replicate-ignore-db=mysql
replicate-ignore-db=sys
replicate-ignore-db=information_schema
replicate-ignore-db=performance_schema

在canal-admin解压文件的conf中有一个canal_manager.sql,导入到master数据库

3、canal admin安装和启动

把canal.admin-1.1.8.tar.gz上传到linux

解压 tar -zvxf canal.admin-1.1.8.tar.gz 

进入conf目录下,编辑application.yml配置文件。

server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

spring.datasource:
  address: 127.0.0.1:3306
  database: canal_manager
  username: repl
  password: 123456
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=true
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

canal:
  adminUser: admin
  adminPasswd: 123456

重点介绍以下几个参数:

address:我们需要订阅(也就是mysql master服务器)mysql所在的服务器IP和数据库端口。

database:canal.admin web系统必须的几张表,需要在mysql master服务器上初始化conf/canal_manager.sql文件。

sername和password就是mysql master服务器创建的用于复制的用户和密码,也就是我们在canal server中配置的repl 和 123456。

driver-class-name:mysql的驱动,默认是MYSQL5的驱动,如果你的MYSQL是8的(我的就是),要将驱动改为com.mysql.cj.jdbc.Driver。

另外,还需要在mysql连接后面加上allowPublicKeyRetrieval=true,不然启动时,有可能报错。

启动canal.admin

进入bin目录,执行如下命令,启动canal.admin:

./startup.sh

查看 admin 日志

2022-12-10 03:13:58.995 [main] INFO  o.s.jmx.export.annotation.AnnotationMBeanExporter - 
Located MBean 'dataSource': registering with JMX server as MBean [com.zaxxer.hikari:name=dataSource,type=HikariDataSource]
2022-12-10 03:13:59.015 [main] INFO  org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8089"]
2022-12-10 03:13:59.038 [main] INFO  org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
2022-12-10 03:13:59.214 [main] INFO  o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8089 (http) with context path ''
2022-12-10 03:13:59.221 [main] INFO  com.alibaba.otter.canal.admin.CanalAdminApplication - Started CanalAdminApplication in 14.281 seconds (JVM running for 15.894)

如果出现上述日志,说明启动成功!

登录admin

通过http://127.0.0.1:8089/访问,默认密码:admin/123456。

注意,IP和密码需要改成你自己配置的。如果是在服务器上配置的,别忘记放开8089端口。

输入用户名和密码之后,出现上述页面说明配置成功!

如果需要修改密码,直接通过执行 select upper(sha1(unhex(sha1('1234567')))) 这个sql得到结果,然后复制到canal_manager库的canal_user表的password字段中就可以了,其中1234567是明文密码,执行上述sql会得到一个密码。

4、canal server安装和启动

把canal.deployer-1.1.8.tar.gz上传到linux

解压 tar -zvxf ccanal.deployer-1.1.8.tar.gz

进入conf目录下,编辑canal.properties配置文件。

注意,如果直接编辑canal.properties,可能无法启动,报如下错误:

可以通过如下方式修改

mv canal.properties canal.properties_bak
cp canal_local.properties canal.properties
vim canal.properties
# register ip
canal.register.ip =

# canal admin config  canalAdmin 的链接、端口、用户名和MD5密码
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd =6F32482BAFC60F23B7736044CEFC1799166E5CDB
# admin auto register canal server启动后自动注入到canal admin管理模块
canal.admin.register.auto = true
canal.admin.register.cluster =
canal.admin.register.name =

一般只需要修改下面这3个

canal.admin.manager = 127.0.0.1:8089
canal.admin.user = admin
canal.admin.passwd =6F32482BAFC60F23B7736044CEFC1799166E5CDB

启动canal.server

进入bin目录,执行如下命令,启动canal.server:

./startup.sh

查看canal日志

启动后,canalAdmin的server管理模块,对应创建的canal server会动态识别到,状态变为启动

5、java 端集成监听canal 同步的mysql数据

1、引入依赖

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.4</version>
</dependency>

2、编写测试代码

package com.hy.das.config;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;
import java.util.List;

@Component
public class CanalClient implements InitializingBean{

    private final static int BATCH_SIZE = 1000;

    @Override
    public void afterPropertiesSet() throws Exception {
        // 创建链接 此处的11111为tcp端口 在canal admin Server管理模块可以查看
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111),
                "test", "", "");
        try {
            //打开连接
            connector.connect();
            //订阅数据库表,全部表
            connector.subscribe(".*\\..*");
            //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
            connector.rollback();
            while (true) {
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(BATCH_SIZE);
                System.out.println(message.getEntries().size());
                //获取批量ID
                long batchId = message.getId();
                //获取批量的数量
                int size = message.getEntries().size();
                //如果没有数据
                if (batchId == -1 || size == 0) {
                    try {
                        //线程休眠2秒
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    System.out.println("----------------");
                    //如果有数据,处理数据
                    //遍历entries,单条解析
                    for (CanalEntry.Entry entry : message.getEntries()) {
                        //获取表名
                        String tableName = entry.getHeader().getTableName();
                        //获取类型
                        CanalEntry.EntryType entryType = entry.getEntryType();
                        //获取序列化后的数据
                        ByteString storeValue = entry.getStoreValue();
                        //判断entry类型是否为ROWDATA类型
                        if (CanalEntry.EntryType.ROWDATA.equals(entryType)){
                            //反序列化
                            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                            //获取当前事件操作类型
                            CanalEntry.EventType eventType = rowChange.getEventType();
                            //获取数据集
                            List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
                            //遍历
                            for (CanalEntry.RowData rowData : rowDatasList) {
                                //改变前数据
                                JSONObject jsonObjectBefore = new JSONObject();
                                List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                                for (CanalEntry.Column column : beforeColumnsList) {
                                    jsonObjectBefore.put(column.getName(),column.getValue());
                                }
                                //改变后数据
                                JSONObject jsonObjectAfter = new JSONObject();
                                List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                                for (CanalEntry.Column column : afterColumnsList) {
                                    jsonObjectAfter.put(column.getName(),column.getValue());
                                }
                                System.out.println("Table:"+tableName+",EventTpye:"+eventType+",Before:"+jsonObjectBefore+",After:"+jsonObjectAfter);
                            }
                        }else {
                            System.out.println("当前操作类型为:"+entryType);
                        }
                    }
                }
                //进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
                connector.ack(batchId);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            connector.disconnect();
        }
    }
}

newSingleConnector方法里面的test是一个instance实列,定义了需要同步的master库的信息(ip、端口、用户名、密码、binlog文件名称、同步位置、需要同步的库、不需要同步的库等)

在canal admin web管理界面的Instance 管理模块,点击新建Instance进行创建,新建页面的Instance名称就是test,这个可以随便填写,代码对应修改就行,所属集群/主机,因为我这里是单机部署,直接选择自动注入的canal server就行,点击载入模板,获取配置初始信息,下图中标出的信息按照实际的修改填入就行,点击保存后,启动这个Instance。

3、启动服务,对test库的sys_user表进行数据更新,可以看到后台已经收到变更数据

6、java tcp同步只是其中一种方式,还可以通过kafka、rabbitmq等方式进行数据同步

注意上面需要提供对外访问的端口需要开通安全组,比如8089、11111等端口。

参考文章:

【CanalAdmin部署文档】_canal-admin-CSDN博客

https://zhuanlan.zhihu.com/p/590705531

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2384979.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

易境通专线散拼系统:全方位支持多种专线物流业务!

在全球化电商快速发展的今天&#xff0c;跨境电商物流已成为电商运营中极为重要的环节。为了确保物流效率、降低运输成本&#xff0c;越来越多的电商卖家选择专线物流服务。专线物流作为五大主要跨境电商物流模式之一&#xff0c;通过固定的运输路线和流程&#xff0c;极大提高…

06 如何定义方法,掌握有参无参,有无返回值,调用数组作为参数的方法,方法的重载

1.调用方法 2.掌握有参函数 3.调用数组作为参数 一个例题&#xff1a;数组参数&#xff0c;返回值 方法的重载 两个例题&#xff1a;冒泡排序和九九乘法表的格式学习

使用vscode MSVC CMake进行C++开发和Debug

使用vscode MSVC CMake进行C开发和Debug 前言软件安装安装插件构建debuug方案一debug方案二其他 前言 一般情况下我都是使用visual studio来进行c开发的&#xff0c;但是由于python用的是vscode&#xff0c;所以二者如果统一的话能稍微提高一点效率。 软件安装 需要安装的软…

提升开发运维效率:原力棱镜游戏公司的 Amazon Q Developer CLI 实践

引言 在当今快速发展的云计算环境中&#xff0c;游戏开发者面临着新的挑战和机遇。为了提升开发效率&#xff0c;需要更智能的工具来辅助工作流程。Amazon Q Developer CLI 作为亚马逊云科技推出的生成式 AI 助手&#xff0c;为开发者提供了一种新的方式来与云服务交互。 Ama…

@Column 注解属性详解

提示&#xff1a;文章旨在说明 Column 注解属性如何在日常开发中使用&#xff0c;数据库类型为 MySql&#xff0c;其他类型数据库可能存在偏差&#xff0c;需要注意。 文章目录 一、name 方法二、unique 方法三、nullable 方法四、insertable 方法五、updatable 方法六、column…

基于 ESP32 与 AWS 全托管服务的 IoT 架构:MQTT + WebSocket 实现设备-云-APP 高效互联

目录 一、总体架构图 二、设备端(ESP32)低功耗设计(适配 AWS IoT) 1.MQTT 设置(ESP32 连接 AWS IoT Core) 2.低功耗策略总结(ESP32) 三、云端架构(基于 AWS Serverless + IoT Core) 1.AWS IoT Core 接入 2.云端 → APP:WebSocket 推送方案 流程: 3.数据存…

unity在urp管线中插入事件

由于在urp下&#xff0c;打包后传统的相机事件有些无法正确执行&#xff0c;这时候我们需要在urp管线中的特定时机进行处理一些事件&#xff0c;需要创建继承ScriptableRenderPass和ScriptableRendererFeature的脚本&#xff0c;示例如下&#xff1a; PluginEventPass&#xf…

docker安装es连接kibana并安装分词器

使用Docker部署Elasticsearch、Kibana并安装分词器有以下主要优点&#xff1a; 1. 快速部署与一致性 一键式部署&#xff1a;通过Docker Compose可以快速搭建完整的ELK栈环境 环境一致性&#xff1a;确保开发、测试和生产环境完全一致&#xff0c;避免"在我机器上能运行…

线性回归中涉及的数学基础

线性回归中涉及的数学基础 本文详细地说明了线性回归中涉及到的主要的数学基础。 如果数学基础很扎实可以直接空降博文: 线性回归&#xff08;一&#xff09;-CSDN博客 一、概率、似然与概率密度函数 1. 概率&#xff08;Probability&#xff09; 定义&#xff1a;概率是描述…

如何计算VLLM本地部署Qwen3-4B的GPU最小配置应该是多少?多人并发访问本地大模型的GPU配置应该怎么分配?

本文一定要阅读我上篇文章&#xff01;&#xff01;&#xff01; 超详细VLLM框架部署qwen3-4B加混合推理探索&#xff01;&#xff01;&#xff01;-CSDN博客 本文是基于上篇文章遗留下的问题进行说明的。 一、本文解决的问题 问题1&#xff1a;我明明只部署了qwen3-4B的模型…

Attu下载 Mac版与Win版

通过Git地址下载 Mac 版选择对于的架构进行安装 其中遇到了安装不成功&#xff0c;文件损坏等问题 一般是两种情况导致 1.安装版本不对 2.系统权限限制 https://www.cnblogs.com/similar/p/11280162.html打开terminal执行以下命令 sudo spctl --master-disable安装包Git下载地…

V2X协议|如何做到“车联万物”?【无线通信小百科】

1、什么是V2X V2X&#xff08;Vehicle-to-Everything&#xff09;即“车联万物”&#xff0c;是一项使车辆能够与周围环境实现实时通信的前沿技术。它允许车辆与其他交通参与者和基础设施进行信息交互。通过V2X&#xff0c;车辆不仅具备“远程感知”能力&#xff0c;还能在更大…

[测试_3] 生命周期 | Bug级别 | 测试流程 | 思考

目录 一、软件测试的生命周期&#xff08;重点&#xff09; 1、软件测试 & 软件开发生命周期 &#xff08;1&#xff09;需求分析 &#xff08;2&#xff09;测试计划 &#xff08;3&#xff09;测试设计与开发 &#xff08;4&#xff09;测试执行 &#xff08;5&am…

RabbitMQ ⑤-顺序性保障 || 消息积压 || 幂等性

幂等性保障 幂等性&#xff08;Idempotency&#xff09; 是计算机科学和网络通信中的一个重要概念&#xff0c;指的是某个操作无论被执行多少次&#xff0c;所产生的效果与执行一次的效果相同。 应用程序的幂等性&#xff1a; 在应用程序中&#xff0c;幂等性就是指对一个系统…

java基础知识回顾1(可用于Java基础速通)考前,面试前均可用!

目录 一、初识java 二、基础语法 1.字面量 2.变量 3.关键字 4.标识符 声明&#xff1a;本文章根据黑马程序员b站教学视频做的笔记&#xff0c;可对应课程听&#xff0c;课程链接如下: 02、Java入门&#xff1a;初识Java_哔哩哔哩_bilibili 一、初识java Java是美国 sun 公…

云原生CICD-Tekton入门到精通

文章目录 一、Tekton介绍二、Tekton组件介绍三、执行流程四、安装Tekton管道五、安装Tekton Dashboard六、安装Tekton Cli七、运行单Task八、运行流水线九、在流水线中使用secret十、taskSpec、taskRef、pipelineRef、pipelineSpec使用pipelineRef与taskRef结合使用(推荐)pipel…

opencv 图像的平移和旋转

warpAffine函数讲解,图片可自行下载&#xff0c;也可用自己的图片 原图im 平移im_shifted 旋转im_rotated # 图像仿射变换 # 步骤&#xff1a; 读取图像 -> 创建仿射变换矩阵 -> 仿射变换计算 # 平移变换矩阵&#xff1a;一种写法&#xff0c;直接写死 # 旋转变…

IDEA2025版本使用Big Data Tools连接Linux上Hadoop的HDFS

目录 Windows的准备 1. 将与Linux上版本相同的hadoop压缩包解压到本地 ​编辑2.设置$HADOOP HOME环境变量指向:E:\hadoop-3.3.4 3.下载hadoop.dll和winutils.exe文件 4.将hadoop.dll和winutils.exe放入$HADOOP HOME/bin中 IDEA中操作 1.下载Big Data Tools插件 2.添加并连…

hysAnalyser特色的TS流编辑、剪辑和转存MP4功能说明

摘要 hysAnalyser 是一款特色的 MPEG-TS 数据分析工具&#xff0c;融合了常规TS文件的剪辑&#xff0c;转存功能&#xff0c;可用于平常的视频开发和测试。 本文详细阐述了对MPEG-TS 流的节目ID&#xff0c;名称&#xff0c;PID&#xff0c;时间戳&#xff0c;流类型&#xff…

Google机器学习实践指南(学习速率篇)

&#x1f525;Google机器学习核心概念精讲&#xff08;学习速率&#xff09; Google机器学习实战(7)-5分钟掌握学习速率。 学习速率&#xff1a;模型训练的关键超参数 学习速率是指在训练模型时用于梯度下降的一个标量。在每次迭代期间&#xff0c;梯度下降法都会将学习速率…