进阶编程

共享变量

通常在向 Spark 传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。Spark 的两个共享变量,累加器与广播变量,分别为结果聚合与广播这两种常见的通信模式突破了这一限制。

累加器

在scala中累加空行

val sc = new SparkContext(...)
val file = sc.textFile("file.txt")

val blankLines = sc.accumulator(0) // 创建Accumulator[Int]并初始化为0

val callSigns = file.flatMap(line => {
if (line == "") {
    blankLines += 1 // 累加器加1
}
line.split(" ")
})

callSigns.saveAsTextFile("output.txt")
println("Blank lines: " + blankLines.value)
  • 总结起来,累加器的用法如下所示:

    • 通过在驱动器中调用 SparkContext.accumulator(initialValue) 方法,创建出存有初始值的累加器。返回值为 org.apache.spark.Accumulator[T] 对象,其中 T 是初始值 initialValue 的类型。

    • Spark 闭包里的执行器代码可以使用累加器的 += 方法(在 Java 中是 add)增加累加器的值。

    • 驱动器程序可以调用累加器的 value 属性(在 Java 中使用 value() 或 setValue())来访问累加器的值。

注意:工作节点上的任务不能访问累加器的值。从这些任务的角度来看,累加器是一个只写变量。在这种模式下,累加器的实现可以更加高效,不需要对每次更新操作进行复杂的通信。

  • 累加器容错性

    • Spark 会自动重新执行失败的或较慢的任务来应对有错误的或者比较慢的机器。

      例如:
      1.如果对某分区执行map()操作的节点失败了,Spark会在另一个节点上重新运行该任务。
      2.即使该节点没有崩溃,而只是处理速度比别的节点慢很多,Spark也可以抢占式地在另一个节点上启动一个任务副本,如果该任务更早结束就可以直接获取结果。
      3.即使没有节点失败,Spark有时也需要重新运行任务来获取缓存中被移除出内存的数据。
      因此最终结果就是同一个函数可能对同一份数据运行了多次,这取决于集群的动态。

    • 对于要在行动操作中使用的累加器,Spark只会把每个任务对各累加器的修改应用一次。因此,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,我们必须把它放在 foreach() 这样的行动操作中。

    • 对于在 RDD 转化操作中使用的累加器,就不能保证有这种情况了。转化操作中累加器可能会发生不止一次更新。在转化操作中,累加器通常只用于调试目的。

  • 广播的优化

    • 当广播一个比较大的值时,选择既快又好的序列化格式是很重要的,因为如果序列化对象的时间很长或者传送花费的时间太久,这段时间很容易就成为性能瓶颈,可以使用spark.serializer

广播变量

它可以让程序高效地向所有工作节点发送一个较大的只读值,以供一个或多个 Spark 操作使用。

scala:

// 查询RDD contactCounts中的呼号的对应位置。将呼号前缀
// 读取为国家代码来进行查询
val signPrefixes = sc.broadcast(loadCallSignTable())
val countryContactCounts = contactCounts.map{case (sign, count) =>
val country = lookupInArray(sign, signPrefixes.value)
    (country, count)
}.reduceByKey((x, y) => x + y)
countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")

基于分区进行操作

基于分区对数据进行操作可以让我们避免为每个数据元素进行重复的配置工作。
Spark 提供基于分区的 map 和 foreach,让你的部分代码只对 RDD 的每个分区运行一次,这样可以帮助降低这些操作的代价。

enter image description here

与外部程序间的管道

有三种可用的语言供你选择,这可能已经满足了你用来编写 Spark 应用的几乎所有需求。Spark 在 RDD 上提供 pipe() 方法。

例如Spark与R语言的结合

R语言:

#!/usr/bin/env Rscript
library("Imap")
f <- file("stdin")
open(f)
while(length(line <- readLines(f,n=1)) > 0) {
# 处理行
contents <- Map(as.numeric, strsplit(line, ","))
mydist <- gdist(contents[[1]][2], contents[[1]][2],
                 contents[[1]][3], contents[[1]][4],
                 units="m", a=6378137.0, b=6356752.3142, verbose = FALSE)
write(mydist, stdout())
}

在scala中使用pipe()调用R文件:

// 使用一个R语言外部程序计算每次呼叫的距离
// 将脚本添加到各个节点需要在本次作业中下载的文件的列表中
val distScript = "./src/R/finddistance.R"
val distScriptName = "finddistance.R"
sc.addFile(distScript)
val distances = contactsContactLists.values.flatMap(x => 
x.map(y =>s"$y.contactlay,$y.contactlong,$y.mylat,$y.mylong"))
.pipe(Seq(SparkFiles.get(distScriptName)))
println(distances.collect().toList)

SparkContext.addFile(path),可以构建一个文件列表,让每个工作节点在 Spark 作业中下载列表中的文件。
SparkFiles.get(Filename)来定位单个文件

  • RDD的pipe()方法让RDD容易通过脚本管道
    • rdd.pipe(Seq(SparkFiles.get("finddistance.R"), ","))
    • rdd.pipe(SparkFiles.get("finddistance.R") + " ,")

数值RDD的操作

Spark 的数值操作是通过流式算法实现的,允许以每次一个元素的方式构建出模型。这些统计数据都会在调用 stats() 时通过一次遍历数据计算出来,并以 StatsCounter 对象返回。

enter image description here

Scala移除异常值:

// 现在要移除一些异常值,因为有些地点可能是误报的
// 首先要获取字符串RDD并将它转换为双精度浮点型
val distanceDouble = distance.map(string => string.toDouble)
val stats = distanceDoubles.stats()
val stddev = stats.stdev
val mean = stats.mean
val reasonableDistances = distanceDoubles.filter(x => math.abs(x-mean) < 3 * stddev)
println(reasonableDistance.collect().toList)