Task 1
A bookstore selling the following number of books per day in the last N days, calculate the average daily sales of each book.
Day 1: (“spark”,2), (“hadoop”,6),
Day 2: (“hadoop”,4),(“spark”,6),
…
1
2
3
4
5
6
7
8
9
10
scala> val rdd = sc.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:23
scala> rdd.mapValues(x=>(x,1)).
| reduceByKey((x,y)=>(x._1+y._1, x._2+y._2)).
| mapValues(x=>x._1/x._2).
| foreach(println)
(spark,4)
(hadoop,5)
Task 2
A list of accounts and balance stored in a file like:
1
2
3
4
5
6
7
8
9
hadoop@apache 200
hive@apache 550
yarn@apache 580
hive@apache 159
hadoop@apache 300
hive@apache 258
hadoop@apache 150
yarn@apache 560
yarn@apache 260
Aggregate the list by account, sort by account and then by balance.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
scala> val rdd = sc.textFile("/data/balance.txt")
rdd: org.apache.spark.rdd.RDD[String] = /data/balance.txt MapPartitionsRDD[1] at textFile at <console>:23
scala> rdd.
| map(x => x.replaceAll("\\s+"," ").split(" ")).
| map(x => (x(0),x(1))).
| repartition(1).
| groupByKey().
| mapValues(x => x.toList.sortBy(x=>x)).
| sortByKey().
| foreach(println)
(hadoop@apache,List(150, 200, 300))
(hive@apache,List(159, 258, 550))
(yarn@apache,List(260, 560, 580))
Task 3
Columns in log file means order_id, user_id, payment amount, and product_id. The content of the log file looks like:
1
2
3
4
5
6
100,4287,226,233
101,6562,489,124
102,1124,33,17
103,3267,159,179
104,4569,57,125
105,1438,37,116
Calculate the top N payment amount.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
scala> val rdd = sc.textFile("/data/order.log")
rdd: org.apache.spark.rdd.RDD[String] = /data/order.log MapPartitionsRDD[1] at textFile at <console>:23
scala> rdd.
map(x=>x.split(',')(2).toInt).
top(3).
foreach(println)
489
226
159
scala> rdd.
map(x=>(x.split(',')(2).toInt,1)).
repartition(1).
sortByKey(false).
take(3).
foreach(x=>println(x._1))
489
226
159