Overview
Flink CEP is the Complex Event Processing library implemented on top of Flink, 
which allows you to detect event patterns in event stream.
Pattern Types
1
2
3
4
  |   start.times(3);
  start.times(3).optional();
  start.times(2,4).greedy();
  start.timesOrMore(2);
  | 
 
1
2
3
4
5
6
  |   start.next("next").where(...); // strict contiguity
  start.followed("next").where(...); // relaxed contiguity
  start.followedByAny("middle").where(...); // non-deterministic relaxed contiguity
  start.notNext("not").where(...);// NOT pattern with strict contiguity
  start.notFollowedBy("not").where(...); // NOT pattern with relaxed continguity
 | 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
  | Pattern<Event, ?> start = Pattern.begin(
    Pattern.<Event>begin("start").where(...).followedBy("start_middle").where(...)
);
// strict contiguity
Pattern<Event, ?> strict = start.next(
    Pattern.<Event>begin("next_start").where(...).followedBy("next_middle").where(...)
).times(3);
// relaxed contiguity
Pattern<Event, ?> relaxed = start.followedBy(
    Pattern.<Event>begin("followedby_start").where(...).followedBy("followedby_middle").where(...)
).oneOrMore();
// non-deterministic relaxed contiguity
Pattern<Event, ?> nonDetermin = start.followedByAny(
    Pattern.<Event>begin("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
).optional(); 
 | 
 
Practice
Set up a maven project
Add flink-cep and other flink libraries in pom.xml, the full project code can be downloaded at the end of this article:
1
2
3
4
5
  | <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-cep</artifactId>
    <version>1.15.0</version>
</dependency>
 | 
 
Requirement
We want to find users whose login failed match specified patterns, such as failed 2 times in 3 seconds, failed 3 or more than 3 times in 10 seconds.
the login event sample data as follows, the columns are user id, ip address, login status, and timestamp of the event:
1
2
3
4
5
  | 7233,86.226.15.75,success,1558430832
5692,80.149.25.29,success,1558430840
29607,66.249.73.135,success,1558430841
1035,83.149.9.216,fail,1558430842
1035,83.149.9.216,fail,1558430846
  | 
 
Task 1 – find users whose login failed 2 times in 3 seconds
1. Define a case class to map the event:
1
  | case class LoginEvent(userId: Long, ip: String, event: String, ts: Long)
  | 
 
2. Define source
Fead the log data from a file as experiment, in production environment, it probably came from a real-time pipeline like Kafka.
And then parse each line of the stream into LoginEvent:
1
2
3
4
5
6
7
8
9
  |     val eventStream = env.readTextFile("./data/login.log")
      .map(line => {
        val fields=line.split(",");
        LoginEvent(
          fields(0).trim.toLong,
          fields(1).trim,
          fields(2).trim,
          fields(3).trim.toLong)
      })
 | 
 
3. Define event timestamp and watermark
Define the event timestamp and watermark, and then group the stream by userId.
1
2
3
  |     eventStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LoginEvent](Time.seconds(5)) {
        override def extractTimestamp(t: LoginEvent): Long = t.ts * 1000L
      }).keyBy(_.userId);
 | 
 
4. Define the login fail pattern
1
2
3
4
  |     val failPattern = Pattern
      .begin[LoginEvent]("begin").where(_.event == "fail")
      .next("next").where(_.event == "fail")
      .within(Time.seconds(3))
 | 
 
5. Apply pattern to data stream
Apply pattern to data stream, and define handler function when find matched failed pattern, we just print the userId and time when failed.
1
2
3
4
5
6
7
8
9
10
  |     CEP.pattern(eventStream, failPattern)
      .select(new PatternSelectFunction[LoginEvent, String] {
      override def select(map: util.Map[String, util.List[LoginEvent]]): String = {
        val firstFail = map.get("begin").iterator().next()
        val lastFail = map.get("next").iterator().next()
        "user " + firstFail.userId.toString +
          " login failed at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")).format(new Date(firstFail.ts * 1000)) +
          " and " + (new SimpleDateFormat("HH:mm:ss")).format(new Date(lastFail.ts * 1000))
      }
    }).print()
 | 
 
6. Run the code
1
  |     env.execute("Login Fail in 3 seconds CEP")
 | 
 
Run the application, we get the result:
1
2
3
4
5
  | user 8455 login failed at 2019-05-21 17:27:45 and 17:27:47
user 8455 login failed at 2019-05-21 17:27:47 and 17:27:49
user 1035 login failed at 2019-05-21 17:27:22 and 17:27:23
user 1035 login failed at 2019-05-21 17:27:23 and 17:27:24
user 1035 login failed at 2019-05-21 17:27:24 and 17:27:26
  | 
 
Task 2 – find users whose login failed 3 or more than 3 times in 10 seconds
2.1 Change the pattern
Copy the code to another Object file, change the pattern to:
1
2
3
4
5
  |     // pattern
    val failPattern = Pattern
      .begin[LoginEvent]("begin").where(_.event == "fail")
      .timesOrMore(3)
      .within(Time.seconds(10))
 | 
 
2.2 Change the PatternSelectFunction
1
2
3
4
5
6
7
8
9
10
11
  |     patternStream.select(new PatternSelectFunction[LoginEvent, String] {
      override def select(map: util.Map[String, util.List[LoginEvent]]): String = {
        var iterator = map.get("begin").iterator()
        var res = ""
        while (iterator.hasNext) {
          var fail = iterator.next()
          res += fail.userId.toString + " login fails at: " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")).format(new Date(fail.ts * 1000)) +"\n"
        }
        res + "=============="
      }
    }).print
 | 
 
2.3 Run
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
  | 8455 login fails at: 2019-05-21 17:27:45
8455 login fails at: 2019-05-21 17:27:47
8455 login fails at: 2019-05-21 17:27:49
==============
8455 login fails at: 2019-05-21 17:27:45
8455 login fails at: 2019-05-21 17:27:47
8455 login fails at: 2019-05-21 17:27:49
8455 login fails at: 2019-05-21 17:27:52
==============
8455 login fails at: 2019-05-21 17:27:47
8455 login fails at: 2019-05-21 17:27:49
8455 login fails at: 2019-05-21 17:27:52
==============
1035 login fails at: 2019-05-21 17:27:22
1035 login fails at: 2019-05-21 17:27:23
1035 login fails at: 2019-05-21 17:27:24
==============
1035 login fails at: 2019-05-21 17:27:22
1035 login fails at: 2019-05-21 17:27:23
1035 login fails at: 2019-05-21 17:27:24
1035 login fails at: 2019-05-21 17:27:26
==============
1035 login fails at: 2019-05-21 17:27:23
1035 login fails at: 2019-05-21 17:27:24
1035 login fails at: 2019-05-21 17:27:26
==============
  | 
 
Download
Project code download:
Flink CEP
Reference: Flink CEP https://www.jianshu.com/p/4a6bfdf8fe63