Spark SQL Practice
1. Features of Spark SQL
- Integrated SQL queries with spark program
- Uniform Data access
- Hive Compatibility
- Standard Connectivity(jdbc/odbc)
2. Create Table
2.1 case class
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
| scala> case class Emp(
| empno:Int,
| ename:String,
| job:String,
| mgr:String,
| hiredate:String,
| sal:Int,
| comm:String,
| deptno:Int
| )
scala> val emps = sc.textFile("/data/emp.csv")
emps: org.apache.spark.rdd.RDD[String] = /data/emp.csv MapPartitionsRDD[1] at textFile at <console>:24
scala> val emps1 = emps.
map(_.split(",")).
map(x=>Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt))
emps1: org.apache.spark.rdd.RDD[Emp] = MapPartitionsRDD[5] at map at <console>:27
scala> val df = emps1.toDF
df: org.apache.spark.sql.DataFrame = [empno: int, ename: string ... 6 more fields]
scala> df.printSchema
root
|-- empno: integer (nullable = false)
|-- ename: string (nullable = true)
|-- job: string (nullable = true)
|-- mgr: string (nullable = true)
|-- hiredate: string (nullable = true)
|-- sal: integer (nullable = false)
|-- comm: string (nullable = true)
|-- deptno: integer (nullable = false)
scala> df.show
+-----+-----+-------+----+--------+----+----+------+
|empno|ename| job| mgr|hiredate| sal|comm|deptno|
+-----+-----+-------+----+--------+----+----+------+
| 7333|SMITH| CLERK|7902|20220304|2000| 200| 1|
| 7334|Green| CLERK|7902|20220405|2200| 300| 1|
| 7335|Marry|MANAGER|7903|20220505|2500| 300| 1|
+-----+-----+-------+----+--------+----+----+------+
scala> df.select("ename", "job").show
+-----+-------+
|ename| job|
+-----+-------+
|SMITH| CLERK|
|Green| CLERK|
|Marry|MANAGER|
+-----+-------+
scala> df.select("ename", "job", "hiredate").where("hiredate > '20220305'").show
+-----+-------+--------+
|ename| job|hiredate|
+-----+-------+--------+
|Green| CLERK|20220405|
|Marry|MANAGER|20220505|
+-----+-------+--------+
|
2.2. from json
upload the people.json to /data in hdfs:
1
2
3
4
| [root@hadoop001 ~]# hdfs dfs -cat /data/people.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| scala> val df1 = spark.read.json("/data/people.json")
df1: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df1.printSchema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
scala> df1.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
|
2.3 from parquet
Load parquet file from hdfs:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| scala> val userDF = spark.read.load("/spark/resources/users.parquet")
userDF: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more field]
scala> userDF.show
+------+--------------+----------------+
| name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa| null| [3, 9, 15, 20]|
| Ben| red| []|
+------+--------------+----------------+
scala> userDF.printSchema
root
|-- name: string (nullable = true)
|-- favorite_color: string (nullable = true)
|-- favorite_numbers: array (nullable = true)
| |-- element: integer (containsNull = true)
|
3. Manipulate DataFrame
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| scala> df.select($"ename", $"sal"+100).show
+-----+-----------+
|ename|(sal + 100)|
+-----+-----------+
|SMITH| 2100|
|Green| 2300|
|Marry| 2600|
+-----+-----------+
scala> df.select($"ename", $"sal"+100).where("sal > 2000").show
+-----+-----------+
|ename|(sal + 100)|
+-----+-----------+
|Green| 2300|
|Marry| 2600|
+-----+-----------+
scala> df.groupBy($"deptno").count.show
+------+-----+
|deptno|count|
+------+-----+
| 1| 3|
+------+-----+
|
3.2 Register data frame to a view
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| scala> df.createTempView("emps")
scala> spark.sql("select * from emps").show
+-----+-----+-------+----+--------+----+----+------+
|empno|ename| job| mgr|hiredate| sal|comm|deptno|
+-----+-----+-------+----+--------+----+----+------+
| 7333|SMITH| CLERK|7902|20220304|2000| 200| 1|
| 7334|Green| CLERK|7902|20220405|2200| 300| 1|
| 7335|Marry|MANAGER|7903|20220505|2500| 300| 1|
+-----+-----+-------+----+--------+----+----+------+
// select employees salary greater than 2000
scala> spark.sql("select * from emps where sal > 2000").show
+-----+-----+-------+----+--------+----+----+------+
|empno|ename| job| mgr|hiredate| sal|comm|deptno|
+-----+-----+-------+----+--------+----+----+------+
| 7334|Green| CLERK|7902|20220405|2200| 300| 1|
| 7335|Marry|MANAGER|7903|20220505|2500| 300| 1|
+-----+-----+-------+----+--------+----+----+------+
// get employees of manager 7903
scala> spark.sql("select * from emps where mgr=7903").show
+-----+-----+-------+----+--------+----+----+------+
|empno|ename| job| mgr|hiredate| sal|comm|deptno|
+-----+-----+-------+----+--------+----+----+------+
| 7335|Marry|MANAGER|7903|20220505|2500| 300| 1|
+-----+-----+-------+----+--------+----+----+------+
|
4. Write to hdfs
Write data as parquet(default)/csv/json format to hdfs:
1
2
3
4
5
6
| scala> userDF.select("*").write.save("/data/users")
scala> userDF.select("name", "favorite_color").write.format("csv").save("/data/users-csv")
scala> userDF.select("name", "favorite_color").write.format("json").save("/data/users-json")
|
check hdfs:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| [root@hadoop001 resources]# hdfs dfs -ls /data
drwxr-xr-x - root supergroup 0 2022-06-23 21:10 /data/users
drwxr-xr-x - root supergroup 0 2022-06-23 21:14 /data/users-csv
drwxr-xr-x - root supergroup 0 2022-06-23 21:14 /data/users-json
[root@hadoop001 resources]# hdfs dfs -ls /data/users
Found 2 items
-rw-r--r-- 3 root supergroup 0 2022-06-23 21:10 /data/users/_SUCCESS
-rw-r--r-- 3 root supergroup 966 2022-06-23 21:10 /data/users/part-00000-17a50263-ced8-471e-8d8d-46090079f908-c000.snappy.parquet
[root@hadoop001 resources]# hdfs dfs -ls /data/users-csv/
Found 2 items
-rw-r--r-- 3 root supergroup 0 2022-06-23 21:14 /data/users-csv/_SUCCESS
-rw-r--r-- 3 root supergroup 18 2022-06-23 21:14 /data/users-csv/part-00000-68076725-5daf-455b-bd65-3d045a782b2f-c000.csv
[root@hadoop001 resources]# hdfs dfs -cat /data/users-csv/part-00000-68076725-5daf-455b-bd65-3d045a782b2f-c000.csv
Alyssa,""
Ben,red
[root@hadoop001 resources]# hdfs dfs -ls /data/users-json
Found 2 items
-rw-r--r-- 3 root supergroup 0 2022-06-23 21:14 /data/users-json/_SUCCESS
-rw-r--r-- 3 root supergroup 56 2022-06-23 21:14 /data/users-json/part-00000-e6963025-47d8-4b32-92ea-e9b61a286cd3-c000.json
[root@hadoop001 resources]# hdfs dfs -cat /data/users-json/part-00000-e6963025-47d8-4b32-92ea-e9b61a286cd3-c000.json
{"name":"Alyssa"}
{"name":"Ben","favorite_color":"red"}
|
5. Other source
5.1 jdbc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
| [root@hadoop003 ~]# ln -sf /opt/module/apache-hive-3.1.2-bin/lib/mysql-connector-java-8.0.11.jar .
[root@hadoop003 ~]# spark-shell \
--master local \
--jars mysql-connector-java-8.0.11.jar \
--driver-class-path mysql-connector-java-8.0.11.jar
Spark context Web UI available at http://hadoop003:4040
Spark context available as 'sc' (master = local, app id = local-1656058949228).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ `_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.5
/_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_231)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val agents = spark.
| read.
| format("jdbc").
| option("url", "jdbc:mysql://mysql-host.com:3306/agent").
| option("dbtable", "agent").
| option("user", "*****").
| option("password", "******").
| load
Fri Jun 24 16:25:10 CST 2022 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
agents: org.apache.spark.sql.DataFrame = [id: bigint, create_time: timestamp ... 13 more fields]
scala> agents.printSchema
root
|-- id: long (nullable = true)
|-- create_time: timestamp (nullable = true)
|-- update_time: timestamp (nullable = true)
|-- account: string (nullable = true)
|-- email: string (nullable = true)
|-- sex: string (nullable = true)
|-- birth_date: date (nullable = true)
|-- nation: string (nullable = true)
|-- marriage: string (nullable = true)
|-- cert_type: string (nullable = true)
|-- shop_code: string (nullable = true)
|-- cert_no: string (nullable = true)
|-- nationality: string (nullable = true)
|-- name: string (nullable = true)
|-- status: integer (nullable = true)
scala> agents.select("id", "account", "email","birth_date").show
Fri Jun 24 16:26:39 CST 2022 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
+----+------------+-------------------+----------+
| id| account| email|birth_date|
+----+------------+-------------------+----------+
|5335| shiboya| shiboya@xx.com|1990-09-26|
|5336|baihongchang|baihongchang@xx.com|1991-04-04|
|5337| weifeng| weifeng@xx.com|1995-08-11|
|5338| huiyatong| huiyatong@xx.com|1993-09-20|
|5339| zhongerhuai| zhongerhuai@xx.com|1998-08-10|
|5340| shulesong| shulesong@xx.com|1998-07-07|
|5341|cuikangsheng|cuikangsheng@xx.com|1992-09-28|
|5342| feiqingli| feiqingli@xx.com|1993-11-02|
|5343| shanghongbo| shanghongbo@xx.com|1996-08-24|
|5344| renghuaihan| renghuaihan@xx.com|1990-03-16|
|5345| wusunfang| wusunfang@xx.com|1992-05-18|
|5346| ranghanzhi| ranghanzhi@xx.com|1991-10-06|
|5347| kuiyouyou| kuiyouyou@xx.com|1990-06-17|
|5348| yinman| yinman@xx.com|1991-03-24|
|5349| qikunhui| qikunhui@xx.com|1997-07-27|
|5350| yuhaosi| yuhaosi@xx.com|1991-04-09|
|5351| eryubei| eryubei@xx.com|1994-05-07|
|5352| kuangzhimei| kuangzhimei@xx.com|1999-07-30|
|5353| kuya| kuya@xx.com|1991-04-28|
|5354| suowu| suowu@xx.com|1997-04-01|
+----+------------+-------------------+----------+
only showing top 20 rows
|
5.2 hive
copy hadoop core-site.xml/hdfs-site.xml, hive-site.xml to $SPARK_HOME/conf,
still need to specify mysql-connector-java-8.0.11.jar both in “–jars” and “–driver-class-path”:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| [root@hadoop003 ~]# spark-shell \
--master local \
--jars mysql-connector-java-8.0.11.jar \
--driver-class-path mysql-connector-java-8.0.11.jar
Spark context Web UI available at http://hadoop003:4040
Spark context available as 'sc' (master = local, app id = local-1656058949228).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ `_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.5
/_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_231)
Type in expressions to have them evaluated.
Type :help for more information.
scala> spark.sql("select * from ods.ods_agent_agent_mobile_da limit 10").show
+---+--------+-----------+--------------------+--------------------+--------+
| id|agent_id| mobile| create_time| update_time| pt|
+---+--------+-----------+--------------------+--------------------+--------+
| 1| 5335|147****2696|2021-09-13 15:41:...|2022-02-14 16:01:...|20220624|
| 2| 5336|157****6005|2021-12-07 08:42:...|2022-02-14 16:01:...|20220624|
| 3| 5337|166****7841|2021-10-18 15:17:...|2022-02-14 16:01:...|20220624|
| 4| 5338|157****5191|2021-10-15 01:47:...|2022-02-14 16:01:...|20220624|
| 5| 5339|150****2746|2021-06-14 23:12:...|2022-02-14 16:01:...|20220624|
| 6| 5340|147****9902|2022-04-21 04:14:...|2022-02-14 16:01:...|20220624|
| 7| 5341|198****3595|2021-05-28 20:52:...|2022-02-14 16:01:...|20220624|
| 8| 5342|133****9229|2022-05-23 21:35:...|2022-02-14 16:01:...|20220624|
| 9| 5343|159****7462|2021-07-08 08:12:...|2022-02-14 16:01:...|20220624|
| 10| 5344|175****6370|2021-11-18 12:06:...|2022-02-14 16:01:...|20220624|
+---+--------+-----------+--------------------+--------------------+--------+
|
6. Optimise
Cache the table
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
| scala> val agents = spark.
| read.
| format("jdbc").
| option("url", "jdbc:mysql://mysql-host.com:3306/agent").
| option("dbtable", "agent").
| option("user", "*****").
| option("password", "******").
| load
Fri Jun 24 16:25:10 CST 2022 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
agents: org.apache.spark.sql.DataFrame = [id: bigint, create_time: timestamp ... 13 more fields]
scala> agents.registerTempTable("agents")
warning: there was one deprecation warning; re-run with -deprecation for details
scala> spark.sqlContext.cacheTable("agents")
22/06/24 18:16:31 WARN execution.CacheManager: Asked to cache already cached data.
scala> spark.sql("select id,account,email,birth_date from agents limit 10").show
+----+------------+-------------------+----------+
| id| account| email|birth_date|
+----+------------+-------------------+----------+
|5335| shiboya| shiboya@xx.com|1990-09-26|
|5336|baihongchang|baihongchang@xx.com|1991-04-04|
|5337| weifeng| weifeng@xx.com|1995-08-11|
|5338| huiyatong| huiyatong@xx.com|1993-09-20|
|5339| zhongerhuai| zhongerhuai@xx.com|1998-08-10|
|5340| shulesong| shulesong@xx.com|1998-07-07|
|5341|cuikangsheng|cuikangsheng@xx.com|1992-09-28|
|5342| feiqingli| feiqingli@xx.com|1993-11-02|
|5343| shanghongbo| shanghongbo@xx.com|1996-08-24|
|5344| renghuaihan| renghuaihan@xx.com|1990-03-16|
+----+------------+-------------------+----------+
scala> spark.sql("select id,account,email,birth_date from agents limit 10").show
+----+------------+-------------------+----------+
| id| account| email|birth_date|
+----+------------+-------------------+----------+
|5335| shiboya| shiboya@xx.com|1990-09-26|
|5336|baihongchang|baihongchang@xx.com|1991-04-04|
|5337| weifeng| weifeng@xx.com|1995-08-11|
|5338| huiyatong| huiyatong@xx.com|1993-09-20|
|5339| zhongerhuai| zhongerhuai@xx.com|1998-08-10|
|5340| shulesong| shulesong@xx.com|1998-07-07|
|5341|cuikangsheng|cuikangsheng@xx.com|1992-09-28|
|5342| feiqingli| feiqingli@xx.com|1993-11-02|
|5343| shanghongbo| shanghongbo@xx.com|1996-08-24|
|5344| renghuaihan| renghuaihan@xx.com|1990-03-16|
+----+------------+-------------------+----------+
|
From the web ui, we can see the duration of the job apparently reduced after the cache the table.
Job Id |
Description |
Submitted |
Duration |
Stages: Succeeded/Total |
11 |
show at :24 |
2022/06/24 18:17:03 |
62 ms |
2/2 |
10 |
show at :24 |
2022/06/24 18:17:00 |
0.1s |
2/2 |