Flink Realtime Word Counting

Posted by Bourne's Blog - A Full-stack & Web3 Developer on July 2, 2022

Word Count

Create a maven project, add org.apache.flink dependency in pom.xml.
the full file can be downloaded at the end of this article.

Task 1

Create a package named “com.example”, and then create a Class named “WordCount” under the package.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.example;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class WordCount {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> stream = env.socketTextStream("localhost",1234);
        SingleOutputStreamOperator<Tuple2<String, Integer>> result =
                stream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
                        String[] fields = line.split(",");
                        for (String word: fields) {
                            out.collect(new Tuple2<String, Integer>(word, 1));
                        }
                    }
                }).keyBy(0).sum(1);

        result.print();
        try {
            env.execute("wordcount ...");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

This example will open a socket connection to localhost on port 1234, and do computing as soon as receive messages from the connection.
Open a terminal and run “nc -l 1234” to open a incoming connection, run program, and then type some words in the “nc” window:

1
2
(base) ➜  nc -l 1234
I love flink

once I hit the [Enter] key to send the message, I got the flatMap result from Intellij IDEA run window:

1
2
3
6> (love,1)
7> (flink,1)
3> (I,1)

Type more message in nc window:

1
2
3
(base) ➜  nc -l 1234
I love flink
I love spark too

We got the result from running window:

1
2
3
4
5> (too,1)
6> (love,2)
3> (I,2)
1> (spark,1)

Task 2

Create a class named WordCount2 in the same package.
This time, we use a class WordAndOne in the FlapMapFunction, which attribute can be used in keyBy and sum operator.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class WordCount2 {
    public static void main(String[] args) {
        //...
        SingleOutputStreamOperator<WordAndOne> result = stream.flatMap(new FlatMapFunction<String, WordAndOne>() {
            @Override
            public void flatMap(String s, Collector<WordAndOne> collector) throws Exception {
                String[] words = s.split(" ");
                for (String word: words) {
                    collector.collect(new WordAndOne(word, 1));
                }
            }
        }).keyBy("word").sum("count");
        // ...
    }

    public static class WordAndOne{
        private String word;
        private Integer count;

        public WordAndOne(){}
        public WordAndOne(String word, Integer count) {
            this.word = word;
            this.count = count;
        }

        public void setWord(String word) {
            this.word = word;
        }

        public void setCount(Integer count) {
            this.count = count;
        }

        public String getWord() {
            return word;
        }

        public Integer getCount() {
            return count;
        }

        @Override
        public String toString() {
            return "WordAndOne{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }

}

Similarly, when we type the following messages in nc window afte running:

1
2
3
4
(base) ➜  ~ nc -l 1234
hello flink
hello spark
I love flink

We got the result perfectly:

1
2
3
4
5
6
7
8
9
7> WordAndOne{word='flink', count=1}
3> WordAndOne{word='hello', count=1}

1> WordAndOne{word='spark', count=1}
3> WordAndOne{word='hello', count=2}

3> WordAndOne{word='I', count=1}
7> WordAndOne{word='flink', count=2}
6> WordAndOne{word='love', count=1}

Task 3

Create a new class WordCount3, this time, we move the main task logical into to class named StringSplitTask:

1
2
3
4
5
6
7
8
9
    public static class StringSplitTask implements FlatMapFunction<String, WordAndOne>{
        @Override
        public void flatMap(String s, Collector<WordAndOne> collector) throws Exception {
            String[] words = s.split(" ");
            for (String word: words) {
                collector.collect(new WordAndOne(word, 1));
            }
        }
    }

and the main program simplified to just one line:

1
2
SingleOutputStreamOperator<WordAndOne> result =
        stream.flatMap(new StringSplitTask()).keyBy("word").sum("count");

Task 4

Change the stream source to a host and port specified by parameter to make the program more flexible:

1
2
3
4
import org.apache.flink.api.java.utils.ParameterTool;

ParameterTool paras = ParameterTool.fromArgs(args);
DataStreamSource<String> stream = env.socketTextStream(paras.get("host"), paras.getInt("port"));

the code can be downloaded here:
pom.xml
WordCount
WordCount2
WordCount3
WordCount4