Overview
Flink CEP is the Complex Event Processing library implemented on top of Flink,
which allows you to detect event patterns in event stream.
Pattern Types
1
2
3
4
| start.times(3);
start.times(3).optional();
start.times(2,4).greedy();
start.timesOrMore(2);
|
1
2
3
4
5
6
| start.next("next").where(...); // strict contiguity
start.followed("next").where(...); // relaxed contiguity
start.followedByAny("middle").where(...); // non-deterministic relaxed contiguity
start.notNext("not").where(...);// NOT pattern with strict contiguity
start.notFollowedBy("not").where(...); // NOT pattern with relaxed continguity
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| Pattern<Event, ?> start = Pattern.begin(
Pattern.<Event>begin("start").where(...).followedBy("start_middle").where(...)
);
// strict contiguity
Pattern<Event, ?> strict = start.next(
Pattern.<Event>begin("next_start").where(...).followedBy("next_middle").where(...)
).times(3);
// relaxed contiguity
Pattern<Event, ?> relaxed = start.followedBy(
Pattern.<Event>begin("followedby_start").where(...).followedBy("followedby_middle").where(...)
).oneOrMore();
// non-deterministic relaxed contiguity
Pattern<Event, ?> nonDetermin = start.followedByAny(
Pattern.<Event>begin("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
).optional();
|
Practice
Set up a maven project
Add flink-cep and other flink libraries in pom.xml, the full project code can be downloaded at the end of this article:
1
2
3
4
5
| <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep</artifactId>
<version>1.15.0</version>
</dependency>
|
Requirement
We want to find users whose login failed match specified patterns, such as failed 2 times in 3 seconds, failed 3 or more than 3 times in 10 seconds.
the login event sample data as follows, the columns are user id, ip address, login status, and timestamp of the event:
1
2
3
4
5
| 7233,86.226.15.75,success,1558430832
5692,80.149.25.29,success,1558430840
29607,66.249.73.135,success,1558430841
1035,83.149.9.216,fail,1558430842
1035,83.149.9.216,fail,1558430846
|
Task 1 – find users whose login failed 2 times in 3 seconds
1. Define a case class to map the event:
1
| case class LoginEvent(userId: Long, ip: String, event: String, ts: Long)
|
2. Define source
Fead the log data from a file as experiment, in production environment, it probably came from a real-time pipeline like Kafka.
And then parse each line of the stream into LoginEvent:
1
2
3
4
5
6
7
8
9
| val eventStream = env.readTextFile("./data/login.log")
.map(line => {
val fields=line.split(",");
LoginEvent(
fields(0).trim.toLong,
fields(1).trim,
fields(2).trim,
fields(3).trim.toLong)
})
|
3. Define event timestamp and watermark
Define the event timestamp and watermark, and then group the stream by userId.
1
2
3
| eventStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LoginEvent](Time.seconds(5)) {
override def extractTimestamp(t: LoginEvent): Long = t.ts * 1000L
}).keyBy(_.userId);
|
4. Define the login fail pattern
1
2
3
4
| val failPattern = Pattern
.begin[LoginEvent]("begin").where(_.event == "fail")
.next("next").where(_.event == "fail")
.within(Time.seconds(3))
|
5. Apply pattern to data stream
Apply pattern to data stream, and define handler function when find matched failed pattern, we just print the userId and time when failed.
1
2
3
4
5
6
7
8
9
10
| CEP.pattern(eventStream, failPattern)
.select(new PatternSelectFunction[LoginEvent, String] {
override def select(map: util.Map[String, util.List[LoginEvent]]): String = {
val firstFail = map.get("begin").iterator().next()
val lastFail = map.get("next").iterator().next()
"user " + firstFail.userId.toString +
" login failed at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")).format(new Date(firstFail.ts * 1000)) +
" and " + (new SimpleDateFormat("HH:mm:ss")).format(new Date(lastFail.ts * 1000))
}
}).print()
|
6. Run the code
1
| env.execute("Login Fail in 3 seconds CEP")
|
Run the application, we get the result:
1
2
3
4
5
| user 8455 login failed at 2019-05-21 17:27:45 and 17:27:47
user 8455 login failed at 2019-05-21 17:27:47 and 17:27:49
user 1035 login failed at 2019-05-21 17:27:22 and 17:27:23
user 1035 login failed at 2019-05-21 17:27:23 and 17:27:24
user 1035 login failed at 2019-05-21 17:27:24 and 17:27:26
|
Task 2 – find users whose login failed 3 or more than 3 times in 10 seconds
2.1 Change the pattern
Copy the code to another Object file, change the pattern to:
1
2
3
4
5
| // pattern
val failPattern = Pattern
.begin[LoginEvent]("begin").where(_.event == "fail")
.timesOrMore(3)
.within(Time.seconds(10))
|
2.2 Change the PatternSelectFunction
1
2
3
4
5
6
7
8
9
10
11
| patternStream.select(new PatternSelectFunction[LoginEvent, String] {
override def select(map: util.Map[String, util.List[LoginEvent]]): String = {
var iterator = map.get("begin").iterator()
var res = ""
while (iterator.hasNext) {
var fail = iterator.next()
res += fail.userId.toString + " login fails at: " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")).format(new Date(fail.ts * 1000)) +"\n"
}
res + "=============="
}
}).print
|
2.3 Run
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| 8455 login fails at: 2019-05-21 17:27:45
8455 login fails at: 2019-05-21 17:27:47
8455 login fails at: 2019-05-21 17:27:49
==============
8455 login fails at: 2019-05-21 17:27:45
8455 login fails at: 2019-05-21 17:27:47
8455 login fails at: 2019-05-21 17:27:49
8455 login fails at: 2019-05-21 17:27:52
==============
8455 login fails at: 2019-05-21 17:27:47
8455 login fails at: 2019-05-21 17:27:49
8455 login fails at: 2019-05-21 17:27:52
==============
1035 login fails at: 2019-05-21 17:27:22
1035 login fails at: 2019-05-21 17:27:23
1035 login fails at: 2019-05-21 17:27:24
==============
1035 login fails at: 2019-05-21 17:27:22
1035 login fails at: 2019-05-21 17:27:23
1035 login fails at: 2019-05-21 17:27:24
1035 login fails at: 2019-05-21 17:27:26
==============
1035 login fails at: 2019-05-21 17:27:23
1035 login fails at: 2019-05-21 17:27:24
1035 login fails at: 2019-05-21 17:27:26
==============
|
Download
Project code download:
Flink CEP
Reference: Flink CEP https://www.jianshu.com/p/4a6bfdf8fe63