Spark Machine Learning

Posted by Bourne's Blog - A Full-stack & Web3 Developer on June 25, 2022

1 Core concept

  • Transformer, implement transform() method which can transform a dataframe to another dataframe with more columns;
  • Estimator, implement fit() method which manipulate a dataframe and produce a transformer/model.
  • Pipeline, connect multiple transformer and estimator to a work flow.

2 Task

2.1 predict whether a line contains word “spark”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
[root@hadoop001 ~]# spark-shell
Spark context Web UI available at http://hadoop001:4040
Spark context available as 'sc' (master = local[*], app id = local-1656150455596).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  `_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.5
      /_/

Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_231)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

scala> import spark.implicits._
import spark.implicits._

scala> import org.apache.spark.ml.feature._
import org.apache.spark.ml.feature._

scala> import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.classification.LogisticRegression

scala> import org.apache.spark.ml.{Pipeline,PipelineModel}
import org.apache.spark.ml.{Pipeline, PipelineModel}

scala> import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.linalg.Vector

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> val training = spark.createDataFrame(Seq(
    | (0L, "a b c d e spark", 1.0), 
    | (1L, "b d", 0.0), 
    | (2L, "spark f g h", 1.0),
    | (3L, "hadoop mapreduce", 0.0)
    | )).toDF("id", "text", "label")
training: org.apache.spark.sql.DataFrame = [id: bigint, text: string ... 1 more field]


scala> training.printSchema
root
 |-- id: long (nullable = false)
 |-- text: string (nullable = true)
 |-- label: double (nullable = false)


scala> training.show
+---+----------------+-----+
| id|            text|label|
+---+----------------+-----+
|  0| a b c d e spark|  1.0|
|  1|             b d|  0.0|
|  2|     spark f g h|  1.0|
|  3|hadoop mapreduce|  0.0|
+---+----------------+-----+


scala> val tokenizer = new Tokenizer().
    | setInputCol("text").
    | setOutputCol("words")
tokenizer: org.apache.spark.ml.feature.Tokenizer = tok_c274e39d1269

scala> val hashingTF = new HashingTF().
     | setNumFeatures(1000).
     | setInputCol(tokenizer.getOutputCol).
     | setOutputCol("features")
hashingTF: org.apache.spark.ml.feature.HashingTF = hashingTF_4b69551b45e1

scala> val lr = new LogisticRegression().
     | setMaxIter(10).
     | setRegParam(0.01)
lr: org.apache.spark.ml.classification.LogisticRegression = logreg_fe387b0c61e0

scala> val pipeline = new Pipeline().
     | setStages(Array(tokenizer, hashingTF, lr))
pipeline: org.apache.spark.ml.Pipeline = pipeline_079a563f3f95

scala> val model = pipeline.fit(training)
model: org.apache.spark.ml.PipelineModel = pipeline_079a563f3f95

scala> val test = spark.createDataFrame(Seq(
     | (4L, "spark i j k"),
     | (5L, "l m n"),
     | (6L, "spark a"),
     | (7L, "apache hadoop")
     | )).toDF("id", "text")
test: org.apache.spark.sql.DataFrame = [id: bigint, text: string]

scala> model.transform(test).
     | select("id", "text", "probability", "prediction").
     | collect().
     | foreach{case Row(id:Long, text:String, prob:Vector, prediction:Double) => println(s"($id, $text) --> prob=$prob, prediction=$prediction")}
(4, spark i j k) --> prob=[0.540643354485232,0.45935664551476796], prediction=0.0
(5, l m n) --> prob=[0.9334382627383527,0.06656173726164716], prediction=0.0
(6, spark a) --> prob=[0.1504143004807332,0.8495856995192668], prediction=1.0
(7, apache hadoop) --> prob=[0.9768636139518375,0.02313638604816238], prediction=0.0

We can see the model failed to predict 4L “spark i j k”.

2.2 training with a larger dataset

Add more lines in training data set, and mark the lines contain “spark” with 1.0 .

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
scala> val training1 = spark.createDataFrame(Seq(
    | (0L, "a b c d e spark", 1.0), 
    | (1L, "b d", 0.0), 
    | (2L, "spark f g h", 1.0),
    | (3L, "hadoop mapreduce", 0.0)
    | (4L, "hello spark", 1.0)
    | (5L, "spark is a good framework", 1.0)
    | (6L, "many developers in the world are using spark, which is a good tool!", 1.0)
    | (7L, "spark contains 4 components", 1.0)
    | (8L, "big data platform hadoop", 0.0)
    | (9L, "someone like flink", 0.0)
    | (10L, "Hdfs is a great invention", 0.0)
    | )).toDF("id", "text", "label")
    
scala> training1.show
+---+--------------------+-----+
| id|                text|label|
+---+--------------------+-----+
|  0|     a b c d e spark|  1.0|
|  1|                 b d|  0.0|
|  2|         spark f g h|  1.0|
|  3|    hadoop mapreduce|  0.0|
|  4|         hello spark|  1.0|
|  5|spark is a good f...|  1.0|
|  6|many developers i...|  1.0|
|  7|spark contains 4 ...|  1.0|
|  8|big data platform...|  0.0|
|  9|  someone like flink|  0.0|
| 10|Hdfs is a great i...|  0.0|
+---+--------------------+-----+

scala> val model1 = pipeline.fit(training1)
model1: org.apache.spark.ml.PipelineModel = pipeline_079a563f3f95

scala> model1.transform(test).
     |       select("id", "text", "probability", "prediction").
     |       collect().
     |       foreach{case Row(id:Long, text:String, prob:Vector, prediction:Double) => println(s"($id, $text) --> prob=$prob, prediction=$prediction")}
(4, spark i j k) --> prob=[0.18786259772300723,0.8121374022769927], prediction=1.0
(5, l m n) --> prob=[0.7648742169322029,0.23512578306779716], prediction=0.0
(6, spark a) --> prob=[0.09369154387609983,0.9063084561239001], prediction=1.0
(7, apache hadoop) --> prob=[0.9276280847186241,0.07237191528137597], prediction=0.0
       

this time, the model1 perfectly predict the test data.