Flink Tasks: Top Hot Products/Page View

Posted by Bourne's Blog - A Full-stack & Web3 Developer on July 9, 2022

1. Top Hot Products

Get top hot products according log continuously. The log has such columns: userId,productId,categoryId,behavior,timeStamp,sessionId, and the sample data looks like:

1
2
3
1021615,4355712,15138396,P,1601688552,913d5742-0fd8-46db-bb26-8cf0d09d90da
2178695,12498888,2595117,P,1601688552,3d29d1e2-d514-4991-bd79-9a866a10ad97
4306505,17843562,462120,P,1601688552,2dd496f4-7615-40f5-9d9e-bd5eab113519

1.1 Program

Create directory src/main/scala, and set it as “Sources” in [Project Structure > Module] page.

Create a case class named “UserBehavior” to define data model for each line:

1
2
3
4
5
6
7
8
9
10
package org.yukun

case class UserBehavior(
                         userId: Long,
                         productId: Long,
                         categoryId: Long,
                         behavior: String,
                         timeStamp: Long,
                         sessionId: String
                       )

Create a class named “Utils” to parse line convert to an UserBehavior:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package org.yukun

object Utils {
  def string2UserBehavior(line:String):UserBehavior ={
    val fields = line.split(",")
    UserBehavior(
      fields(0).trim.toLong,
      fields(1).trim.toLong,
      fields(2).trim.toLong,
      fields(3).trim,
      fields(4).trim.toLong,
      fields(5).trim
    )
  }
}

Create a class to define EventTimeExtractor:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package org.yukun

import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.watermark.Watermark

class EventTimeExtractor extends AssignerWithPeriodicWatermarks[UserBehavior] {
  var currentMaxEventTime = 0L;
  val maxOutOfOrderness = 10;
  override def getCurrentWatermark: Watermark = {
    new Watermark((currentMaxEventTime - maxOutOfOrderness) * 1000)
  }

  override def extractTimestamp(t: UserBehavior, l: Long): Long = {
    val timeStamp = t.timeStamp * 1000
    currentMaxEventTime = Math.max(timeStamp, currentMaxEventTime)
    timeStamp
  }
}

Create an object named “HotProduct”:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package org.yukun

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, KeyedProcessFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import java.sql.Timestamp
import scala.collection.mutable.ListBuffer

object HotProduct {

  case class ProductViewCount(productId: Long, windowEnd:Long, count: Long)

  def main(args: Array[String]): Unit = {
    println("hello world!")
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.readTextFile("./data/data1.csv")
      .map(Utils.string2UserBehavior(_))
      .assignTimestampsAndWatermarks(new EventTimeExtractor)
      .filter(_.behavior == "P")
      .keyBy(_.productId)
      .timeWindow(Time.hours(1), Time.minutes(5))
      .aggregate(new CountProduct, new WindowResult)
      .keyBy(_.windowEnd)
      .process(new TopHotProduct(3))
      .print

    env.execute("Hot Products ...")
  }

  class TopHotProduct(topN: Int) extends KeyedProcessFunction[Long, ProductViewCount, String] {
    private var productState:ListState[ProductViewCount] = _

    override def open(parameters: Configuration): Unit = {
      productState = getRuntimeContext.getListState(
        new ListStateDescriptor[ProductViewCount](
          "product-state",
          classOf[ProductViewCount]
        )
      )
    }

    override def processElement(i: ProductViewCount,
                                context: KeyedProcessFunction[Long, ProductViewCount, String]#Context,
                                collector: Collector[String]): Unit = {
      productState.add(i)
      context.timerService().registerEventTimeTimer(i.windowEnd + 1)
    }

    override def onTimer(timestamp: Long,
                         ctx: KeyedProcessFunction[Long, ProductViewCount, String]#OnTimerContext,
                         out: Collector[String]): Unit = {
      val allProduct:ListBuffer[ProductViewCount] = new ListBuffer[ProductViewCount]
      val iterable = productState.get.iterator()
      while (iterable.hasNext) {
        allProduct += iterable.next()
      }

      val sortedProducts = allProduct.sortWith(_.count > _.count).take(topN)
      productState.clear()

      val sb = new StringBuilder
      sb.append("Time: " + (new Timestamp(timestamp - 1)) + "\n")
      sortedProducts.foreach(product => {
        sb.append("product Id: "+product.productId +", access: " + product.count + "\n")
      })

      sb.append("===================")
      out.collect(sb.toString())
    }
  }
  class CountProduct extends AggregateFunction[UserBehavior, Long, Long] {
    override def createAccumulator(): Long = 0L

    override def add(in: UserBehavior, acc: Long): Long = acc + 1

    override def getResult(acc: Long): Long = acc

    override def merge(acc: Long, acc1: Long): Long = acc + acc1
  }

  class WindowResult extends WindowFunction[Long, ProductViewCount, Long,TimeWindow] {
    override def apply(key: Long,
                       window: TimeWindow,
                       input: Iterable[Long],
                       out: Collector[ProductViewCount]): Unit = {
      out.collect(ProductViewCount(key, window.getEnd, input.iterator.next()))
    }
  }

}

1.3 Run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Time: 2020-10-03 09:30:00.0
product Id: 10601868, access: 2
product Id: 27358782, access: 2
product Id: 18916440, access: 2
===================
Time: 2020-10-03 09:35:00.0
product Id: 28427400, access: 3
product Id: 1325508, access: 3
product Id: 5191422, access: 3
===================
Time: 2020-10-03 09:40:00.0
product Id: 21123024, access: 6
product Id: 14188074, access: 5
product Id: 3104010, access: 5

2. Product Page View

Compute page view every 10 seconds according previous log data.

2.1 Program

Create a class named “PageView”, and put the following code, in which we use classes such as EventTimeExtractor/Utils defined early.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package org.yukun

import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

object PageView {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    env.readTextFile("./data/data1.csv")
      .map(Utils.string2UserBehavior(_))
      .assignTimestampsAndWatermarks(new EventTimeExtractor)
      .filter(_.behavior == "P")
      .map(data => ("P", 1))
      .timeWindowAll(Time.seconds(10))
      .sum(1)
      .print()

    env.execute("Page View")
  }
}

2.2 Run

Run the program, we get the following output every 10 seconds:

1
2
3
4
5
6
7
8
(P,130)
(P,146)
(P,140)
(P,139)
(P,126)
(P,141)
(P,129)
(P,37)

3. Top N Hot Page

3.1 Requirement

Get top 3 hot pages of the last 10 minutes every 5 seconds, sameple log:

1
2
3
83.149.9.123 - - 17/05/2020:10:05:03 +0000 GET /presentations/logstash-kafkamonitor-2020/images/kibana-search.png
83.149.9.123 - - 17/05/2020:10:05:43 +0000 GET /presentations/logstash-kafkamonitor-2020/images/kibana-dashboard3.png
83.149.9.123 - - 17/05/2020:10:05:47 +0000 GET /presentations/logstash-kafkamonitor-2020/plugin/highlight/highlight.js

3.2 Program

Create a class named “HotPage”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package org.yukun

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, KeyedProcessFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import java.sql.Timestamp
import scala.collection.mutable.ListBuffer

case class ApacheLogEvent(
                   ip: String,
                   userId: String,
                   eventTime: Long,
                   method: String,
                   url: String
                   )

case class UrlViewCount(
                       url:String,
                       windowEnd: Long,
                       count: Long
                       )
object HotPage {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    env.readTextFile("./data/access.log")
      .map(Utils.string2ApacheLogEvent(_))
      .assignTimestampsAndWatermarks(new HotPageEventTimeExtractor)
      .keyBy(_.url)
      .timeWindow(Time.minutes(10), Time.seconds(5))
      .aggregate(new PageCountAgg, new PageWindowResult)
      .keyBy(_.windowEnd)
      .process(new TopNHotPage(3))
      .print()

    env.execute("Hot Page Count")
  }

  class HotPageEventTimeExtractor extends AssignerWithPeriodicWatermarks[ApacheLogEvent]{

    var currentMaxEventTime = 0L
    val maxOufOfOrderness = 10000
    override def getCurrentWatermark: Watermark = {
      new Watermark(currentMaxEventTime - maxOufOfOrderness)
    }

    override def extractTimestamp(element: ApacheLogEvent, previousElementTimestamp: Long): Long = {
      val timestamp = element.eventTime
      currentMaxEventTime = Math.max(element.eventTime, currentMaxEventTime)
      timestamp;
    }
  }

  class PageCountAgg extends AggregateFunction[ApacheLogEvent, Long, Long] {
    override def createAccumulator(): Long = 0L

    override def add(in: ApacheLogEvent, acc: Long): Long = acc + 1

    override def getResult(acc: Long): Long = acc

    override def merge(acc: Long, acc1: Long): Long = acc + acc1
  }

  class PageWindowResult extends WindowFunction[Long, UrlViewCount, String, TimeWindow] {
    override def apply(key: String,
                       window: TimeWindow,
                       input: Iterable[Long],
                       out: Collector[UrlViewCount]): Unit = {
      out.collect(UrlViewCount(key, window.getEnd, input.iterator.next()))
    }
  }

  class TopNHotPage(topN:Int) extends KeyedProcessFunction[Long, UrlViewCount, String] {
    lazy val urlState: MapState[String, Long] =
      getRuntimeContext.getMapState(new MapStateDescriptor[String, Long](
        "url-state", classOf[String], classOf[Long]
      ))

    override def processElement(i: UrlViewCount,
                                context: KeyedProcessFunction[Long, UrlViewCount, String]#Context,
                                collector: Collector[String]): Unit = {
      urlState.put(i.url, i.count)
      context.timerService().registerEventTimeTimer(i.windowEnd + 1)
    }

    override def onTimer(timestamp: Long,
                         ctx: KeyedProcessFunction[Long, UrlViewCount, String]#OnTimerContext,
                         out: Collector[String]): Unit = {
      val allUrlViews:ListBuffer[(String, Long)] = new ListBuffer[(String, Long)]
      val iter = urlState.entries().iterator()
      while (iter.hasNext) {
        val entry = iter.next()
        allUrlViews += ((entry.getKey, entry.getValue))
      }

      urlState.clear()

      val sortedUrlView = allUrlViews.sortWith(_._2 > _._2).take(topN)

      val result = new StringBuilder
      result.append("Time: "+(new Timestamp(timestamp-1))+"\n")
      sortedUrlView.foreach(view => {
        result.append("URL: "+view._1 +" Access: " +view._2 +"\n")
      })
      result.append("------------")
      out.collect(result.toString())
    }
  }
}

3.2 Run

Run the program, we get the following output every 10 seconds:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Time: 2020-05-20 06:15:25.0
URL: /style2.css Access: 9
URL: /reset.css Access: 9
URL: /images/jordan-80.png Access: 9
------------
Time: 2020-05-20 06:15:30.0
URL: /images/jordan-80.png Access: 9
URL: /style2.css Access: 8
URL: /favicon.ico Access: 8
------------
Time: 2020-05-20 06:15:35.0
URL: /images/jordan-80.png Access: 9
URL: /favicon.ico Access: 7
URL: /style2.css Access: 6
------------

4. Download

Project code and data download:
Flink Tasks