玩转RDD

  • RDD操作包含两种算子,transformation(转化)和action(行动)
  • 只有在行动算子下才会进行真正的数据运算,因为spark的运算机制是将数据存入内存,大数据处理考虑效率,如此操作是合理的
  • rdd.persist() 可以将rdd持久化在内存中

1. 创建RDD

Spark 提供了两种创建 RDD 的方式:读取外部数据集,以及在驱动器程序中对一个集合进行并行化( parallelize() 方法)。

val lines = sc.textFile("/path/to/README.md")

2. RDD操作

转化操作

RDD的转化操作是惰性求值的

针对各个元素的转化

  • map() (接收一个函数,把这个函数用于 RDD 中的每个元素,将函数的返回结果作为结果 RDD 中对应元素的值)

scala:

val input = sc.parallelize(List(1, 2, 3, 4))  
val result = input.map(x => x * x)  
println(result.collect().mkString(","))

-flatmap()(我们提供给 flatMap() 的函数被分别应用到了输入 RDD 的每个元素上。不过返回的不是一个元素,而是一个返回值序列的迭代器)

scala:

val lines = sc.parallelize(List("hello world", "hi"))  
val words = lines.flatMap(line => line.split(" "))  
words.first() // 返回"hello"
  • filter() (接收一个函数,并将 RDD 中满足该函数的元素放入新的 RDD 中返回)

scala:

val inputRDD = sc.textFile("log.txt")  
val errorsRDD = inputRDD.filter(line => line.contains("error"))

filter() 操作不会改变已有的 inputRDD 中的数据。实际上,该操作会返回一个全新的 RDD

  • distinct()(生成一个只包含不同元素的新 RDD)

    distinct() 操作的开销很大,因为它需要将所有数据通过网络进行混洗(shuffle),以确保每个元素都只有一份

  • sample(withReplacement, fraction, [seed])

    对 RDD 采样,以及是否替换

伪集合操作

  • union()

scala:

badLinesRDD = errorsRDD.union(warningsRDD)

union() 与 filter() 的不同点在于它操作两个 RDD 而不是一个。转化操作可以操作任意数量的输入 RDD。

  • intersection()

    只返回两个 RDD 中都有的元素,运行时也会去掉所有重复的元素(单个 RDD 内的重复元素也会一起移除)

  • subtract()

    函数接收另一个RDD作为参数,返回一个由只存在于第一个 RDD 中而不存在于第二个 RDD 中的所有元素组成的 RDD

  • cartesian()

    返回所有可能的对,即进行笛卡尔积操作

最后要说的是,通过转化操作,你从已有的 RDD 中派生出新的 RDD,Spark 会使用谱系图(lineage graph)来记录这些不同 RDD 之间的依赖关系。Spark 需要用这些信息来按需计算每个 RDD,也可以依靠谱系图在持久化的 RDD 丢失部分数据时恢复所丢失的数据

行动操作

  • reduce()

    接收一个函数作为参数,这个函数要操作两个相同元素类型的 RDD 数据并返回一个同样类型的新元素

scala:

val sum = rdd.reduce((x, y) => x + y)
  • fold()

    接收一个与 reduce() 接收的函数签名相同的函数,再加上一个“初始值”来作为每个分区第一次调用时的结果

  • aggregate()

    aggregate() 函数则把我们从返回值类型必须与所操作的 RDD 类型相同的限制中解放出来;
    需要提供我们期待返回的类型的初始值。然后通过一个函数把 RDD 中的元素合并起来放入累加器。考虑到每个节点是在本地进行累加的,最终,还需要提供第二个函数来将累加器两两合并。

scala:

val result = input.aggregate((0, 0))(
           (acc, value) => (acc._1 + value, acc._2 + 1),
           (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
val avg = result._1 / result._2.toDouble
  • collect()

    collect() 不能用在大规模数据集上,因为其可以用来获取整个 RDD 中的数据。如果你的程序把 RDD 筛选到一个很小的规模,并且你想在本地处理这些数据时,就可以使用它。谨记:只有当你的整个数据集能在单台机器的内存中放得下时,才能使用 collect()
    数据集通常很大,我们通常要把数据写到诸如 HDFS 或 Amazon S3 这样的分布式的存储系统中。你可以使用 saveAsTextFile()、saveAsSequenceFile(),或者任意的其他行动操作来把 RDD 的数据内容以各种自带的格式保存起来

  • count()

  • countByValue()

    各元素在 RDD 中出现的次数

  • take()

  • top()

  • takeOrdered(num)(ordering)

    从 RDD 中按照提供的顺序返回最前面的 num 个元素

  • takeSample(withReplacement, num, [seed])

    从 RDD 中返回任意一些元素

  • foreach(func)

    对 RDD 中的每个元素使用给定的函数

enter image description here

内存持久化

Spark RDD 是惰性求值的,而有时我们希望能多次使用同一个 RDD。如果简单地对 RDD 调用行动操作,Spark 每次都会重算 RDD 以及它的所有依赖。这在迭代算法中消耗格外大,因为迭代算法常常会多次使用同一组数据。为了避免多次计算同一个 RDD,可以让 Spark 对数据进行持久化

scala:

import org.apache.spark.storage.StorageLevel
val result = input.map(x => x * x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))

-unpersist()

手动把持久化的 RDD 从缓存中移除