Overview
Canal is a realtime database syncing tool developed and maintained by Alibaba.
Canal worked as a MySQL slave, which communicates with master using the replication protocol.
graph LR;
MySQL -->|push binary log| Canal[Canal: Pretend to be a MySQL slave];
Canal -->|dump| MySQL;
Canal -->|Push| MQ[Kafka/RocketMQ/RabbitMQ/...] ;
Plan:
Node |
Service |
hadoop001 |
MySQL |
hadoop002 |
Canal/Kafka server |
hadoop003 |
Kafka consumer |
1. Installation Canal
Download canal 1.1.5 from its official website.
Extract it to /opt/module/canal:
1
2
| [root@hadoop002 ~]# mkdir /opt/module/canal/
[root@hadoop002 ~]# tar xvf canal.deployer-1.1.5.tar.gz -C /opt/module/canal/
|
Notice: the latest version is 1.1.6, which keeps occurring the following error, I am sure I don't have a BASE table in my DB
1
2
3
| com.alibaba.otter.canal.parse.exception.CanalParseException: \
java.io.IOException: ErrorPacket [errorNumber=1146, fieldCount=-1, \
message=Table 'mall.BASE TABLE' doesn't exist, sqlState=42S02, sqlStateMarker=#]
|
2. Start Zookeeper in all cluster nodes
1
2
3
4
| [root@hadoop001 ~]# zkServer.sh start
[root@hadoop001 ~]# jps
26711 Jps
24999 QuorumPeerMain
|
1
2
3
4
| [root@hadoop002 ~]# zkServer.sh start
[root@hadoop002 ~]# jps
17846 Jps
15979 QuorumPeerMain
|
1
2
3
4
| [root@hadoop003 ~]# zkServer.sh start
[root@hadoop003 ~]# jps
29364 QuorumPeerMain
29178 Jps
|
3. Start MySQL as Master in Hadoop001
Install MySQL or MariaDB of 5.x version in node hadoop001. In CentOS 7.x, default is MariaDB, so directly install it with yum:
1
| [root@hadoop001 ~]# yum install -y mariadb-server
|
Add following optinos in my.cnf
1
2
3
4
5
6
7
8
| [root@hadoop001 ~]# cat /etc/my.cnf
[mysqld]
log-bin=/var/log/mysql/master-bin
binlog-format=ROW
server_id=1
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock
sync_binlog=1
|
Start service:
1
| [root@hadoop001 ~]# systemctl start mariadb
|
Add a user canal to do the replication, password is also “canal” which is the default value in Canal’s config.
1
2
3
4
5
6
7
8
9
10
11
12
| [root@hadoop001 ~]# mysql
Welcome to the MariaDB monitor. Commands end with ; or \g.
Your MariaDB connection id is 563
Server version: 5.5.68-MariaDB MariaDB Server
Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
MariaDB [(none)]> grant replication client,replication slave on *.* to 'canal'@'%' identified by 'canal';
Query OK, 0 rows affected (0.00 sec)
MariaDB [(none)]> flush privileges;
Query OK, 0 rows affected (0.00 sec)
|
4. Start Kafka server in Hadoop002
I already installed Kafka in the cluster, start the service directly:
1
2
3
4
5
| [root@hadoop002 kafka_2.11-2.4.1]# ./bin/kafka-server-start.sh -daemon config/server.properties
[root@hadoop002 kafka_2.11-2.4.1]# jps
17462 Jps
19592 Kafka
15979 QuorumPeerMain
|
Edit config file:
1
2
| [root@hadoop002 ~]# cd /opt/module/canal/conf/
[root@hadoop002 conf]# vim example/instance.properties
|
Change the following options, keep other options as default:
1
2
3
| canal.instance.mysql.slaveId=2
# position info
canal.instance.master.address=hadoop001:3306
|
Edit canal.properties:
- Open option “canal.instance.parser.parallelThreadSize = 16”;
- Change serverMode to kafka;
- Set bootstrap server of kafka;
1
2
3
4
5
6
| canal.instance.parser.parallelThreadSize = 16
...
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = kafka
...
kafka.bootstrap.servers = hadoop002:9092
|
Start Canal service:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
| [root@hadoop002 canal]# ./bin/startup.sh
cd to /opt/module/canal/bin for workaround relative path
LOG CONFIGURATION : /opt/module/canal/bin/../conf/logback.xml
canal conf : /opt/module/canal/bin/../conf/canal.properties
CLASSPATH :/opt/module/canal/bin/../conf:/opt/module/canal/bin/../lib/zookeeper-3.4.5.jar:/opt/module/canal/bin/../lib/zkclient-0.10.jar:/opt/module/canal/bin/../lib/spring-tx-5.0.5.RELEASE.jar:/opt/module/canal/bin/../lib/spring-orm-5.0.5.RELEASE.jar:/opt/module/canal/bin/../lib/spring-jdbc-5.0.5.RELEASE.jar:/opt/module/canal/bin/../lib/spring-jcl-5.0.5.RELEASE.jar:/opt/module/canal/bin/../lib/spring-expression-5.0.5.RELEASE.jar:/opt/module/canal/bin/../lib/spring-core-5.0.5.RELEASE.jar:/opt/module/canal/bin/../lib/spring-context-5.0.5.RELEASE.jar:/opt/module/canal/bin/../lib/spring-beans-5.0.5.RELEASE.jar:/opt/module/canal/bin/../lib/spring-aop-5.0.5.RELEASE.jar:/opt/module/canal/bin/../lib/slf4j-api-1.7.12.jar:/opt/module/canal/bin/../lib/simpleclient_pushgateway-0.4.0.jar:/opt/module/canal/bin/../lib/simpleclient_httpserver-0.4.0.jar:/opt/module/canal/bin/../lib/simpleclient_hotspot-0.4.0.jar:/opt/module/canal/bin/../lib/simpleclient_common-0.4.0.jar:/opt/module/canal/bin/../lib/simpleclient-0.4.0.jar:/opt/module/canal/bin/../lib/protobuf-java-3.6.1.jar:/opt/module/canal/bin/../lib/oro-2.0.8.jar:/opt/module/canal/bin/../lib/netty-all-4.1.6.Final.jar:/opt/module/canal/bin/../lib/netty-3.2.2.Final.jar:/opt/module/canal/bin/../lib/mysql-connector-java-5.1.48.jar:/opt/module/canal/bin/../lib/mybatis-spring-2.0.4.jar:/opt/module/canal/bin/../lib/mybatis-3.5.4.jar:/opt/module/canal/bin/../lib/logback-core-1.1.3.jar:/opt/module/canal/bin/../lib/logback-classic-1.1.3.jar:/opt/module/canal/bin/../lib/jsr305-3.0.2.jar:/opt/module/canal/bin/../lib/joda-time-2.9.4.jar:/opt/module/canal/bin/../lib/jctools-core-2.1.2.jar:/opt/module/canal/bin/../lib/jcl-over-slf4j-1.7.12.jar:/opt/module/canal/bin/../lib/javax.annotation-api-1.3.2.jar:/opt/module/canal/bin/../lib/j2objc-annotations-1.1.jar:/opt/module/canal/bin/../lib/httpcore-4.4.3.jar:/opt/module/canal/bin/../lib/httpclient-4.5.1.jar:/opt/module/canal/bin/../lib/h2-1.4.196.jar:/opt/module/canal/bin/../lib/guava-22.0.jar:/opt/module/canal/bin/../lib/fastjson-1.2.58.sec06.jar:/opt/module/canal/bin/../lib/error_prone_annotations-2.0.18.jar:/opt/module/canal/bin/../lib/druid-1.2.6.jar:/opt/module/canal/bin/../lib/disruptor-3.4.2.jar:/opt/module/canal/bin/../lib/connector.core-1.1.5.jar:/opt/module/canal/bin/../lib/commons-logging-1.2.jar:/opt/module/canal/bin/../lib/commons-lang3-3.7.jar:/opt/module/canal/bin/../lib/commons-lang-2.6.jar:/opt/module/canal/bin/../lib/commons-io-2.4.jar:/opt/module/canal/bin/../lib/commons-compress-1.9.jar:/opt/module/canal/bin/../lib/commons-codec-1.9.jar:/opt/module/canal/bin/../lib/commons-beanutils-1.8.2.jar:/opt/module/canal/bin/../lib/canal.store-1.1.5.jar:/opt/module/canal/bin/../lib/canal.sink-1.1.5.jar:/opt/module/canal/bin/../lib/canal.server-1.1.5.jar:/opt/module/canal/bin/../lib/canal.protocol-1.1.5.jar:/opt/module/canal/bin/../lib/canal.prometheus-1.1.5.jar:/opt/module/canal/bin/../lib/canal.parse.driver-1.1.5.jar:/opt/module/canal/bin/../lib/canal.parse.dbsync-1.1.5.jar:/opt/module/canal/bin/../lib/canal.parse-1.1.5.jar:/opt/module/canal/bin/../lib/canal.meta-1.1.5.jar:/opt/module/canal/bin/../lib/canal.instance.spring-1.1.5.jar:/opt/module/canal/bin/../lib/canal.instance.manager-1.1.5.jar:/opt/module/canal/bin/../lib/canal.instance.core-1.1.5.jar:/opt/module/canal/bin/../lib/canal.filter-1.1.5.jar:/opt/module/canal/bin/../lib/canal.deployer-1.1.5.jar:/opt/module/canal/bin/../lib/canal.common-1.1.5.jar:/opt/module/canal/bin/../lib/aviator-2.2.1.jar:/opt/module/canal/bin/../lib/animal-sniffer-annotations-1.14.jar:
cd to /opt/module/canal for continue
[root@hadoop002 canal]# jps
19970 Jps
19815 CanalLauncher
19592 Kafka
15979 QuorumPeerMain
[root@hadoop002 canal]# tail -f logs/canal/canal.log
2022-07-16 20:40:13.193 [Thread-6] INFO com.alibaba.otter.canal.deployer.CanalController - ## stop the canal server[172.17.0.3(172.17.0.3):11111]
2022-07-16 20:40:13.200 [Thread-6] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## canal server is down.
2022-07-16 20:40:23.167 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2022-07-16 20:40:23.215 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2022-07-16 20:40:23.352 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'kerberos.enable' was supplied but isn't a known config.
2022-07-16 20:40:23.354 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'kerberos.krb5.file' was supplied but isn't a known config.
2022-07-16 20:40:23.354 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration 'kerberos.jaas.file' was supplied but isn't a known config.
2022-07-16 20:40:23.356 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2022-07-16 20:40:23.406 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.17.0.3(172.17.0.3):11111]
2022-07-16 20:40:25.003 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
[root@hadoop002 canal]# tail -f logs/example/example.log
2022-07-16 20:40:12.198 [Thread-6] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - stop CannalInstance for null-example
2022-07-16 20:40:13.192 [Thread-6] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - stop successful....
2022-07-16 20:40:24.779 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2022-07-16 20:40:24.799 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2022-07-16 20:40:24.799 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
2022-07-16 20:40:24.998 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2022-07-16 20:40:25.044 [destination = example , address = hadoop001/172.17.0.16:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2022-07-16 20:40:25.069 [destination = example , address = hadoop001/172.17.0.16:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just last position
{"identity":{"slaveId":-1,"sourceAddress":{"address":"hadoop001","port":3306}},"postion":{"gtid":"","included":false,"journalName":"master-bin.000006","position":4747,"serverId":1,"timestamp":1657967227000}}
2022-07-16 20:40:25.528 [destination = example , address = hadoop001/172.17.0.16:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> \
find start position successfully, EntryPosition[included=false,journalName=master-bin.000006,position=4747,serverId=1,gtid=,timestamp=1657967227000] cost : 472ms , \
the next step is binlog dump
|
The Cancel service start successfully.
6. Start Kafka Consumer in Hadoop003
1
| [root@hadoop003 ~]# kafka-console-consumer.sh --bootstrap-server hadoop002:9092 --topic example
|
In Hadoop001, we insert a row to books table:
1
2
| MariaDB [mall]> insert into books values(7, "Web Development");
Query OK, 1 row affected (0.00 sec)
|
Immediately we get the data change in Kafka consumer window:
1
2
| [root@hadoop003 ~]# kafka-console-consumer.sh --bootstrap-server hadoop002:9092 --topic example
{"data":[{"id":"7","name":"Web Development"}],"database":"mall","es":1657975530000,"id":2,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(45)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"books","ts":1657975530937,"type":"INSERT"}
|
Great! Everything seems going on well.