Overview
Flink CDC is a feature of Flink to capture different databases data change. CDC Connectors integrates Debezium as the engine to capture data changes.
Installation
CDC is not installed with Flink by default, you need to download CDC Connectors first. Find you compatible connector with Supported Flink Versions, download the connector and put it under $FLINK_HOME/lib directory.
Syncing Data from MySQL to MySQL
Preparation
We have a table goods in MySQL under database mall, I want to sync data from this table to database “report”:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
MariaDB [(none)]> show create table mall.goods;
+-------+---------------------------------------------------------------------------------------------------------------+
| Table | Create Table |
+-------+---------------------------------------------------------------------------------------------------------------+
| goods | CREATE TABLE `goods` (
`goodsId` int(11) NOT NULL DEFAULT '0',
`goodsName` varchar(64) DEFAULT NULL,
`goodsPrice` decimal(6,2) DEFAULT NULL,
`goodsStock` int(11) DEFAULT NULL,
`goodsViews` int(11) DEFAULT NULL,
PRIMARY KEY (`goodsId`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 |
+-------+---------------------------------------------------------------------------------------------------------------+
1 row in set (0.00 sec)
MariaDB [(none)]> select * from mall.goods;
+---------+----------------+------------+------------+------------+
| goodsId | goodsName | goodsPrice | goodsStock | goodsViews |
+---------+----------------+------------+------------+------------+
| 1 | Apple iPhone X | 6399.00 | 1000 | 33445 |
| 2 | Apple Watch 3 | 4299.00 | 800 | 2235 |
| 3 | Apple iPad 8 | 3999.00 | 620 | 4883 |
| 4 | iPad 5 | 4688.00 | 100 | 4330 |
+---------+----------------+------------+------------+------------+
4 rows in set (0.00 sec)
Create table goods with the same DDL in report:
1
2
3
4
5
6
7
8
9
10
MariaDB [(none)]> create database report;
MariaDB [(none)]> use report;
MariaDB [(report)]> CREATE TABLE `goods` (
`goodsId` int(11) NOT NULL DEFAULT '0',
`goodsName` varchar(64) DEFAULT NULL,
`goodsPrice` decimal(6,2) DEFAULT NULL,
`goodsStock` int(11) DEFAULT NULL,
`goodsViews` int(11) DEFAULT NULL,
PRIMARY KEY (`goodsId`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
Start Flink Cluster
Start flink cluster first:
1
2
3
4
5
6
[root@hadoop001 ~]# start-cluster.sh
[root@hadoop001 ~]# jps
7825 QuorumPeerMain
26312 StandaloneSessionClusterEntrypoint
27145 Jps
26605 TaskManagerRunner
Define source and sink
Login to Flink SQL client:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
[root@hadoop001 ~]# sql-client.sh embedded
No default environment specified.
Searching for '/opt/module/flink-1.12.7/conf/sql-client-defaults.yaml'...found.
Reading default environment from: file:/opt/module/flink-1.12.7/conf/sql-client-defaults.yaml
No session environment specified.
Command history file path: /root/.flink-sql-history
????????
????????????????
??????? ??????? ?
???? ????????? ?????
??? ??????? ?????
??? ??? ?????
?? ???????????????
?? ? ??? ?????? ?????
????? ???? ????? ?????
??????? ??? ??????? ???
????????? ?? ?? ??????????
???????? ?? ? ?? ???????
???? ??? ? ?? ???????? ?????
???? ? ?? ? ?? ???????? ???? ??
???? ???? ?????????? ??? ?? ????
???? ?? ??? ??????????? ???? ? ? ???
??? ?? ??? ????????? ???? ???
?? ? ??????? ???????? ??? ??
??? ??? ???????????????????? ???? ?
????? ??? ?????? ???????? ???? ??
???????? ??????????????? ??
?? ???? ??????? ??? ?????? ?? ???
??? ??? ??? ??????? ???? ?????????????
??? ????? ???? ?? ?? ???? ???
?? ??? ? ?? ?? ??
?? ?? ?? ?? ????????
?? ????? ?? ??????????? ??
?? ???? ? ??????? ??
??? ????? ?? ???????????
???? ???? ??????? ????????
????? ?? ???? ?????
????????????????????????????????? ?????
______ _ _ _ _____ ____ _ _____ _ _ _ BETA
| ____| (_) | | / ____|/ __ \| | / ____| (_) | |
| |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_
| __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __|
| | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_
|_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
Flink SQL>
Define source table and sink table, make sure you already set proper permission to user flink:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Flink SQL> create table `goods_input`(
goodsId INT,
goodsName VARCHAR,
goodsPrice DECIMAL,
goodsStock INT,
goodsViews INT,
PRIMARY KEY(goodsId) NOT ENFORCED
)WITH(
'connector' = 'mysql-cdc',
'hostname' = 'hadoop001',
'port' = '3306',
'username' = 'flink',
'password' = '*****',
'database-name' ='mall',
'table-name' = 'goods'
);
Flink SQL> create table `goods_report`(
goodsId INT,
goodsName VARCHAR,
goodsPrice DECIMAL,
goodsStock INT,
goodsViews INT,
PRIMARY KEY(goodsId) NOT ENFORCED
)WITH(
'connector' = 'jdbc',
'url' = 'jdbc:mysql://hadoop001:3306/report?useSSL=false',
'username' = 'flink',
'password' = '*****',
'table-name' = 'goods'
);
Execute the CDC task
1
2
3
4
5
Flink SQL> insert into `goods_report` select * from `goods_input`;
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 88d9950004962513b4942ae4606cee86
Open the flink web ui, we can see the job is running:
Check the result
1
2
3
4
5
6
7
8
9
10
MariaDB [(none)]> select * from report.goods;
+---------+-----------------+------------+------------+------------+
| goodsId | goodsName | goodsPrice | goodsStock | goodsViews |
+---------+-----------------+------------+------------+------------+
| 1 | Apple iPhone X | 6399.00 | 1000 | 33445 |
| 2 | Apple Watch 3 | 4299.00 | 800 | 2235 |
| 3 | Apple iPad 8 | 3999.00 | 620 | 4883 |
| 4 | iPad 5 | 4688.00 | 100 | 4330 |
+---------+-----------------+------------+------------+------------+
4 rows in set (0.00 sec)
Let’s insert a record to the source table, and check the dest table:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
MariaDB [(none)]> insert into mall.goods values(5, "Apple Charger", 128.00, 2000, 262);
Query OK, 1 row affected (0.00 sec)
MariaDB [(none)]> select * from report.goods;
+---------+-----------------+------------+------------+------------+
| goodsId | goodsName | goodsPrice | goodsStock | goodsViews |
+---------+-----------------+------------+------------+------------+
| 1 | Apple iPhone X | 6399.00 | 1000 | 33445 |
| 2 | Apple Watch 3 | 4299.00 | 800 | 2235 |
| 3 | Apple iPad 8 | 3999.00 | 620 | 4883 |
| 4 | iPad 5 | 4688.00 | 100 | 4330 |
| 5 | Apple Charger | 128.00 | 2000 | 262 |
+---------+-----------------+------------+------------+------------+
5 rows in set (0.00 sec)
Everything works well.