Spark SQL Practice

Posted by Bourne's Blog - A Full-stack & Web3 Developer on June 23, 2022

Spark SQL Practice

1. Features of Spark SQL

  • Integrated SQL queries with spark program
  • Uniform Data access
  • Hive Compatibility
  • Standard Connectivity(jdbc/odbc)

2. Create Table

2.1 case class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
scala> case class Emp(
     | empno:Int,
     | ename:String,
     | job:String,
     | mgr:String,
     | hiredate:String,
     | sal:Int,
     | comm:String,
     | deptno:Int
     | )
scala> val emps = sc.textFile("/data/emp.csv")
emps: org.apache.spark.rdd.RDD[String] = /data/emp.csv MapPartitionsRDD[1] at textFile at <console>:24

scala> val emps1 = emps.
  map(_.split(",")).
  map(x=>Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt))
emps1: org.apache.spark.rdd.RDD[Emp] = MapPartitionsRDD[5] at map at <console>:27

scala> val df = emps1.toDF
df: org.apache.spark.sql.DataFrame = [empno: int, ename: string ... 6 more fields]

scala> df.printSchema
root
 |-- empno: integer (nullable = false)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- mgr: string (nullable = true)
 |-- hiredate: string (nullable = true)
 |-- sal: integer (nullable = false)
 |-- comm: string (nullable = true)
 |-- deptno: integer (nullable = false)

scala> df.show
+-----+-----+-------+----+--------+----+----+------+
|empno|ename|    job| mgr|hiredate| sal|comm|deptno|
+-----+-----+-------+----+--------+----+----+------+
| 7333|SMITH|  CLERK|7902|20220304|2000| 200|     1|
| 7334|Green|  CLERK|7902|20220405|2200| 300|     1|
| 7335|Marry|MANAGER|7903|20220505|2500| 300|     1|
+-----+-----+-------+----+--------+----+----+------+

scala> df.select("ename", "job").show
+-----+-------+
|ename|    job|
+-----+-------+
|SMITH|  CLERK|
|Green|  CLERK|
|Marry|MANAGER|
+-----+-------+

scala> df.select("ename", "job", "hiredate").where("hiredate > '20220305'").show
+-----+-------+--------+
|ename|    job|hiredate|
+-----+-------+--------+
|Green|  CLERK|20220405|
|Marry|MANAGER|20220505|
+-----+-------+--------+

2.2. from json

upload the people.json to /data in hdfs:

1
2
3
4
[root@hadoop001 ~]# hdfs dfs -cat /data/people.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
scala> val df1 = spark.read.json("/data/people.json")
df1: org.apache.spark.sql.DataFrame = [age: bigint, name: string]               

scala> df1.printSchema
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)


scala> df1.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

2.3 from parquet

Load parquet file from hdfs:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
scala> val userDF = spark.read.load("/spark/resources/users.parquet")
userDF: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more field]

scala> userDF.show
+------+--------------+----------------+                                        
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+

scala> userDF.printSchema
root
 |-- name: string (nullable = true)
 |-- favorite_color: string (nullable = true)
 |-- favorite_numbers: array (nullable = true)
 |    |-- element: integer (containsNull = true)


3. Manipulate DataFrame

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
scala> df.select($"ename", $"sal"+100).show
+-----+-----------+
|ename|(sal + 100)|
+-----+-----------+
|SMITH|       2100|
|Green|       2300|
|Marry|       2600|
+-----+-----------+

scala> df.select($"ename", $"sal"+100).where("sal > 2000").show
+-----+-----------+
|ename|(sal + 100)|
+-----+-----------+
|Green|       2300|
|Marry|       2600|
+-----+-----------+

scala> df.groupBy($"deptno").count.show
+------+-----+                                                                  
|deptno|count|
+------+-----+
|     1|    3|
+------+-----+

3.2 Register data frame to a view

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
scala> df.createTempView("emps")

scala> spark.sql("select * from emps").show
+-----+-----+-------+----+--------+----+----+------+
|empno|ename|    job| mgr|hiredate| sal|comm|deptno|
+-----+-----+-------+----+--------+----+----+------+
| 7333|SMITH|  CLERK|7902|20220304|2000| 200|     1|
| 7334|Green|  CLERK|7902|20220405|2200| 300|     1|
| 7335|Marry|MANAGER|7903|20220505|2500| 300|     1|
+-----+-----+-------+----+--------+----+----+------+

// select employees salary greater than 2000
scala> spark.sql("select * from emps where sal > 2000").show
+-----+-----+-------+----+--------+----+----+------+
|empno|ename|    job| mgr|hiredate| sal|comm|deptno|
+-----+-----+-------+----+--------+----+----+------+
| 7334|Green|  CLERK|7902|20220405|2200| 300|     1|
| 7335|Marry|MANAGER|7903|20220505|2500| 300|     1|
+-----+-----+-------+----+--------+----+----+------+

// get employees of manager 7903
scala> spark.sql("select * from emps where mgr=7903").show
+-----+-----+-------+----+--------+----+----+------+
|empno|ename|    job| mgr|hiredate| sal|comm|deptno|
+-----+-----+-------+----+--------+----+----+------+
| 7335|Marry|MANAGER|7903|20220505|2500| 300|     1|
+-----+-----+-------+----+--------+----+----+------+



4. Write to hdfs

Write data as parquet(default)/csv/json format to hdfs:

1
2
3
4
5
6
scala> userDF.select("*").write.save("/data/users")

scala> userDF.select("name", "favorite_color").write.format("csv").save("/data/users-csv")
                                                                                
scala> userDF.select("name", "favorite_color").write.format("json").save("/data/users-json")
                 

check hdfs:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
[root@hadoop001 resources]# hdfs dfs -ls /data
drwxr-xr-x   - root supergroup          0 2022-06-23 21:10 /data/users
drwxr-xr-x   - root supergroup          0 2022-06-23 21:14 /data/users-csv
drwxr-xr-x   - root supergroup          0 2022-06-23 21:14 /data/users-json

[root@hadoop001 resources]# hdfs dfs -ls /data/users
Found 2 items
-rw-r--r--   3 root supergroup          0 2022-06-23 21:10 /data/users/_SUCCESS
-rw-r--r--   3 root supergroup        966 2022-06-23 21:10 /data/users/part-00000-17a50263-ced8-471e-8d8d-46090079f908-c000.snappy.parquet

[root@hadoop001 resources]# hdfs dfs -ls /data/users-csv/
Found 2 items
-rw-r--r--   3 root supergroup          0 2022-06-23 21:14 /data/users-csv/_SUCCESS
-rw-r--r--   3 root supergroup         18 2022-06-23 21:14 /data/users-csv/part-00000-68076725-5daf-455b-bd65-3d045a782b2f-c000.csv
[root@hadoop001 resources]# hdfs dfs -cat /data/users-csv/part-00000-68076725-5daf-455b-bd65-3d045a782b2f-c000.csv
Alyssa,""
Ben,red
[root@hadoop001 resources]# hdfs dfs -ls /data/users-json
Found 2 items
-rw-r--r--   3 root supergroup          0 2022-06-23 21:14 /data/users-json/_SUCCESS
-rw-r--r--   3 root supergroup         56 2022-06-23 21:14 /data/users-json/part-00000-e6963025-47d8-4b32-92ea-e9b61a286cd3-c000.json
[root@hadoop001 resources]# hdfs dfs -cat /data/users-json/part-00000-e6963025-47d8-4b32-92ea-e9b61a286cd3-c000.json
{"name":"Alyssa"}
{"name":"Ben","favorite_color":"red"}

5. Other source

5.1 jdbc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
[root@hadoop003 ~]# ln -sf /opt/module/apache-hive-3.1.2-bin/lib/mysql-connector-java-8.0.11.jar  .
[root@hadoop003 ~]# spark-shell \
 --master local \ 
 --jars mysql-connector-java-8.0.11.jar \ 
 --driver-class-path mysql-connector-java-8.0.11.jar
Spark context Web UI available at http://hadoop003:4040
Spark context available as 'sc' (master = local, app id = local-1656058949228).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  `_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.5
      /_/

Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_231)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val agents = spark.
     | read.
     | format("jdbc").
     | option("url", "jdbc:mysql://mysql-host.com:3306/agent").
     | option("dbtable", "agent").
     | option("user", "*****").
     | option("password", "******").
     | load
Fri Jun 24 16:25:10 CST 2022 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
agents: org.apache.spark.sql.DataFrame = [id: bigint, create_time: timestamp ... 13 more fields]

scala> agents.printSchema
root
 |-- id: long (nullable = true)
 |-- create_time: timestamp (nullable = true)
 |-- update_time: timestamp (nullable = true)
 |-- account: string (nullable = true)
 |-- email: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- birth_date: date (nullable = true)
 |-- nation: string (nullable = true)
 |-- marriage: string (nullable = true)
 |-- cert_type: string (nullable = true)
 |-- shop_code: string (nullable = true)
 |-- cert_no: string (nullable = true)
 |-- nationality: string (nullable = true)
 |-- name: string (nullable = true)
 |-- status: integer (nullable = true)

scala> agents.select("id", "account", "email","birth_date").show
Fri Jun 24 16:26:39 CST 2022 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
+----+------------+-------------------+----------+
|  id|     account|              email|birth_date|
+----+------------+-------------------+----------+
|5335|     shiboya|     shiboya@xx.com|1990-09-26|
|5336|baihongchang|baihongchang@xx.com|1991-04-04|
|5337|     weifeng|     weifeng@xx.com|1995-08-11|
|5338|   huiyatong|   huiyatong@xx.com|1993-09-20|
|5339| zhongerhuai| zhongerhuai@xx.com|1998-08-10|
|5340|   shulesong|   shulesong@xx.com|1998-07-07|
|5341|cuikangsheng|cuikangsheng@xx.com|1992-09-28|
|5342|   feiqingli|   feiqingli@xx.com|1993-11-02|
|5343| shanghongbo| shanghongbo@xx.com|1996-08-24|
|5344| renghuaihan| renghuaihan@xx.com|1990-03-16|
|5345|   wusunfang|   wusunfang@xx.com|1992-05-18|
|5346|  ranghanzhi|  ranghanzhi@xx.com|1991-10-06|
|5347|   kuiyouyou|   kuiyouyou@xx.com|1990-06-17|
|5348|      yinman|      yinman@xx.com|1991-03-24|
|5349|    qikunhui|    qikunhui@xx.com|1997-07-27|
|5350|     yuhaosi|     yuhaosi@xx.com|1991-04-09|
|5351|     eryubei|     eryubei@xx.com|1994-05-07|
|5352| kuangzhimei| kuangzhimei@xx.com|1999-07-30|
|5353|        kuya|        kuya@xx.com|1991-04-28|
|5354|       suowu|       suowu@xx.com|1997-04-01|
+----+------------+-------------------+----------+
only showing top 20 rows

5.2 hive

copy hadoop core-site.xml/hdfs-site.xml, hive-site.xml to $SPARK_HOME/conf, still need to specify mysql-connector-java-8.0.11.jar both in “–jars” and “–driver-class-path”:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
[root@hadoop003 ~]# spark-shell \
 --master local \ 
 --jars mysql-connector-java-8.0.11.jar \ 
 --driver-class-path mysql-connector-java-8.0.11.jar
Spark context Web UI available at http://hadoop003:4040
Spark context available as 'sc' (master = local, app id = local-1656058949228).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  `_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.5
      /_/

Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_231)
Type in expressions to have them evaluated.
Type :help for more information.

scala> spark.sql("select * from ods.ods_agent_agent_mobile_da limit 10").show
+---+--------+-----------+--------------------+--------------------+--------+
| id|agent_id|     mobile|         create_time|         update_time|      pt|
+---+--------+-----------+--------------------+--------------------+--------+
|  1|    5335|147****2696|2021-09-13 15:41:...|2022-02-14 16:01:...|20220624|
|  2|    5336|157****6005|2021-12-07 08:42:...|2022-02-14 16:01:...|20220624|
|  3|    5337|166****7841|2021-10-18 15:17:...|2022-02-14 16:01:...|20220624|
|  4|    5338|157****5191|2021-10-15 01:47:...|2022-02-14 16:01:...|20220624|
|  5|    5339|150****2746|2021-06-14 23:12:...|2022-02-14 16:01:...|20220624|
|  6|    5340|147****9902|2022-04-21 04:14:...|2022-02-14 16:01:...|20220624|
|  7|    5341|198****3595|2021-05-28 20:52:...|2022-02-14 16:01:...|20220624|
|  8|    5342|133****9229|2022-05-23 21:35:...|2022-02-14 16:01:...|20220624|
|  9|    5343|159****7462|2021-07-08 08:12:...|2022-02-14 16:01:...|20220624|
| 10|    5344|175****6370|2021-11-18 12:06:...|2022-02-14 16:01:...|20220624|
+---+--------+-----------+--------------------+--------------------+--------+

6. Optimise

Cache the table

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
scala> val agents = spark.
     | read.
     | format("jdbc").
     | option("url", "jdbc:mysql://mysql-host.com:3306/agent").
     | option("dbtable", "agent").
     | option("user", "*****").
     | option("password", "******").
     | load
Fri Jun 24 16:25:10 CST 2022 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
agents: org.apache.spark.sql.DataFrame = [id: bigint, create_time: timestamp ... 13 more fields]

scala> agents.registerTempTable("agents")
warning: there was one deprecation warning; re-run with -deprecation for details

scala> spark.sqlContext.cacheTable("agents")
22/06/24 18:16:31 WARN execution.CacheManager: Asked to cache already cached data.

scala> spark.sql("select id,account,email,birth_date from agents limit 10").show
+----+------------+-------------------+----------+
|  id|     account|              email|birth_date|
+----+------------+-------------------+----------+
|5335|     shiboya|     shiboya@xx.com|1990-09-26|
|5336|baihongchang|baihongchang@xx.com|1991-04-04|
|5337|     weifeng|     weifeng@xx.com|1995-08-11|
|5338|   huiyatong|   huiyatong@xx.com|1993-09-20|
|5339| zhongerhuai| zhongerhuai@xx.com|1998-08-10|
|5340|   shulesong|   shulesong@xx.com|1998-07-07|
|5341|cuikangsheng|cuikangsheng@xx.com|1992-09-28|
|5342|   feiqingli|   feiqingli@xx.com|1993-11-02|
|5343| shanghongbo| shanghongbo@xx.com|1996-08-24|
|5344| renghuaihan| renghuaihan@xx.com|1990-03-16|
+----+------------+-------------------+----------+


scala> spark.sql("select id,account,email,birth_date from agents limit 10").show
+----+------------+-------------------+----------+
|  id|     account|              email|birth_date|
+----+------------+-------------------+----------+
|5335|     shiboya|     shiboya@xx.com|1990-09-26|
|5336|baihongchang|baihongchang@xx.com|1991-04-04|
|5337|     weifeng|     weifeng@xx.com|1995-08-11|
|5338|   huiyatong|   huiyatong@xx.com|1993-09-20|
|5339| zhongerhuai| zhongerhuai@xx.com|1998-08-10|
|5340|   shulesong|   shulesong@xx.com|1998-07-07|
|5341|cuikangsheng|cuikangsheng@xx.com|1992-09-28|
|5342|   feiqingli|   feiqingli@xx.com|1993-11-02|
|5343| shanghongbo| shanghongbo@xx.com|1996-08-24|
|5344| renghuaihan| renghuaihan@xx.com|1990-03-16|
+----+------------+-------------------+----------+

From the web ui, we can see the duration of the job apparently reduced after the cache the table.

Job Id Description Submitted Duration Stages: Succeeded/Total
11 show at :24 2022/06/24 18:17:03 62 ms 2/2
10 show at :24 2022/06/24 18:17:00 0.1s 2/2