第 1 章 Spark的环境搭建与运行

第 1 章 Spark的环境搭建与运行

Apache Spark是一个分布式计算框架,旨在简化运行于计算机集群或虚拟机上的并行程序的编写。该框架对资源调度,任务的提交、执行和跟踪,节点间的通信以及数据并行处理的内在底层操作都进行了抽象。它提供了一个更高级别的API来处理分布式数据。从这方面说,它与Apache Hadoop等分布式处理框架类似。但在底层架构上,Spark与它们有所不同。

Spark起源于加州大学伯克利分校AMP实验室的一个研究项目。该高校当时关注分布式机器学习算法的应用情况。因此,Spark从一开始便是为应对迭代式应用的高性能需求而设计的。在这类应用中,相同的数据会被多次访问。该设计主要通过在内存中缓存数据集以及启动并行计算任务时的低延迟和低系统开销来实现高性能。再加上其容错性、灵活的分布式数据结构和强大的函数式编程接口,Spark在各类基于机器学习和迭代分析的大规模数据处理任务上有广泛的应用,这也表明了其实用性。

 关于Spark项目的更多信息,请参见:

从性能上说,Spark在不同工作负载下的运行速度明显高于Hadoop,如下图所示。

来源:https://amplab.cs.berkeley.edu/wp-content/uploads/2011/11/spark-lr.png

Spark支持4种运行模式。

  • 本地单机模式:所有Spark进程都运行在同一个Java虚拟机(JVM,Java virtual machine)进程中。
  • 集群单机模式:使用Spark内置的任务调度框架。
  • 基于Mesos:Mesos是一个流行的开源集群计算框架。
  • 基于Hadoop YARN:YARN常被称作NextGen MapReduce。

本章主要包括以下内容。

  • 下载Spark二进制版本,并搭建一个在本地单机模式下运行的开发环境。本书各章的代码示例都在该环境下运行。
  • 通过Spark的交互式终端来了解它的编程模型及API。
  • 分别用Scala、Java、R和Python语言来编写第一个Spark程序。
  • 在Amazon的EC2(Elastic Cloud Compute)平台上架设一个Spark集群。相比本地模式,该集群可以应对数据量更大、计算更复杂的任务。
  • 借助Amazon Elastic Map Reduce 服务来构建一个Spark集群。

如果读者曾构建过Spark环境并熟悉有关Spark程序编写的基础知识,可以跳过本章。

1.1 Spark的本地安装与配置

Spark能通过内置的单机集群调度器在本地模式下运行。此时,所有的Spark进程都高效地运行在同一个Java虚拟机中。这实际上构造了一个独立、多线程版本的Spark环境。本地模式常用于原型设计、开发、调试及测试。同样,它也适应于在单机上进行多核并行计算的实际场景。

Spark的本地模式与集群模式完全兼容,在本地编写和测试过的程序仅需增加少许设置便能在集群上运行。

本地构建Spark环境的第一步是下载其最新的版本包。各版本的版本包及源代码的GitHub地址可在Spark项目的下载页面找到:http://spark.apache.org/downloads.html

 Spark的在线文档(http://spark.apache.org/docs/latest/)涵盖了进一步学习Spark所需的各种资料。强烈推荐读者浏览查阅。

为了访问Hadoop分布式文件系统(HDFS)以及标准或定制的Hadoop输入源,Spark的编译需要与Hadoop的版本对应。上述下载页面提供了针对Cloudera的Hadoop发行版(CHD)、MapR的Hadoop发行版和Hadoop 2(YARN)的预编译二进制包。除非你想构建针对特定版本Hadoop的Spark,否则建议你通过如下链接从Apache镜像下载Hadoop 2.7预编译版本:http://d3kbcqa49mib13.cloudfront.net/spark-2.0.2-bin-hadoop2.7.tgz

Spark的运行依赖Scala编程语言(写作本书时为 2.10.x 或 2.11.x 版)。好在预编译的二进制包中已包含Scala运行环境,我们不需要另外安装Scala便可运行Spark。但是,你需要先安装好Java运行时环境(JRE)或Java开发工具包(JDK)。

 相应的安装指南可参见本书代码包中的软硬件列表。推荐使用R 3.1或以上版本。

下载完上述版本包后,在终端输入如下指令解压软件包并进入解压出的文件夹:

$ tar xfvz spark-2.0.0-bin-hadoop2.7.tgz
$ cd spark-2.0.0-bin-hadoop2.7

用户启动Spark所用的脚本在该目录的bin文件夹下。可通过如下命令运行Spark附带的一个示例程序来测试是否一切正常:

$ ./bin/run-example SparkPi 100

该命令将在本地单机模式下运行SparkPi这个示例。在该模式下,所有的Spark进程均运行于同一个JVM中,而并行处理则通过多线程来实现。默认情况下,该示例会启用的线程数与本地系统的CPU核心数目相同。示例运行完后,应可在输出的结尾看到如下的提示:

...
16/11/24 14:41:58 INFO Executor: Finished task 99.0 in stage 0.0
  (TID 99). 872 bytes result sent to driver
16/11/24 14:41:58 INFO TaskSetManager: Finished task 99.0 in stage
  0.0 (TID 99) in 59 ms on localhost (100/100)
16/11/24 14:41:58 INFO DAGScheduler: ResultStage 0 (reduce at
  SparkPi.scala:38) finished in 1.988 s
16/11/24 14:41:58 INFO TaskSchedulerImpl: Removed TaskSet 0.0,
  whose tasks have all completed, from pool
16/11/24 14:41:58 INFO DAGScheduler: Job 0 finished: reduce at
  SparkPi.scala:38, took 2.235920 s
Pi is roughly 3.1409527140952713

上述命令调用了org.apache.spark.examples.SparkPi类。

该类以local[N]格式来接受输入参数,其中 N 表示要启用的线程数目。比如只使用两个线程时,便可使用如下命令:

$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi
  --master local[2] ./examples/jars/spark-examples_2.11-2.0.0.jar 100

按惯例,将命令中的local[2]改为local[*]则会使用本机所有可用的核心。

1.2 Spark集群

Spark集群由两类进程构成:一个驱动程序和多个执行程序。在本地模式下,所有的进程都运行在同一个JVM内,而在集群模式下时,它们通常运行在不同的节点上。

举例来说,一个采用单机模式的Spark集群(即使用Spark内置的集群管理模块)通常包括:

  • 一个运行Spark单机主进程和驱动程序的主节点
  • 各自运行一个执行程序进程的多个工作节点

在本书中,我们将使用Spark的本地单机模式进行概念阐述和举例说明,但所用的代码也可运行在Spark集群上。比如在一个Spark单机集群上运行上述示例,只需传入主节点的URL即可:

$ MASTER=spark://IP:PORT --class org.apache.spark.examples.SparkPi
  ./examples/jars/spark-examples_2.11-2.0.0.jar 100

其中的IPPORT分别是主节点IP地址和端口号。这是告诉Spark让示例程序在主节点所对应的集群上运行。

Spark集群管理和部署的完整方案不在本书的讨论范围内。但是,本章后面会对Amazon EC2集群的设置和使用做简要说明。

 Spark集群部署的概要介绍可参见如下链接:

1.3 Spark编程模型

在对Spark的设计进行更全面的介绍前,我们先介绍SparkContext对象以及Spark shell。后面将通过它们来了解Spark编程模型的基础知识。

虽然这里会对Spark的使用进行简要介绍并提供示例,但我们推荐读者通过如下资料来获得更深入的理解。

 请参考如下链接。

1.3.1 SparkContext类与SparkConf

任何Spark程序的编写都是从SparkContext(或用Java编写时的JavaSparkContext)开始的。SparkContext的初始化需要SparkConf对象的一个实例,后者包含了Spark集群配置的各种参数,比如主节点的URL。

SparkContext是调用Spark功能的一个主要入口。一个SparkContext对象代表与一个Spark集群的连接。它能用于创建RDD对象、累加器或在集群内广播变量。

每个JVM上都只能有一个SparkContext对象。在创建一个新的对象前,必须调用现有对象的stop()函数。

初始化后,便可用SparkContext对象所包含的各种方法来创建和操作分布式数据集和共享变量。Spark shell(在Scala和Python下可以,但不支持Java)能自动完成上述初始化。若要用Scala代码来实现的话,可参照下面的代码:

val conf = new SparkConf()
  .setAppName("Test Spark App")
  .setMaster("local[4]")
val sc = new SparkContext(conf)

这段代码会创建一个4线程的SparkContext对象,并将其相应的应用程序命名为Test Spark APP。也可通过如下方式调用SparkContext的简单构造函数,以默认的参数值来创建相应的对象,其效果和上述的完全相同。

val sc = new SparkContext("local[4]", "Test Spark App")

 下载示例代码

你可从https://www.packtpub.com下载你账号购买过的Packt图书的示例代码。若书是从别处购买的,则可在https://www.packtpub.com/books/content/support注册,相应的代码会直接发送到你的电子邮箱。

1.3.2 SparkSession

SparkSession同时支持DataFrame和各种数据集API,它提供了一个统一的API来调用这些功能。

首先需要创建SparkConf类的实例,然后用它来创建SparkSession实例。参考如下示例代码:

val spConfig = (new SparkConf).setMaster("local").setAppName("SparkApp")
  val spark = SparkSession
    .builder()
    .appName("SparkUserData").config(spConfig)
    .getOrCreate()

然后,用Spark对象来创建一个DataFrame对象:

val user_df = spark.read.format("com.databricks.spark.csv")
  .option("delimiter", "|").schema(customSchema)
  .load("/home/ubuntu/work/ml-resources/spark-ml/data/ml-100k/u.user")
val first = user_df.first()

1.3.3 Spark shell

Spark支持用Scala、Python或R REPL(read-eval-print-loop,交互式shell)来进行交互式的程序编写。由于输入的代码会被立即计算,shell能在输入代码时提供实时反馈。在Scala shell中,命令执行结果的值与类型在代码执行完后也会显示出来。

要通过Scala来使用Spark shell,只需从Spark的主目录执行./bin/spark-shell。它会启动Scala shell并初始化一个SparkContext对象。我们可以通过sc这个Scala值来调用这个对象。在Spark 2.0中也以Spark变量的形式提供了一个SparkSession实例。

上述命令的终端输出如下:

$ ~/work/spark-2.0.0-bin-hadoop2.7/bin/spark-shell
Using Spark's default log4j profile: org/apache/spark/log4jdefaults.properties
  Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
16/08/06 22:14:25 WARN NativeCodeLoader: Unable to load nativehadoop library for your
  platform... using builtin-java classes where applicable
16/08/06 22:14:25 WARN Utils: Your hostname, ubuntu resolves to a
  loopback address: 127.0.1.1; using 192.168.22.180 instead (on
  interface eth1)
16/08/06 22:14:25 WARN Utils: Set SPARK_LOCAL_IP if you need to
  bind to another address
16/08/06 22:14:26 WARN Utils: Service 'SparkUI' could not bind on
  port 4040. Attempting port 4041.
16/08/06 22:14:27 WARN SparkContext: Use an existing SparkContext,
  some configuration may not take effect.
Spark context Web UI available at http://192.168.22.180:4041
Spark context available as 'sc' (master = local[*], app id = local-
  1470546866779).
Spark session available as 'spark'.
Welcome to
      ____             __
     / __/__ ___ _____/ /__
    _ / _ / ______/ __/ '_/
   /___/ .__/_,_/_/ /_/_  version 2.0.0
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM,
  Java 1.7.0_60)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

要想在Python shell中使用Spark,直接运行./bin/pyspark命令即可。1与Scala shell类似, Python下的SparkContext对象可以通过Python变量sc来调用。上述命令的终端输出应该如下:

1先执行:quit退出Spark shell,再启用Pyspark。——译者注

~/work/spark-2.0.0-bin-hadoop2.7/bin/pyspark
Python 2.7.6 (default, Jun 22 2015, 17:58:13)  [GCC 4.8.2] on linux2
Type "help", "copyright", "credits" or "license" for more
  information.
Using Spark's default log4j profile: org/apache/spark/log4jdefaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
16/08/06 22:16:15 WARN NativeCodeLoader: Unable to load native hadoop
  library for yourplatform... using builtin-java classes where applicable
16/08/06 22:16:15 WARN Utils: Your hostname, ubuntu resolves to a
  loopback address: 127.0.1.1; using 192.168.22.180 instead (on
  interface eth1)
16/08/06 22:16:15 WARN Utils: Set SPARK_LOCAL_IP if you need to
  bind to another address
16/08/06 22:16:16 WARN Utils: Service 'SparkUI' could not bind on
  port 4040. Attempting port 4041.
Welcome to
      ____             __
     / __/__ ___ _____/ /__
    _ / _ / ______/ __/ '_/
   /__ / .__/_,_/_/ /_/_ version 2.0.0
      /_/

Using Python version 2.7.6 (default, Jun 22 2015 17:58:13)
SparkSession available as 'spark'.
>>>

R是一门编程语言,并提供了统计计算和图形可视化运行时环境。它是一个GNU项目,是S语言(由贝尔实验室开发)的一种不同实现。

R提供了统计(线性和非线性建模、经典统计测试、时序分析、分类和聚类)以及可视化支持,有着极强的可扩展性。

要通过R来使用Spark,执行如下命令来启用Spark-R shell即可:

$ ~/work/spark-2.0.0-bin-hadoop2.7/bin/sparkR
R version 3.0.2 (2013-09-25) -- "Frisbee Sailing"
Copyright (C) 2013 The R Foundation for Statistical Computing
Platform: x86_64-pc-linux-gnu (64-bit)

R is free software and comes with ABSOLUTELY NO WARRANTY.
You are welcome to redistribute it under certain conditions.
Type 'license()' or 'licence()' for distribution details.

  Natural language support but running in an English locale

R is a collaborative project with many contributors.
Type 'contributors()' for more information and
'citation()' on how to cite R or R packages in publications.

Type 'demo()' for some demos, 'help()' for on-line help, or
  'help.start()' for an HTML browser interface to help.
Type 'q()' to quit R.

Launching java with spark-submit command /home/ubuntu/work/spark-
  2.0.0-bin-hadoop2.7/bin/spark-submit   "sparkr-shell"
  /tmp/RtmppzWD8S/backend_porta6366144af4f
Using Spark's default log4j profile: org/apache/spark/log4jdefaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
16/08/06 22:26:22 WARN NativeCodeLoader: Unable to load nativehadoop library for your
  platform... using builtin-java classes where applicable
16/08/06 22:26:22 WARN Utils: Your hostname, ubuntu resolves to a
  loopback address: 127.0.1.1; using 192.168.22.186 instead (on interface eth1)
16/08/06 22:26:22 WARN Utils: Set SPARK_LOCAL_IP if you need to
  bind to another address
16/08/06 22:26:22 WARN Utils: Service 'SparkUI' could not bind on
  port 4040. Attempting port 4041.

 Welcome to
    ____             __
   / __/__ ___ _____/ /__
  _ / _ / ______/ __/ '_/
 /__ / .__/_,_/_/ /_/_  version 2.0.0
    /_/
SparkSession available as 'spark'.
During startup - Warning message:
package 'SparkR' was built under R version 3.1.1
>

1.3.4 弹性分布式数据集

弹性分布式数据集(RDD,resilient distributed dataset)是Spark的核心概念。RDD代表一系列的记录(严格来说是某种类型的对象)。这些记录被分配或分区到集群的多个节点上(在本地模式下,可以近似地理解为单个进程中的多个线程上)。Spark中的RDD具有容错性,即当某个节点或任务失败时(由用户代码错误之外的原因引起,如硬件故障、网络不通等),RDD会在余下的节点上自动重建,以便最终完成任务。

  1. 创建RDD

    RDD可从现有的集合创建。比如在Scala Spark shell中:

    val collection = List("a", "b", "c", "d", "e")
    val rddFromCollection = sc.parallelize(collection)

    RDD也可以基于Hadoop的输入源创建,比如本地文件系统、HDFS和Amazon S3。基于Hadoop的RDD可以使用任何实现了Hadoop InputFormat接口的输入格式,包括文本文件、其他Hadoop标准格式、HBase、Cassandra和Tachyon等。

    以下举例说明如何用一个本地文件系统里的文件创建RDD:

    val rddFromTextFile = sc.textFile("LICENSE")

    上述代码中的textFile函数(方法)会返回一个RDD对象。该对象的每一条记录都是一个表示文本文件中某一行文字的String(字符串)对象。该段代码对应的输出如下:

    rddFromTextFile: org.apache.spark.rdd.RDD[String] = LICENSE
    MapPartitionsRDD[1] at textFile at <console>:24

    如下代码演示了如何通过 hdfs://协议从HDFS中的一个文本文件创建一个RDD:

    val rddFromTextFileHDFS = sc.textFile("hdfs://input/LICENSE ")

    如下代码则演示了如何通过s3n://协议从Amazon S3中的一个文本文件创建一个RDD:

    val rddFromTextFileS3 = sc.textFile("s3n://input/LICENSE ")
  2. Spark操作

    创建RDD后,我们便有了一个可供操作的分布式记录集。在Spark编程模型下,所有的操作被分为转换(transformation)和执行(action)两种。一般来说,转换操作是对一个数据集里的所有记录执行某个函数,从而使记录发生改变;而执行通常是运行某些计算或聚合操作,并将结果返回运行SparkContext的那个驱动程序。

    Spark的操作通常采用函数式风格。对于那些熟悉用Scala、Python或Java 8 中的lambda表达式进行函数式编程的程序员来说,这应不难掌握。若你没有函数式编程经验,也不用担心,Spark API其实很容易上手。

    Spark程序中最常用的一种转换操作便是map(映射)。该操作对RDD里的每一条记录都执行某个函数,从而将输入映射为新的输出。比如,下面这段代码便对一个从本地文本文件创建的RDD进行操作。它对该RDD中的每一条记录都执行size函数。之前我们曾创建过一个这样的由若干String构成的RDD对象。通过map函数,我们将每一个字符串都转换为一个整数(Int),从而返回一个由若干Int构成的RDD对象。

    val intsFromStringsRDD = rddFromTextFile.map(line => line.size)

    其输出应与如下类似,其中也提示了RDD的类型:

    intsFromStringsRDD: org.apache.spark.rdd.RDD[Int] =
    MapPartitionsRDD[2] at map at <console>:26

    示例代码中的=>是Scala下表示匿名函数的语法。匿名函数指那些没有指定函数名的函数,比如Scala或Python中用def关键字定义的函数。

     匿名函数的具体细节并不在本书讨论范围内,但由于它们在Scala、Python以及Java 8中大量使用(示例或现实应用中都是),列举一些实例仍会有帮助。

    语法line => line.size表示以=>操作符左边的部分作为输入,对其执行一个函数,并以=>操作符右边代码的执行结果为输出。在这个例子中,输入为line,输出则是line.size函数的执行结果。在Scala语言中,这种将一个String对象映射为一个Int的函数被表示为String => Int

    该语法使得每次使用如map这种方法时,都不需要另外单独定义一个函数。当函数简单且只需使用一次时(像本例一样时),这种方式很有用。

    现在我们可以调用一个常见的执行操作count,来返回RDD中的记录数目:

    intsFromStringsRDD.count

    执行的结果应该如下:

    res0: Long = 299

    如果要计算这个文本文件里每行字符串的平均长度,可以先使用sum函数对所有记录的长度求和,然后再除以总的记录数目:

    val sumOfRecords = intsFromStringsRDD.sum
    val numRecords = intsFromStringsRDD.count
    val aveLengthOfRecord = sumOfRecords / numRecords

    结果应该如下:

    scala> intsFromStringsRDD.count
    res0: Long = 299
    
    scala> val sumOfRecords = intsFromStringsRDD.sum
    sumOfRecords: Double = 17512.0
    
    scala> val numRecords = intsFromStringsRDD.count
    numRecords: Long = 299
    
    scala> val aveLengthOfRecord = sumOfRecords / numRecords
    aveLengthOfRecord: Double = 58.5685618729097

    在多数情况下,Spark的操作都会返回一个新RDD,但多数的执行操作则是返回计算的结果(比如上面例子中,count返回一个Longsum返回一个Double)。这就意味着多个操作可以很自然地前后链接,从而让代码更为简洁明了。举例来说,用下面的一行代码可以得到和上面例子相同的结果:

    val aveLengthOfRecordChained = rddFromTextFile
      .map(line => line.size).sum / rddFromTextFile.count

    值得注意的一点是,Spark中的转换操作是延后的。也就是说,在RDD上调用一个转换操作并不会立即触发相应的计算。相反,这些转换操作会链接起来,并只在有执行操作被调用时才被高效地计算。这样,大部分操作可以在集群上并行执行,只有必要时才计算结果并将其返回给驱动程序,从而提高了Spark的效率。

    这就意味着,如果我们的Spark程序从未调用一个执行操作,就不会触发实际的计算,也不会得到任何结果。比如下面的代码就只返回一个表示一系列转换操作的新RDD:

    val transformedRDD = rddFromTextFile
      .map(line => line.size).filter(size => size > 10).map(size => size * 2)

    相应的终端输出如下:

    transformedRDD: org.apache.spark.rdd.RDD[Int] =
      MappedRDD[6] at map at <console>:26

    注意,这里实际上没有触发任何计算,也没有结果被返回。如果我们现在在新的RDD上调用一个执行操作,比如sum,该计算将会被触发:

    val computation = transformedRDD.sum

    现在你可以看到一个Spark任务被启动,并返回如下终端输出:

    computation: Double = 35006.0

     RDD支持的转换和执行操作的完整列表以及更为详细的例子,参见Spark编程指南(http://spark.apache.org/docs/latest/programming-guide.html#rdd-operations)以及Spark API(Scala)文档(http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD)。

  3. RDD缓存策略

    Spark最为强大的功能之一便是能够把数据缓存在集群的内存里。这通过调用RDD的cache函数来实现:

    rddFromTextFile.cache
    res0: rddFromTextFile.type = MapPartitionsRDD[1] at textFile at
    <console>:27

    调用一个RDD的cache函数将会告诉Spark将这个RDD缓存在内存中。在RDD首次调用一个执行操作时,这个操作对应的计算会立即执行,数据会从数据源里读出并保存到内存。因此,首次调用cache函数所需要的时间,部分取决于Spark从输入源读取数据所需要的时间。但是,当下一次访问该数据集的时候(比如在后续分析中进行查询时,以及机器学习模型中的迭代时),数据可以直接从内存中读出,从而减少低效的I/O操作,加快计算。多数情况下,这会取得数倍的速度提升。

    当再在上述已缓存了的RDD上调用countsum函数时,该RDD已载入内存:

    val aveLengthOfRecordChained = rddFromTextFile
      .map(line => line.size).sum / rddFromTextFile.count

     Spark支持更为细化的缓存策略。通过persist函数可以指定Spark的数据缓存策略。关于RDD缓存的更多信息可参见:http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence

1.3.5 广播变量和累加器

Spark的另一个核心功能是能创建两种特殊类型的变量:广播变量和累加器。

广播变量(broadcast variable)为只读变量,它由运行SparkContext的驱动程序创建后,发送给会参与计算的节点。对那些需要让各工作节点高效地访问相同数据的应用场景,比如分布式系统,这非常有用。Spark下创建广播变量只需在SparkContext上调用一个方法即可:

val broadcastAList = sc.broadcast(List("a", "b", "c", "d", "e"))

广播变量也可以被非驱动程序所在的节点(即工作节点)访问,访问的方法是调用该变量的value方法:

sc.parallelize(List("1", "2", "3")).map(x => broadcastAList.value ++ x).collect

这段代码会从{"1", "2", "3"}这个集合(一个Scala List)里,新建一个带有3条记录的RDD。map函数里的代码会返回一个新的List对象。这个对象里的记录由之前创建的那个broadcastAList里的记录与新建的RDD里的3条记录分别拼接而成。

...
res1: Array[List[Any]] = Array(List(a, b, c, d, e, 1), List(a, b,
c, d, e, 2), List(a, b, c, d, e, 3))

注意,上述代码使用了collect函数。这是一个Spark执行函数,它将整个RDD以Scala(或Python或Java)集合的形式返回驱动程序。

通常只在需将结果返回到驱动程序所在节点以供本地处理时,才调用collect函数。

 注意,一般仅在的确需要将整个结果集返回驱动程序并进行后续处理时,才有必要调用collect函数。如果在一个非常大的数据集上调用该函数,可能耗尽驱动程序的可用内存,进而导致程序崩溃。

高负荷的处理应尽可能地在整个集群上进行,从而避免驱动程序成为系统瓶颈。然而在不少情况下,将结果收集到驱动程序的确是有必要的。很多机器学习算法的迭代过程便属于这类情况。

从上述结果可以看出,新生成的RDD里包含3条记录,其中每一条记录包含一个由原来被广播的List变量附加一个新的元素所构成的新记录(也就是说,新记录分别以123结尾)。

累加器(accumulator)也是一种被广播到工作节点的变量。累加器与广播变量的关键不同是后者只能读取而前者却可累加。但支持的累加操作有一定的限制。具体来说,这种累加必须是一种有关联的操作,即它得能保证在全局范围内累加起来的值能被正确地并行计算并返回驱动程序。每一个工作节点只能访问和操作其自己本地的累加器,全局累加器则只允许驱动程序访问。累加器同样可以在Spark代码中通过value访问。

 关于广播变量和累加器的更多信息,可参见Spark编程指南:http://spark.apache.org/docs/latest/rdd-programming-guide.html#shared-variables

1.4 SchemaRDD

SchemaRDD结合了RDD和结构(schema)信息。它提供了丰富且易于使用的API接口,即DataSet API。2.0版本中并没采用SchemaRDD,但DataFrameDataset的API内部都用到了它。

结构用来描述数据在逻辑上是如何组织的。在获取该信息后,SQL引擎便可支持对相应的数据进行结构化查询。Dataset API替代了原Spark SQL Parser的功能。它保存了原始程序逻辑树。后续的处理则重用了Spark SQL 的核心逻辑。可以说,Dataset API实现了和相应SQL查询完全等同的处理功能。

SchemaRDD是RDD的一个子类。当程序调用Dataset API时,一个新的SchemaRDD对象便会被创建。同时,通过在原始逻辑布局树上增加新的逻辑操作节点,该对象也生成了自己的逻辑布局属性。和RDD一样,Dataset API也支持两种操作:转换执行

与关系操作相关的API属于转换类。

那些会生成输出数据的操作属于执行类。另外,仅当调用了一个执行类操作时,一个Spark任务才会被触发并分发到集群上执行,这和RDD一样。

1.5 Spark data frame

在Apache Spark中,每个Dataset对象对应一个分布式数据集。Dataset是自Spark 1.6版起添加的新接口,它提供了和Spark SQL执行引擎同等的功能。该类对象可从JVM对象创建,然后便可通过功能式转换(如mapflatMapfilter等)进行各种操作。Dataset API仅支持Scala和Java语言,不支持Python和R。

一个DataFrame对象对应一个带列名的数据集。它等同于关系型数据库中的表或R/Python中的data frame对象,但优化更多。DataFrame可从结构型数据文件、Hive的表格、外部数据库或现有的RDD创建。其API支持覆盖Scala、Python、Java和R。

要生成DataFrame类对象,需要首先初始化SparkSession

import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
  .appName("Spark SQL").config("spark.some.config.option", "")
    .getOrCreate() import spark.implicits._

之后,借助 spark.read.json函数从一个JSON文件来创建该类对象:

scala> val df = spark.read.json("/home/ubuntu/work/ml-resources
  /spark-ml/Chapter_01/data/example_one.json")

注意,需要使用Spark Implicits类来隐式地将RDD转换为DataFrame类型:

org.apache.spark.sql
Class SparkSession.implicits$
Object org.apache.spark.sql.SQLImplicits
Enclosing class: SparkSession

Scala提供了这些隐式函数来将常见的Scala对象转换为DataFrame类对象。

上述命令的输出应与如下类似:

df: org.apache.spark.sql.DataFrame = [address: struct<city:
string, state: string>, name: string]

现在,可通过df.show命令来显示其在DataFrame中的详细信息:

scala> df.show
+-----------------+-------+
|          address|   name|
+-----------------+-------+
|  [Columbus,Ohio]|    Yin|
|[null,California]|Michael|
+-----------------+-------+

1.6 Spark Scala编程入门

下面我们用上一节所提到的内容来编写一个简单的Spark数据处理程序。该程序将依次用Scala、Java和Python这3种语言来编写。所用数据是用户在在线商店的商品购买记录。该数据存在一个逗号分隔值(CSV,comma-separated-value)文件中,名为UserPurchaseHistory.csv,该文件存在随书代码包的data目录下。

其部分内容如下所示。文件的每一行对应一条购买记录,从左到右的各列值依次为用户名、商品名称以及商品价格。

John,iPhone Cover,9.99
John,Headphones,5.49
Jack,iPhone Cover,9.99
Jill,Samsung Galaxy Cover,8.95
Bob,iPad Cover,5.49

对于Scala程序而言,需要创建两个文件:Scala代码文件以及项目的构建配置文件。项目将使用Scala构建工具(SBT,Scala build tool)来构建。为便于理解,建议读者下载示例代码scala-spark-app。该资源里的data目录下包含了上述CSV文件。运行这个示例项目需要系统中已经安装好SBT(编写本书时所使用的版本为0.13.8)。

 配置SBT并不在本书讨论范围内,但读者可以从https://www.scala-sbt.org/release/docs/Setup.html找到更多信息。

我们的SBT配置文件是build.sbt,其内容如下面所示(注意,各行代码之间的空行是必需的):

name := "scala-spark-app"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.0 "

最后一行代码将Spark添加到本项目的依赖库。

相应的Scala程序在ScalaApp.scala这个文件里。接下来我们会逐一讲解代码的各个部分。首先,导入所需要的Spark类:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext.__

/**
 * 用Scala编写的一个简单的Spark应用
 */
object ScalaApp {

在主函数里,我们要初始化所需的SparkContext对象,并且用它通过textFile函数来访问CSV数据文件。之后以逗号为分隔符分割每一行原始字符串,提取出相应的用户名、产品和价格信息,从而完成对原始文本的映射:

def main(args: Array[String]) {
  val sc = new SparkContext("local[2]", "First Spark App")
  // 将CSV格式的原始数据转化为(user,product,price)格式的记录集
  val data = sc.textFile("data/UserPurchaseHistory.csv")
    .map(line => line.split(","))
    .map(purchaseRecord => (purchaseRecord(0), purchaseRecord(1),
     purchaseRecord(2)))

现在,我们有了一个RDD,其每条记录都由(user, product, price)三个字段构成。我们可以对商店计算如下指标:

  • 购买总次数
  • 有购买行为的用户总数
  • 总收入
  • 最畅销的产品

计算方法如下:

// 求购买总次数
val numPurchases = data.count()
// 求有多少个不同用户购买过商品
val uniqueUsers = data.map{ case (user, product, price) => user
  }.distinct().count()
// 求和得出总收入
val totalRevenue = data.map{ case (user, product, price) =>
  price.toDouble }.sum()
// 求最畅销的产品是什么
val productsByPopularity = data
  .map{ case (user, product, price) => (product, 1) }
  .reduceByKey(__ + _ )
  .collect()
  .sortBy(-_._2)
val mostPopular = productsByPopularity(0)

最后那段计算最畅销产品的代码演示了如何进行Map/Reduce模式的计算,该模式随Hadoop而流行。首先,我们将(user, product, price)格式的记录映射为(product, 1)格式。然后,执行一个reduceByKey操作,它会对各个产品的1值进行求和。

转换后的RDD包含各个商品的购买次数。有了这个RDD后,我们可以调用collect函数,它会将其计算结果以本地Scala集合的形式返回驱动程序。之后,在驱动程序的本地对这些记录按照购买次数进行排序。(注意,在实际处理大量数据时,我们通常通过sortByKey这类操作来进行并行排序。)

最后,可在终端上打印出计算结果:

    println("Total purchases: " + numPurchases)
    println("Unique users: " + uniqueUsers)
    println("Total revenue: " + totalRevenue)
    println("Most popular product: %s with %d purchases"
      .format(mostPopular._1, mostPopular._2))
  }
}

可以在项目的主目录下执行sbt run命令来运行这个程序。如果你使用了IDE的话,也可以从Scala IDE直接运行。最终的输出应该与下面的内容相似:

...
[info] Compiling 1 Scala source to ...
[info] Running ScalaApp
...
Total purchases: 5
Unique users: 4
Total revenue: 39.91
Most popular product: iPhone Cover with 2 purchases

可以看到,商店总共有4个用户的5次交易,总收入为39.91。最畅销的商品是iPhone Cover,共购买2次。

1.7 Spark Java编程入门

Java API与Scala API本质上很相似。Scala代码可以很方便地调用Java代码,但某些Scala代码却无法在Java里调用,特别是那些使用了隐式类型转换、默认参数和某些Scala反射机制的代码。

一般来说,这些特性在Scala程序中会被广泛使用。这就有必要另外为那些常见的类编写相应的Java版本。由此,SparkContext有了对应的Java版本JavaSparkContext,而RDD则对应JavaRDD

Java 8及之前版本的Java并不支持匿名函数,在函数式编程上也没有严格的语法规范。于是,套用到Spark的Java API上的函数必须要实现一个带有call函数签名的WrappedFunction接口。这会使得代码冗长,所以我们经常会创建临时匿名类来传递给Spark操作。这些类会实现操作所需的接口以及call函数,以取得和用Scala编写时相同的效果。

Spark提供对Java 8匿名函数(或lambda)语法的支持。使用该语法能让Java 8书写的代码看上去很像等效的Scala版。

用Scala编写时,键值对记录的RDD能支持一些特别的操作(比如reduceByKeysaveAsSequenceFile)。这些操作可以通过隐式类型转换而自动被调用。用Java编写时,则需要特殊类型的JavaRDD类来支持这些操作。这包括用于键值对的JavaPairRDD,以及用于数值记录的JavaDoubleRDD

 本节只涉及标准的Java API语法。关于Java下支持的RDD以及Java 8 lambda表达式支持的更多信息,可参见Spark编程指南:http://spark.apache.org/docs/latest/programming-guide.html#rdd-operations

在后面的Java程序中,我们可以看到大部分差异。这些示例代码包含在本章示例代码的java-spark-app目录下。该目录的data子文件夹下也包含上述CSV数据。

这里会使用Maven构建工具来编译和运行这个项目。我们假设读者已经在其系统上安装好了该工具。

 Maven的安装和配置并不在本书讨论范围内。通常它可通过Linux系统中的包管理器或Mac OS X中的HomeBrew或MacPorts方便地安装。

详细的安装指南参见:http://maven.apache.org/download.cgi

项目中包含一个名为JavaApp.java的Java源文件:

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.*;
import java.util.steam.Collectors;

/**
 * 用Java编写的一个简单的Spark应用
 */
public class JavaApp {
  public static void main(String[] args) {

正如在Scala项目中一样,我们首先需要初始化一个上下文对象。值得注意的是,这里所使用的是JavaSparkContext类而不是之前的SparkContext。类似地,我们调用JavaSparkContext对象,利用textFile函数来访问数据,然后将各行输入分割成多个字段。请注意下面代码的加粗部分是如何使用匿名类来定义一个分割函数的。该函数确定了如何对各行字符串进行分割。

JavaSparkContext sc = new JavaSparkContext("local[2]",
  "First Spark App");
// 以CSV格式读取原始数据,并将其转化为(user,produce,price)格式的记录集
JavaRDD<String[]> data =
sc.textFile("data/UserPurchaseHistory.csv").map(s -> s.split(","));

现在可以算一下用Scala时计算过的指标。这里有两点值得注意:一是下面Java API中有些函数(比如distinctcount)实际上和在Scala API中一样,二是我们定义了一个匿名类并将其传给map函数。匿名类的定义方式可参见代码的加粗部分。

// 求购买总次数
long numPurchases = data.count();
// 求有多少个不同用户购买过商品
long uniqueUsers = data.map(strings ->strings[0]).distinct().count();
// 求和得出总收入
Double totalRevenue = data.map(
  strings -> Double.parseDouble(strings[2]))
    .reduce((Double v1, Double v2) ->
      new Double(v1.doubleValue() + v2.doubleValue()));

下面的代码展现了如何求出最畅销的产品,其步骤与Scala示例的相同。多出的那些代码看似复杂,但它们大多与Java中创建匿名函数有关,实际功能与用Scala时一样。

// 求最畅销的产品是什么
List < Tuple2 < String, Integer >> pairs = data.mapToPair(strings - >
      new Tuple2 < String, Integer > (strings[1], 1))
      .reduceByKey((Integer i1, Integer i2) - > i1 + i2)
      .collect();

Map < String, Integer > sortedData = new HashMap < > ();
Iterator it = pairs.iterator();
while (it.hasNext()) {
    Tuple2 < String, Integer > o = (Tuple2 < String, Integer > ) it.next();
    sortedData.put(o._1, o._2);
}
List < String > sorted = sortedData.entrySet()
  .stream()
  .sorted(
    Comparator.comparing(
      (Map.Entry < String, Integer > entry) - >
entry.getValue()).reversed())
  .map(Map.Entry::getKey)
  .collect(Collectors.toList());
String mostPopular = sorted.get(0);
int purchases = sortedData.get(mostPopular);
System.out.println("Total purchases: " + numPurchases);
System.out.println("Unique users: " + uniqueUsers);
System.out.println("Total revenue: " + totalRevenue);
System.out.println(String.format(
  "Most popular product: % s with % d purchases ",
    mostPopular, purchases));
    }
}

从前面的代码可以看出,Java代码和Scala代码相比虽然多了通过匿名内部类来声明变量和函数的样板代码,但两者的基本结构类似。读者不妨分别练习这两种版本的代码,并比较一下计算同一个指标时两种语言在表达上的异同。

该程序可以通过在项目主目录下执行如下命令运行:

$ mvn exec:java -Dexec.mainClass="JavaApp"

可以看到其输出和Scala版的很类似,而且计算结果完全一样:

...
14/01/30 17:02:43 INFO spark.SparkContext: Job finished: collect at
JavaApp.java:46, took 0.039167 s
Total purchases: 5
Unique users: 4
Total revenue: 39.91
Most popular product: iPhone Cover with 2 purchases

1.8 Spark Python编程入门

Spark的Python API几乎覆盖了Scala API所能提供的全部功能,但有些特性暂不支持,比如GraphX的图处理和其他组件中的某些功能。具体可参见Spark编程指南的Python部分:http://spark.apache.org/docs/latest/rdd-programming-guide.html

PySpark基于Spark的Java API来构建。数据通过原生Python来处理并在JVM上实现缓存(cache)和移动(shuffle)。Python驱动程序的SparkContext通过Py4J来启动一个JVM并创建一个JavaSparkContext对象。该程序通过Py4J来实现Python和Java SparkContext对象之间的本地通信。用Python所编写的RDD转换操作会被映射为相应Java版的PythonRDD对象的转换操作。PythonRDD对象启用远程工作节点上的Python子进程,并通过管道(pipe)与其通信。这些子进程则负责发送用户代码和数据的处理。

与上两节类似,这里将编写一个相同功能的Python版程序。我们假设读者系统中已安装2.6或更高版本的Python(多数Linux系统和Mac OS X已预装Python)。

如下示例代码可以在本章代码的python-spark-app目录下找到。相应的CSV数据文件也在该目录的data子目录中。项目代码在一个名为pythonapp.py的脚本里,其内容如下:

"""用Python编写的一个简单Spark应用"""
from pyspark import SparkContext

sc = SparkContext("local[2]", "First Spark App")
# 将CSV格式的原始数据转化为(user,product,price)格式的记录集
data = sc.textFile("data/UserPurchaseHistory.csv").map(lambda line:
  line.split(",")).map(lambda record: (record[0], record[1], record[2]))
# 求购买总次数
numPurchases = data.count()
# 求有多少不同用户购买过商品
uniqueUsers = data.map(lambda record: record[0]).distinct().count()
# 求和得出总收入
totalRevenue = data.map(lambda record: float(record[2])).sum()
# 求最畅销的产品是什么
products = data.map(lambda record: (record[1], 1.0)).
  reduceByKey(lambda a, b: a + b).collect()
mostPopular = sorted(products, key=lambda x: x[1], reverse=True)[0]

print "Total purchases: %d" % numPurchases
print "Unique users: %d" % uniqueUsers
print "Total revenue: %2.2f" % totalRevenue
print "Most popular product: %s with %d purchases" % (mostPopular[0], mostPopular[1])

对比Scala版和Python版代码,不难发现Java版语法大致相同。主要不同在于匿名函数的表达方式上,匿名函数在Python语言中亦称lambda函数,lambda也是语法表达上的关键字。用Scala编写时,一个将输入x映射为输出y的匿名函数表示为x => y,在Python中则是lambda x : y。在上面代码的加粗部分,我们定义了一个将两个输入ab映射为一个输出的匿名函数。这两个输入的类型一般相同,这里调用的是相加函数,故写成lambda a, b : a + b

运行该脚本的最好方法是在脚本目录下运行如下命令:

$ SPARK_HOME/bin/spark-submit pythonapp.py

上述代码中的SPARK_HOME变量应该替换为读者实际的Spark的主目录,也就是在本章开始Spark预编译包解压生成的那个目录。

脚本运行完的输出应该和运行Scala和Java版时的类似,其结果同样也是:

...
14/01/30 11:43:47 INFO SparkContext: Job finished: collect at
  pythonapp.py:14, took 0.050251 s
Total purchases: 5
Unique users: 4
Total revenue: 39.91
Most popular product: iPhone Cover with 2 purchases

1.9 Spark R编程入门

SparkR是一个R包,它提供从R代码中调用Apache Spark的入口。在Spark 1.6.0版中,SparkR提供了针对大数据集的分布式数据框架。SparkR还能通过MLlib来支持分布式机器学习,建议读者在后续机器学习章节中动手实践一下。

SparkR DataFrame

DataFrame指按已命名的列来存储的分布式数据的集合。这个概念十分类似于关系型数据库或是R中的数据框,但它经过更多优化。数据框的数据源可以是CSV、TSV、Hive表格或是本地R的数据框等。

Spark对应的交互环境可由如下命令启用:./bin/sparkR shell

同样,我们用R来实现上述指标示例。这里假设读者系统中已安装R和R Studio,对应版本为3.0.2 (2013-09-25)-Frisbee Sailing或以上。

示例代码可以在本章代码的r-spark-app目录下找到,对应的CSV数据文件则在data子目录中。示例代码还包括一个名为r-script-01.R的脚本文件,其内容如下。读者需将下面代码中的PATH变量改为自己开发环境里对应的值。

  Sys.setenv(SPARK_HOME = "/PATH/spark-2.0.0-bin-hadoop2.7")
    .libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"),
      .libPaths()))
  # 载入SparkR库
  library(SparkR)
  sc <- sparkR.init(master = "local",
    sparkPackages="com.databricks:sparkcsv_2.10:1.3.0")
  sqlContext <- sparkRSQL.init(sc)

  user.purchase.history <-
    "/PATH/ml-resources/spark-ml/Chapter_01/r-sparkapp/data/UserPurchaseHistory.csv"
  data <- read.df(sqlContext, user.purchase.history,
    "com.databricks.spark.csv", header="false")
  head(data)
  count(data)

  parseFields <- function(record) {
  Sys.setlocale("LC_ALL", "C") # necessary for strsplit() to work correctly
  parts <- strsplit(as.character(record), ",")
  list(name=parts[1], product=parts[2], price=parts[3])
  }

  parsedRDD <- SparkR:::lapply(data, parseFields)
  cache(parsedRDD)
  numPurchases <- count(parsedRDD)

  sprintf("Number of Purchases : %d", numPurchases)
  getName <- function(record){
  record[1]
  }

getPrice <- function(record){
  record[3]
}

nameRDD <- SparkR:::lapply(parsedRDD, getName)
nameRDD = collect(nameRDD)
head(nameRDD)

uniqueUsers <- unique(nameRDD)
head(uniqueUsers)

priceRDD <- SparkR:::lapply(parsedRDD, function(x) {
  as.numeric(x$price[1])})
take(priceRDD,3)

totalRevenue <- SparkR:::reduce(priceRDD, "+")
sprintf("Total Revenue : %.2f", s)

products <- SparkR:::lapply(parsedRDD, function(x) { list(
  toString(x$product[1]), 1) })
take(products, 5)
productCount <- SparkR:::reduceByKey(products, "+", 2L)
productsCountAsKey <- SparkR:::lapply(productCount, function(x) { list(
  as.integer(x[2][1]), x[1][1])})
productCount <- count(productsCountAsKey)
mostPopular <- toString(collect(productsCountAsKey)[[productCount]][[2]])
sprintf("Most Popular Product : %s", mostPopular)

在Bash终端中执行如下命令便可运行该脚本:

$ Rscript r-script-01.R

相应的输入应如下:

> sprintf("Number of Purchases : %d", numPurchases)
  [1] "Number of Purchases : 5"

> uniqueUsers <- unique(nameRDD)
> head(uniqueUsers)
  [[1]]
  [[1]]$name
  [[1]]$name[[1]]
  [1] "John"
  [[2]]
  [[2]]$name
  [[2]]$name[[1]]
  [1] "Jack"
  [[3]]
  [[3]]$name
  [[3]]$name[[1]]
  [1] "Jill"
  [[4]]
  [[4]]$name
  [[4]]$name[[1]]
  [1] "Bob"

> sprintf("Total Revenue : %.2f", totalRevenueNum)
  [1] "Total Revenue : 39.91"

> sprintf("Most Popular Product : %s", mostPopular)
  [1] "Most Popular Product : iPad Cover"

1.10 在Amazon EC2上运行Spark

Spark项目提供了在Amazon EC2上构建一个Spark集群所需的脚本,位于ec2文件夹下。输入如下命令便可调用该文件夹下的spark-ec2脚本:

> ./ec2/spark-ec2

当不带参数直接运行上述代码时,终端会显示该命令的用法信息:

Usage: spark-ec2 [options] <actiom> <clusber_name>
<action> can be: launch, destroy, login, stop, start, get-master

Options:
...

在创建一个Spark EC2集群前,我们需要一个Amazon账号。

 如果没有Amazon Web Services账号,可以在https://aws.amazon.com/cn/注册。AWS的管理控制台地址是https://aws.amazon.com/cn/console/

另外,我们还需要创建一个Amazon EC2密钥对和相关的安全凭证。Spark文档提到了在EC2上部署时的需求:

你要先自己创建一个Amazon EC2密钥对。通过管理控制台登入你的Amazon Web Services账号后,单击左边导航栏中的Key Pairs按钮,然后创建并下载相应的私钥文件。通过ssh远程访问EC2时,会需要提交该密钥。该密钥的系统访问权限必须设定为600(即只有你可以读写该文件),否则会访问失败。

当需要使用spark-ec2脚本时,需要设置AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEY两个环境变量。它们分别为你的Amazon EC2访问密钥标识(Key ID)和对应的密钥密码(secret access key)。这些信息可以从AWS主页上依次点击Account | Security Credentials | Access Credentials获得。

创建一个密钥对时,最好选取一个好记的名字来命名。这里假设密钥对名为spark,对应的密钥文件的名称为spark.pem。如上面提到的,我们需要确认密钥的访问权限并设定好所需的环境变量:

> chmod 600 spark.pem
> export AWS_ACCESS_KEY_ID="..."
> export AWS_SECRET_ACCESS_KEY="..."

上述下载所得的密钥文件只能下载一次(即在刚创建后),故对其既要安全保存又要避免丢失。

注意,下一节中会启用一个Amazon EC2集群,这会在你的AWS账号下产生相应的费用。

启动一个EC2 Spark集群

现在我们可以启动一个小型Spark集群了。启动它只需进入ec2目录,然后输入:

$ cd ec2
$ ./spark-ec2 --key-pair=rd_spark-user1 --identity-file=spark.pem
  --region=us-east-1 --zone=us-east-1a launch my-spark-cluster

这将启动一个名为test-cluster的新集群,其包含m3.medium级别的主节点和从节点各一个。该集群所用的Spark版本适配于Hadoop 2。我们使用的密钥名和密钥文件分别是spark和spark.pem。(如果你给密钥文件取了不同的名字,或者有既存的AWS密钥对,就使用该名称。)

集群的完全启动和初始化需要一些时间。在运行启动代码后,应该会立即看到如下所示的内容:

Setting up security groups...
Creating security group my-spark-cluster-master
Creating security group my-spark-cluster-slaves
Searching for existing cluster my-spark-cluster in region
  us-east-1...
Spark AMI: ami-5bb18832
Launching instances...
Launched 1 slave in us-east-1a, regid = r-5a893af2
Launched master in us-east-1a, regid = r-39883b91
Waiting for AWS to propagate instance metadata...
Waiting for cluster to enter 'ssh-ready' state...........
Warning: SSH connection error. (This could be temporary.)
Host: ec2-52-90-110-128.compute-1.amazonaws.com
SSH return code: 255
SSH output: ssh: connect to host ec2-52-90-110-128.compute-
  1.amazonaws.com port 22: Connection refused
Warning: SSH connection error. (This could be temporary.)
Host: ec2-52-90-110-128.compute-1.amazonaws.com
SSH return code: 255
SSH output: ssh: connect to host ec2-52-90-110-128.compute-
  1.amazonaws.com port 22: Connection refused
Warnig: SSH connection error. (This could be temporary.)
Host: ec2-52-90-110-128.compute-1.amazonaws.com
SSH return code: 255
SSH output: ssh: connect to host ec2-52-90-110-128.compute-
  1.amazonaws.com port 22: Connection refused
Cluster is now in 'ssh-ready' state. Waited 510 seconds.

如果集群启动成功,最终应可在终端中看到如下的输出:

./tachyon/setup.sh: line 5: /root/tachyon/bin/tachyon:
  No such file or directory
./tachyon/setup.sh: line 9: /root/tachyon/bin/tachyon-start.sh:
  No such file or directory
[timing] tachyon setup: 00h 00m 01s
Setting up rstudio
spark-ec2/setup.sh: line 110: ./rstudio/setup.sh:
  No such file or directory
[timing] rstudio setup: 00h 00m 00s
Setting up ganglia
RSYNC'ing /etc/ganglia to slaves...
ec2-52-91-214-206.compute-1.amazonaws.com
Shutting down GANGLIA gmond:                               [FAILED]
Starting GANGLIA gmond:                                    [  OK  ]
Shutting down GANGLIA gmond:                               [FAILED]
Starting GANGLIA gmond:                                    [  OK  ]
Connection to ec2-52-91-214-206.compute-1.amazonaws.com closed.
Shutting down GANGLIA gmetad:                              [FAILED]
Starting GANGLIA gmetad:                                   [  OK  ]
Stopping httpd:                                            [FAILED]
Starting httpd: httpd: Syntax error on line 154 of /etc/httpd
  /conf/httpd.conf: Cannot load /etc/httpd/modules/mod_authz_core.so
  into server: /etc/httpd/modules/mod_authz_core.so: cannot open
  shared object file: No such file or directory              [FAILED]
[timing] ganglia setup: 00h 00m 03s
Connection to ec2-52-90-110-128.compute-1.amazonaws.com closed.
Spark standalone cluster started at
  http://ec2-52-90-110-128.compute-1.amazonaws.com:8080
Ganglia started at http://ec2-52-90-110-128.compute-
  1.amazonaws.com:5080/ganglia
Done!
ubuntu@ubuntu:~/work/spark-1.6.0-bin-hadoop2.6/ec2$

这将创建两个虚拟机来分别充当Spark主节点和工作节点,类型均为m1.large,如以下截图所示。

要测试是否能连接到新集群,可以输入如下命令:

$ ssh -i spark.pem root@ec2-52-90-110-128.compute-1.amazonaws.com

注意,该命令中root@后面的IP地址需要替换为你自己的Amazon EC2的公开域名。该域名可在启动集群时的终端输出中找到。

另外,也可以通过如下命令得到集群的公开域名:

> ./spark-ec2 -i spark.pem get-master test-cluster

上述ssh命令执行成功后,你会连接到EC2上Spark集群的主节点,同时终端的输入应与下图类似:

如果要测试集群是否已正确配置Spark环境,可以切换到Spark目录,然后以本地模式运行一个示例程序:

> cd spark
> MASTER=local[2] ./bin/run-example SparkPi

其输出应该与在你自己计算机上的输出类似:

...
14/01/30 20:20:21 INFO SparkContext: Job finished: reduce at SparkPi.
  scala:35, took 0.864044012 s
Pi is roughly 3.14032
...

这样就有了包含多个节点的真实集群,可以测试集群模式下的Spark了。我们会在一个从节点的集群上运行相同的示例。运行命令和上面相同,但用主节点的URL作为MASTER的值:

> MASTER=spark:// ec2-52-90-110-128.compute-
  1.amazonaws.com:7077 ./bin/run-example SparkPi

 注意,你需要将上面代码中的公开域名替换为你自己的。

同样,命令的输出应该和本地运行时的类似。不同的是,这里会有日志消息提示你的驱动程序已连接到Spark集群的主节点。

...
14/01/30 20:26:17 INFO client.Client$ClientActor: Connecting to master
  spark://ec2-54-220-189-136.eu-west-1.compute.amazonaws.com:7077
14/01/30 20:26:17 INFO cluster.SparkDeploySchedulerBackend: Connected to
  Spark cluster with app ID app-20140130202617-0001
14/01/30 20:26:17 INFO client.Client$ClientActor: Executor added:
  app- 20140130202617-0001/0 on worker-20140130201049-
  ip-10-34-137-45.eu-west-1.compute. internal-57119
  (ip-10-34-137-45.eu-west-1.compute.internal:57119) with 1 cores
14/01/30 20:26:17 INFO cluster.SparkDeploySchedulerBackend:
  Granted executor ID app-20140130202617-0001/0 on hostPort ip-10-34-137-45.
  eu- west-1.compute.internal:57119 with 1 cores, 2.4 GB RAM
14/01/30 20:26:17 INFO client.Client$ClientActor:
  Executor updated: app- 20140130202617-0001/0 is now RUNNING
14/01/30 20:26:18 INFO spark.SparkContext: Starting job: reduce at
  SparkPi.scala:39
...

读者不妨在集群上自由练习,熟悉一下Scala的交互式终端:

$ ./bin/spark-shell --master spark:// ec2-52-90-110-128.compute-1.amazonaws.com:7077

练习完后,输入exit便可退出终端。另外也可以通过如下命令来体验PySpark终端:

$ ./bin/pyspark --master spark:// ec2-52-90-110-128.compute-1.amazonaws.com:7077

通过Spark主节点网页界面,可以看到主节点下注册了哪些应用。该界面位于ec2-52-90-110-128.compute-1.amazonaws.com:8080(同样,需要将公开域名替换为你自己的)。

值得注意的是,Amazon会根据集群的使用情况收取费用。所以在使用完毕后,记得停止或终止这个测试集群。要终止该集群,可以先在你本地系统的ssh会话里输入exit,然后再输入如下命令:

$ ./ec2/spark-ec2 -k spark -i spark.pem destroy test-cluster

应该可以看到这样的输出:

Are you sure you want to destroy the cluster test-cluster?
The following instances will be terminated:
Searching for existing cluster test-cluster...
Found 1 master(s), 1 slaves
> ec2-54-227-127-14.compute-1.amazonaws.com
> ec2-54-91-61-225.compute-1.amazonaws.com
ALL DATA ON ALL NODES WILL BE LOST!!
Destroy cluster test-cluster (y/N): y
Terminating master...
Terminating slaves...

输入y然后回车,便可终止该集群。

恭喜!现在你已经做到了在云端设置Spark集群,并在它上面运行了一个完全并行的示例程序,最后也终止了这个集群。如果在学习后续章节时,你想在集群上运行示例或你自己的程序,可以再次使用这些脚本并指定想要的集群规模和配置。(留意一下费用,并记得使用完毕后关闭它们就行。)

1.11 在Amazon Elastic Map Reduce上配置并运行Spark

这里将介绍如何借助EMR(Amazon Elastic Map Reduce)来启用一个包含Spark的Hadoop集群。这可通过如下步骤来实现。

(1) 启动一个Amazon EMR Cluster。

(2) 在如下地址打开Amazon EMR UI终端:https://console.aws.amazon.com/elasticmapreduce/home

(3) 如以下截图所示,选择Create cluster(创建集群)。

(4) 如以下截图所示,选择Amazon AMI 3.9.0或更新版本。

(5) User Interface中提供了Application预安装选项,从中选择Spark 1.5.2或更新版本,并点击Add按钮。

(6) 根据需要选择其他硬件选项。

  • Instance type:主机类型
  • Key pair:用于SSH的密钥对
  • Permissions:权限
  • IAM roles:IAM角色,Default(默认)或Custom(自定义)

见如下截图。

(7) 点击 Create cluster按钮。集群将会开始初始化,如下图所示。

(8) 登录主节点。当EMR集群就绪后,便可通过SSH来登录主节点。

$ ssh -i rd_spark-user1.pem hadoop@ec2-52-3-242-138.compute-1.amazonaws.com

其输出如下:

Last login: Wed Jan 13 10:46:26 2016

   __| __|_  )
   _|  (    / Amazon Linux AMI
  ___|___|___|

https://aws.amazon.com/amazon-linux-ami/2015.09-release-notes/
  23 package(s) needed for security, out of 49 available
Run "sudo yum update" to apply all updates.
[hadoop@ip-172-31-2-31 ~]$

(9) 启动Spark Shell。

[hadoop@ip-172-31-2-31 ~]$ spark-shell
16/01/13 10:49:36 INFO SecurityManager: Changing view acls to: hadoop
16/01/13 10:49:36 INFO SecurityManager: Changing modify acls to: hadoop
16/01/13 10:49:36 INFO SecurityManager: SecurityManager:
    authentication disabled; ui acls disabled; users with view
    permissions: Set(hadoop); users with modify permissions: Set(hadoop)
    16/01/13 10:49:36 INFO HttpServer: Starting HTTP Server
    16/01/13 10:49:36 INFO Utils: Successfully started service 'HTTP
      class server' on port 60523.
    Welcome to
          ____             __
         / __/__ ___ _____/ /__
        _ / _ / _ &grave;/ __/ '_/
       /___/ .__/_,_/_/ /_/_  version 1.5.2
          /_/
    scala> sc

(10) 运行EMR上的Spark基本示例。

scala> val textFile = sc.textFile("s3://elasticmapreduce/samples
  /hive-ads/tables/impressions/dt=2009-04-13-08-05
  /ec2-0-51-75-39.amazon.com-2009-04-13-08-05.log")
scala> val linesWithCartoonNetwork = textFile.filter(line =>
  line.contains("cartoonnetwork.com")).count()

其输出应如下:

linesWithCartoonNetwork: Long = 9

1.12 Spark用户界面

Spark提供了一个Web界面,它可用于监控任务进度和运行环境,以及运行SQL命令。

SparkContext 通过4040端口发布一个Web界面来显示与当前应用有关的信息。这些信息包括:

  • 各个调度阶段和任务的列表
  • RDD大小和内存使用情况的概要
  • 环境信息
  • 正在运行的执行器的相关信息

该界面可通过https://:4040在浏览器中访问。若同一主机上有多个SparkContext在运行,则会从4040开始依次分配不同的端口,如4041、4042,以此类推。

如下截图显示了Web界面所提供的部分信息:

Spark 运行环境展示界面

Spark Executors状态汇总界面

1.13 Spark所支持的机器学习算法

Spark ML支持如下算法。

  • 协同过滤
    • 交替最小二乘法(ALS,alternating least squares)。协同过滤常用于推荐系统。这些技术旨在计算用户-物品关联矩阵中缺失的关联关系。spark.mllib目前支持基于模型的协同过滤。在其实现中,用户和物品通过一个由若干隐藏因子(latent factor)组成的集合来表示,进而预测缺失的关联关系。spark.mllib使用ALS算法来学习这些隐藏因子。
  • 聚类。聚类旨在处理一种无监督学习问题,即通过某种相似性的度量将不同的对象分组(或分类)。聚类常用于探索性分析或作为分层监督学习流程的一个部分。第二种情况会对不同的类别训练出相应的特征分类器或回归模型。Spark中实现了如下聚类算法。

    • K-均值K-{\rm means})。这是常见的聚类算法之一,它将各个数据点归类到多个类别中,但此时类别的数目已预先定义好,即由用户指定。spark.mllib的对应实现包含了其并行化版本的衍化算法K-{\rm means}++
    • 高斯混合。高斯混合模型(GMM,Gaussian mixture model)指一种组合分布,其中各数据点是从k个子高斯分布之一中取出的。各个子分布都有自己的概率分布。spark.mllib的对应实现使用了期望最大化(expectation-maximization)算法来求解给定样本的最大似然(maximum-likelihood)。
    • 幂迭代聚类(PIC,power iteration clustering)是用于对边加权图中的顶点进行聚类的一种可扩展算法。该类图中的边的权值对应两端顶点的相似性。该算法通过幂迭代来计算图(所对应的归一化后的相似矩阵,affinity matrix)的伪特征向量(pseudo eigenvector)。

      幂迭代是一种特征值求解算法。给定一个矩阵\boldsymbol{X},该算法将求出一个数值\lambda(特征值)和一个非零向量\boldsymbol{v}(特征向量),使得\boldsymbol{Xv}=\lambda\boldsymbol{v}

      矩阵的伪特征向量可视为近邻矩阵(nearby matrix)的特征向量。伪特征向量的详细定义如下。

      \boldsymbol{A}mn列的矩阵,\boldsymbol{E}为任何满足||\boldsymbol{E}||= 的矩阵,那么\boldsymbol{A}的伪特征向量为\boldsymbol{A}+\boldsymbol{E}的特征向量。该特征向量利用它来图的顶点进行聚类。

      spark.mllib包含了PIC的一种实现,该实现基于GraphX。它以元组(tuple)的RDD为输入,输出带有分类结果(标签)的模型。相似性的表示必须为非负值。PIC假设相似度为对称的。

      (在统计学中,相似性度量或相似性函数是一种量化两个对象之间相似度的实数函数。该度量与距离函数相反。一种常见的相似性函数是余弦相似性。)

      若用srcIddstId分别表示图中的两个顶点,则(srcId, dstId)在输入数据中最多只能出现一次,因为它与(dstId, srcId)等效。

    • 隐含狄利克雷分布(LDA,latent Dirichlet allocation)是从一系列文本文档中推断若干主题(topic)的一种模型,是聚类模型的一种。主题解释如下。

      主题为聚类的中心,而各文本对应从相应主题中抽取的样本。主题和文本都存在于一个特征空间中,这里的特征对应表示不同单词出现次数的向量(即词袋模型,bag of words)。

      LDA通过对文本是如何生成的建模,进而聚类,而非使用传统的距离表示。

    • 二分K-均值(bisecting K-{\rm means})是一种典型的层次聚类算法。层次聚类分析(HCA,hierarchial cluster analysis)会自顶向下分层构建出不同层次的类(的划分)。在这类算法中,所有的数据点从同一个类别开始,并递归式地向下分层细分。

      层次聚类常用于聚类分析中需要构建类的层次结构的场景。

    • 流式K-均值聚类(steaming K-{\rm means})。当处理的数据为数据流时,需要根据新的数据来动态评估并更新现有的聚类。spark.mllib支持流式K-均值聚类分析,并提供相关的参数来控制更新期限。该算法使用一种泛化的小批量K-均值更新规则。
  • 分类

    • 决策树(decision trees)。决策树及其集成算法(ensemble)是用于分类和回归的一种模型。决策树具有可解释性高、能处理类别属性以及可扩展到多类别场景的特点,因而使用广泛。它们并不需要特征缩放(feature scaling),而且能捕获非线性特征和特征之间的关联。树集成算法、随机森林和Boosting是分类和回归类应用中表现最优的几种。

      spark.mllib中的决策树模型支持二分类、多类别和回归三种场景,支持连续型和离散型(如类别)特征。该实现按行对数据进行分区,从而支持数百万实例上的分布式训练。

    • 朴素贝叶斯(naive Bayes)是一类应用贝叶斯理论的概率分类模型。该模型有一个强(朴素)假设,即各个特征之间相互独立。

      朴素贝叶斯是一种多分类算法,它假设特征之间两两独立。对给定的一组训练数据,它计算给定标签时每个特征的条件概率分布,然后利用贝叶斯理论来计算给定数据点的标签的条件概率分布,并用该分布来进行预测。spark.mllib支持多项式朴素贝叶斯(multinomial naive Bayes)和伯努利朴素贝叶斯(Bernoulli naive Bayes)。这些模型常用于文本分类。

    • 概率分类器(probability classifier)。在机器学习中,概率分类器用于预测给定的输入数据在一组类别上的概率分布,而非输出该数据最可能的类别。它提供分类归属上的可能性。该可能性本身具有某些意义,也能和其他分类器集成。
    • logistic回归用于二元(是否)判断。它通过一个logistic函数估算的概率来度量与标签有关变量和无关变量的关联性。该函数是一个累积logistic分布(cumulative logistic distribution)函数。

      它预测输出的概率,是广义线性模型(GLM,generalized linear models)的一种特例。相关背景知识和实现的细节可见spark.mllib中与logistic回归相关的文档。

      GLM对变量的误差分布而非正态分布建模,因而被视为线性回归的一种泛化。

    • 随机森林(random forest)算法通过集成多个决策树来确定决策边界。随机森林结合许多决策树,从而降低了结果过拟合(overfitting)的风险。

      Spark ML的决策树算法支持二元和多类别分类与拟合,可用于连续性或标签属性类数值。

  • 降维(dimensionality reduction)是减少数据维度(特征数目)的过程,其输出供后续机器学习。它能用于从原始特征中提取隐藏特征,或在保证整体结构的前提下对数据进行压缩。MLlib在RowMatrix类的基础上提供了降维支持。
    • 奇异值分解(SVD,singular value decomposition)的定义如下:给定一个行列数为 (m,n) 且元素值为实数或复数的矩阵\boldsymbol{M},其奇异值分解形如\boldsymbol{U}Σ\boldsymbol{V}^*,其中\boldsymbol{U}Σ\boldsymbol{V}的行列数分别为 (m,R)(R,R)(n,R),且Σ 的对角线元素为非负实数,\boldsymbol{V}为单位矩阵,R等于矩阵\boldsymbol{M}的秩(Rank)。\boldsymbol{V}^*表示\boldsymbol{V}的共轭转置。
    • 主成分分析(PCA,principal component analysis)是一种统计学方法,旨在找到使得各数据点在第一维度上的差异最大化的旋转。这也使得在后续各个维度上的差异能最大化。旋转矩阵的列被称为主成分。PCA是一种常用的降维方法。 MLlib提供对多行少列矩阵的PCA支持。该支持以RowMatrix类为基础,且矩阵以行优先方式存储。Spark同样支持特征提取和转换,具体如TF-IDF、ChiSquare、Selector、Normalizer和Word2Vector。
  • 频繁模式挖掘

    • FP-growth。FP为frequent pattern(频繁模式)的缩写。该算法首先计算数据中物品的出现次数(属性-属性值对)并将其保存到头表中(header table)。

      第二轮时,它通过插入实例(由物品,即items构成)来构建FP-Tree结构。每个实例对应的多个物品,参照各自在数据集中出现的频率来降序排列。这使得树能快速处理。各实例中,低于特定最小频率阈值的物品会被排除。对于多数实例中高频率出现的物品有所重复的情况,FP-Tree在接近树根的分支进行了高度压缩。

    • 关联规则(association rule)。关联规则学习旨在发现海量数据的各个特征之间的某些关系。

      它实现了一个并行的规则生成算法来构建最终想要的规则,该规则以单个物品为输出。

  • PrefixSpan。这是一种序列模式挖掘算法。
  • 评估指标(evaluation metrics)。spark.mllib提供了一套指标,用于评估算法。
  • PMML模型输出。PMML(predictive model markup language,预测模型标记语言)是一种基于XML的预测模型交换格式。它使得各个分析类应用能够描述并相互交换由机器学习算法生成的预测模型。

    spark.mllib支持以PMML或等效的格式来输出其机器学习模型。

  • 参数优化算法

    • 随机梯度下降法(SGD,stochastic gradient descent)。SGD通过优化梯度下降来最小化一个目标函数。该函数为若干可微函数的和。

      各类梯度下降法和随机次梯度下降法均为MLlib的底层原语,是其他各种机器学习算法的基础。

  • Limited-Memory BFGS(L-BFGS)。这是一种优化算法,且属于准牛顿算法家族(Quasi-Newton methods)的一种。该类算法是对BFGS(Broyden-Fletcher-Goldfarb-Shanno)算法的近似计算。它所需内存空间不大,用于机器学习中的参数估计。

    BFGS模型是牛顿模型的近似,是爬山法(hill-climbing optimization techniques)的一种。爬山法的特点是求解给定函数的平稳点(stationary point)。对这类问题而言,最优化的一个必要条件就是梯度为零。

1.14 Spark ML的优势

加州大学伯克利分校AMQ实验室在Amazon EC2平台上借助一系列实验以及用户应用的基准测试,对Spark和RDD进行了评估。

  • 使用的算法:logistic回归和K-均值。
  • 用例:首次迭代和多次迭代。

所有的测试使用m1.xlarge EC2节点。该类节点包含4个核心以及15GB内存。存储基于HDFS,块大小为256MB。与其他库的比较可见下图。下图对比了Hadoop和Spark的logistic回归算法在首次迭代和后续迭代中的性能。

下图则用K-均值聚类算法进行了相同的比较。

总体结果表明如下几点。

  • 对迭代式机器学习和图应用而言,Spark的性能比Hadoop高,最多能高出20倍。加速来自于避免I/O操作,以及将数据以Java对象形式保存在内存中,从而减少了反序列。
  • 用Spark编写的应用有良好的性能和可扩展性。对比Hadoop,Spark能为分析报告加速40倍。
  • 当节点实效时,Spark仅需重建丢失的RDD分区,从而可以迅速恢复。
  • Spark能在5~7秒延迟内完成对1TB数据的交互式查询。

 更多信息参见http://people.csail.mit.edu/matei/papers/2012/nsdi_spark.pdf

Spark和Hadoop在排序上的基准测评比较——2014年,Databricks团队参加了一项SORT基准测试(http://sortbenchmark.org/)。该测试使用的数据集大小为100TB。Hadoop运行于一个专属数据中心上,而Spark则对应EC2上的200多个节点并用HDFS做分布式存储。

测试表明Spark的速度比Hadoop快3倍,而占用的机器数仅为其1/10,如下图所示。

1.15 在Google Compute Engine上用Dataproc构建Spark集群

Cloud Dataproc是一种运行于Google Compute Engine上的Spark和Hadoop服务。它是一种受管理的服务。Cloud Dataproc自动化有助于快速创建集群,方便对集群进行管理,并在空闲时自动关闭集群来节省费用。

本节将学习如何使用Dataproc服务来创建一个Spark集群,并在其上运行示例。

请读者事先创建好一个Google Compute Engine账号,并安装Google Cloud SDK。

1.15.1 Hadoop和Spark版本

Dataproc支持如下Hadoop和Spark版本,但会随新版本的发布而有所改变:

  • Spark 1.5.2
  • Hadoop 2.7.1
  • Pig 0.15.0
  • Hive 1.2.1
  • GCS connector 1.4.3-hadoop2
  • BigQuery connector 0.7.3-hadoop2

 更多信息请参见https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-versions

下面的步骤将在Google Cloud Console中进行,该用户界面用于Spark集群的创建和任务的提交。

1.15.2 创建集群

可在Cloud Platform Console中创建一个Spark集群。选择相应项目,并点击Continue按钮以打开Clusters页面。这时便可看到归属于该项目的Cloud Dataproc集群,如果你已经创建了的话。

点击Create a cluster按钮以打开Create a Cloud Dataproc集群页面,如下图所示。

点击Create a cluster按钮之后,便会显示一个详细的表格,如下图所示。

上图展示了Create a Cloud Dataproc集群页面,且已自动填写了一个名为cluster-1的新集群。来看一下下面的屏幕截图。

展开Workers、Bucket、Network、Version、Initialization和Access Options界面,便可配置工作节点、Staging bucket、网络、初始化策略、Cloud Dataproc镜像版本、执行的操作和集群的项目级访问策略。可根据需求重新制定这些值,或默认即可。

默认情况下,集群不包含工作节点,但包含默认的Staging bucket和网络设定。同时也会采用最新发布的Cloud Dataproc镜像版本。这些默认配置均可更改,如下图所示。

配置完成后,点击Create按钮来创建集群。集群的名称会显示在Cluster页面上。当集群创建完毕后,其状态会更新为Running。

点击之前创建的集群的名称,便可打开集群详情页面。该页面同时有个Overview标签页和CPU utilization图。

从其他的标签页可以查看任务和实例等信息。

1.15.3 提交任务

通过Cloud Platform UI,便可从Cloud Platform Console提交一个任务到集群。在该页面选择相应的项目并点击Continue按钮。若是第一次提交,会显示如下对话框:

点击Submit按钮来提交任务,如下图所示:

要运行样例任务,参照如下步骤填写Submit页面。

(1) 从集群列表中选择一个集群名。

(2) 设置Spark的Job type。

(3) 在Jar files中加入file:///usr/lib/spark/lib/spark-examples.jar。这里,file:///为Hadoop LocalFileSystem语法;Cloud Dataproc在创建集群时会将/usr/lib/spark/lib/spark-examples.jar安装到集群主节点上。如有需要,用户也可指定所需jar文件的Cloud Storage 路径(gs://my-bucket/ my-jarfile.jar)或一个HDFS路径(hdfs://examples/myexample.jar)到自定义路径中。

(4) 设置jar的Main class为org.apache.spark.examples.SparkPi

(5) 设置Argument为单个参数1000。

点击Submit按钮来开始任务。

任务开始后便会添加到Job列表中,见如下截图。

任务结束后,其状态会发生改变,如下图所示:

不妨看下此时job的输出。

用相应的Job ID在终端中执行命令。

在作者这里,Job ID为1ed4d07f-55fc-45fe-a565-290dcd1978f7,Project ID为rd-spark-1。故相应的命令为:

$ gcloud beta dataproc --project=rd-spark-1 jobs wait 1ed4d07f-
  55fc-45fe-a565-290dcd1978f7

其输出(省略后)为:

Waiting for job output...
16/01/28 10:04:29 INFO akka.event.slf4j.Slf4jLogger: Slf4jLogger
  started
16/01/28 10:04:29 INFO Remoting: Starting remoting
...
Submitted application application_1453975062220_0001
Pi is roughly 3.14157732

也可通过SSH登录Spark实例,并以交互模式启动spark-shell。

1.16 小结

本章讲述了如何在本地计算机以及Amazon EC2的云端上配置Spark环境,还介绍了如何在Amazon Elastic Map Reduce(EMR)上运行Spark,以及如何通过Google Compute Engine的Spark服务来创建一个集群并运行示例程序。此外还通过Scala交互式终端讨论了Spark编程模型的基础知识和API,并分别用Scala、Java、R和Python语言编写了一个简单的Spark程序。最后还对比了Hadoop和Spark在不同机器学习算法以及SORT基准测试上的性能指标。

下一章将介绍机器学习相关的基础数学。

目录

  • 版权声明
  • 前言
  • 第 1 章 Spark的环境搭建与运行
  • 第 2 章 机器学习的数学基础
  • 第 3 章 机器学习系统设计
  • 第 4 章 Spark上数据的获取、处理与准备
  • 第 5 章 Spark构建推荐引擎
  • 第 6 章 Spark构建分类模型
  • 第 7 章 Spark构建回归模型
  • 第 8 章 Spark构建聚类模型
  • 第 9 章 Spark应用于数据降维
  • 第 10 章 Spark高级文本处理技术
  • 第 11 章 Spark Streaming实时机器学习
  • 第 12 章 Spark ML Pipeline API