Canal 安装与入门

news2025/7/13 5:06:22

MySQL Binlog 简介

https://blog.csdn.net/weixin_44371237/article/details/127904514

MySQL 主从复制过程

1)Master 主库将改变记录,写到二进制日志(Binary Log)中;

2)Slave 从库向 MySQL Master 发送 dump 协议,将 Master 主库的 binary log events 拷贝到它的中继日志(relay log);

3)Slave 从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库。

Canal 的工作原理

把自己伪装成 Slave,假装从 Master 复制数据

使用场景:同步数据库到redis

在这里插入图片描述

MySQL 的准备

启动MySQL
service mysqld start

登录 mysql
mysql -uroot -proot123456

创建数据库canal,表 test

赋权限

mysql> set global validate_password_length=4; 
mysql> set global validate_password_policy=0; 
mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal' ;

查看mysql数据库,可以看到用户已生效
在这里插入图片描述

Canal 的下载和安装

下载并解压 Jar 包
https://github.com/alibaba/canal/releases

上传jar包到software,在/opt/module/创建canal文件夹,再解压
tar -zxvf /software/canal.deployer-1.1.2.tar.gz -C /opt/module/canal

修改 canal.properties 的配置(不用改)
vim /opt/module/canal/conf/canal.properties

进入
cd /opt/module/canal/conf/example

修改配置文件 vim instance.properties

## 只要跟/etc/my.cnf的server.id=1不一样就行
canal.instance.mysql.slaveId=20
canal.instance.master.address=hadoop100:3306

启动
/opt/module/canal/bin/startup.sh

jps查看有CanalLauncher进程
在这里插入图片描述

TCP 模式测试

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.chen</groupId>
    <artifactId>canal</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.1</version>
        </dependency>
    </dependencies>
</project>

CanalClient

package com.chen;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

import java.net.InetSocketAddress;
import java.util.List;

public class CanalClient {
    public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException {
        //连接器
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop100", 11111), "example", "", "");

        while (true){
            //连接
            canalConnector.connect();
            //订阅数据库
            canalConnector.subscribe("canal.*");
            //获取数据
            Message message = canalConnector.get(100);
            //获取Entry集合
            List<CanalEntry.Entry> entries = message.getEntries();
            //判断集合是否为空,为空则等待一会继续拉取
            if (entries.size()<=0){
                System.out.println("当次抓去没有数据,休息一会儿。。。");
                Thread.sleep(1000);
            }else {
                //遍历entries,单条解析
                for (CanalEntry.Entry entry : entries) {
                    //获取表名
                    String tableName = entry.getHeader().getTableName();
                    //获取类型
                    CanalEntry.EntryType entryType = entry.getEntryType();
                    //获取序列化后的数据
                    ByteString storeValue = entry.getStoreValue();
                    //判断entry类型是否为ROWDATA类型
                    if (CanalEntry.EntryType.ROWDATA.equals(entryType)){
                        //反序列化
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                        //获取当前事件操作类型
                        CanalEntry.EventType eventType = rowChange.getEventType();
                        //获取数据集
                        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
                        //遍历
                        for (CanalEntry.RowData rowData : rowDatasList) {
                            //改变前数据
                            JSONObject jsonObjectBefore = new JSONObject();
                            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                            for (CanalEntry.Column column : beforeColumnsList) {
                                jsonObjectBefore.put(column.getName(),column.getValue());
                            }
                            //改变后数据
                            JSONObject jsonObjectAfter = new JSONObject();
                            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                            for (CanalEntry.Column column : afterColumnsList) {
                                jsonObjectAfter.put(column.getName(),column.getValue());
                            }
                            System.out.println("Table:"+tableName+",EventTpye:"+eventType+",Before:"+jsonObjectBefore+",After:"+jsonObjectAfter);
                        }
                    }else {
                        System.out.println("当前操作类型为:"+entryType);
                    }
                }
            }
        }
    }
}

运行程序

然后向canal数据库的test表插入数据

insert into canal.test values(1,'aaa'); 

控制台输出如下
在这里插入图片描述

Kafka 模式测试

修改 canal.properties 的配置
vim /opt/module/canal/conf/canal.properties

canal.serverMode = kafka
canal.mq.servers = hadoop100:9092,hadoop101:9092,hadoop102:9092

修改instance.properties
vim /opt/module/canal/conf/example/instance.properties

canal.mq.topic=canal

启动zookeeper和kafka
/home/zk.sh start
/home/kafka.sh start

启动canal
/opt/module/canal/bin/startup.sh

启动kafka消费者canal

bin/kafka-console-consumer.sh --bootstrap-server hadoop100:9092 --topic canal

向canal数据库的test表插入数据

insert into canal.test values(8,'aaa'); 

可以看到kafka消费者接收到如下
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/17873.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于QT的考试管理系统设计与实现

目录 一、项目概要 4 1.1项目名称 4 1.2项目目标 4 1.3软件概要 4 1.4功能描述 5 1.5开发环境 5 1.6关键技术 6 1.7开发体制 6 1.8开发阶段 6 二、软件详细需求 7 2.1学生登陆主界面 7 2.2管理员登陆主界面 8 2.3 学生考试系统实现 9 2.4学生练习系统实现 10 2.5试题管理系统实…

mongoDB mapreduce使用总结

大家都知道&#xff0c;mongodb是一个非关系型数据库&#xff0c;也就是说&#xff0c;mongodb数据库中的每张表是独立存在的&#xff0c;表与表之间没有任何依赖关系。在mongodb中&#xff0c;除了各种CRUD语句之外&#xff0c;还给我们提供了聚合和mapreduce统计的功能&#…

JVM 彻底搞懂JVM内存区域及直接内存

面试题&#xff1a;直接内存会导致OOM么&#xff1f; 程序计数器 代表当前线程所执行的字节码所在的行号&#xff0c;配合字节码解释器获取下一条需要执行的字节码指令。 代码中的分支、循环、跳转、异常处理、线程恢复都要依靠它来实现。 程序计数器是线程私有的&#xff0…

进程控制的一些具体操作

目录进程控制进程终止进程退出的方式进程等待进程等待的方法wait使用方法waitpid使用方法进程程序替换替换函数execl函数execv函数execlp函数execvp函数execle函数execve函数---->只有这一个是系统调用&#xff0c;其他都是库函数execvpe函数补充几个知识: %s/被替换的文件…

代码随想录——冗余连接II(并查集)

题目 在本问题中&#xff0c;有根树指满足以下条件的 有向 图。该树只有一个根节点&#xff0c;所有其他节点都是该根节点的后继。该树除了根节点之外的每一个节点都有且只有一个父节点&#xff0c;而根节点没有父节点。 输入一个有向图&#xff0c;该图由一个有着 n 个节点&am…

vb.net自定义白板

希沃白板在学校里基本上是一直使用的&#xff0c;但是在非希沃电脑里面是没有启动白板的 简答介绍思路和具体的功能 1、背景颜色和画笔颜色自由切换、画笔粗细1~20可以调节。 2、画笔样式&#xff1a;虚线、点线、短线 3、基本图形&#xff1a;矩形&#xff0c;正方形&…

程序员级别分析,看看你是哪个级别

关于程序员的工资众说纷纭&#xff0c;有说开七八千的&#xff0c;也有人说每月上万的&#xff0c;但不管怎么说&#xff0c;程序员的工资是真的比一些文职、行政人员岗位挣得多&#xff0c;大家都是靠自己的能力赚钱&#xff0c;这没什么可比的&#xff0c;况且大家都是在学习…

JAVASE零基础到高级教程(1)------ 集成开发环境安装使用

一 什么是环境变量 环境变量是在操作系统中⼀个具有特定名字的对象&#xff0c;它包含了⼀个或者多个应⽤程序所将使⽤到的 信息。例如Windows和DOS操作系统中的path环境变量&#xff0c;当要求系统运⾏⼀个程序⽽没有告诉它程 序所在的完整路径时&#xff0c;系统除了在当前⽬…

前端框架 Electron 使用总结

目录 一、基础搭建 通过脚手架搭建 1、Electron官方案例搭建环境 2、查看调试 3、菜单的使用 4、图标配置 5、项目打包 web应用相信每位程序员都不陌生&#xff0c;PC端应用可能会底层开发的就不是太多了&#xff0c;下面的这套技术栈就是为前端程序员快速一键搭建windo…

Linux学习——网络编程基础及TCP服务器

目录 一、网络采用分层的思想&#xff1a; 二、各层典型的协议&#xff1a; 三、网络的封包和拆包&#xff1a; 四、网络编程的预备知识 4.1.SOCKET 4.2 IP地址 4.3 端口号 4.4 字节序 五、TCP编程API TCP协议分成了两个不同的协议&#xff1a;可靠传输&#xff1a;用来检测网络…

读书笔记-学习GNU Emacs-3终篇

学习本书目的&#xff1a; emacs的学习一直是陆陆续续看博客和上手实践&#xff0c;这次想通过阅读"学习GNU Emacs"这本书好好系统的再复习下emacs。 yps:读技术书应该是带着一定的目的去读的&#xff0c;最简单的目的可能就是为了学好某一项技术或者复习下某一项技…

[附源码]java毕业设计社区健康服务平台管理系统lunwen

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

IDEA利用maven建立javaWeb项目(IDEA 2021.3.3)

1、在Idea中配置maven (1)、打开Idea&#xff0c;点击File&#xff0c;然后点击Settings&#xff0c;进入设置&#xff0c;或者直接按CtrlAltS进入设置 (2)、先在左上角的搜索框输入maven&#xff0c;找到maven后单击&#xff0c;然后在右边的maven home path的右边选择你的m…

置信度--学习笔记

置信区间是衡量测量精度的一个指标&#xff0c;也能显示出估算有多稳定&#xff0c;也就是说如果重复做某项实验&#xff0c;得到的结果与最初的估计有多接近。步骤&#xff1a; 确定要测试的情况&#xff1a;如“A大学男生的平均体重是80公斤”&#xff0c;则后续就是要测试在…

最新最全面的Spring详解(三)——Resources,验证、数据绑定和类型转换与Spring表达式语言(SpEL)

前言 本文为 【Spring】Resources与Spring表达式语言&#xff08;SpEL&#xff09; 等相关知识&#xff0c;下边将对Resources&#xff08;包含&#xff1a;Resource接口、内置的 Resource的实现、ResourceLoader接口、应用环境和资源路径&#xff09;&#xff0c;验证、数据绑…

浅谈化工生产制造企业软件系统的选择

现在大家都在讨论全球COVID流行和经济衰退对企业的影响&#xff0c;以及一个有作为的企业&#xff0c;在当下的环境下如何求生存和谋发展的问题。“埃森哲的一份报告发现&#xff0c;99%的首席运营官都认为&#xff0c;使用实时数据运营对于应对Covid或经济衰退威胁等至关重要。…

Java的JDBC编程

1. 数据库编程的必备条件 编程语言&#xff0c;如Java&#xff0c;C、C、Python等数据库&#xff0c;如Oracle&#xff0c;MySQL&#xff0c;SQL Server等数据库驱动包&#xff1a;不同的数据库&#xff0c;对应不同的编程语言提供了不同的数据库驱动包&#xff0c;如&#xf…

Telnet SMTP协议关于“535 Error: authentication failed“解决思路

计算机网络中应用层的SMTP(Simple Mail Transfer Protocol)协议可用来发送邮件&#xff0c;在Telnet使用SMTP登陆账号密码时出现“535 Error: authentication failed”问题。现记录解决步骤。 1. 确认在邮箱中已开启SMTP服务。 2. 按照扫码流程&#xff0c;获得授权密码&…

第六章第二节:图的遍历(广度优先遍历和深度优先遍历)和应用(最小生成树、最短路径、有向无环图的描述表达式、拓扑排序、关键路径)

文章目录1. 图的遍历1.1 广度优先搜索&#xff08;BFS&#xff09;1.1.1 遍历序列的可变性1.1.2 复杂度的分析1.1.3 广度优先生成树1..1.4 广度优先生成森林1.2 深度优先搜索&#xff08;DFS&#xff09;1.2.1 树的深度优先遍历1.2.2 图的深度优先遍历1.2.2 复杂度的分析1.2.4 …

Servlet | 域对象、request对象其它常用的方法

目录 一&#xff1a;域对象 1、应用域对象 2、请求域对象 二&#xff1a;request对象其它常用的方法 一&#xff1a;域对象 1、应用域对象 &#xff08;1&#xff09;应用域对象是什么&#xff1f; ServletContext &#xff08;Servlet上下文对象。&#xff09; 什么情况…