第 3 章 RDD 编程

第 3 章 RDD 编程

本章介绍 Spark 对数据的核心抽象——弹性分布式数据集(Resilient Distributed Dataset,简称 RDD)。RDD 其实就是分布式的元素集合。在 Spark 中,对数据的所有操作不外乎创建 RDD、转化已有 RDD 以及调用 RDD 操作进行求值。而在这一切背后,Spark 会自动将 RDD 中的数据分发到集群上,并将操作并行化执行。

由于 RDD 是 Spark 的核心概念,因此数据科学家和工程师都应该读一读本章。我们强烈建议读者在交互式 shell(参见 2.2 节)中亲身尝试一些示例。此外,本章中的示例代码都可以在本书的 GitHub 仓库(https://github.com/databricks/learning-spark)中找到。

3.1 RDD基础

Spark 中的 RDD 就是一个不可变的分布式对象集合。每个 RDD 都被分为多个分区,这些分区运行在集群中的不同节点上。RDD 可以包含 Python、Java、Scala 中任意类型的对象,甚至可以包含用户自定义的对象。

用户可以使用两种方法创建 RDD:读取一个外部数据集,或在驱动器程序里分发驱动器程序中的对象集合(比如 list 和 set)。我们在本书前面的章节中已经见过使用 SparkContext.textFile() 来读取文本文件作为一个字符串 RDD 的示例,如例 3-1 所示。

例 3-1:在 Python 中使用 textFile() 创建一个字符串的 RDD

>>> lines = sc.textFile("README.md")

创建出来后,RDD 支持两种类型的操作:转化操作(transformation) 和行动操作(action)。转化操作会由一个 RDD 生成一个新的 RDD。例如,根据谓词匹配情况筛选数据就是一个常见的转化操作。在我们的文本文件示例中,我们可以用筛选来生成一个只存储包含单词 Python 的字符串的新的 RDD,如例 3-2 所示。

例 3-2:调用转化操作 filter()

>>> pythonLines = lines.filter(lambda line: "Python" in line)

另一方面,行动操作会对 RDD 计算出一个结果,并把结果返回到驱动器程序中,或把结果存储到外部存储系统(如 HDFS)中。first() 就是我们之前调用的一个行动操作,它会返回 RDD 的第一个元素,如例 3-3 所示。

例 3-3:调用 first() 行动操作

>>> pythonLines.first()
u'## Interactive Python Shell'

转化操作和行动操作的区别在于 Spark 计算 RDD 的方式不同。虽然你可以在任何时候定义新的 RDD,但 Spark 只会惰性计算这些 RDD。它们只有第一次在一个行动操作中用到时,才会真正计算。这种策略刚开始看起来可能会显得有些奇怪,不过在大数据领域是很有道理的。比如,看看例 3-2 和例 3-3,我们以一个文本文件定义了数据,然后把其中包含 Python 的行筛选出来。如果 Spark 在我们运行 lines = sc.textFile(...) 时就把文件中所有的行都读取并存储起来,就会消耗很多存储空间,而我们马上就要筛选掉其中的很多数据。相反,一旦 Spark 了解了完整的转化操作链之后,它就可以只计算求结果时真正需要的数据。事实上,在行动操作 first() 中,Spark 只需要扫描文件直到找到第一个匹配的行为止,而不需要读取整个文件。

最后,默认情况下,Spark 的 RDD 会在你每次对它们进行行动操作时重新计算。如果想在多个行动操作中重用同一个 RDD,可以使用 RDD.persist() 让 Spark 把这个 RDD 缓存下来。我们可以让 Spark 把数据持久化到许多不同的地方,可用的选项会在表 3-6 中列出。在第一次对持久化的 RDD 计算之后,Spark 会把 RDD 的内容保存到内存中(以分区方式存储到集群中的各机器上),这样在之后的行动操作中,就可以重用这些数据了。我们也可以把 RDD 缓存到磁盘上而不是内存中。默认不进行持久化可能也显得有些奇怪,不过这对于大规模数据集是很有意义的:如果不会重用该 RDD,我们就没有必要浪费存储空间,Spark 可以直接遍历一遍数据然后计算出结果。1

1在任何时候都能进行重算是我们为什么把 RDD 描述为“弹性”的原因。当保存 RDD 数据的一台机器失败时,Spark 还可以使用这种特性来重算出丢掉的分区,这一过程对用户是完全透明的。

在实际操作中,你会经常用 persist() 来把数据的一部分读取到内存中,并反复查询这部分数据。例如,如果我们想多次对 README 文件中包含 Python 的行进行计算,就可以写出如例 3-4 所示的脚本。

例 3-4:把 RDD 持久化到内存中

>>> pythonLines.persist

>>> pythonLines.count()
2

>>> pythonLines.first()
u'## Interactive Python Shell'

总的来说,每个 Spark 程序或 shell 会话都按如下方式工作。

(1) 从外部数据创建出输入 RDD。

(2) 使用诸如 filter() 这样的转化操作对 RDD 进行转化,以定义新的 RDD。

(3) 告诉 Spark 对需要被重用的中间结果 RDD 执行 persist() 操作。

(4) 使用行动操作(例如 count()first() 等)来触发一次并行计算,Spark 会对计算进行优化后再执行。

 cache() 与使用默认存储级别调用 persist() 是一样的。

接下来我们会对这几个步骤逐一详解,并介绍 Spark 中常见的一些 RDD 操作。

3.2 创建RDD

Spark 提供了两种创建 RDD 的方式:读取外部数据集,以及在驱动器程序中对一个集合进行并行化。

创建 RDD 最简单的方式就是把程序中一个已有的集合传给 SparkContext 的 parallelize() 方法,如例 3-5 至例 3-7 所示。这种方式在学习 Spark 时非常有用,它让你可以在 shell 中快速创建出自己的 RDD,然后对这些 RDD 进行操作。不过,需要注意的是,除了开发原型和测试时,这种方式用得并不多,毕竟这种方式需要把你的整个数据集先放在一台机器的内存中。

例 3-5:Python 中的 parallelize() 方法

lines = sc.parallelize(["pandas", "i like pandas"])

例 3-6:Scala 中的 parallelize() 方法

val lines = sc.parallelize(List("pandas", "i like pandas"))

例 3-7:Java 中的 parallelize() 方法

JavaRDD<String> lines = sc.parallelize(Arrays.asList("pandas", "i like pandas"));

更常用的方式是从外部存储中读取数据来创建 RDD。外部数据集的读取会在第 5 章详细介绍。不过,我们已经接触了用来将文本文件读入为一个存储字符串的 RDD 的方法 SparkContext.textFile(),用法如例 3-8 至例 3-10 所示。

例 3-8:Python 中的 textFile() 方法

lines = sc.textFile("/path/to/README.md")

例 3-9:Scala 中的 textFile() 方法

val lines = sc.textFile("/path/to/README.md")

例 3-10:Java 中的 textFile() 方法

JavaRDD<String> lines = sc.textFile("/path/to/README.md");

3.3 RDD操作

我们已经讨论过,RDD 支持两种操作:转化操作行动操作。RDD 的转化操作是返回一个新的 RDD 的操作,比如 map()filter(),而行动操作则是向驱动器程序返回结果或把结果写入外部系统的操作,会触发实际的计算,比如 count()first()。Spark 对待转化操作和行动操作的方式很不一样,因此理解你正在进行的操作的类型是很重要的。如果对于一个特定的函数是属于转化操作还是行动操作感到困惑,你可以看看它的返回值类型:转化操作返回的是 RDD,而行动操作返回的是其他的数据类型。

3.3.1 转化操作

RDD 的转化操作是返回新 RDD 的操作。我们会在 3.3.3 节讲到,转化出来的 RDD 是惰性求值的,只有在行动操作中用到这些 RDD 时才会被计算。许多转化操作都是针对各个元素的,也就是说,这些转化操作每次只会操作 RDD 中的一个元素。不过并不是所有的转化操作都是这样的。

举个例子,假定我们有一个日志文件 log.txt,内含有若干消息,希望选出其中的错误消息。我们可以使用前面说过的转化操作 filter()。不过这一次,我们会展示如何用 Spark 支持的三种语言的 API 分别实现(见例 3-11 至例 3-13)。

例 3-11:用 Python 实现 filter() 转化操作

inputRDD = sc.textFile("log.txt")
errorsRDD = inputRDD.filter(lambda x: "error" in x)

例 3-12:用 Scala 实现 filter() 转化操作

val inputRDD = sc.textFile("log.txt")
val errorsRDD = inputRDD.filter(line => line.contains("error"))

例 3-13:用 Java 实现 filter() 转化操作

JavaRDD<String> inputRDD = sc.textFile("log.txt");
JavaRDD<String> errorsRDD = inputRDD.filter(
  new Function<String, Boolean>() {
    public Boolean call(String x) { return x.contains("error"); }
  }
});

注意,filter() 操作不会改变已有的 inputRDD 中的数据。实际上,该操作会返回一个全新的 RDD。inputRDD 在后面的程序中还可以继续使用,比如我们还可以从中搜索别的单词。事实上,要再从 inputRDD 中找出所有包含单词 warning 的行。接下来,我们使用另一个转化操作 union() 来打印出包含 error 或 warning 的行数。下例中用 Python 作了示例,不过 union() 函数的用法在所有三种语言中是一样的。

例 3-14:用 Python 进行 union() 转化操作

errorsRDD = inputRDD.filter(lambda x: "error" in x)
warningsRDD = inputRDD.filter(lambda x: "warning" in x)
badLinesRDD = errorsRDD.union(warningsRDD)

union()filter() 的不同点在于它操作两个 RDD 而不是一个。转化操作可以操作任意数量的输入 RDD。

 要获得与例 3-14 中等价的结果,更好的方法是直接筛选出要么包含 error 要么包含 warning 的行,这样只对 inputRDD 进行一次筛选即可。

最后要说的是,通过转化操作,你从已有的 RDD 中派生出新的 RDD,Spark 会使用谱系图(lineage graph)来记录这些不同 RDD 之间的依赖关系。Spark 需要用这些信息来按需计算每个 RDD,也可以依靠谱系图在持久化的 RDD 丢失部分数据时恢复所丢失的数据。图 3-1 展示了例 3-14 中的谱系图。

图 3-1:日志分析过程中创建出的 RDD 谱系图

3.3.2 行动操作

我们已经看到了如何通过转化操作从已有的 RDD 创建出新的 RDD,不过有时,我们希望对数据集进行实际的计算。行动操作是第二种类型的 RDD 操作,它们会把最终求得的结果返回到驱动器程序,或者写入外部存储系统中。由于行动操作需要生成实际的输出,它们会强制执行那些求值必须用到的 RDD 的转化操作。

继续我们在前几章中用到的日志的例子,我们可能想输出关于 badLinesRDD 的一些信息。为此,需要使用两个行动操作来实现:用 count() 来返回计数结果,用 take() 来收集 RDD 中的一些元素,如例 3-15 至例 3-17 所示。

例 3-15:在 Python 中使用行动操作对错误进行计数

print "Input had " + badLinesRDD.count() + " concerning lines"
print "Here are 10 examples:"
for line in badLinesRDD.take(10):
    print line

例 3-16:在 Scala 中使用行动操作对错误进行计数

println("Input had " + badLinesRDD.count() + " concerning lines")
println("Here are 10 examples:")
badLinesRDD.take(10).foreach(println)

例 3-17:在 Java 中使用行动操作对错误进行计数

System.out.println("Input had " + badLinesRDD.count() + " concerning lines")
System.out.println("Here are 10 examples:")
for (String line: badLinesRDD.take(10)) {
  System.out.println(line);
}

在这个例子中,我们在驱动器程序中使用 take() 获取了 RDD 中的少量元素。然后在本地遍历这些元素,并在驱动器端打印出来。RDD 还有一个 collect() 函数,可以用来获取整个 RDD 中的数据。如果你的程序把 RDD 筛选到一个很小的规模,并且你想在本地处理这些数据时,就可以使用它。记住,只有当你的整个数据集能在单台机器的内存中放得下时,才能使用 collect(),因此,collect() 不能用在大规模数据集上。

在大多数情况下,RDD 不能通过 collect() 收集到驱动器进程中,因为它们一般都很大。此时,我们通常要把数据写到诸如 HDFS 或 Amazon S3 这样的分布式的存储系统中。你可以使用 saveAsTextFile()saveAsSequenceFile(),或者任意的其他行动操作来把 RDD 的数据内容以各种自带的格式保存起来。我们会在第 5 章讲解导出数据的各种选项。

需要注意的是,每当我们调用一个新的行动操作时,整个 RDD 都会从头开始计算。要避免这种低效的行为,用户可以将中间结果持久化,这会在 3.6 节中介绍。

3.3.3 惰性求值

前面提过,RDD 的转化操作都是惰性求值的。这意味着在被调用行动操作之前 Spark 不会开始计算。这对新用户来说可能与直觉有些相违背之处,但是对于那些使用过诸如 Haskell 等函数式语言或者类似 LINQ 这样的数据处理框架的人来说,会有些似曾相识。

惰性求值意味着当我们对 RDD 调用转化操作(例如调用 map())时,操作不会立即执行。相反,Spark 会在内部记录下所要求执行的操作的相关信息。我们不应该把 RDD 看作存放着特定数据的数据集,而最好把每个 RDD 当作我们通过转化操作构建出来的、记录如何计算数据的指令列表。把数据读取到 RDD 的操作也同样是惰性的。因此,当我们调用 sc.textFile() 时,数据并没有读取进来,而是在必要时才会读取。和转化操作一样的是,读取数据的操作也有可能会多次执行。

 虽然转化操作是惰性求值的,但还是可以随时通过运行一个行动操作来强制 Spark 执行 RDD 的转化操作,比如使用 count()。这是一种对你所写的程序进行部分测试的简单方法。

Spark 使用惰性求值,这样就可以把一些操作合并到一起来减少计算数据的步骤。在类似 Hadoop MapReduce 的系统中,开发者常常花费大量时间考虑如何把操作组合到一起,以减少 MapReduce 的周期数。而在 Spark 中,写出一个非常复杂的映射并不见得能比使用很多简单的连续操作获得好很多的性能。因此,用户可以用更小的操作来组织他们的程序, 这样也使这些操作更容易管理。

3.4 向Spark传递函数

Spark 的大部分转化操作和一部分行动操作,都需要依赖用户传递的函数来计算。在我们支持的三种主要语言中,向 Spark 传递函数的方式略有区别。

3.4.1 Python

在 Python 中,我们有三种方式来把函数传递给 Spark。传递比较短的函数时,可以使用 lambda 表达式来传递,如例 3-2 和例 3-18 所示。除了 lambda 表达式,我们也可以传递顶层函数或是定义的局部函数。

例 3-18:在 Python 中传递函数

word = rdd.filter(lambda s: "error" in s)

def containsError(s):
    return "error" in s
word = rdd.filter(containsError)

传递函数时需要小心的一点是,Python 会在你不经意间把函数所在的对象也序列化传出去。当你传递的对象是某个对象的成员,或者包含了对某个对象中一个字段的引用时(例如 self.field),Spark 就会把整个对象发到工作节点上,这可能比你想传递的东西大得多(见例 3-19)。有时,如果传递的类里面包含 Python 不知道如何序列化传输的对象,也会导致你的程序失败。

例 3-19:传递一个带字段引用的函数(别这么做!)

class SearchFunctions(object):
  def __init__(self, query):
      self.query = query
  def isMatch(self, s):
      return self.query in s
  def getMatchesFunctionReference(self, rdd):
      # 问题:在"self.isMatch"中引用了整个self
      return rdd.filter(self.isMatch)
  def getMatchesMemberReference(self, rdd):
      # 问题:在"self.query"中引用了整个self
      return rdd.filter(lambda x: self.query in x)

替代的方案是,只把你所需要的字段从对象中拿出来放到一个局部变量中,然后传递这个局部变量,如例 3-20 所示。

例 3-20:传递不带字段引用的 Python 函数

class WordFunctions(object):
  ...
  def getMatchesNoReference(self, rdd):
      # 安全:只把需要的字段提取到局部变量中
      query = self.query
      return rdd.filter(lambda x: query in x)

3.4.2 Scala

在 Scala 中,我们可以把定义的内联函数、方法的引用或静态方法传递给 Spark,就像 Scala 的其他函数式 API 一样。我们还要考虑其他一些细节,比如所传递的函数及其引用的数据需要是可序列化的(实现了 Java 的 Serializable 接口)。除此以外,与 Python 类似,传递一个对象的方法或者字段时,会包含对整个对象的引用。这在 Scala 中不是那么明显,毕竟我们不会像 Python 那样必须用 self 写出那些引用。类似在例 3-20 中对 Python 执行的操作,我们可以把需要的字段放到一个局部变量中,来避免传递包含该字段的整个对象,如例 3-21 所示。

例 3-21:Scala 中的函数传递

class SearchFunctions(val query: String) {
  def isMatch(s: String): Boolean = {
    s.contains(query)
  }
  def getMatchesFunctionReference(rdd: RDD[String]): RDD[String] = {
    // 问题:"isMatch"表示"this.isMatch",因此我们要传递整个"this"
    rdd.map(isMatch)
  }
  def getMatchesFieldReference(rdd: RDD[String]): RDD[String] = {
    // 问题:"query"表示"this.query",因此我们要传递整个"this"
    rdd.map(x => x.split(query))
  }
  def getMatchesNoReference(rdd: RDD[String]): RDD[String] = {
    // 安全:只把我们需要的字段拿出来放入局部变量中
    val query_ = this.query
    rdd.map(x => x.split(query_))
  }
}

如果在 Scala 中出现了 NotSerializableException,通常问题就在于我们传递了一个不可序列化的类中的函数或字段。记住,传递局部可序列化变量或顶级对象中的函数始终是安全的。

3.4.3 Java

在 Java 中,函数需要作为实现了 Spark 的 org.apache.spark.api.java.function 包中的任一函数接口的对象来传递。根据不同的返回类型,我们定义了一些不同的接口。我们把最基本的一些函数接口列在表 3-1 中,同时介绍了一些其他的函数接口,在需要返回特殊类型(比如键值对)的数据时使用,参见 3.5.2 节中的“Java”一节。

表3-1:标准Java函数接口

函数名

实现的方法

用途

Function<T, R>

R call(T)

接收一个输入值并返回一个输出值,用于类似 map()filter() 等操作中

Function2<T1, T2, R>

R call(T1, T2)

接收两个输入值并返回一个输出值,用于类似 aggregate()fold() 等操作中

FlatMapFunction<T, R>

Iterable<R> call(T)

接收一个输入值并返回任意个输出,用于类似 flatMap() 这样的操作中

可以把我们的函数类内联定义为使用匿名内部类(例 3-22),也可以创建一个具名类(例 3-23)。

例 3-22:在 Java 中使用匿名内部类进行函数传递

RDD<String> errors = lines.filter(new Function<String, Boolean>() {
  public Boolean call(String x) { return x.contains("error"); }
});

例 3-23:在 Java 中使用具名类进行函数传递

class ContainsError implements Function<String, Boolean>() {
  public Boolean call(String x) { return x.contains("error"); }
}

RDD<String> errors = lines.filter(new ContainsError());

具体风格的选择取决于个人偏好。不过我们发现顶级具名类通常在组织大型程序时显得比较清晰。使用顶级函数的另一个好处在于你可以给它们的构造函数添加参数,如例 3-24 所示。

例 3-24:带参数的 Java 函数类

class Contains implements Function<String, Boolean>() {
  private String query;
  public Contains(String query) { this.query = query; }
  public Boolean call(String x) { return x.contains(query); }
}

RDD<String> errors = lines.filter(new Contains("error"));

在 Java 8 中,你也可以使用 lambda 表达式来简洁地实现函数接口。由于在本书写作时,Java 8 仍然相对比较新,我们的示例就使用了之前版本的 Java,以更啰嗦的语法来定义函数类。不过,如果使用 lambda 表达式,我们的搜索示例就会变得如例 3-25 所示那样。

例 3-25:在 Java 中使用 Java 8 地 lambda 表达式进行函数传递

RDD<String> errors = lines.filter(s -> s.contains("error"));

如果你对使用 Java 8 的 lambda 表达式感兴趣,请参考 Oracle 的相关文档(http://docs.oracle.com/javase/tutorial/java/javaOO/lambdaexpressions.html)以及 Databricks 关于如何在 Spark 中 使用 lambda 表达式的博客(http://databricks.com/blog/2014/04/14/spark-with-java-8.html)。

 匿名内部类和 lambda 表达式都可以引用方法中封装的任意 final 变量,因此你可以像在 Python 和 Scala 中一样把这些变量传递给 Spark。

3.5 常见的转化操作和行动操作

本节我们会接触 Spark 中大部分常见的转化操作和行动操作。包含特定数据类型的 RDD 还支持一些附加操作,例如,数字类型的 RDD 支持统计型函数操作,而键值对形式的 RDD 则支持诸如根据键聚合数据的键值对操作。我们也会在后面几节中讲到如何转换 RDD 类型,以及各类型对应的特殊操作。

3.5.1 基本RDD

首先来讲讲哪些转化操作和行动操作受任意数据类型的 RDD 支持。

1. 针对各个元素的转化操作

你很可能会用到的两个最常用的转化操作是 map()filter()(见图 3-2)。转化操作 map() 接收一个函数,把这个函数用于 RDD 中的每个元素,将函数的返回结果作为结果 RDD 中对应元素的值。而转化操作 filter() 则接收一个函数,并将 RDD 中满足该函数的元素放入新的 RDD 中返回。

图 3-2:从输入 RDD 映射与筛选得到的 RDD

我们可以使用 map() 来做各种各样的事情:可以把我们的 URL 集合中的每个 URL 对应的主机名提取出来,也可以简单到只对各个数字求平方值。map() 的返回值类型不需要和输入类型一样。这样如果有一个字符串 RDD,并且我们的 map() 函数是用来把字符串解析并返回一个 Double 值的,那么此时我们的输入 RDD 类型就是 RDD[String],而输出类型是 RDD[Double]

让我们看一个简单的例子,用 map() 对 RDD 中的所有数求平方(如例 3-26 至例 3-28 所示)。

例 3-26:Python 版计算 RDD 中各值的平方

nums = sc.parallelize([1, 2, 3, 4])
squared = nums.map(lambda x: x * x).collect()
for num in squared:
    print "%i " % (num)

例 3-27:Scala 版计算 RDD 中各值的平方

val input = sc.parallelize(List(1, 2, 3, 4))
val result = input.map(x => x * x)
println(result.collect().mkString(","))

例 3-28:Java 版计算 RDD 中各值的平方

JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
JavaRDD<Integer> result = rdd.map(new Function<Integer, Integer>() {
  public Integer call(Integer x) { return x*x; }
});
System.out.println(StringUtils.join(result.collect(), ","));

有时候,我们希望对每个输入元素生成多个输出元素。实现该功能的操作叫作 flatMap()。和 map() 类似,我们提供给 flatMap() 的函数被分别应用到了输入 RDD 的每个元素上。不过返回的不是一个元素,而是一个返回值序列的迭代器。输出的 RDD 倒不是由迭代器组成的。我们得到的是一个包含各个迭代器可访问的所有元素的 RDD。flatMap() 的一个简单用途是把输入的字符串切分为单词,如例 3-29 至例 3-31 所示。

例 3-29:Python 中的 flatMap() 将行数据切分为单词

lines = sc.parallelize(["hello world", "hi"])
words = lines.flatMap(lambda line: line.split(" "))
words.first() # 返回"hello"

例 3-30:Scala 中的 flatMap() 将行数据切分为单词

val lines = sc.parallelize(List("hello world", "hi"))
val words = lines.flatMap(line => line.split(" "))
words.first() // 返回"hello"

例 3-31:Java 中的 flatMap() 将行数据切分为单词

JavaRDD<String> lines = sc.parallelize(Arrays.asList("hello world", "hi"));
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
  public Iterable<String> call(String line) {
    return Arrays.asList(line.split(" "));
  }
});
words.first(); // 返回"hello"

我们在图 3-3 中阐释了 flatMap()map() 的区别。你可以把 flatMap() 看作将返回的迭代器“拍扁”,这样就得到了一个由各列表中的元素组成的 RDD,而不是一个由列表组成的 RDD。

图 3-3:RDD 的 flatMap()map() 的区别

2. 伪集合操作

尽管 RDD 本身不是严格意义上的集合,但它也支持许多数学上的集合操作,比如合并和相交操作。图 3-4 展示了四种操作。注意,这些操作都要求操作的 RDD 是相同数据类型的。

我们的 RDD 中最常缺失的集合属性是元素的唯一性,因为常常有重复的元素。如果只要唯一的元素,我们可以使用 RDD.distinct() 转化操作来生成一个只包含不同元素的新 RDD。不过需要注意,distinct() 操作的开销很大,因为它需要将所有数据通过网络进行混洗(shuffle),以确保每个元素都只有一份。第 4 章会详细介绍数据混洗,以及如何避免数据混洗。

图 3-4:一些简单的集合操作

最简单的集合操作是 union(other),它会返回一个包含两个 RDD 中所有元素的 RDD。这在很多用例下都很有用,比如处理来自多个数据源的日志文件。与数学中的 union() 操作不同的是,如果输入的 RDD 中有重复数据,Spark 的 union() 操作也会包含这些重复数据(如有必要,我们可以通过 distinct() 实现相同的效果)。

Spark 还提供了 intersection(other) 方法,只返回两个 RDD 中都有的元素。intersection() 在运行时也会去掉所有重复的元素(单个 RDD 内的重复元素也会一起移除)。尽管 intersection()union() 的概念相似,intersection() 的性能却要差很多,因为它需要通过网络混洗数据来发现共有的元素。

有时我们需要移除一些数据。subtract(other) 函数接收另一个RDD作为参数,返回一个由只存在于第一个 RDD 中而不存在于第二个 RDD 中的所有元素组成的 RDD。和 intersection() 一样,它也需要数据混洗。

我们也可以计算两个 RDD 的笛卡儿积,如图 3-5 所示。cartesian(other) 转化操作会返回所有可能的 (a, b) 对,其中 a 是源 RDD 中的元素,而 b 则来自另一个 RDD。笛卡儿积在我们希望考虑所有可能的组合的相似度时比较有用,比如计算各用户对各种产品的预期兴趣程度。我们也可以求一个 RDD 与其自身的笛卡儿积,这可以用于求用户相似度的应用中。不过要特别注意的是,求大规模 RDD 的笛卡儿积开销巨大。

图 3-5:两个 RDD 的笛卡儿积

表 3-2 和表 3-3 总结了这些常见的 RDD 转化操作。

表3-2:对一个数据为{1, 2, 3, 3}的RDD进行基本的RDD转化操作

函数名

目的

示例

结果

map()

将函数应用于 RDD 中的每个元素,将返回值构成新的 RDD

rdd.map(x => x + 1)

{2, 3, 4, 4}

flatMap()

将函数应用于 RDD 中的每个元素,将返回的迭代器的所有内容构成新的 RDD。通常用来切分单词

rdd.flatMap(x => x.to(3))

{1, 2, 3, 2, 3, 3, 3}

filter()

返回一个由通过传给 filter() 的函数的元素组成的 RDD

rdd.filter(x => x != 1)

{2, 3, 3}

distinct()

去重

rdd.distinct()

{1, 2, 3}

sample(withReplacement, fraction, [seed])

对 RDD 采样,以及是否替换

rdd.sample(false, 0.5)

非确定的

表3-3:对数据分别为{1, 2, 3}和{3, 4, 5}的RDD进行针对两个RDD的转化操作

函数名

目的

示例

结果

union()

生成一个包含两个 RDD 中所有元素的 RDD

rdd.union(other)

{1, 2, 3, 3, 4, 5}

intersection()

求两个 RDD 共同的元素的 RDD

rdd.intersection(other)

{3}

subtract()

移除一个 RDD 中的内容(例如移除训练数据)

rdd.subtract(other)

{1, 2}

cartesian()

与另一个 RDD 的笛卡儿积

rdd.cartesian(other)

{(1, 3), (1, 4), ...(3, 5)}

3. 行动操作

你很有可能会用到基本 RDD 上最常见的行动操作 reduce()。它接收一个函数作为参数,这个函数要操作两个相同元素类型的 RDD 数据并返回一个同样类型的新元素。一个简单的例子就是函数 +,可以用它来对我们的 RDD 进行累加。使用 reduce(),可以很方便地计算出 RDD 中所有元素的总和、元素的个数,以及其他类型的聚合操作(如例 3-32 至例 3-34 所示)。

例 3-32:Python 中的 reduce()

sum = rdd.reduce(lambda x, y: x + y)

例 3-33:Scala 中的 reduce()

val sum = rdd.reduce((x, y) => x + y)

例 3-34:Java 中的 reduce()

Integer sum = rdd.reduce(new Function2<Integer, Integer, Integer>() {
  public Integer call(Integer x, Integer y) { return x + y; }
});

fold()reduce() 类似,接收一个与 reduce() 接收的函数签名相同的函数,再加上一个“初始值”来作为每个分区第一次调用时的结果。你所提供的初始值应当是你提供的操作的单位元素;也就是说,使用你的函数对这个初始值进行多次计算不会改变结果(例如 + 对应的 0,* 对应的 1,或拼接操作对应的空列表)。

 我们可以通过原地修改并返回两个参数中的前一个的值来节约在 fold() 中创建对象的开销。但是你没有办法修改第二个参数。

fold()reduce() 都要求函数的返回值类型需要和我们所操作的 RDD 中的元素类型相同。这很符合像 sum 这种操作的情况。但有时我们确实需要返回一个不同类型的值。例如,在计算平均值时,需要记录遍历过程中的计数以及元素的数量,这就需要我们返回一个二元组。可以先对数据使用 map() 操作,来把元素转为该元素和 1 的二元组,也就是我们所希望的返回类型。这样 reduce() 就可以以二元组的形式进行归约了。

aggregate() 函数则把我们从返回值类型必须与所操作的 RDD 类型相同的限制中解放出来。与 fold() 类似,使用 aggregate() 时,需要提供我们期待返回的类型的初始值。然后通过一个函数把 RDD 中的元素合并起来放入累加器。考虑到每个节点是在本地进行累加的,最终,还需要提供第二个函数来将累加器两两合并。

我们可以用 aggregate() 来计算 RDD 的平均值,来代替 map() 后面接 fold() 的方式,如例 3-35 至例 3-37 所示。

例 3-35:Python 中的 aggregate()

sumCount = nums.aggregate((0, 0),
               (lambda acc, value: (acc[0] + value, acc[1] + 1),
               (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))))
return sumCount[0] / float(sumCount[1])

例 3-36:Scala 中的 aggregate()

val result = input.aggregate((0, 0))(
               (acc, value) => (acc._1 + value, acc._2 + 1),
               (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
val avg = result._1 / result._2.toDouble

例 3-37:Java 中的 aggregate()

class AvgCount implements Serializable {
  public AvgCount(int total, int num) {
    this.total = total;
    this.num = num;
  }
  public int total;
  public int num;
  public double avg() {
    return total / (double) num;
  }
}
Function2<AvgCount, Integer, AvgCount> addAndCount =
  new Function2<AvgCount, Integer, AvgCount>() {
    public AvgCount call(AvgCount a, Integer x) {
      a.total += x;
      a.num += 1;
      return a;
  }
};
Function2<AvgCount, AvgCount, AvgCount> combine =
  new Function2<AvgCount, AvgCount, AvgCount>() {
  public AvgCount call(AvgCount a, AvgCount b) {
    a.total += b.total;
    a.num += b.num;
    return a;
  }
};
AvgCount initial = new AvgCount(0, 0);
AvgCount result = rdd.aggregate(initial, addAndCount, combine);
System.out.println(result.avg());

RDD 的一些行动操作会以普通集合或者值的形式将 RDD 的部分或全部数据返回驱动器程序中。

把数据返回驱动器程序中最简单、最常见的操作是 collect(),它会将整个 RDD 的内容返回。collect() 通常在单元测试中使用,因为此时 RDD 的整个内容不会很大,可以放在内存中。使用 collect() 使得 RDD 的值与预期结果之间的对比变得很容易。由于需要将数据复制到驱动器进程中,collect() 要求所有数据都必须能一同放入单台机器的内存中。

take(n) 返回 RDD 中的 n 个元素,并且尝试只访问尽量少的分区,因此该操作会得到一个不均衡的集合。需要注意的是,这些操作返回元素的顺序与你预期的可能不一样。

这些操作对于单元测试和快速调试都很有用,但是在处理大规模数据时会遇到瓶颈。

如果为数据定义了顺序,就可以使用 top() 从 RDD 中获取前几个元素。top() 会使用数据的默认顺序,但我们也可以提供自己的比较函数,来提取前几个元素。

有时需要在驱动器程序中对我们的数据进行采样。takeSample(withReplacement, num, seed) 函数可以让我们从数据中获取一个采样,并指定是否替换。

有时我们会对 RDD 中的所有元素应用一个行动操作,但是不把任何结果返回到驱动器程序中,这也是有用的。比如可以用 JSON 格式把数据发送到一个网络服务器上,或者把数据存到数据库中。不论哪种情况,都可以使用 foreach() 行动操作来对 RDD 中的每个元素进行操作,而不需要把 RDD 发回本地。

关于基本 RDD 上的更多标准操作,我们都可以从其名称推测出它们的行为。count() 用来返回元素的个数,而 countByValue() 则返回一个从各值到值对应的计数的映射表。表 3-4 总结了这些行动操作。

表3-4:对一个数据为{1, 2, 3, 3}的RDD进行基本的RDD行动操作

函数名

目的

示例

结果

collect()

返回 RDD 中的所有元素

rdd.collect()

{1, 2, 3, 3}

count()

RDD 中的元素个数

rdd.count()

4

countByValue()

各元素在 RDD 中出现的次数

rdd.countByValue()

{(1, 1),(2, 1),(3, 2)}

take(num)

从 RDD 中返回 num 个元素

rdd.take(2)

{1, 2}

top(num)

从 RDD 中 返回最前面的 num 个元素

rdd.top(2)

{3, 3}

takeOrdered(num)(ordering)

从 RDD 中按照提供的顺序返回最前面的 num 个元素

rdd.takeOrdered(2)(myOrdering)

{3, 3}

takeSample(withReplacement, num, [seed])

从 RDD 中返回任意一些元素

rdd.takeSample(false, 1)

非确定的

reduce(func)

并行整合 RDD 中 所有数据(例如 sum

rdd.reduce((x, y) => x + y)

9

fold(zero)(func)

reduce() 一样,但是需要提供初始值

rdd.fold(0)((x, y) => x + y)

9

aggregate(zeroValue)(seqOp, combOp)

reduce() 相似,但是通常返回不同类型的函数

rdd.aggregate((0, 0))
((x, y) =>
(x._1 + y, x._2 + 1),
(x, y) =>
(x._1 + y._1, x._2 + y._2))

(9,4)

foreach(func)

对 RDD 中的每个元素使用给定的函数

rdd.foreach(func)

3.5.2 在不同RDD类型间转换

有些函数只能用于特定类型的 RDD,比如 mean()variance() 只能用在数值 RDD 上,而 join() 只能用在键值对 RDD 上。我们会在第 6 章讨论数值 RDD 的专门函数,在第 4 章讨论键值对 RDD 的专有操作。在 Scala 和 Java 中,这些函数都没有定义在标准的 RDD 类中,所以要访问这些附加功能,必须要确保获得了正确的专用 RDD 类。

1. Scala

在 Scala 中,将 RDD 转为有特定函数的 RDD(比如在 RDD[Double] 上进行数值操作)是由隐式转换来自动处理的。2.4.1 节中提到过,我们需要加上 import org.apache.spark.SparkContext._ 来使用这些隐式转换。你可以在 SparkContext 对象的 Scala 文档(http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext$)中查看所列出的隐式转换。这些隐式转换可以隐式地将一个 RDD 转为各种封装类,比如 DoubleRDDFunctions(数值数据的 RDD)和 PairRDDFunctions(键值对 RDD),这样我们就有了诸如 mean()variance() 之类的额外的函数。

隐式转换虽然强大,但是会让阅读代码的人感到困惑。如果你对 RDD 调用了像 mean() 这样的函数,可能会发现 RDD 类的 Scala 文档(http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD)中根本没有 mean() 函数。调用之所以能够成功,是因为隐式转换可以把 RDD[Double] 转为 DoubleRDDFunctions。当我们在 Scala 文档中查找函数时,不要忘了那些封装的专用类中的函数。

2. Java

在 Java 中,各种 RDD 的特殊类型间的转换更为明确。Java 中有两个专门的类 JavaDoubleRDDJavaPairRDD,来处理特殊类型的 RDD,这两个类还针对这些类型提供了额外的函数。这让你可以更加了解所发生的一切,但是也显得有些累赘。

要构建出这些特殊类型的 RDD,需要使用特殊版本的类来替代一般使用的 Function 类。如果要从 T 类型的 RDD 创建出一个 JavaDoubleRDD,我们就应当在映射操作中使用 DoubleFunction<T> 来替代 Function<T, Double>。表 3-5 展示了一些特殊版本的函数类及其用法。

此外,我们也需要调用 RDD 上的一些别的函数(因此不能只是创建出一个 DoubleFunction 然后把它传给 map())。当需要一个 JavaDoubleRDD 时,我们应当调用 mapToDouble() 来替代 map(),跟其他所有函数所遵循的模式一样。

表3-5:Java中针对专门类型的函数接口

函数名

等价函数

用途

DoubleFlatMapFunction<T>

Function<T, Iterable<Double>>

用于 flatMapToDouble,以生成 DoubleRDD

DoubleFunction<T>

Function<T, Double>

用于 mapToDouble,以生成 DoubleRDD

PairFlatMapFunction<T, K, V>

Function<T, Iterable<Tuple2<K, V>>>

用于 flatMapToPair,以生成 PairRDD<K, V>

PairFunction<T, K, V>

Function<T, Tuple2<K, V>>

用 于 mapToPair, 以生成 PairRDD<K, V>

我们可以把例 3-28 修改为生成一个 JavaDoubleRDD、计算 RDD 中每个元素的平方值的示例,如例 3-38 所示。这样就可以调用 DoubleRDD 独有的函数了,比如 mean()variance()

例 3-38:用 Java 创建 DoubleRDD

JavaDoubleRDD result = rdd.mapToDouble(
  new DoubleFunction<Integer>() {
    public double call(Integer x) {
      return (double) x * x;
    }
});
System.out.println(result.mean());

3. Python

Python 的 API 结构与 Java 和 Scala 有所不同。在 Python 中,所有的函数都实现在基本的 RDD 类中,但如果操作对应的 RDD 数据类型不正确,就会导致运行时错误。

3.6 持久化(缓存)

如前所述,Spark RDD 是惰性求值的,而有时我们希望能多次使用同一个 RDD。如果简单地对 RDD 调用行动操作,Spark 每次都会重算 RDD 以及它的所有依赖。这在迭代算法中消耗格外大,因为迭代算法常常会多次使用同一组数据。例 3-39 就是先对 RDD 作一次计数、再把该 RDD 输出的一个小例子。

例 3-39:Scala 中的两次执行

val result = input.map(x => x*x)
println(result.count())
println(result.collect().mkString(","))

为了避免多次计算同一个 RDD,可以让 Spark 对数据进行持久化。当我们让 Spark 持久化存储一个 RDD 时,计算出 RDD 的节点会分别保存它们所求出的分区数据。如果一个有持久化数据的节点发生故障,Spark 会在需要用到缓存的数据时重算丢失的数据分区。如果希望节点故障的情况不会拖累我们的执行速度,也可以把数据备份到多个节点上。

出于不同的目的,我们可以为 RDD 选择不同的持久化级别(如表 3-6 所示)。在 Scala(见例 3-40)和 Java 中,默认情况下 persist() 会把数据以序列化的形式缓存在 JVM 的堆空间中。在 Python 中,我们会始终序列化要持久化存储的数据,所以持久化级别默认值就是以序列化后的对象存储在 JVM 堆空间中。当我们把数据写到磁盘或者堆外存储上时,也总是使用序列化后的数据。

表3-6:org.apache.spark.storage.StorageLevelpyspark.StorageLevel中的持久化级别;如有必要,可以通过在存储级别的末尾加上“_2”来把持久化数据存为两份

级别

使用的空间

CPU时间

是否在内存中

是否在磁盘上

备注

MEMORY_ONLY

 

MEMORY_ONLY_SER

 

MEMORY_AND_DISK

中等

部分

部分

如果数据在内存中放不下,则溢写到磁盘上

MEMORY_AND_DISK_SER

部分

部分

如果数据在内存中放不下,则溢写到磁盘上。在内存中存放序列化后的数据

DISK_ONLY

 

 堆外缓存是试验性功能,我们使用 Tachyon(http://tachyon-project.org/)作为外部系统。如果你对 Spark 的堆外缓存有兴趣,可以看看关于如何在 Tachyon 上运行 Spark 的介绍(http://tachyon-project.org/Running-Spark-on-Tachyon.html)。

例 3-40:在 Scala 中使用 persist()

import org.apache.spark.storage.StorageLevel
val result = input.map(x => x * x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))

注意,我们在第一次对这个 RDD 调用行动操作前就调用了 persist() 方法。persist() 调用本身不会触发强制求值。

如果要缓存的数据太多,内存中放不下,Spark 会自动利用最近最少使用(LRU)的缓存策略把最老的分区从内存中移除。对于仅把数据存放在内存中的缓存级别,下一次要用到已经被移除的分区时,这些分区就需要重新计算。但是对于使用内存与磁盘的缓存级别的分区来说,被移除的分区都会写入磁盘。不论哪一种情况,都不必担心你的作业因为缓存了太多数据而被打断。不过,缓存不必要的数据会导致有用的数据被移出内存,带来更多重算的时间开销。

最后,RDD 还有一个方法叫作 unpersist(),调用该方法可以手动把持久化的 RDD 从缓存中移除。

3.7 总结

在本章中,我们介绍了 RDD 运行模型以及 RDD 的许多常见操作。如果你读到了这里,恭喜——你已经学完了 Spark 的所有核心概念。我们在进行并行聚合、分组等操作时,常常需要利用键值对形式的 RDD。下一章会讲解键值对形式的 RDD 上一些相关的特殊操作。然后,我们会讨论各种数据源的输入输出,以及一些关于使用 SparkContext 的进阶话题。

目录