在集群上运行Spark

Spark运行架构

在分布式环境下,Spark 集群采用的是主 / 从结构。
Spark 应用通过一个叫作集群管理器(Cluster Manager)的外部服务在集群中的机器上启动。Spark 自带的集群管理器被称为独立集群管理器。Spark 也能运行在 Hadoop YARN 和 Apache Mesos 这两大开源集群管理器上。
在一个 Spark 集群中,有一个节点负责中央协调,调度各个分布式工作节点。这个中央协调节点被称为驱动器(Driver)节点,与之对应的工作节点被称为执行器(executor)节点。驱动器节点可以和大量的执行器节点进行通信,它们也都作为独立的 Java 进程运行。驱动器节点和所有的执行器节点一起被称为一个 Spark 应用(application)。

enter image description here

运行机制

  • 驱动器节点

    • 把用户程序转为任务

      Spark驱动器程序负责把用户程序转为多个物理执行的单元(称为任务)
      所有程序都遵循同样的结构:输出数据创建一系列RDD-> 转化操作派生出新的RDD -> 行动操作手机或存储结果RDD中的数据
      有向无环图:Spark隐式地创建一个有操作组成的逻辑上的无向无环图,驱动程序运行可将其转为物理执行计划

    • 为执行器节点调度任务

      每个执行器节点代表一个能够处理任务和存储RDD数据的进程,执行器进程启动后,会向驱动器进程注册自己
      驱动程序会根据当前的节点集合,尝试把所有任务基于数据所在位置分配给合适的执行器进程,同时执行器运行时,会跟踪各缓存数据的位置,进而调度以后的任务,尽量减少数据的网络传输
      Spark进程信息:在本地模式下,访问 http://localhost:4040 就可以看到这个网页了

  • 执行器节点

    Spark执行器节点是一种工作进程,版锁着整个Spark应用的生命周期,负责运行任务,任务间相互独立,主要有两大作用:

    • 执行器进程负责运行组成Spark应用的任务,并将结果返回给驱动器进程
    • 执行器进程通过自身的块管理器为用户程序中要求缓存的RDD提供内存式存储
  • 集群管理器

    Spark 依赖于集群管理器来启动执行器节点,而在某些特殊情况下,也依赖集群管理器来启动驱动器节点。

启动程序的过程概述

  • 用户通过spark-submit脚本提交应用
  • spark-submit脚本启动驱动器程序,调用用户定义的main()方法
  • 驱动程序与集群管理器通信,申请资源以启动执行器节点
  • 集群管理器为驱动器程序启动执行器节点
  • 驱动器进程执行用户应用中的操作,根据程序中所定义的对RDD的转化和行动操作,驱动器节点把工作以任务的形式发送到执行器进程
  • 任务在执行器程序中进行计算并保存结果
  • 如果驱动程序的main()方法退出,或者调用了SparkContext.stop(),驱动器程序会终止执行器进程,并且通过集群管理器释放资源

部署应用

spark_submit的一般格式

bin/spark-submit [options] <app jar | python file> [app options]

enter image description here

代码:

# 使用独立集群模式提交Java应用
$ ./bin/spark-submit \
--master spark://hostname:7077 \
--deploy-mode cluster \
--class com.databricks.examples.SparkExample \
--name "Example Program" \
--jars dep1.jar,dep2.jar,dep3.jar \
--total-executor-cores 300 \
--executor-memory 10g \
myApp.jar "options" "to your application" "go here"

# 使用YARN客户端模式提交Python应用
$ export HADOP_CONF_DIR=/opt/hadoop/conf
$ ./bin/spark-submit \
--master yarn \
--py-files somelib-1.2.egg,otherlib-4.4.zip,other-file.py \
--deploy-mode client \
--name "Example Program" \
--queue exampleQueue \
--num-executors 40 \
--executor-memory 10g \
my_script.py "options" "to your application" "go here"

打包代码与依赖

通常用户程序需要依赖第三方库,Java 和 Scala 用户也可以通过 spark-submit 的 --jars 标记提交独立的 JAR 包依赖。
当你向 Spark 提交应用时,你必须把应用的整个依赖传递图中的所有依赖都传给集群。你不仅要传递你直接依赖的库,还要传递这些库的依赖,以及它们的依赖的依赖,等等。
常规的做法是使用构建工具,生成单个大 JAR 包,包含应用的所有的传递依赖。这通常被称为超级(uber)JAR 或者组合(assembly)JAR,大多数 Java 或 Scala 的构建工具都支持生成这样的工件。

  • 使用Maven构建用Java编写的Spark应用

    • Maven构建的Spark应用的pom.xml文件 enter image description here
    • 打包使用Maven构建的Spark应用 enter image description here
  • 使用sbt构建的用Scala编写的Spark应用

    • 使用 sbt 0.13 的 Spark 应用的 build.sbt 文件 enter image description here

    • 在 sbt 工程构建中添加 assembly 插件 enter image description here

    • 打包使用 sbt 构建的 Spark 应用 enter image description here

当用户应用与 Spark 本身依赖同一个库时可能会发生依赖冲突,导致程序崩溃

配置资源量

  • 执行器进程内存

    spark-submit 的 --executor-memory 参数来配置此项

  • 占用核心总数的最大量

    spark-submit 的 --total-executorcores 参数设置这个值,或者在你的 Spark 配置文件中设置 spark.cores.max 的值。