Flink CEP Practice

Posted by Bourne's Blog - A Full-stack & Web3 Developer on July 25, 2022

Overview

Flink CEP is the Complex Event Processing library implemented on top of Flink, which allows you to detect event patterns in event stream.

Pattern Types

  • Individual Patterns
1
2
3
4
  start.times(3);
  start.times(3).optional();
  start.times(2,4).greedy();
  start.timesOrMore(2);
  • Combining Patterns
1
2
3
4
5
6
  start.next("next").where(...); // strict contiguity
  start.followed("next").where(...); // relaxed contiguity
  start.followedByAny("middle").where(...); // non-deterministic relaxed contiguity
  start.notNext("not").where(...);// NOT pattern with strict contiguity
  start.notFollowedBy("not").where(...); // NOT pattern with relaxed continguity

  • Groups of patterns
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Pattern<Event, ?> start = Pattern.begin(
    Pattern.<Event>begin("start").where(...).followedBy("start_middle").where(...)
);

// strict contiguity
Pattern<Event, ?> strict = start.next(
    Pattern.<Event>begin("next_start").where(...).followedBy("next_middle").where(...)
).times(3);

// relaxed contiguity
Pattern<Event, ?> relaxed = start.followedBy(
    Pattern.<Event>begin("followedby_start").where(...).followedBy("followedby_middle").where(...)
).oneOrMore();

// non-deterministic relaxed contiguity
Pattern<Event, ?> nonDetermin = start.followedByAny(
    Pattern.<Event>begin("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
).optional(); 

Practice

Set up a maven project

Add flink-cep and other flink libraries in pom.xml, the full project code can be downloaded at the end of this article:

1
2
3
4
5
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-cep</artifactId>
    <version>1.15.0</version>
</dependency>

Requirement

We want to find users whose login failed match specified patterns, such as failed 2 times in 3 seconds, failed 3 or more than 3 times in 10 seconds.

the login event sample data as follows, the columns are user id, ip address, login status, and timestamp of the event:

1
2
3
4
5
7233,86.226.15.75,success,1558430832
5692,80.149.25.29,success,1558430840
29607,66.249.73.135,success,1558430841
1035,83.149.9.216,fail,1558430842
1035,83.149.9.216,fail,1558430846

Task 1 – find users whose login failed 2 times in 3 seconds

1. Define a case class to map the event:

1
case class LoginEvent(userId: Long, ip: String, event: String, ts: Long)

2. Define source

Fead the log data from a file as experiment, in production environment, it probably came from a real-time pipeline like Kafka. And then parse each line of the stream into LoginEvent:

1
2
3
4
5
6
7
8
9
    val eventStream = env.readTextFile("./data/login.log")
      .map(line => {
        val fields=line.split(",");
        LoginEvent(
          fields(0).trim.toLong,
          fields(1).trim,
          fields(2).trim,
          fields(3).trim.toLong)
      })

3. Define event timestamp and watermark

Define the event timestamp and watermark, and then group the stream by userId.

1
2
3
    eventStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LoginEvent](Time.seconds(5)) {
        override def extractTimestamp(t: LoginEvent): Long = t.ts * 1000L
      }).keyBy(_.userId);

4. Define the login fail pattern

1
2
3
4
    val failPattern = Pattern
      .begin[LoginEvent]("begin").where(_.event == "fail")
      .next("next").where(_.event == "fail")
      .within(Time.seconds(3))

5. Apply pattern to data stream

Apply pattern to data stream, and define handler function when find matched failed pattern, we just print the userId and time when failed.

1
2
3
4
5
6
7
8
9
10
    CEP.pattern(eventStream, failPattern)
      .select(new PatternSelectFunction[LoginEvent, String] {
      override def select(map: util.Map[String, util.List[LoginEvent]]): String = {
        val firstFail = map.get("begin").iterator().next()
        val lastFail = map.get("next").iterator().next()
        "user " + firstFail.userId.toString +
          " login failed at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")).format(new Date(firstFail.ts * 1000)) +
          " and " + (new SimpleDateFormat("HH:mm:ss")).format(new Date(lastFail.ts * 1000))
      }
    }).print()

6. Run the code

1
    env.execute("Login Fail in 3 seconds CEP")

Run the application, we get the result:

1
2
3
4
5
user 8455 login failed at 2019-05-21 17:27:45 and 17:27:47
user 8455 login failed at 2019-05-21 17:27:47 and 17:27:49
user 1035 login failed at 2019-05-21 17:27:22 and 17:27:23
user 1035 login failed at 2019-05-21 17:27:23 and 17:27:24
user 1035 login failed at 2019-05-21 17:27:24 and 17:27:26

Task 2 – find users whose login failed 3 or more than 3 times in 10 seconds

2.1 Change the pattern

Copy the code to another Object file, change the pattern to:

1
2
3
4
5
    // pattern
    val failPattern = Pattern
      .begin[LoginEvent]("begin").where(_.event == "fail")
      .timesOrMore(3)
      .within(Time.seconds(10))

2.2 Change the PatternSelectFunction

1
2
3
4
5
6
7
8
9
10
11
    patternStream.select(new PatternSelectFunction[LoginEvent, String] {
      override def select(map: util.Map[String, util.List[LoginEvent]]): String = {
        var iterator = map.get("begin").iterator()
        var res = ""
        while (iterator.hasNext) {
          var fail = iterator.next()
          res += fail.userId.toString + " login fails at: " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")).format(new Date(fail.ts * 1000)) +"\n"
        }
        res + "=============="
      }
    }).print

2.3 Run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
8455 login fails at: 2019-05-21 17:27:45
8455 login fails at: 2019-05-21 17:27:47
8455 login fails at: 2019-05-21 17:27:49
==============
8455 login fails at: 2019-05-21 17:27:45
8455 login fails at: 2019-05-21 17:27:47
8455 login fails at: 2019-05-21 17:27:49
8455 login fails at: 2019-05-21 17:27:52
==============
8455 login fails at: 2019-05-21 17:27:47
8455 login fails at: 2019-05-21 17:27:49
8455 login fails at: 2019-05-21 17:27:52
==============
1035 login fails at: 2019-05-21 17:27:22
1035 login fails at: 2019-05-21 17:27:23
1035 login fails at: 2019-05-21 17:27:24
==============
1035 login fails at: 2019-05-21 17:27:22
1035 login fails at: 2019-05-21 17:27:23
1035 login fails at: 2019-05-21 17:27:24
1035 login fails at: 2019-05-21 17:27:26
==============
1035 login fails at: 2019-05-21 17:27:23
1035 login fails at: 2019-05-21 17:27:24
1035 login fails at: 2019-05-21 17:27:26
==============

Download

Project code download:
Flink CEP

Reference: Flink CEP https://www.jianshu.com/p/4a6bfdf8fe63