MySQL数据实时同步到ES或者HBase canal简介 canal工作原理 canal使用
canal
主要用途是对MySQL
数据库增量日志进行解析,提供增量数据的订阅和消费,简单说就是可以对MySQL
的增量数据进行实时同步,支持同步到MySQL
、Elasticsearch
、HBase
等数据存储中去。
canal工作原理
canal
会模拟MySQL
主库和从库的交互协议,从而伪装成MySQL
的从库,然后向MySQL
主库发送dump
协议,MySQL
主库收到dump
请求会向canal
推送binlog
,canal
通过解析binlog
将数据同步到其他存储中去。
canal使用
接下来我们来学习下canal
的使用,以MySQL
实时同步数据到Elasticsearch
为例。
组件下载
首先我们需要下载canal
的各个组件canal-server、canal-adapter、canal-admin
,下载地址:https://github.com/alibaba/canal/releases
canal
的各个组件的用途各不相同,下面分别介绍下:
-
canal-server(即现在的canal-deployer)
:可以直接监听MySQL
的binlog
,把自己伪装成MySQL
的从库,只负责接收数据,并不做处理。 -
canal-adapter
:相当于canal
的客户端,会从canal-server
中获取数据,然后对数据进行同步,可以同步到MySQL
、Elasticsearch
和HBase
等存储中去。 -
canal-admin
:为canal
提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI
操作界面,方便更多用户快速和安全的操作。
由于不同版本的MySQL
、Elasticsearch
和canal
会有兼容性问题,所以我们先对其使用版本做个约定。
MySQL配置
由于canal
是通过订阅MySQL
的binlog
来实现数据同步的,所以我们需要开启MySQL
的binlog
写入功能,并设置binlog-format
为ROW
模式,我的配置文件为/mydata/mysql/conf/my.cnf
,改为如下内容即可;
[mysqld]
## 设置server_id,同一局域网中需要唯一
server_id=101
## 指定不需要同步的数据库名称
binlog-ignore-db=mysql
## 开启二进制日志功能
log-bin=mall-mysql-bin
## 设置二进制日志使用内存大小(事务)
binlog_cache_size=1M
## 设置使用的二进制日志格式(mixed,statement,row)
binlog_format=row
## 二进制日志过期清理时间。默认值为0,表示不自动清理。
expire_logs_days=7
## 跳过主从复制中遇到的所有错误或指定类型的错误,避免slave端复制中断。
## 如:1062错误是指一些主键重复,1032错误是因为主从数据库数据不一致
slave_skip_errors=1062
- 配置完成后需要重新启动
MySQL
,重启成功后通过如下命令查看binlog
是否启用;
show variables like '%log_bin%'
+---------------------------------+-------------------------------------+
| Variable_name | Value |
+---------------------------------+-------------------------------------+
| log_bin | ON |
| log_bin_basename | /var/lib/mysql/mall-mysql-bin |
| log_bin_index | /var/lib/mysql/mall-mysql-bin.index |
| log_bin_trust_function_creators | OFF |
| log_bin_use_v1_row_events | OFF |
| sql_log_bin | ON |
+---------------------------------+-------------------------------------+
- 再查看
下MySQL
的binlog
模式;
show variables like 'binlog_format%';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
- 接下来需要创建一个拥有从库权限的账号,用于订阅
binlog
,这里创建的账号为canal:canal
;
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
- 创建好测试用的数据库
canal-test
,之后创建一张商品表product
,建表语句如下。
CREATE TABLE `product` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`title` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`sub_title` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`price` decimal(10, 2) NULL DEFAULT NULL,
`pic` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
canal-server使用
将我们下载好的压缩包canal.deployer-1.1.5-SNAPSHOT.tar.gz
上传到Linux
服务器,然后解压到指定目录/mydata/canal-server
,可使用如下命令解压;
tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz
- 解压完成后目录结构如下;
├── bin
│ ├── restart.sh
│ ├── startup.bat
│ ├── startup.sh
│ └── stop.sh
├── conf
│ ├── canal_local.properties
│ ├── canal.properties
│ └── example
│ └── instance.properties
├── lib
├── logs
│ ├── canal
│ │ └── canal.log
│ └── example
│ ├── example.log
│ └── example.log
└── plugin
- 修改配置文件
conf/example/instance.properties
,按如下配置即可,主要是修改数据库相关配置;
# 需要同步数据的MySQL地址
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# 用于同步数据的数据库账号
canal.instance.dbUsername=canal
# 用于同步数据的数据库密码
canal.instance.dbPassword=canal
# 数据库连接编码
canal.instance.connectionCharset = UTF-8
# 需要订阅binlog的表过滤正则表达式
canal.instance.filter.regex=.*\..*
- 使用
startup.sh
脚本启动canal-server
服务;
sh bin/startup.sh
- 启动成功后可使用如下命令查看服务日志信息;
tail -f logs/canal/canal.log
2020-10-26 16:18:13.354 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.17.0.1(172.17.0.1):11111]
2020-10-26 16:18:19.978 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is r
- 启动成功后可使用如下命令查看
instance
日志信息;
tail -f logs/example/example.log
2020-10-26 16:18:16.056 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2020-10-26 16:18:16.061 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2020-10-26 16:18:18.259 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2020-10-26 16:18:18.282 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*..*$
2020-10-26 16:18:18.282 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql.slave_.*$
2020-10-26 16:18:19.543 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2020-10-26 16:18:19.578 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2020-10-26 16:18:19.912 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just last position
{"identity":{"slaveId":-1,"sourceAddress":{"address":"localhost","port":3306}},"postion":{