Build Realtime Data Flow With Canal

Build a pipeline from MySQL to Kafka

Posted by Bourne's Blog - A Full-stack & Web3 Developer on July 16, 2022

Overview

Canal is a realtime database syncing tool developed and maintained by Alibaba.
Canal worked as a MySQL slave, which communicates with master using the replication protocol.

graph LR;
MySQL -->|push binary log| Canal[Canal: Pretend to be a MySQL slave];
Canal -->|dump| MySQL;
Canal -->|Push| MQ[Kafka/RocketMQ/RabbitMQ/...] ;

Canal work flow

Plan:

Node Service
hadoop001 MySQL
hadoop002 Canal/Kafka server
hadoop003 Kafka consumer

1. Installation Canal

Download canal 1.1.5 from its official website.

Extract it to /opt/module/canal:

1
2
[root@hadoop002 ~]# mkdir /opt/module/canal/
[root@hadoop002 ~]# tar xvf canal.deployer-1.1.5.tar.gz -C /opt/module/canal/
Notice: the latest version is 1.1.6, which keeps occurring the following error, I am sure I don't have a BASE table in my DB
1
2
3
com.alibaba.otter.canal.parse.exception.CanalParseException:  \
java.io.IOException: ErrorPacket [errorNumber=1146, fieldCount=-1, \ 
message=Table 'mall.BASE TABLE' doesn't exist, sqlState=42S02, sqlStateMarker=#]

2. Start Zookeeper in all cluster nodes

1
2
3
4
[root@hadoop001 ~]# zkServer.sh start
[root@hadoop001 ~]# jps
26711 Jps
24999 QuorumPeerMain
1
2
3
4
[root@hadoop002 ~]# zkServer.sh start
[root@hadoop002 ~]# jps
17846 Jps
15979 QuorumPeerMain
1
2
3
4
[root@hadoop003 ~]# zkServer.sh start
[root@hadoop003 ~]# jps
29364 QuorumPeerMain
29178 Jps

3. Start MySQL as Master in Hadoop001

Install MySQL or MariaDB of 5.x version in node hadoop001. In CentOS 7.x, default is MariaDB, so directly install it with yum:

1
[root@hadoop001 ~]# yum install -y mariadb-server

Add following optinos in my.cnf

1
2
3
4
5
6
7
8
[root@hadoop001 ~]# cat /etc/my.cnf
[mysqld]
log-bin=/var/log/mysql/master-bin
binlog-format=ROW
server_id=1
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock
sync_binlog=1

Start service:

1
[root@hadoop001 ~]# systemctl start mariadb

Add a user canal to do the replication, password is also “canal” which is the default value in Canal’s config.

1
2
3
4
5
6
7
8
9
10
11
12
[root@hadoop001 ~]# mysql
Welcome to the MariaDB monitor.  Commands end with ; or \g.
Your MariaDB connection id is 563
Server version: 5.5.68-MariaDB MariaDB Server

Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
MariaDB [(none)]> grant replication client,replication slave on *.* to 'canal'@'%' identified by 'canal';
Query OK, 0 rows affected (0.00 sec)
MariaDB [(none)]> flush privileges;
Query OK, 0 rows affected (0.00 sec)

4. Start Kafka server in Hadoop002

I already installed Kafka in the cluster, start the service directly:

1
2
3
4
5
[root@hadoop002 kafka_2.11-2.4.1]# ./bin/kafka-server-start.sh -daemon config/server.properties
[root@hadoop002 kafka_2.11-2.4.1]# jps
17462 Jps
19592 Kafka
15979 QuorumPeerMain

5. Configure Canal

Edit config file:

1
2
[root@hadoop002 ~]# cd /opt/module/canal/conf/
[root@hadoop002 conf]# vim example/instance.properties

Change the following options, keep other options as default:

1
2
3
canal.instance.mysql.slaveId=2
# position info
canal.instance.master.address=hadoop001:3306

Edit canal.properties:

  • Open option “canal.instance.parser.parallelThreadSize = 16”;
  • Change serverMode to kafka;
  • Set bootstrap server of kafka;
1
2
3
4
5
6
canal.instance.parser.parallelThreadSize = 16
...
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = kafka
...
kafka.bootstrap.servers = hadoop002:9092

Start Canal service:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
[root@hadoop002 canal]# ./bin/startup.sh
cd to /opt/module/canal/bin for workaround relative path
LOG CONFIGURATION : /opt/module/canal/bin/../conf/logback.xml
canal conf : /opt/module/canal/bin/../conf/canal.properties
CLASSPATH :/opt/module/canal/bin/../conf:/opt/module/canal/bin/../lib/zookeeper-3.4.5.jar:/opt/module/canal/bin/../lib/zkclient-0.10.jar:/opt/module/canal/bin/../lib/spring-tx-5.0.5.RELEASE.jar:/opt/module/canal/bin/../lib/spring-orm-5.0.5.RELEASE.jar:/opt/module/canal/bin/../lib/spring-jdbc-5.0.5.RELEASE.jar:/opt/module/canal/bin/../lib/spring-jcl-5.0.5.RELEASE.jar:/opt/module/canal/bin/../lib/spring-expression-5.0.5.RELEASE.jar:/opt/module/canal/bin/../lib/spring-core-5.0.5.RELEASE.jar:/opt/module/canal/bin/../lib/spring-context-5.0.5.RELEASE.jar:/opt/module/canal/bin/../lib/spring-beans-5.0.5.RELEASE.jar:/opt/module/canal/bin/../lib/spring-aop-5.0.5.RELEASE.jar:/opt/module/canal/bin/../lib/slf4j-api-1.7.12.jar:/opt/module/canal/bin/../lib/simpleclient_pushgateway-0.4.0.jar:/opt/module/canal/bin/../lib/simpleclient_httpserver-0.4.0.jar:/opt/module/canal/bin/../lib/simpleclient_hotspot-0.4.0.jar:/opt/module/canal/bin/../lib/simpleclient_common-0.4.0.jar:/opt/module/canal/bin/../lib/simpleclient-0.4.0.jar:/opt/module/canal/bin/../lib/protobuf-java-3.6.1.jar:/opt/module/canal/bin/../lib/oro-2.0.8.jar:/opt/module/canal/bin/../lib/netty-all-4.1.6.Final.jar:/opt/module/canal/bin/../lib/netty-3.2.2.Final.jar:/opt/module/canal/bin/../lib/mysql-connector-java-5.1.48.jar:/opt/module/canal/bin/../lib/mybatis-spring-2.0.4.jar:/opt/module/canal/bin/../lib/mybatis-3.5.4.jar:/opt/module/canal/bin/../lib/logback-core-1.1.3.jar:/opt/module/canal/bin/../lib/logback-classic-1.1.3.jar:/opt/module/canal/bin/../lib/jsr305-3.0.2.jar:/opt/module/canal/bin/../lib/joda-time-2.9.4.jar:/opt/module/canal/bin/../lib/jctools-core-2.1.2.jar:/opt/module/canal/bin/../lib/jcl-over-slf4j-1.7.12.jar:/opt/module/canal/bin/../lib/javax.annotation-api-1.3.2.jar:/opt/module/canal/bin/../lib/j2objc-annotations-1.1.jar:/opt/module/canal/bin/../lib/httpcore-4.4.3.jar:/opt/module/canal/bin/../lib/httpclient-4.5.1.jar:/opt/module/canal/bin/../lib/h2-1.4.196.jar:/opt/module/canal/bin/../lib/guava-22.0.jar:/opt/module/canal/bin/../lib/fastjson-1.2.58.sec06.jar:/opt/module/canal/bin/../lib/error_prone_annotations-2.0.18.jar:/opt/module/canal/bin/../lib/druid-1.2.6.jar:/opt/module/canal/bin/../lib/disruptor-3.4.2.jar:/opt/module/canal/bin/../lib/connector.core-1.1.5.jar:/opt/module/canal/bin/../lib/commons-logging-1.2.jar:/opt/module/canal/bin/../lib/commons-lang3-3.7.jar:/opt/module/canal/bin/../lib/commons-lang-2.6.jar:/opt/module/canal/bin/../lib/commons-io-2.4.jar:/opt/module/canal/bin/../lib/commons-compress-1.9.jar:/opt/module/canal/bin/../lib/commons-codec-1.9.jar:/opt/module/canal/bin/../lib/commons-beanutils-1.8.2.jar:/opt/module/canal/bin/../lib/canal.store-1.1.5.jar:/opt/module/canal/bin/../lib/canal.sink-1.1.5.jar:/opt/module/canal/bin/../lib/canal.server-1.1.5.jar:/opt/module/canal/bin/../lib/canal.protocol-1.1.5.jar:/opt/module/canal/bin/../lib/canal.prometheus-1.1.5.jar:/opt/module/canal/bin/../lib/canal.parse.driver-1.1.5.jar:/opt/module/canal/bin/../lib/canal.parse.dbsync-1.1.5.jar:/opt/module/canal/bin/../lib/canal.parse-1.1.5.jar:/opt/module/canal/bin/../lib/canal.meta-1.1.5.jar:/opt/module/canal/bin/../lib/canal.instance.spring-1.1.5.jar:/opt/module/canal/bin/../lib/canal.instance.manager-1.1.5.jar:/opt/module/canal/bin/../lib/canal.instance.core-1.1.5.jar:/opt/module/canal/bin/../lib/canal.filter-1.1.5.jar:/opt/module/canal/bin/../lib/canal.deployer-1.1.5.jar:/opt/module/canal/bin/../lib/canal.common-1.1.5.jar:/opt/module/canal/bin/../lib/aviator-2.2.1.jar:/opt/module/canal/bin/../lib/animal-sniffer-annotations-1.14.jar:
cd to /opt/module/canal for continue
[root@hadoop002 canal]# jps
19970 Jps
19815 CanalLauncher
19592 Kafka
15979 QuorumPeerMain
[root@hadoop002 canal]# tail -f logs/canal/canal.log
2022-07-16 20:40:13.193 [Thread-6] INFO  com.alibaba.otter.canal.deployer.CanalController - ## stop the canal server[172.17.0.3(172.17.0.3):11111]
2022-07-16 20:40:13.200 [Thread-6] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## canal server is down.
2022-07-16 20:40:23.167 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2022-07-16 20:40:23.215 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2022-07-16 20:40:23.352 [main] WARN  org.apache.kafka.clients.producer.ProducerConfig - The configuration 'kerberos.enable' was supplied but isn't a known config.
2022-07-16 20:40:23.354 [main] WARN  org.apache.kafka.clients.producer.ProducerConfig - The configuration 'kerberos.krb5.file' was supplied but isn't a known config.
2022-07-16 20:40:23.354 [main] WARN  org.apache.kafka.clients.producer.ProducerConfig - The configuration 'kerberos.jaas.file' was supplied but isn't a known config.
2022-07-16 20:40:23.356 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2022-07-16 20:40:23.406 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.17.0.3(172.17.0.3):11111]
2022-07-16 20:40:25.003 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......

[root@hadoop002 canal]# tail -f logs/example/example.log
2022-07-16 20:40:12.198 [Thread-6] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - stop CannalInstance for null-example
2022-07-16 20:40:13.192 [Thread-6] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - stop successful....
2022-07-16 20:40:24.779 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2022-07-16 20:40:24.799 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2022-07-16 20:40:24.799 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
2022-07-16 20:40:24.998 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2022-07-16 20:40:25.044 [destination = example , address = hadoop001/172.17.0.16:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2022-07-16 20:40:25.069 [destination = example , address = hadoop001/172.17.0.16:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just last position
 {"identity":{"slaveId":-1,"sourceAddress":{"address":"hadoop001","port":3306}},"postion":{"gtid":"","included":false,"journalName":"master-bin.000006","position":4747,"serverId":1,"timestamp":1657967227000}}
2022-07-16 20:40:25.528 [destination = example , address = hadoop001/172.17.0.16:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> \
find start position successfully, EntryPosition[included=false,journalName=master-bin.000006,position=4747,serverId=1,gtid=,timestamp=1657967227000] cost : 472ms , \
the next step is binlog dump

The Cancel service start successfully.

6. Start Kafka Consumer in Hadoop003

1
[root@hadoop003 ~]# kafka-console-consumer.sh --bootstrap-server hadoop002:9092 --topic example

In Hadoop001, we insert a row to books table:

1
2
MariaDB [mall]> insert into books values(7, "Web Development");
Query OK, 1 row affected (0.00 sec)

Immediately we get the data change in Kafka consumer window:

1
2
[root@hadoop003 ~]# kafka-console-consumer.sh --bootstrap-server hadoop002:9092 --topic example
{"data":[{"id":"7","name":"Web Development"}],"database":"mall","es":1657975530000,"id":2,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(45)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"books","ts":1657975530937,"type":"INSERT"}

Great! Everything seems going on well.