Overview
In data warehouse, every once in a while, for example, a day, we need to pull all amount of data from MySQL to HBase. Today, we create a Flink java project to do this pull work.
In this experiment, we pull a table named goods, which has the following columns.
1
2
3
4
5
6
7
8
9
10
MariaDB [mall]> describe goods;
+------------+--------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+------------+--------------+------+-----+---------+-------+
| goodsId | int(11) | YES | | NULL | |
| goodsName | varchar(64) | YES | | NULL | |
| goodsPrice | decimal(6,2) | YES | | NULL | |
| goodsStock | int(11) | YES | | NULL | |
| goodsViews | int(11) | YES | | NULL | |
+------------+--------------+------+-----+---------+-------+
Create a Maven project
Load the following dependencies in the pom.xml, the full file can be found at the end of this article.
- flink-java
- flink-streaming-java_2.11
- flink-scala_2.11
- flink-runtime-web_2.11
- flink-cep-scala_2.11
- flink-streaming-scala_2.11
- flink-connector-kafka_2.11
- flink-connector-redis_2.11
- flink-hadoop-compatibility_2.11
- flink-hbase_2.11
- protobuf-java
- flink-json
- flink-jdbc_2.11
- flink-table_2.11
- flink-shaded-hadoop2
- mysql-connector-java
- fastjson
- lombok
- protobuf-java
- canal.client
- slf4j-log4j12
- log4j
Program
Define the Config class
1
2
3
4
5
6
7
8
9
10
import java.io.Serializable;
public class Config implements Serializable {
public static final String DRIVER_CLASS = "com.mysql.jdbc.Driver";
public static final String DB_URL = "jdbc:mysql://you-mysql-host:3306/mall?useUnicode=true&characterEncoding=utf8";
public static final String USERNAME = "*****"; // use your username
public static final String PASSWORD = "*****"; // use your password
public static final int BATCH_SIZE = 2;
}
Replace the DB_URL/USERNAME/PASSWORD values with yours.
Define connector and source
Define a JDBC connector and create a data source from the connector .
1
2
3
4
5
6
7
8
9
10
JDBCInputFormat.JDBCInputFormatBuilder jdbc = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(Config.DRIVER_CLASS)
.setDBUrl(Config.DB_URL)
.setUsername(Config.USERNAME)
.setPassword(Config.PASSWORD)
.setQuery("select * from mall.goods")
.setRowTypeInfo(ROW_TYPE_INFO);
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<Row> source = env.createInput(jdbc.finish());
Create a HBase job from config
1
2
3
4
5
6
7
8
org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "hadoop001");
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("zookeeper.znode.parent", "/hbase");
conf.set(TableOutputFormat.OUTPUT_TABLE, "mall:goods");
conf.set("mapreduce.output.fileoutputformat.outputdir", "/tmp");
Job job = Job.getInstance(conf);
Put record into HBase for every row data
Define a function to deal with MySQL records:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public static DataSet<Tuple2<Text, Mutation>> converyMysqlToHBase(DataSet<Row> ds) {
return ds.map(new RichMapFunction<Row, Tuple2<Text, Mutation>>() {
private transient Tuple2<Text, Mutation> resultTp;
private byte[] cf = "info".getBytes(ConfigConstants.DEFAULT_CHARSET);
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
resultTp = new Tuple2<>();
}
@Override
public Tuple2<Text, Mutation> map(Row row) throws Exception {
resultTp.f0 = new Text(row.getField(0).toString());
Put put = new Put(row.getField(0).toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
if (row.getField(1) != null) {
put.addColumn(cf, Bytes.toBytes("goodsName"), Bytes.toBytes(row.getField(1).toString()));
}
put.addColumn(cf, Bytes.toBytes("goodsPrice"), Bytes.toBytes(row.getField(2).toString()));
put.addColumn(cf, Bytes.toBytes("goodsStock"), Bytes.toBytes(row.getField(3).toString()));
put.addColumn(cf, Bytes.toBytes("goodsViews"), Bytes.toBytes(row.getField(4).toString()));
resultTp.f1 = put;
return resultTp;
}
});
}
Start job
1
2
3
DataSet<Tuple2<Text, Mutation>> hbaseResult = converyMysqlToHBase(source);
hbaseResult.output(new HadoopOutputFormat<Text, Mutation>(new TableOutputFormat(), job));
env.execute("Full Pull Mysql to HBase");
Run
Before run the application, we should make sure the namespace “mall” and table “goods” be created in HBase.
1
2
3
4
5
hbase(main):008:0* create_namespace 'mall'
0 row(s) in 0.8850 seconds
hbase(main):009:0> create 'mall:goods', 'info'
0 row(s) in 1.3090 seconds
Then run the application.
Check the HBase table after running successfully:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
hbase(main):010:0> scan 'mall:goods'
ROW COLUMN+CELL
1 column=info:goodsName, timestamp=1658233279152, value=Apple iPhone X
1 column=info:goodsPrice, timestamp=1658233279152, value=6399.00
1 column=info:goodsStock, timestamp=1658233279152, value=10000
1 column=info:goodsViews, timestamp=1658233279152, value=34567
2 column=info:goodsName, timestamp=1658233279152, value=Vivo iQ100
2 column=info:goodsPrice, timestamp=1658233279152, value=3599.00
2 column=info:goodsStock, timestamp=1658233279152, value=20000
2 column=info:goodsViews, timestamp=1658233279152, value=3433
3 column=info:goodsName, timestamp=1658233279152, value=Apple Watch 3
3 column=info:goodsPrice, timestamp=1658233279152, value=2598.00
3 column=info:goodsStock, timestamp=1658233279152, value=2000
3 column=info:goodsViews, timestamp=1658233279152, value=34221
4 column=info:goodsName, timestamp=1658233279152, value=iPad 5
4 column=info:goodsPrice, timestamp=1658233279152, value=3988.00
4 column=info:goodsStock, timestamp=1658233279152, value=10000
4 column=info:goodsViews, timestamp=1658233279152, value=12293
4 row(s) in 0.0410 seconds
Download
Project code download:
Pull Data From MySQL to HBase