第 2 章 Spark 下载与入门

第 2 章 Spark 下载与入门

在本章中,我们会下载 Spark 并在本地模式下单机运行它。本章是写给 Spark 的所有初学者的,对数据科学家和工程师来说都值得一读。

Spark 可以通过 Python、Java 或 Scala 来使用 1。要用好本书不需要高超的编程技巧,但是确实需要对其中某种语言的语法有基本的了解。我们会尽可能在示例中给出全部三种语言的代码。

1Spark 1.4.0 起添加了 R 语言支持。

Spark 本身是用 Scala 写的,运行在 Java 虚拟机(JVM)上。要在你的电脑或集群上运行 Spark,你要做的准备工作只是安装 Java 6 或者更新的版本。如果你希望使用 Python 接口,你还需要一个 Python 解释器(2.6 以上版本)。Spark 尚不支持 Python 32

2Spark 1.4.0 起支持 Python 3。——译者注

2.1 下载Spark

使用 Spark 的第一步是下载和解压缩。我们先从下载预编译版本的 Spark 开始。访问 http://spark.apache.org/downloads.html,选择包类型为“Pre-built for Hadoop 2.4 and later”(为 Hadoop 2.4 及更新版本预编译的版本),然后选择“Direct Download”直接下载。这样我们就可以得到一个压缩的 TAR 文件,文件名为 spark-1.2.0-bin-hadoop2.4.tgz.

 Windows 用户如果把 Spark 安装到带有空格的路径下,可能会遇到一些问题。所以我们需要把 Spark 安装到不带空格的路径下,比如 C:\spark 这样的目录中。

你不需要安装 Hadoop,不过如果你已经有了一个 Hadoop 集群或安装好的 HDFS,请下载对应版本的 Spark。你可以在 http://spark.apache.org/downloads.html 里选择所需要的包类型,这会导致下载得到的文件名略有不同。也可以选择从源代码直接编译。你可以从 GitHub 上下载最新代码,也可以在下载页面上选择包类型为“Source Code”(源代码)进行下载。

 大多数类 Unix 系统,包括 OSX 和 Linux,都有一个叫 tar 的命令行工具,可以用来解压 TAR 文件。如果你的操作系统没有安装 tar,可以尝试搜索网络获取免费的 TAR 解压缩工具。比如,如果你使用的是 Windows,可以试一下 7-Zip.

下载好了 Spark 之后,我们要进行解压缩,然后看一看默认的 Spark 发行版中都有些什么。打开终端,将工作路径转到下载的 Spark 压缩包所在的目录,然后解开压缩包。这样会创建出一个和压缩包同名但是没了 .tgz 后缀的新文件夹。接下来我们就把工作路径转到这个新目录下看看里面都有些什么。上面这些步骤可以用如下命令完成:

cd ~
tar -xf spark-1.2.0-bin-hadoop2.4.tgz
cd spark-1.2.0-bin-hadoop2.4
ls

tar 命令所在的那一行中,x 标记指定 tar 命令执行解压缩操作,f 标记则指定压缩包的文件名。ls 命令列出了 Spark 目录中的内容。我们先来粗略地看一看 Spark 目录中的一些比较重要的文件及目录的名字和作用。

  • README.md

    包含用来入门 Spark 的简单的使用说明。

  • bin

    包含可以用来和 Spark 进行各种方式的交互的一系列可执行文件,比如本章稍后会讲到的 Spark shell。

  • core、streaming、python……

  • 包含Spark项目主要组件的源代码。

  • examples

    包含一些可以查看和运行的 Spark 程序,对学习 Spark 的 API 非常有帮助。

不要被 Spark 项目数量庞大的文件和复杂的目录结构吓倒,我们会在本书接下来的部分中讲解它们中的很大一部分。就目前来说,我们还是按部就班,先来试试 Spark 的 Python 和 Scala 版本的 shell。让我们从运行一些 Spark 自带的示例代码开始,然后再编写、编译并运行一个我们自己简易的 Spark 程序。

本章我们所做的一切,Spark 都是在本地模式下运行,也就是非分布式模式,这样我们只需要用到一台机器。Spark 可以运行在许多种模式下,除了本地模式,还支持运行在 Mesos 或 YARN 上,也可以运行在 Spark 发行版自带的独立调度器上。我们会在第 7 章详细讲述各种部署模式。

2.2 Spark中Python和Scala的shell

Spark 带有交互式的 shell,可以作即时数据分析。如果你使用过类似 R、Python、Scala 所提供的 shell,或操作系统的 shell(例如 Bash 或者 Windows 中的命令提示符),你也会对 Spark shell 感到很熟悉。然而和其他 shell 工具不一样的是,在其他 shell 工具中你只能使用单机的硬盘和内存来操作数据,而 Spark shell 可用来与分布式存储在许多机器的内存或者硬盘上的数据进行交互,并且处理过程的分发由 Spark 自动控制完成。

由于 Spark 能够在工作节点上把数据读取到内存中,所以许多分布式计算都可以在几秒钟之内完成,哪怕是那种在十几个节点上处理 TB 级别的数据的计算。这就使得一般需要在 shell 中完成的那些交互式的即时探索性分析变得非常适合 Spark。Spark 提供 Python 以及 Scala 的增强版 shell,支持与集群的连接。

 本书中大多数示例代码都包含 Spark 支持的所有语言版本,但是交互式 shell 部分只提供了 Python 和 Scala 版本的示例。shell 对于学习 API 是非常有帮助的,因此我们建议读者在 Python 和 Scala 版本的例子中选择一种进行尝试,即便你是 Java 开发者也是如此,毕竟各种语言的 API 是相似的。

展示 Spark shell 的强大之处最简单的方法就是使用某个语言的 shell 作一些简单的数据分析。我们一起按照 Spark 官方文档中的快速入门指南(http://spark.apache.org/docs/latest/quick-start.html)中的示例来做一遍。

第一步是打开 Spark shell。要打开 Python 版本的 Spark shell,也就是我们所说的 PySpark Shell,进入你的 Spark 目录然后输入:

bin/pyspark

(在 Windows 中则运行 bin\pyspark。)如果要打开 Scala 版本的 shell,输入:

bin/spark-shell

稍等数秒,shell 提示符就会出现。Shell 启动时,你会看到许多日志信息输出。有的时候,由于提示符之后又输出了日志,我们需要按一下回车键,来得到一个清楚的 shell 提示符。图 2-1 是 PySpark shell 启动时的样子。

图 2-1:默认日志选项下的 PySpark shell

如果觉得 shell 中输出的日志信息过多而使人分心,可以调整日志的级别来控制输出的信息量。你需要在 conf 目录下创建一个名为 log4j.properties 的文件来管理日志设置。Spark 开发者们已经在 Spark 中加入了一个日志设置文件的模版,叫作 log4j.properties.template。要让日志看起来不那么啰嗦,可以先把这个日志设置模版文件复制一份到 conf/log4j.properties 来作为日志设置文件,接下来找到下面这一行:

log4j.rootCategory=INFO, console

然后通过下面的设定降低日志级别,只显示警告及更严重的信息:

log4j.rootCategory=WARN, console

这时再打开 shell,你就会看到输出大大减少(图 2-2)。

图 2-2:降低日志级别后的 PySpark shell

 

使用 IPython

IPython 是一个受许多 Python 使用者喜爱的增强版 Python shell,能够提供自动补全等好用的功能。你可以在 http://ipython.org 上找到安装说明。只要把环境变量 IPYTHON 的值设为 1,你就可以使用 IPython 了:

IPYTHON=1 ./bin/pyspark

要使用 IPython Notebook,也就是 Web 版的 IPython,可以运行:

IPYTHON_OPTS="notebook" ./bin/pyspark

在 Windows 上,像下面这样设置环境变量并运行命令行:

set IPYTHON=1
bin\pyspark

在 Spark 中,我们通过对分布式数据集的操作来表达我们的计算意图,这些计算会自动地在集群上并行进行。这样的数据集被称为弹性分布式数据集(resilient distributed dataset),简称 RDD。RDD 是 Spark 对分布式数据和计算的基本抽象。

在我们更详细地讨论 RDD 之前,先来使用 shell 从本地文本文件创建一个 RDD 来作一些简单的即时统计。例 2-1 是 Python 版的例子,例 2-2 是 Scala 版的。

例 2-1:Python 行数统计

>>> lines = sc.textFile("README.md") # 创建一个名为lines的RDD

>>> lines.count() # 统计RDD中的元素个数
127
>>> lines.first() # 这个RDD中的第一个元素,也就是README.md的第一行
u'# Apache Spark'

例 2-2:Scala 行数统计

scala> val lines = sc.textFile("README.md") // 创建一个名为lines的RDD
lines: spark.RDD[String] = MappedRDD[...]

scala> lines.count() // 统计RDD中的元素个数
res0: Long = 127

scala> lines.first() // 这个RDD中的第一个元素,也就是README.md的第一行
res1: String = # Apache Spark

要退出任一 shell,按 Ctrl-D。

 你可能在日志的输出中注意到了这样一行信息:INFO SparkUI: Started SparkUI at http://[ipaddress]:4040。你可以由这个地址访问 Spark 用户界面,查看关于任务和集群的各种信息。我们会在第 7 章中详细讨论。

在例 2-1 和例 2-2 中,变量 lines 是一个 RDD,是从你电脑上的一个本地的文本文件创建出来的。我们可以在这个 RDD 上运行各种并行操作,比如统计这个数据集中的元素个数在这里就是文本的行数),或者是输出第一个元素。我们会在后续章节中深入探讨 RDD。在此之前,让我们先花些时间来了解 Spark 的基本概念。

2.3 Spark核心概念简介

现在你已经用 shell 运行了你的第一段 Spark 程序,是时候对 Spark 编程作更细致的了解了。

从上层来看,每个 Spark 应用都由一个驱动器程序(driver program)来发起集群上的各种并行操作。驱动器程序包含应用的 main 函数,并且定义了集群上的分布式数据集,还对这些分布式数据集应用了相关操作。在前面的例子里,实际的驱动器程序就是 Spark shell 本身,你只需要输入想要运行的操作就可以了。

驱动器程序通过一个 SparkContext 对象来访问 Spark。这个对象代表对计算集群的一个连接。shell 启动时已经自动创建了一个 SparkContext 对象,是一个叫作 sc 的变量。我们可以通过例 2-3 中的方法尝试输出 sc 来查看它的类型。

例 2-3:查看变量 sc

>>> sc
<pyspark.context.SparkContext object at 0x1025b8f90>

一旦有了 SparkContext,你就可以用它来创建 RDD。在例 2-1 和例 2-2 中,我们调用了 sc.textFile() 来创建一个代表文件中各行文本的 RDD。我们可以在这些行上进行各种操作,比如count()

要执行这些操作,驱动器程序一般要管理多个执行器(executor)节点。比如,如果我们在集群上运行 count() 操作,那么不同的节点会统计文件的不同部分的行数。由于我们刚才是在本地模式下运行 Spark shell,因此所有的工作会在单个节点上执行,但你可以将这个 shell 连接到集群上来进行并行的数据分析。图 2-3 展示了 Spark 如何在一个集群上运行。

图 2-3:Spark 分布式执行涉及的组件

最后,我们有很多用来传递函数的 API,可以将对应操作运行在集群上。比如,可以扩展我们的 README 示例,筛选出文件中包含某个特定单词的行。以“Python”这个单词为例,具体代码如例 2-4(Python 版本)和例 2-5(Scala 版本)所示。

例 2-4:Python 版本筛选的例子

>>> lines = sc.textFile("README.md")

>>> pythonLines = lines.filter(lambda line: "Python" in line)

>>> pythonLines.first()
u'## Interactive Python Shell'

例 2-5:Scala 版本筛选的例子

scala> val lines = sc.textFile("README.md") // 创建一个叫lines的RDD
lines: spark.RDD[String] = MappedRDD[...]

scala> val pythonLines = lines.filter(line => line.contains("Python"))
pythonLines: spark.RDD[String] = FilteredRDD[...]

scala> pythonLines.first()
res0: String = ## Interactive Python Shell

向 Spark 传递函数

如果你对例 2-4 和例 2-5 中的 lambda 或者 => 语法不熟悉,可以把它们理解为 Python 和 Scala 中定义内联函数的简写方法。当你在这些语言中使用 Spark 时,你也可以单独定义一个函数,然后把函数名传给 Spark。比如,在 Python 中可以这样做:

def hasPython(line):
    return "Python" in line

pythonLines = lines.filter(hasPython)

在 Java 中向 Spark 传递函数也是可行的,但是在这种情况下,我们必须把函数定义为实现了 Function 接口的类。例如:

JavaRDD<String> pythonLines = lines.filter(
  new Function<String, Boolean>() {
    Boolean call(String line) { return line.contains("Python"); }
  }
);

Java 8 提供了类似 Python 和 Scala 的 lambda 简写语法。下面就是一个使用这种语法的代码的例子:

JavaRDD<String> pythonLines = lines.filter(line -> line.contains("Python"));

我们会在 3.4 节更深入地讨论如何向 Spark 传递函数。

尽管后面会更详细地讲述 Spark API,我们还是不得不感叹,其实 Spark API 最神奇的地方就在于像 filter 这样基于函数的操作也会在集群上并行执行。也就是说,Spark 会自动将函数(比如 line.contains("Python"))发到各个执行器节点上。这样,你就可以在单一的驱动器程序中编程,并且让代码自动运行在多个节点上。第 3 章会详细讲述 RDD API。

2.4 独立应用

我们的 Spark 概览中的最后一部分就是如何在独立程序中使用 Spark。除了交互式运行之外,Spark 也可以在 Java、Scala 或 Python 的独立程序中被连接使用。这与在 shell 中使用的主要区别在于你需要自行初始化 SparkContext。接下来,使用的 API 就一样了。

连接 Spark 的过程在各语言中并不一样。在 Java 和 Scala 中,只需要给你的应用添加一个对于 spark-core 工件的 Maven 依赖。编写此书时,Spark 的最新版本是 1.2.0,对应的 Maven 索引是:

groupId = org.apache.spark
artifactId = spark-core_2.10
version = 1.2.0

Maven 是一个流行的包管理工具,可以用于任何基于 Java 的语言,让你可以连接公共仓库中的程序库。可以使用 Maven 来构建你的工程,也可以使用其他能够访问 Maven 仓库的工具来进行构建,包括 Scala 的 sbt 工具或者 Gradle 工具。一些常用的集成开发环境(比如 Eclipse)也可以让你直接把 Maven 依赖添加到工程中。

在 Python 中,你可以把应用写成 Python 脚本,但是需要使用 Spark 自带的 bin/spark-submit 脚本来运行。spark-submit 脚本会帮我们引入 Python 程序的 Spark 依赖。这个脚本为 Spark 的 PythonAPI 配置好了运行环境。你只需要像例 2-6 所示的那样运行脚本即可。

例 2-6:运行 Python 脚本

bin/spark-submit my_script.py

(注意,在 Windows 上需要使用反斜杠来代替斜杠。)

2.4.1 初始化SparkContext

一旦完成了应用与 Spark 的连接,接下来就需要在你的程序中导入 Spark 包并且创建 SparkContext。你可以通过先创建一个 SparkConf 对象来配置你的应用,然后基于这个 SparkConf 创建一个 SparkContext 对象。在例 2-7 至例 2-9 中,我们用各种语言分别示范了这一过程。

例 2-7:在 Python 中初始化 Spark

from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf = conf)

例 2-8:在 Scala 中初始化 Spark

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

val conf = new SparkConf().setMaster("local").setAppName("My App")
val sc = new SparkContext(conf)

例 2-9:在 Java 中初始化 Spark

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;

SparkConf conf = new SparkConf().setMaster("local").setAppName("My App");
JavaSparkContext sc = new JavaSparkContext(conf);

这些例子展示了创建 SparkContext 的最基本的方法,你只需传递两个参数:

  • 集群 URL:告诉 Spark 如何连接到集群上。在这几个例子中我们使用的是 local,这个特殊值可以让 Spark 运行在单机单线程上而无需连接到集群。

  • 应用名:在例子中我们使用的是 My App。当连接到一个集群时,这个值可以帮助你在集群管理器的用户界面中找到你的应用。

还有很多附加参数可以用来配置应用的运行方式或添加要发送到集群上的代码。我们会在本书的后续章节中介绍。

在初始化 SparkContext 之后,你可以使用我们前面展示的所有方法(比如利用文本文件)来创建 RDD 并操控它们。

最后,关闭 Spark 可以调用 SparkContext 的 stop() 方法,或者直接退出应用(比如通过 System.exit(0) 或者 sys.exit())。

这个快速概览应该已经足够让你在电脑上运行一个独立的 Spark 应用了。如果要了解更高级的配置选项,第 7 章会讲到如何让你的应用连接到一个集群上,包括将你的应用打包,使得代码可以自动发送到工作节点上。就目前而言,参考 Spark 官方文档的快速入门指南(http://spark.apache.org/docs/latest/quick-start.html)就足够了。

2.4.2 构建独立应用

作为一本讲大数据的书,如果没有一个单词数统计的例子,就不能成其为完整的一章。在单机上实现单词数统计很容易,但在分布式框架下,由于要在许多工作节点上读入并组合数据,单词数统计就成了一个很常用的例子。下面我们来学习用 sbt 以及 Maven 来构建并打包一个简单的单词数统计的例程。我们可以把所有的例程构建在一起,但是为了展示最简单的构建过程,我们只保留了最基本的依赖。在 learning-spark-examples/mini-complete-example 目录下,你可以找到这样一个独立的小工程。Java 版本(例 2-10)和 Scala 版本(例 2-11)的例子分别如下所示。

例 2-10:Java 版本的单词数统计应用(暂时不需要深究细节)

// 创建一个Java版本的Spark Context
SparkConf conf = new SparkConf().setAppName("wordCount");
JavaSparkContext sc = new JavaSparkContext(conf);
// 读取我们的输入数据
JavaRDD<String> input = sc.textFile(inputFile);
// 切分为单词
JavaRDD<String> words = input.flatMap(
  new FlatMapFunction<String, String>() {
    public Iterable<String> call(String x) {
      return Arrays.asList(x.split(" "));
    }});
// 转换为键值对并计数
JavaPairRDD<String, Integer> counts = words.mapToPair(
  new PairFunction<String, String, Integer>(){
    public Tuple2<String, Integer> call(String x){
      return new Tuple2(x, 1);
    }}).reduceByKey(new Function2<Integer, Integer, Integer>(){
        public Integer call(Integer x, Integer y){ return x + y;}});
// 将统计出来的单词总数存入一个文本文件,引发求值
counts.saveAsTextFile(outputFile);

例 2-11:Scala 版本的单词数统计应用(暂时不需要深究细节)

// 创建一个Scala版本的Spark Context
val conf = new SparkConf().setAppName("wordCount")
val sc = new SparkContext(conf)
// 读取我们的输入数据
val input = sc.textFile(inputFile)
// 把它切分成一个个单词
val words = input.flatMap(line => line.split(" "))
// 转换为键值对并计数
val counts = words.map(word => (word, 1)).reduceByKey{case (x, y) => x + y}
// 将统计出来的单词总数存入一个文本文件,引发求值
counts.saveAsTextFile(outputFile)

我们可以使用非常简单的 sbt(例 2-12)或 Maven(例 2-13)构建文件来构建这些应用。由于 Spark Core 包已经在各个工作节点的 classpath 中了,所以我们把对 Spark Core 的依赖标记为 provided,这样当我们稍后使用 assembly 方式打包应用时,就不会把 spark-core 包也打包到 assembly 包中。

例 2-12:sbt 构建文件

name := "learning-spark-mini-example"

version := "0.0.1"

scalaVersion := "2.10.4"

// 附加程序库
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.2.0" % "provided"
)

例 2-13:Maven 构建文件

<project>
  <groupId>com.oreilly.learningsparkexamples.mini</groupId>
  <artifactId>learning-spark-mini-example</artifactId>
  <modelVersion>4.0.0</modelVersion>
  <name>example</name>
  <packaging>jar</packaging>
  <version>0.0.1</version>
  <dependencies>
    <dependency> <!-- Spark依赖 -->
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
      <version>1.2.0</version>
      <scope>provided</scope>
    </dependency>
  </dependencies>
  <properties>
    <java.version>1.6</java.version>
  </properties>
  <build>
    <pluginManagement>
      <plugins>
        <plugin>   <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.1</version>
          <configuration>
            <source>${java.version}</source>
            <target>${java.version}</target>
          </configuration> </plugin> </plugin>
      </plugins>
    </pluginManagement>
  </build>
</project>

 spark-core 包被标记为了 provided,这是为了控制我们以 assembly 方式打包应用时的行为。第 7 章中会详细讨论这个细节。

一旦敲定了构建方式,我们就可以轻松打包并且使用 bin/spark-submit 脚本执行我们的应用了。spark-submit 脚本可以为我们配置 Spark 所要用到的一系列环境变量。在 mini-complete-example 目录中,我们可以使用 Scala(例 2-14)或者 Java(例 2-15)进行构建。

例 2-14:Scala 构建与运行

sbt clean package
$SPARK_HOME/bin/spark-submit \
  --class com.oreilly.learningsparkexamples.mini.scala.WordCount \
  ./target/...  (as above) \
  ./README.md ./wordcounts

例 2-15:Maven 构建与运行

mvn clean && mvn compile && mvn package
$SPARK_HOME/bin/spark-submit \
  --class com.oreilly.learningsparkexamples.mini.java.WordCount \
  ./target/learning-spark-mini-example-0.0.1.jar \
  ./README.md ./wordcounts

要了解关于连接应用程序到 Spark 的更多例子,请参考 Spark 官方文档中的快速入门指南(http://spark.apache.org/docs/latest/quick-start.html)一节。第 7 章中也会更详细地讲解如何打包 Spark 应用。

2.5 总结

在本章中,我们讲到了下载并在单机的本地模式下运行 Spark,以及 Spark 的使用方式,包括交互式方式和通过一个独立应用进行调用。另外我们还简单介绍了 Spark 编程的核心概念:通过一个驱动器程序创建一个 SparkContext 和一系列 RDD,然后进行并行操作。在下一章中,我们将会更加深入地介绍如何操作 RDD。

目录