Spark User Analyses

Posted by Bourne's Blog - A Full-stack & Web3 Developer on July 5, 2022

1. Requirement

Analysis of the distribution of users by province, age group and sex, based on user registration data from a website.

The sample of desensitised data looks like:

1
2
3
{"AGE":48,"BIRTHDAY":"1973","BPLACE":"吉林省吉林市磐石县","IDTYPE":"01","RNAME":"崔**","SEX":"女"}
{"AGE":45,"BIRTHDAY":"1976","BPLACE":"四川省重庆市綦江县","IDTYPE":"01","RNAME":"朱**","SEX":"男"}
{"AGE":26,"BIRTHDAY":"1995","BPLACE":"浙江省杭州市富阳市","IDTYPE":"01","RNAME":"骆**","SEX":"男"}

The full file and project code are provided at the end of this article.

2. Developemnt

2.1 pom.xml

Create a maven project, and load scala/spark/hadoop/maven-compiler-plugin in pom.xml:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.3.0</spark.version>
        <hadoop.version>2.6.5</hadoop.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
    </dependencies>
    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>    

2.2 Program

Create directory src/main/scala, and set it as “Sources” in [Project Structure > Module] page.

Create a package and an object UserReport under this package.

2.2.1 Get province

we need to extract the province from address, there we need to do it by registering a UDF. In your development, you better debug the UDF first to ensure its bug free.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
scala> def getProvince(addr: String):String = {
     |       try {
     |         val p = "^(.*省|.*自治区|.*市).*".r
     |         val p(province) = addr
     |         province
     |       }catch{
     |         case _ => ""
     |       }
     |     }
<console>:32: warning: This catches all Throwables. If this is really intended, use `case _ : Throwable` to clear this warning.
               case _ => ""
                    ^
getProvince: (addr: String)String

scala> getProvince("广西壮族自治区南宁地区横县")
res63: String = 广西壮族自治区

scala> getProvince("广东省汕头市潮州市")
res64: String = 广东省

scala> getProvince("浙江省温州市永嘉县")
res65: String = 浙江省

scala> getProvince("上海市黄浦区打浦路")
res66: String = 上海市

2.2.2 Get age group

Based on a user’s age, we want get age range such as 30s/40s/50s, also we need a UDF to do this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
scala> def getAgeGroup (age: Int) = {
     |       try {
     |         val g = (age/10) * 10
     |         if (g == 0){
     |           "1-9"
     |         } else {
     |           g.toString + "s"
     |         }
     |       } catch{
     |         case _ => ""
     |       }
     |     }
<console>:36: warning: This catches all Throwables. If this is really intended, use `case _ : Throwable` to clear this warning.
               case _ => ""
                    ^
getAgeGroup: (age: Int)String

scala> getAgeGroup(5)
res72: String = 1-9

scala> getAgeGroup(9)
res73: String = 1-9

scala> getAgeGroup(33)
res67: String = 30s

scala> getAgeGroup(49)
res69: String = 40s

scala> getAgeGroup(56)
res70: String = 50s

scala> getAgeGroup(102)
res71: String = 100s

2.2.3 SQL analyses

put the following code in UserReport.scala:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package org.yukun

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{udf, when}

object UserReport {
  def main(args: Array[String]): Unit = {
    println("hello user report");
    if (args.length < 1) {
      println(
        """
          |Usage: org.yukun.UserReport <data-path>
          |""".stripMargin)
      System.exit(0)
    }

    val Array(dataPath) = args
    val conf = new SparkConf()
    conf.setAppName("User Report")
    conf.setMaster("local")
    val spark = SparkSession.builder().config(conf).getOrCreate()
    val df = spark.read.json(dataPath)
    df.createOrReplaceTempView("persons")

    val getProvince = udf((addr: String) => {
      try {
        val p = "^(.*省|.*自治区|.*市).*".r
        val p(province) = addr
        province
      }catch{
        case _ => ""
      }
    })
    val getAgeGroup = udf((age: Int) => {
      try {
        val g = (age/10) * 10
        if (g == 0){
          "1-9"
        } else {
          g.toString + "s"
        }
      } catch{
        case _ => ""
      }
    })
    spark.udf.register("getProvince", getProvince)
    spark.udf.register("getAgeGroup", getAgeGroup)
    spark.sql("select getProvince(BPLACE) as province, count(*) as total from persons group by province order by total desc")
      .repartition(1).write.json("./report/province")

    spark.sql("select SEX, count(*) as total from persons group by SEX order by total desc")
      .repartition(1).write.json("./report/sex")

    spark.sql("select getAgeGroup(AGE) as age_group, count(*) as total from persons group by age_group order by age_group")
      .repartition(1).write.json("./report/age-group")

    spark.stop
  }
}

3. Run

Click [Run -> Edit Configurations], in the Program arguments field, input data path “ ./data/person_info_1000.json”. Run the program, we get the result in report directory:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
(base) ➜  SparkUserReport find report 
report
report/sex
report/sex/._SUCCESS.crc
report/sex/part-00000-9e393740-8e74-4ad8-be54-17cc795bcf5d-c000.json
report/sex/_SUCCESS
report/sex/.part-00000-9e393740-8e74-4ad8-be54-17cc795bcf5d-c000.json.crc
report/province
report/province/part-00000-6decd3c2-1a58-476f-870a-08e136875bb3-c000.json
report/province/._SUCCESS.crc
report/province/.part-00000-6decd3c2-1a58-476f-870a-08e136875bb3-c000.json.crc
report/province/_SUCCESS
report/age-group
report/age-group/._SUCCESS.crc
report/age-group/_SUCCESS
report/age-group/.part-00000-b91f9ff5-f13c-4b19-b808-ec5bf10e2bac-c000.json.crc
report/age-group/part-00000-b91f9ff5-f13c-4b19-b808-ec5bf10e2bac-c000.json

(base) ➜  SparkUserReport cat report/sex/part-00000-9e393740-8e74-4ad8-be54-17cc795bcf5d-c000.json 
{"SEX":"男","total":523}
{"SEX":"女","total":470}
{"total":7}
(base) ➜  SparkUserReport cat report/province/part-00000-6decd3c2-1a58-476f-870a-08e136875bb3-c000.json 
{"province":"江苏省","total":104}
{"province":"安徽省","total":100}
{"province":"四川省","total":97}
{"province":"湖南省","total":71}
{"province":"浙江省","total":69}
{"province":"河南省","total":62}
{"province":"辽宁省","total":49}
{"province":"湖北省","total":48}
{"province":"山东省","total":43}
{"province":"广东省","total":36}
{"province":"江西省","total":36}
{"province":"上海市","total":33}
{"province":"甘肃省","total":32}
{"province":"福建省","total":26}
{"province":"黑龙江省","total":26}
{"province":"河北省","total":21}
{"province":"广西壮族自治区","total":20}
{"province":"山西省","total":19}
{"province":"吉林省","total":19}
{"province":"陕西省","total":12}
{"province":"重庆市","total":10}
{"province":"内蒙古自治区","total":10}
{"province":"贵州省","total":10}
{"province":"","total":9}
{"province":"云南省","total":9}
{"province":"湖北省省","total":6}
{"province":"内蒙古巴彦淖尔盟临河市","total":6}
{"province":"青海省","total":5}
{"province":"天津市","total":4}
{"province":"新疆维吾尔自治区","total":3}
{"province":"北京市","total":2}
{"province":"宁夏回族自治区","total":2}
{"province":"广西桂林市","total":1}
(base) ➜  SparkUserReport cat report/age-group/part-00000-b91f9ff5-f13c-4b19-b808-ec5bf10e2bac-c000.json
{"total":7}
{"age_group":"1-9","total":32}
{"age_group":"100s","total":7}
{"age_group":"10s","total":81}
{"age_group":"110s","total":17}
{"age_group":"120s","total":15}
{"age_group":"20s","total":152}
{"age_group":"30s","total":184}
{"age_group":"40s","total":167}
{"age_group":"50s","total":173}
{"age_group":"60s","total":93}
{"age_group":"70s","total":43}
{"age_group":"80s","total":18}
{"age_group":"90s","total":11}

Because some addresses do not strictly use province and city format, the parsed province looks a bit strange, in production environment, we need to clean the data first.

4. Download

Project code and data download:
Spark User Profile