Spark调优与调试

使用SparkConf配置Spark

在 Scala 中使用 SparkConf 创建一个应用

// 创建一个conf对象
val conf = new SparkConf()
conf.set("spark.app.name", "My Spark App")
conf.set("spark.master", "local[4]")
conf.set("spark.ui.port", "36000") // 重载默认端口配置

// 使用这个配置对象创建一个SparkContext
val sc = new SparkContext(conf)

在运行时使用标记设置配置项的值

$ bin/spark-submit \
--class com.example.MyApp \
--master local[4] \
--name "My Spark App" \
--conf spark.ui.port=36000 \
myApp.jar

运行时使用默认文件设置配置项的值

$ bin/spark-submit \
--class com.example.MyApp \
--properties-file my-config.conf \
myApp.jar

#Contents of my-config.conf
spark.master    local[4]
spark.app.name  "My Spark App"
spark.ui.port   36000

Spark的执行过程

  • 用户代码定义RDD的有向无环图
    • RDD上的操作会创建出新的RDD,并引用它们的父节点,这样就创建处理一个图
    • 调度器为有向图中的每个 RDD 输出计算步骤,步骤中包括 RDD 上需要应用于每个分区的任务。然后以相反的顺序执行这些步骤,计算得出最终所求的 RDD。(RDD图与执行步骤的对应关系并不一定是一一对应的,比如调度器进行流水线执行或者把多个RDD合并到一个步骤中)。

  • 行动操作把有向无环图强制转译为执行计划

    • Spark 调度器提交一个作业(特定的行动操作生成的步骤的集合)来计算所有必要的 RDD。这个作业会包含一个或多个步骤,每个步骤其实也就是一波并行执行的计算任务。一个步骤对应有向无环图中的一个或多个 RDD,一个步骤对应多个 RDD 是因为发生了流水线执行。toDebugString()可以查看RDD的谱系(RDD的依赖关系)
    • 一个物理步骤会启动很多任务,每个任务都是在不同的数据分区上做同样的事情。
      1.从数据存储(该RDD是一个输入RDD)/已有RDD(已经缓存的数据)/数据混洗的输出中获取输入数据
      2.执行必要的操作来计算出这些操作所代表的RDD,如filter()和map()函数 3.把输出写到一个数据混洗文件中,写入外部存储货发回驱动器程序(如count()操作)
  • 任务于集群中调度并执行

    步骤是按顺序处理的,任务则独立地启动来计算出 RDD 的一部分。一旦作业的最后一个步骤结束,一个行动操作也就执行完毕了。

查找信息

Spark在执行时记录详细的进度信息和性能指标,可以在Spark的网页用户界面以及驱动器进程和执行器进程生成的日志文件中找到

驱动器进程和执行器进程日志

Spark 日志文件的具体位置取决于以下部署模式。

  • 在 Spark 独立模式下,所有日志会在独立模式主节点的网页用户界面中直接显示。这些日志默认存储于各个工作节点的 Spark 目录下的 work/ 目录中。
  • 在 Mesos 模式下,日志存储在 Mesos 从节点的 work/ 目录中,可以通过 Mesos 主节点用户界面访问。
  • 在 YARN 模式下,最简单的收集日志的方法是使用 YARN 的日志收集工具(运行 yarn logs -applicationId )来生成一个包含应用日志的报告。这种方法只有在应用已经完全完成之后才能使用,因为 YARN 必须先把这些日志聚合到一起。

关键性能考量

并行度

  • 并行度会从两方面影响程序的性能

    • 当并行度过低时,Spark 集群会出现资源闲置的情况
    • 当并行度过高时,每个分区产生的间接开销累计起来就会更大
  • Spark 提供了两种方法来对操作的并行度进行调优

    • 在数据混洗操作时,使用参数的方式为混洗后的 RDD 指定并行度
    • 对于任何已有的 RDD,可以进行重新分区(repartition())来获取更多或者更少的分区数

序列化格式

当 Spark 需要通过网络传输数据,或是将数据溢写到磁盘上时,Spark 需要把数据序列化为二进制格式
序列化会在数据进行混洗操作时发生,此时有可能需要通过网络传输大量数据。默认情况下,Spark 会使用 Java 内建的序列化库。Spark 也支持使用第三方序列化库 Kryo

使用Kryo序列化工具并注册所需类

val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 严格要求注册类
conf.set("spark.kryo.registrationRequired", "true")
conf.registerKryoClasses(Array(classOf[MyClass], classOf[MyOtherClass]))

内存管理

Spark对于内存管理有几种不同的途径:

  • RDD存储

    当调用 RDD 的 persist() 或 cache() 方法时,这个 RDD 的分区会被存储到缓存区中。Spark 会根据 spark.storage.memoryFraction 限制用来缓存的内存占整个 JVM 堆空间的比例大小。如果超出限制,旧的分区数据会被移出内存。

  • 数据混洗与聚合的缓存区

    当进行数据混洗操作时,Spark 会创建出一些中间缓存区来存储数据混洗的输出数据。这些缓存区用来存储聚合操作的中间结果,以及数据混洗操作中直接输出的部分缓存数据。Spark 会尝试根据 spark.shuffle.memoryFraction 限定这种缓存区内存占总内存的比例。

  • 用户代码

    Spark 可以执行任意的用户代码,所以用户的函数可以自行申请大量内存。例如,如果一个用户应用分配了巨大的数组或者其他对象,那这些都会占用总的内存。用户代码可以访问 JVM 堆空间中除分配给 RDD 存储和数据混洗存储以外的全部剩余空间。

在默认情况下,Spark 会使用 60%的空间来存储 RDD,20% 存储数据混洗操作产生的数据,剩下的 20% 留给用户程序

硬件供给

提供给 Spark 的硬件资源会显著影响应用的完成时间。影响集群规模的主要参数包括分配给每个执行器节点的内存大小、每个执行器节点占用的核心数、执行器节点总数,以及用来存储临时数据的本地磁盘数量。

  • 执行器节点的内存可以通过spark.executor.memory配置项或者spark-submit 的 --executor-memory 标记来设置

  • 执行器节点的数目以及每个执行器进程的核心数的配置选项则取决于各种部署模式。在 YARN 模式下,你可以通过 spark.executor.cores 或 --executor-cores 标记来设置执行器节点的核心数,通过 --num-executors 设置执行器节点的总数

  • 一般来说,更大的内存和更多的计算核心对 Spark 应用会更有用处。

    • Spark 的架构允许线性伸缩;双倍的资源通常能使应用的运行时间减半。
    • 在调整集群规模时,需要额外考虑的方面还包括是否在计算中把中间结果数据集缓存起来。如果确实要使用缓存,那么内存中缓存的数据越多,应用的表现就会越好。
    • Spark 用户界面中的存储页面会展示所缓存的数据中有哪些部分保留在内存中。你可以从在小集群上只缓存一部分数据开始,然后推算缓存大量数据所需要的总内存量。
    • 除了内存和 CPU 核心,Spark 还要用到本地磁盘来存储数据混洗操作的中间数据,以及溢写到磁盘中的 RDD 分区数据。因此,使用大量的本地磁盘可以帮助提升 Spark 应用的性能。

“越多越好”的原则在设置执行器节点内存时并不一定适用。使用巨大的堆空间可能会导致垃圾回收的长时间暂停,从而严重影响 Spark 作业的吞吐量。有时,使用较小内存(比如不超过 64GB)的执行器实例可以缓解该问题