Spark RDD Usage - Part 1

Posted by Bourne's Blog - A Full-stack & Web3 Developer on May 31, 2022

Spark RDD Usage - Part 1

Overall

RDD 是Spark用于对分布式数据进行抽象的模型,用于封装所有内存和磁盘中的分布式数据实体。
为了解决开发人员在大规模的集群中以一种容错的方式进行内存计算,提出的概念。
RDD即Resilient Distributed Dataset, 弹性分布式数据集合,是Spark中最基本的数据抽象,代表一个不可变、只读的、被分区的数据集。
操作RDD就像操作本地集合一样,无需关心底层的细节。

How to Create

  • 把已经存在的集合传给parallelize方法
  • 利用外部数据生成RDD,从文件系统加载
  • 从现有的RDD转换成新的RDD
  • 常用的转换map/filter/flatMap/mapPartitions/distinct/union/groupByKey/reduceByKey/sortByKey/join

Action

之前的操作rdd并不会实际计算,到action时才会触发计算(强制执行那些求职必须用到的转换操作)
常用的action:reduce/collect/collectByKey/foreach/first/take/takeOrdered/saveAsTextFile/saveAsSequenceFile

DAG: Directd Acyclic Graph

目的:解决MapReduce的局限性

  • 每个MapReduce任务都是相互独立的。
  • 每一步的数据结果,都会持久存储到磁盘货HDFS上。

Job由Action触发生成,一个Application产生多个Job。
每个Job分成几个Stage,分Shuffle Map Stage和Result Stage。
Stage的划分依据是 根据RDD之间依赖的关系找出开销最小的调度方法。

Transformations & Actions

Name Type Prototype Description
glom Transform rdd.glom() 将RDD中每一个分区中类型为T的元素转换成Array[T],每一个分区就只有一个数组元素。
coalesce Transform rdd.coalesce(N,[isShuffle=False]) 将RDD进行重新分为N个区
combineByKey Transform rdd.combineByKey(F, M, C)  
distinct Transform rdd.distinct() 去除重复的元素
filter Transform rdd.filter(func) 过滤元素
flatMap Transform rdd.flatMap(func) 对每个元素执行函数并将结果Flat处理|
flatMapValues Transform rdd.flatMapValues(func) 对RDD元素格式为KV对中的Value进行func定义的逻辑处理
foldByKey Transform rdd.foldByKey(value, func) 对元素为KV格式的RDD的每个元素按照Key进行func定义的逻辑进行处理
countByValue Action rdd.countByValue() 统计RDD中各个Value出现的次数,并返回一个字典
first Action rdd.first() get first element in rdd
top Action rdd.top(N) get top N number elements in rdd after sort
take Action rdd.take(N) get N elements in rdd
sum Action rdd.sum() get sum of all elements in rdd
count Action rdd.count() get number of elements in rdd
collect Action rdd.collect() 将RDD类型的数据转化为数组,同时会从集群中拉取数据到driver端,这对于少量RDD数据的观察非常有用
collectAsMap Action rdd.collectAsMap() 将键值RDD转换为Map映射以保留其键值结构
countByKey Action rdd.countByKey() 统计rdd中每个key的数量
countByValue Action rdd.countByValue() 统计rdd中每个唯一数值的数量 |

Usage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("spark://hadoop001:7077") \
    .appName("RDD Usage") \
    .getOrCreate();
sc = spark.sparkContext

rdd = sc.parallelize([1,2,3,4,5], 3).glom()

rdd.collect()
// [[1], [2, 3], [4, 5]]

rdd.coalesce(1).glom().collect()
// [[[1], [2, 3], [4, 5]]]

rdd2 = sc.parallelize([1,2,3,4,5,6], 3).coalesce(1, True)

rdd2.collect()
// [5, 6, 1, 2, 3, 4]

rdd3 = sc.parallelize([1,2,3,4,5,6], 3)

rdd3.collect()
//[1, 2, 3, 4, 5, 6]

rdd4 = sc.parallelize([1,2,3,4,5,6], 3).glom()

rdd4.collect()
// [[3, 4], [5, 6], [1, 2]]

rdd4.coalesce(1, True).collect()
// [[3, 4], [5, 6], [1, 2]]


rdd5 = sc.parallelize(["a", "b", "a", "c"])

rdd5.countByValue()
// defaultdict(int, {'a': 2, 'b': 1, 'c': 1})

rdd5.countByValue().items()
// dict_items([('a', 2), ('b', 1), ('c', 1)])

rdd6 =sc.parallelize(["a","a","c"])
rdd6.distinct().collect()
// ['c', 'a']

rdd7 = sc.parallelize([1,2,3,4,5,6])
rdd7 = rdd7.filter(lambda x: x > 2)
rdd7.collect()
//[3, 4, 5, 6]

rdd7.first()
// 3

rdd8 = sc.parallelize([3,4,5])
fm = rdd8.flatMap(lambda x: range(1,x))
fm.collect()
// [1, 2, 1, 2, 3, 1, 2, 3, 4]

rdd9 = sc.parallelize([3,4,5])
rdd9 = rdd.flatMap(lambda x: [(x,x)])
rdd9.collect()
// [(3, 3), (4, 4), (5, 5)]

rdd10 =sc.parallelize(range(1,100))
rdd10.top(5)
// [99, 98, 97, 96, 95]

rdd10.take(3)
// [1, 2, 3]

rdd11 = sc.parallelize(range(1,5))
rdd11.collect()
// [1, 2, 3, 4]

rdd11.sum()
// 10

rdd11.max()
// 4

rdd11.min()
// 1

sc.stop()