玩转RDD
- RDD操作包含两种算子,transformation(转化)和action(行动)
- 只有在行动算子下才会进行真正的数据运算,因为spark的运算机制是将数据存入内存,大数据处理考虑效率,如此操作是合理的
- rdd.persist() 可以将rdd持久化在内存中
1. 创建RDD
Spark 提供了两种创建 RDD 的方式:读取外部数据集,以及在驱动器程序中对一个集合进行并行化( parallelize() 方法)。
val lines = sc.textFile("/path/to/README.md")
2. RDD操作
转化操作
RDD的转化操作是惰性求值的
针对各个元素的转化
- map() (接收一个函数,把这个函数用于 RDD 中的每个元素,将函数的返回结果作为结果 RDD 中对应元素的值)
scala:
val input = sc.parallelize(List(1, 2, 3, 4))
val result = input.map(x => x * x)
println(result.collect().mkString(","))
-flatmap()(我们提供给 flatMap() 的函数被分别应用到了输入 RDD 的每个元素上。不过返回的不是一个元素,而是一个返回值序列的迭代器)
scala:
val lines = sc.parallelize(List("hello world", "hi"))
val words = lines.flatMap(line => line.split(" "))
words.first() // 返回"hello"
- filter() (接收一个函数,并将 RDD 中满足该函数的元素放入新的 RDD 中返回)
scala:
val inputRDD = sc.textFile("log.txt")
val errorsRDD = inputRDD.filter(line => line.contains("error"))
filter() 操作不会改变已有的 inputRDD 中的数据。实际上,该操作会返回一个全新的 RDD
distinct()(生成一个只包含不同元素的新 RDD)
distinct() 操作的开销很大,因为它需要将所有数据通过网络进行混洗(shuffle),以确保每个元素都只有一份
sample(withReplacement, fraction, [seed])
对 RDD 采样,以及是否替换
伪集合操作
- union()
scala:
badLinesRDD = errorsRDD.union(warningsRDD)
union() 与 filter() 的不同点在于它操作两个 RDD 而不是一个。转化操作可以操作任意数量的输入 RDD。
intersection()
只返回两个 RDD 中都有的元素,运行时也会去掉所有重复的元素(单个 RDD 内的重复元素也会一起移除)
subtract()
函数接收另一个RDD作为参数,返回一个由只存在于第一个 RDD 中而不存在于第二个 RDD 中的所有元素组成的 RDD
cartesian()
返回所有可能的对,即进行笛卡尔积操作
最后要说的是,通过转化操作,你从已有的 RDD 中派生出新的 RDD,Spark 会使用谱系图(lineage graph)来记录这些不同 RDD 之间的依赖关系。Spark 需要用这些信息来按需计算每个 RDD,也可以依靠谱系图在持久化的 RDD 丢失部分数据时恢复所丢失的数据
行动操作
- reduce()
接收一个函数作为参数,这个函数要操作两个相同元素类型的 RDD 数据并返回一个同样类型的新元素
scala:
val sum = rdd.reduce((x, y) => x + y)
fold()
接收一个与 reduce() 接收的函数签名相同的函数,再加上一个“初始值”来作为每个分区第一次调用时的结果
aggregate()
aggregate() 函数则把我们从返回值类型必须与所操作的 RDD 类型相同的限制中解放出来;
需要提供我们期待返回的类型的初始值。然后通过一个函数把 RDD 中的元素合并起来放入累加器。考虑到每个节点是在本地进行累加的,最终,还需要提供第二个函数来将累加器两两合并。
scala:
val result = input.aggregate((0, 0))(
(acc, value) => (acc._1 + value, acc._2 + 1),
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
val avg = result._1 / result._2.toDouble
collect()
collect() 不能用在大规模数据集上,因为其可以用来获取整个 RDD 中的数据。如果你的程序把 RDD 筛选到一个很小的规模,并且你想在本地处理这些数据时,就可以使用它。
谨记:只有当你的整个数据集能在单台机器的内存中放得下时,才能使用 collect()
数据集通常很大,我们通常要把数据写到诸如 HDFS 或 Amazon S3 这样的分布式的存储系统中。你可以使用 saveAsTextFile()、saveAsSequenceFile(),或者任意的其他行动操作来把 RDD 的数据内容以各种自带的格式保存起来count()
countByValue()
各元素在 RDD 中出现的次数
take()
top()
takeOrdered(num)(ordering)
从 RDD 中按照提供的顺序返回最前面的 num 个元素
takeSample(withReplacement, num, [seed])
从 RDD 中返回任意一些元素
foreach(func)
对 RDD 中的每个元素使用给定的函数
内存持久化
Spark RDD 是惰性求值的,而有时我们希望能多次使用同一个 RDD。如果简单地对 RDD 调用行动操作,Spark 每次都会重算 RDD 以及它的所有依赖。这在迭代算法中消耗格外大,因为迭代算法常常会多次使用同一组数据。
为了避免多次计算同一个 RDD,可以让 Spark 对数据进行持久化
scala:
import org.apache.spark.storage.StorageLevel
val result = input.map(x => x * x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))
-unpersist()
手动把持久化的 RDD 从缓存中移除