Spark RDD - Spark Shell several ways to do word count
find the top 10 ranking words used in an article
1. upload news file to hdfs
1
2
3
[root@hadoop001 ~]# hdfs dfs -put news.txt /dir1
[root@hadoop001 ~]# hdfs dfs -ls /dir1
-rw-r--r-- 3 root supergroup 13882 2022-06-01 18:28 /dir1/news.txt
2. Start Spark Shell
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[root@hadoop001 ~]# spark-shell
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/spark-2.4.5-bin-hadoop2.7/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
22/06/01 19:35:58 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://hadoop001:4040
Spark context available as 'sc' (master = local[*], app id = local-1654083402486).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ ''_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.5
/_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_231)
Type in expressions to have them evaluated.
Type :help for more information.
3. Load text file
1
2
scala> val rdd = sc.textFile("/dir1/news.txt")
f: org.apache.spark.rdd.RDD[String] = /dir1/news.txt MapPartitionsRDD[15] at textFile at <console>:24
4. transform the RDD and made some actions to get the top 10 ranking words
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
scala> rdd.flatMap(line =>line.split(" ")).
| filter(x=>x.nonEmpty).
| map(x=>(x,1)).
| reduceByKey((a,b)=>a+b).
| sortBy(x=>x._2, false).
| take(10).
| foreach(println)
(the,72)
(to,63)
(a,51)
(of,50)
(in,40)
(on,33)
(and,32)
(for,26)
(at,24)
(her,23)
method | description |
---|---|
flatMap(line =>line.split(“ “)) | split each row by space and put all the words in an array |
filter(x=>x.nonEmpty) | root out emepty element |
map(x=>(x,1)) | give each word an initial number - 1 |
reduceByKey((a,b) => a+b) | aggregate the numbers for every same word(key) |
sortBy(x=>x._2, false) | sort by number(2nd column) descendingly |
take(10) | take the top 10 element |
foreach(println) | print each element |
5. Other ways to do word count.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
scala> rdd.flatMap(_.split(" ")).
filter(_.nonEmpty).
map((_,1)).
foldByKey(0)(_+_).
sortBy(x=>x._2,false).
take(10).
foreach(println)
(the,72)
(to,63)
(a,51)
(of,50)
(in,40)
(on,33)
(and,32)
(for,26)
(at,24)
(her,23)
scala> rdd.flatMap(_.split(" ")).
filter(_.nonEmpty).
map((_,1)).
aggregateByKey(0)(_+_, _+_).
sortBy(x=>x._2,false).
take(10).
foreach(println)
(the,72)
(to,63)
(a,51)
(of,50)
(in,40)
(on,33)
(and,32)
(for,26)
(at,24)
(her,23)
scala> rdd.flatMap(_.split(" ")).
filter(_.nonEmpty).
map((_,1)).
groupByKey.
map(x => (x._1, x._2.sum)).
sortBy(x=>x._2,false).
collect.
take(10).
foreach(println)
(the,72)
(to,63)
(a,51)
(of,50)
(in,40)
(on,33)
(and,32)
(for,26)
(at,24)
(her,23)