Flink CDC - Syncing Data from MySQL yo MySQL

Posted by Bourne's Blog - A Full-stack & Web3 Developer on July 29, 2022

Overview

Flink CDC is a feature of Flink to capture different databases data change. CDC Connectors integrates Debezium as the engine to capture data changes.

Installation

CDC is not installed with Flink by default, you need to download CDC Connectors first. Find you compatible connector with Supported Flink Versions, download the connector and put it under $FLINK_HOME/lib directory.

Syncing Data from MySQL to MySQL

Preparation

We have a table goods in MySQL under database mall, I want to sync data from this table to database “report”:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
MariaDB [(none)]> show create table mall.goods;
+-------+---------------------------------------------------------------------------------------------------------------+
| Table | Create Table                                                                                                  |
+-------+---------------------------------------------------------------------------------------------------------------+
| goods | CREATE TABLE `goods` (
  `goodsId` int(11) NOT NULL DEFAULT '0',
  `goodsName` varchar(64) DEFAULT NULL,
  `goodsPrice` decimal(6,2) DEFAULT NULL,
  `goodsStock` int(11) DEFAULT NULL,
  `goodsViews` int(11) DEFAULT NULL,
  PRIMARY KEY (`goodsId`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 |
+-------+---------------------------------------------------------------------------------------------------------------+
1 row in set (0.00 sec)

MariaDB [(none)]> select * from mall.goods;
+---------+----------------+------------+------------+------------+
| goodsId | goodsName      | goodsPrice | goodsStock | goodsViews |
+---------+----------------+------------+------------+------------+
|       1 | Apple iPhone X |    6399.00 |       1000 |      33445 |
|       2 | Apple Watch 3  |    4299.00 |        800 |       2235 |
|       3 | Apple iPad 8   |    3999.00 |        620 |       4883 |
|       4 | iPad 5         |    4688.00 |        100 |       4330 |
+---------+----------------+------------+------------+------------+
4 rows in set (0.00 sec)

Create table goods with the same DDL in report:

1
2
3
4
5
6
7
8
9
10
MariaDB [(none)]> create database report;
MariaDB [(none)]> use report;
MariaDB [(report)]> CREATE TABLE `goods` (
  `goodsId` int(11) NOT NULL DEFAULT '0',
  `goodsName` varchar(64) DEFAULT NULL,
  `goodsPrice` decimal(6,2) DEFAULT NULL,
  `goodsStock` int(11) DEFAULT NULL,
  `goodsViews` int(11) DEFAULT NULL,
  PRIMARY KEY (`goodsId`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

Start flink cluster first:

1
2
3
4
5
6
[root@hadoop001 ~]# start-cluster.sh
[root@hadoop001 ~]# jps
7825 QuorumPeerMain
26312 StandaloneSessionClusterEntrypoint
27145 Jps
26605 TaskManagerRunner

Define source and sink

Login to Flink SQL client:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
[root@hadoop001 ~]# sql-client.sh embedded
No default environment specified.
Searching for '/opt/module/flink-1.12.7/conf/sql-client-defaults.yaml'...found.
Reading default environment from: file:/opt/module/flink-1.12.7/conf/sql-client-defaults.yaml
No session environment specified.

Command history file path: /root/.flink-sql-history
                                   ????????
                               ????????????????
                            ???????        ???????  ?
                          ????   ?????????      ?????
                          ???         ???????    ?????
                            ???            ???   ?????
                              ??       ???????????????
                            ?? ?   ???       ?????? ?????
                            ?????   ????      ????? ?????
                         ???????       ???    ??????? ???
                   ????????? ??         ??    ??????????
                  ????????  ??           ?   ?? ???????
                ????  ???            ?  ?? ???????? ?????
               ???? ? ??          ? ?? ????????    ????  ??
              ???? ????          ??????????       ??? ?? ????
           ???? ?? ???       ???????????         ????  ? ?  ???
           ???  ?? ??? ?????????              ????           ???
           ??    ? ???????              ????????          ??? ??
           ???    ???    ????????????????????            ????  ?
          ????? ???   ??????   ????????                  ????  ??
          ????????  ???????????????                            ??
          ?? ????   ???????  ???       ??????    ??          ???
          ??? ???  ???  ???????            ????   ?????????????
           ??? ?????  ????  ??                ??      ????   ???
           ??   ???   ?     ??                ??              ??
            ??   ??         ??                 ??        ????????
             ?? ?????       ??                  ???????????    ??
              ??   ????      ?                    ???????      ??
               ???   ?????                         ?? ???????????
                ????    ????                     ??????? ????????
                  ?????                          ??  ????  ?????
                      ?????????????????????????????????  ?????

    ______ _ _       _       _____  ____  _         _____ _ _            _  BETA
   |  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |
   | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_
   |  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|
   | |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_
   |_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|

        Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.


Flink SQL>

Define source table and sink table, make sure you already set proper permission to user flink:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Flink SQL> create table `goods_input`(
goodsId INT,
goodsName VARCHAR,
goodsPrice DECIMAL,
goodsStock INT,
goodsViews INT,
PRIMARY KEY(goodsId) NOT ENFORCED
)WITH(
'connector' = 'mysql-cdc',
'hostname' = 'hadoop001',
'port' = '3306',
'username' = 'flink',
'password' = '*****',
'database-name' ='mall',
'table-name' = 'goods'
);

Flink SQL> create table `goods_report`(
goodsId INT,
goodsName VARCHAR,
goodsPrice DECIMAL,
goodsStock INT,
goodsViews INT,
PRIMARY KEY(goodsId) NOT ENFORCED
)WITH(
'connector' = 'jdbc',
'url' = 'jdbc:mysql://hadoop001:3306/report?useSSL=false',
'username' = 'flink',
'password' = '*****',
'table-name' = 'goods'
);

Execute the CDC task

1
2
3
4
5
Flink SQL> insert into `goods_report` select * from `goods_input`;
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 88d9950004962513b4942ae4606cee86

Open the flink web ui, we can see the job is running: Flink CDC Job

Check the result

1
2
3
4
5
6
7
8
9
10
MariaDB [(none)]> select * from report.goods;
+---------+-----------------+------------+------------+------------+
| goodsId | goodsName       | goodsPrice | goodsStock | goodsViews |
+---------+-----------------+------------+------------+------------+
|       1 | Apple iPhone X  |    6399.00 |       1000 |      33445 |
|       2 | Apple Watch 3   |    4299.00 |        800 |       2235 |
|       3 | Apple iPad 8    |    3999.00 |        620 |       4883 |
|       4 | iPad 5          |    4688.00 |        100 |       4330 |
+---------+-----------------+------------+------------+------------+
4 rows in set (0.00 sec)

Let’s insert a record to the source table, and check the dest table:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
MariaDB [(none)]> insert into mall.goods values(5, "Apple Charger", 128.00, 2000, 262);
Query OK, 1 row affected (0.00 sec)

MariaDB [(none)]> select * from report.goods;
+---------+-----------------+------------+------------+------------+
| goodsId | goodsName       | goodsPrice | goodsStock | goodsViews |
+---------+-----------------+------------+------------+------------+
|       1 | Apple iPhone X  |    6399.00 |       1000 |      33445 |
|       2 | Apple Watch 3   |    4299.00 |        800 |       2235 |
|       3 | Apple iPad 8    |    3999.00 |        620 |       4883 |
|       4 | iPad 5          |    4688.00 |        100 |       4330 |
|       5 | Apple Charger   |     128.00 |       2000 |        262 |
+---------+-----------------+------------+------------+------------+
5 rows in set (0.00 sec)

Everything works well.