第 2 章 用 Scala 和 Spark 进行数据分析

第 2 章 用Scala和Spark进行数据分析

作者:乔希 • 威尔斯

 

世上无难事,只要肯耐烦。

——David Foster Wallace

数据清洗是数据科学项目的第一步,往往也是最重要的一步。许多灵巧的分析最后功败垂成,原因就是分析的数据存在严重的质量问题,或者数据中某些因素使分析产生偏见,或使数据科学家得出根本不存在的规律。

尽管数据清洗很重要,但数据科学相关的许多教材和课程都不讲述数据清洗,抑或一笔带过。造成这种现象的原因其实很简单:数据清洗实在是很琐碎。然而“磨刀不误砍柴工”,只有事先做了这种沉闷乏味的工作,后面你才能领略到应用机器学习算法解决新问题时的酣畅淋漓。许多道行尚浅的数据科学家往往急于求成,对数据草草处理就进行下一步工作,等到运行算法后,却发现数据有严重的质量问题(可能是计算量太大),或最后得出的结果完全不合理。

“垃圾进垃圾出”这样浅显的道理大家都明白,危害更大的是:数据看似合理却有很严重(但第一眼看不出来)的质量问题,你根据这样的数据也得到了看似合理的答案。许多数据科学家丢饭碗,往往就是因为这样错误地得出了重要结论。

数据科学家最为人称道的是在数据分析生命周期的每一个阶段都能发现有意思、有价值的问题。在一个分析项目的早期阶段,你投入的技能和思考越多,对最终的产品就越有信心。

当然,说起来容易做起来难。对于数据科学行业来说,这就像是告诉小孩子要多吃蔬菜。相比数据清洗,摆弄 Spark 之类新潮的工具,用它们构建花哨的机器学习算法,开发流式数据处理引擎和分析海量图数据,要好玩得多。如果要介绍如何用 Spark 和 Scala 进行数据处理,有没有一种比练习数据清洗更好的方法呢?

2.1 数据科学家的Scala

对数据处理和分析,数据科学家往往都自己钟爱的工具,比如 R 或者 Python。除非不得已,数据科学家常常会坚持用他们所钟爱的工具,对于手头上的工作,他们总想方设法地沿用这些工具。即使情况再顺利,想让数据科学家采用新的工具、学习新语法和新使用模式,都困难重重。

为了能在 R 或 Python 里直接用 Spark,Spark 上开发了专门的类库和工具包。Python 有个非常好用的工具包叫作 PySpark,第 11 章有几个例子介绍了它的用法。但是本书大部分例子还是用 Scala 语言编写的。Spark 框架是用 Scala 语言编写的,在向数据科学家介绍 Spark 时,采用与底层框架相同的编程语言有很多好处。

  • 性能开销小

    为了能在基于 JVM 的语言(比如 Scala)上运行用 R 或 Python 编写的算法,我们必须在不同环境中传递代码和数据,这会付出代价,而且在转换过程中信息时有丢失。但是,如果数据分析算法用 Spark Scala API 编写,你会对程序正确运行更有信心。

  • 能用上最新的版本和最好的功能

    Spark 的机器学习、流处理和图分析库全都是用 Scala 写的,而新功能对 Python 和 R 绑定支持可能要慢得多。如果想用 Spark 的全部功能(而不用花时间等待它移植到其他语言绑定),恐怕你必须学点儿 Scala 基础知识,如果想扩展这些 Spark 已有功能来解决你手头上的新问题,就更要深入了解 Scala 了。

  • 有助于你了解Spark的原理

    即使在 Python 或 R 中调用 Spark,API 仍然反映了底层计算原理,它是 Spark 从其开发语言 Scala 继承过来的。如果你知道如何在 Scala 中使用 Spark,即使你平时主要还是在其他语言中使用 Spark,你还是会更理解系统,因此会更好地“用 Spark 思考”。

学习在 Scala 中用 Spark 还有一个好处。由于 Spark 不同于其他任何一种数据分析工具,这个好处解释起来会有点儿困难。如果你曾经用过 R 或 Python 从数据库读取数据并分析,肯定经历过用一种语言(SQL)读取和操作大量存储在远程集群的数据,然后用另一种语言(Python 或 R)来操作和展现存储在你本地机器上的信息。如果想把一部分计算通过 SQL UDF 放到数据库引擎中,你需要切换到另一种编程环境(如 C++ 或者 Java),并且还要了解数据库的内部细节。如果你一直这么做,时间长了你可能都不会再想这种方式有没有问题。

使用 Spark 和 Scala 做数据分析则是一种完全不同的体验,因为你可以选择用同样的语言完成所有事情。借助 Spark,你用 Scala 代码读取集群上的数据。接着,你把 Scala 代码发送到集群上完成相同的转换,这些转换跟你刚刚对本地数据所做的转换完全一样,但数据却在集群上——这就是精妙之处。即便用 Spark SQL 这样的高阶语言,也可以写好内联 UDF,用 Spark SQL 引擎注册,然后使用 UDF——根本不用切换环境。

在同一个环境中完成所有数据处理和分析,不用考虑数据本身在何处存放和在何处处理,这简直妙不可言。这种感觉只有你亲身经历才体会得到。我们也想确保书中的示例能够让你感受到我们首次使用 Spark 时体验到的那种魔术般的感觉。

2.2 Spark编程模型

Spark 编程始于数据集,而数据集往往存放在分布式持久化存储之上,比如 HDFS。编写 Spark 程序通常包括一系列相关步骤。

(1) 在输入数据集上定义一组转换。

(2) 调用 action,可以将转换后的数据集保存到持久化存储上,或者把结果返回到驱动程序的本地内存。

(3) 运行本地计算,处理分布式计算的结果。本地计算有助于你确定下一步的转换和 action。

从 1.2 版本到 2.1 版本,Spark 变得成熟了,处理上述步骤的工具的数量和质量也大大提升。在完成分析任务时,你可以搭配使用复杂 SQL 查询、机器学习库以及自定义代码。Spark 社区这几年开发了各种高阶抽象,利用这些抽象,你可以花更少的时间来解决更多的问题。但是所有这些高阶抽象都是基于存储与执行的相互作用,从 Spark 诞生起就一直是这样。Spark 优美地搭配这两类抽象,可以将数据处理管道中的任何中间步骤缓在内存里以备后用。了解这些原则可以帮助你更好地利用 Spark 做数据分析。

2.3 记录关联问题

本章我们要研究的主题在许多文献和实践中被冠以许多不同的名称:身份解析、记录去重、合并 - 清除,以及列表清洗。想了解这个主题的方案和技术概况,我们需要参考这个主题的所有相关研究论文。但是由于不同文献和实践中同一个概念使用不同的名称,我们很难找到所有相关论文。在搞清楚数据清洗这个问题之前,我们得请数据科学家把与数据清洗这个概念相关的令人混淆的许多不同名称给去去重。这真让人觉得讽刺!为了方便本章余下部分论述,我们把这个问题称为记录关联(record linkage)。

问题的大概情况如下:我们有大量来自一个或多个源系统的记录,其中多种不同的记录可能代表相同的基础实体,比如客户、病人、业务地址或事件。每个实体有若干属性,比如姓名、地址、生日。我们需要根据这些属性找到那些代表相同实体的记录。不幸的是,有些属性值有问题:格式不一致,或有笔误,或信息缺失。如果简单地对这些属性作相等性测试,就会漏掉许多重复记录。举个例子,我们看看表 2-1 列出的几家商店的记录。

表 2-1:记录关联问题的难点

名称

地址

城市

电话

Josh's Coffee Shop

1234 Sunset Boulevard

West Hollywood

CA

(213)-555-1212

Josh Coffee

1234 Sunset Blvd West

Hollywood

CA

555-1212

Coffee Chain #1234

1400 Sunset Blvd #2

Hollywood

CA

206-555-1212

Coffee Chain Regional Office

1400 Sunset Blvd Suite 2

Hollywood

California

206-555-1212

表中前两行其实指同一家咖啡店,但由于数据录入错误,这两项看起来是在不同城市(West Hollywood 和 Hollywood)。相反,表中后两行其实是同一家咖啡连锁店的不同业务部门,尽管它们有相同的地址:地址 1400 Sunset Blvd #2 是咖啡店的实际地址,另一个地址 1400 Sunset Blvd Suite 2 则是公司在当地的一个办公室地点。后两项给的都是公司 Seattle 总部的官方电话号码。

这个例子清楚地说明了记录关联为什么很困难:即使两组记录看起来相似,但针对每一组中的条目,我们确定它是否重复的标准不一样。这种区别我们人类很容易理解,计算机却很难了解。

2.4 小试牛刀:Spark shell和SparkContext

我们的样例数据集来自加州大学欧文分校机器学习资料库(UC Irvine Machine Learning Repository),这个资料库为研究和教学提供了大量非常好的数据源,这些数据源非常有意义,并且是免费的。我们要分析的数据集来源于一项记录关联研究,这项研究是德国一家医院在 2010 年完成的。这个数据集包含数百万对病人记录,每对记录都根据不同标准来匹配,比如病人姓名(名字和姓氏)、地址、生日。每个匹配字段都被赋予一个数值评分,范围为 0.0 到 1.0,分值根据字符串相似度得出。然后这些数据交由人工处理,标记出哪些代表同一个人,哪些代表不同的人。为了保护病人隐私,创建数据集的每个字段的原始值被删除了。病人的 ID、字段匹配分数、匹配对标记(包括匹配的和不匹配的)等信息是公开的,可用于记录关联研究。

首先我们从资料库中下载数据,请在命令行中输入:

$ mkdir linkage
$ cd linkage/
$ curl -L -o donation.zip https://bit.ly/1Aoywaq
$ unzip donation.zip
$ unzip 'block_*.zip'

如果手头有 Hadoop 集群,可以先在 HDFS 上为块数据创建一个目录,然后将数据集文件复制到 HDFS 上:

$ hadoop fs -mkdir linkage
$ hadoop fs -put block_*.csv linkage

本书示例和代码假定读者使用 Spark 2.1.0。可以在 Spark 项目网站(https://spark.apache.org/downloads.html)获取各个版本的 Spark 软件。想了解如何在集群或本地机器上安装 Spark 环境,请参考 Spark 官方文档。

现在准备工作就绪,可以启动 spark-shell 了。spark-shell 是 Scala 语言的一个 REPL 环境,它同时针对 Spark 做了一些扩展。如果这是你第一次见到 REPL 这个术语,可以把它看成一个类似 R 的控制台:可以在其中用 Scala 编程语言定义函数并操作数据。

如果你有一个 Hadoop 集群,并且 Hadoop 版本支持 YARN,通过为 Spark master 设定 yarn 参数值,就可以在集群上启动 Spark 作业:

$ spark-shell --master yarn --deploy-mode client

如果你是在自己的计算机上运行示例,可以通过设定 local[N] 参数来启动本地 Spark 集群,其中 N 代表运行的线程数,或者用 * 表示使用机器上所有可用的核数。比如,要在一个 8 核的机器上用 8 个线程启动一个本地集群,可以输入以下命令:

$ spark-shell --master local[*]

在本地环境下,书中示例同样能运行。不过,这时传入的文件路径是本地路径,而不是以 hdfs:// 开头的 HDFS 路径。注意,还需要通过 cp block_*.csv 将文件复制到指定的本地目录,而不是用包含解压文件的目录,因为该目录不仅包含 .csv 文件,还包含许多其他文件。

本书其他 spark-shell 示例中不会出现 --master 参数,但根据环境通常需要设定该参数。

为了 Spark shell 能充分利用资源,可能还需要额外设定一些参数。比如,当 Spark 运行于本地 master 模式,可以用 --driver-memory 2g,这样就设定了一个本地进程使用 2 GB 内存。YARN 内存设置会更复杂,相关的选项(如 --executor-memory 等参数)设置可以参考 Spark on YARN 的官方文档(https://spark.apache.org/docs/latest/running-on-yarn.html)。

运行完上述命令后,可以看到 Spark 在初始化过程中的日志消息。与此同时,也能看到一点儿 ASCII 艺术体字样,之后又是一段日志和提示符:

Spark context Web UI available at http://10.0.1.39:4040
Spark context available as 'sc' (master = local[*], app id = ...).
Spark session available as 'spark'.
Welcome to

      ____             __
     / __/__ ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_60)
Type in expressions to have them evaluated.
Type :help for more information.
scala>

如果你是第一次用 Spark shell(或任何类似的 Scala REPL),可以运行 :help 命令,该命令列出了 shell 的所有命令。运行 :history:h?,可以帮你找到之前在某个会话中写过,但一时又想不起来的变量或函数名称。运行 :paste,可以帮你插入剪贴板中的代码,这是学习本书和使用本书源代码必需的。

除了关于 :help 的提示,Spark 日志消息还显示“Spark context available as sc.”。sc 在这里是 SparkContext 的一个引用,它负责协调集群上 Spark 作业的执行。继续在命令行中输入 sc

sc
...
res: org.apache.spark.SparkContext =
  org.apache.spark.SparkContext@DEADBEEF

REPL 会以字符形式打印对象。SparkConext 对象就是名字加上十六进制的对象内存地址(示例中显示的 DEADBEEF 是占位符,具体值每次运行时都不一样)。

sc 变量确实方便,但它的作用是什么呢? SparkContext 是一个对象,是对象当然就有方法。想要在 Scala REPL 中查看这些方法,输入变量名加点号再加 Tab 键即可:

sc.[\t]
...
!=                         hashCode
##                         isInstanceOf
+                          isLocal
->                         isStopped
==                         jars
accumulable                killExecutor
accumulableCollection      killExecutors
accumulator                listFiles
addFile                    listJars
addJar                     longAccumulator
...  (lots of other methods)
getClass                   stop
getConf                    submitJob
getExecutorMemoryStatus    synchronized
getExecutorStorageStatus   textFile
getLocalProperty           toString
getPersistentRDDs          uiWebUrl
getPoolForName             union
getRDDStorageInfo          version
getSchedulingMode          wait
hadoopConfiguration        wholeTextFiles
hadoopFile                 →

SparkContext 有很多方法,但接下来我们使用最多的方法用于创建 RDD。RDD 是 Spark 所提供的最基本的抽象,代表分布在集群中多台机器上的对象集合。Spark 有两种方法可以创建 RDD:

  • SparkContext 基于外部数据源创建 RDD,外部数据源包括 HDFS 上的文件、通过 JDBC 访问的数据库表或 Spark shell 中创建的本地对象集合;
  • 在一个或多个已有 RDD 上执行转换操作来创建 RDD,这些转换操作包括记录过滤、对具有相同键值的记录做汇总、把多个 RDD 关联在一起等。

利用 RDD 可以很方便地描述对数据要进行的一串小而独立的计算步骤。

弹性分布式数据集(RDD)

RDD 以分区(partition)的形式分布在集群中的多个机器上,每个分区代表了数据集的一个子集。分区定义了 Spark 中数据的并行单位。Spark 框架并行处理多个分区,一个分区内的数据对象则是顺序处理。创建 RDD 最简单的方法是在本地对象集合上调用 SparkContextparallelize 方法。

val rdd = sc.parallelize(Array(1, 2, 2, 4), 4)
...
rdd: org.apache.spark.rdd.RDD[Int] = ...

第一个参数代表待并行化的对象集合,第二个参数代表分区的个数。当要对一个分区内的对象进行计算时,Spark 从驱动程序进程里获取对象集合的一个子集。

要在分布式文件系统(比如 HDFS)上的文件或目录上创建 RDD,可以给 textFile 方法传入文件或目录的名称:

val rdd2 = sc.textFile("hdfs:///some/path.txt")
...
rdd2: org.apache.spark.rdd.RDD[String] = ...

如果 Spark 在本地模式下运行,可以用 textFile 方法访问本地文件系统上的路径。如果输入是目录而不是单个文件,Spark 会把该目录下所有的文件作为 RDD 的输入。最后请注意,实际上 Spark 并未将数据读取到客户端机器或集群内存中。当需要对分区内的对象进行计算时,Spark 才会读入输入文件的某个部分(也称“切片”),然后应用其他 RDD 定义的后续转换操作(过滤和汇总等)。

我们的记录关联数据存储在一个文本文件中,文件中每行代表一个样本。我们用 SparkContexttextFile 方法来得到 RDD 形式的数据引用:

val rawblocks = sc.textFile("linkage")
...
rawblocks: org.apache.spark.rdd.RDD[String] = ...

这几行代码有几点值得我们注意。第一,我们声明了一个名叫 rawblocks 的新变量。从 shell 中可以看出,rawblocks 变量的类型为 RDD[String],而我们并没有在变量声明时指出变量类型。这个功能在 Scala 编程语言中称为类型推断,它为我们写代码时节省了许多键盘输入。Scala 会尽可能从上下文中分析出变量类型。在我们的示例中,Scala 会查找 SparkContext 对象 textFile 函数的返回值类型,发现该函数返回 RDD[String] 类型,于是就将 RDD[String] 类型赋给 rawblocks 变量。

只要在 Scala 中定义新变量,就必须在变量名称前加上 valvar。名称前带 val 的变量是不可变变量。一旦给不可变变量赋完初值,就不能改变它,让它指向另一个值。而以 var 开头的变量则可以改变其指向,让它指向同一类型的不同对象。试试看如下代码的执行情况:

rawblocks = sc.textFile("linkage")
...
<console>: error: reassignment to val

var varblocks = sc.textFile("linkage")
varblocks = sc.textFile("linkage")

试图将关联数据重新赋给 rawblocks val 变量会报错,但重新给 varblocks var 变量赋值则没有问题。在 Scala REPL 中,对 val 变量有个例外,因为 Scala REPL 允许我们重新声明相同的不可变变量,请看代码:

val rawblocks = sc.textFile("linakge")
val rawblocks = sc.textFile("linkage")

示例中第二次声明 rawblocks val 变量并没有报错。这在常规 Scala 代码中是非法的,但在 shell 中却没有问题,本书的许多例子中会用到该功能。

REPL 与编译

除了交互式 shell,Spark 也支持编译程序。我们通常推荐使用 Apache Maven(https://maven.apache.org)来编译程序和管理依赖关系。本书在 GitHub 的资料库的 simplesparkproject/ 目录(https://github.com/sryza/aas/tree/master/simplesparkproject)下包含了一个完整的 Maven 工程,你可以用它作为开端。

现在你有两个选择:shell 和编译程序,但测试和构建数据处理程序时该选哪个呢?通常在初始阶段工作可能全部用 REPL 完成。REPL 可以加快原型开发,使迭代更快,很快看到你的想法的结果。但随着程序越来越大,在一个文件中维护大量代码就变得很笨拙了,这时解释 Scala 程序也要消耗更多时间。如果数据量巨大,情况会更糟,经常会出现一个操作导致 Spark 应用崩溃或 SparkContext 不可用的情况。如果发生这种情况,意味着所有的工作和输入的代码都丢失了。这时我们往往应该采用混合模式。最前面的开发工作在 REPL 里完成,随着代码逐渐成熟,将代码移到编译库里。可以在 spark-shell 中引用已编译好的JAR,只要给 spark-shell 设置 --jars 命令行参数即可。这样的话,如果使用得当,就不用频繁重新编译JAR,同时 REPL 可以支持快速代码迭代和逐步成熟方式。

如何引用外部的 Java 和 Scala 类库呢?要编译引用了外部类库的代码,需要在工程的 Maven 配置文件(pom.xml)中指定所需的类库。要运行依赖外部类库的代码,需要在 Spark 进程中通过 classpath 将所需类库的 JAR 文件包含进来。为此,一种好的做法是使用 Maven 来打包JAR,使生成的 JAR 包含应用程序的所有依赖文件。接着在启动 shell 时通过 --jars 属性引用该JAR。这种方法的优点是依赖只需要在 Maven 的 pom.xml 中指定一次即可。如何进行设置,请参考本书 GitHub 资料库的 simplesparkproject/ 目录。

如果想使用第三方 Maven 仓库的某个 JAR,可以通过 --package 命令行参数告知 spark-shell 这个 JAR 的 Maven 坐标,随后 spark-shell 就会加载这个 JAR。举个例子,为加载 Scala 2.11 版本的 Wisp Visualization 库,你需要将 --packages "com.quantifind:wisp_2.11:0.0.4" 这个参数传递给 spark-shell。如果所需的 JAR 未存储在 Maven 中央仓库中,可以通过 --repositories 参数来告诉 Spark,这个 JAR 在哪个仓库中。如果想用 --packages--repositories 来加载多个 JAR,可以使用逗号对多个包名或仓库名进行分隔。

2.5 把数据从集群上获取到客户端

RDD 有许多方法,我们可以用这些方法从集群读取数据到客户端机器上的 Scala REPL 中。其中最简单的方法可能就是 first 了,该方法向客户端返回 RDD 的第一个元素:

rawblocks.first
...
res: String = "id_1","id_2","cmp_fname_c1","cmp_fname_c2",...

first 方法可用于对数据集做常规检查,但通常我们更想返回更多样例数据供客户端分析。如果知道 RDD 只包含少量记录,可以用 collect 方法向客户返回一个包含所有 RDD 内容的数组。因为我们还不知道这个关联数据集有多大,所以暂时不那么做了。

还可以用 take 方法,这个方法在 firstcollect 之间做了一些折衷,可以向客户端返回一个包含指定数量记录的数组。我们来看看如何使用 take 方法获取记录关联数据集的前 10 行记录:

val head = rawblocks.take(10)
...
head: Array[String] = Array("id_1","id_2","cmp_fname_c1",...
head.length
...
res: Int = 10

动作

创建 RDD 的操作并不会导致集群执行分布式计算。相反,RDD 只是定义了作为计算过程中间步骤的逻辑数据集。只有调用 RDD 上的 action(动作)时分布式计算才会执行。举个例子,count 动作返回 RDD 中对象的个数:

rdd.count()
14/09/10 17:36:09 INFO SparkContext: Starting job: count ...
14/09/10 17:36:09 INFO SparkContext: Job finished: count ...
res0: Long = 4

collect 动作返回一个包含 RDD 中所有对象的 Array(数组):

rdd.collect()
14/09/29 00:58:09 INFO SparkContext: Starting job: collect ...
14/09/29 00:58:09 INFO SparkContext: Job finished: collect ...
res2: Array[(Int, Int)] = Array((4,1), (1,1), (2,2))

动作不一定向本地进程返回结果。saveAsTextFile 动作将 RDD 的内容保存到持久化存储(比如 HDFS)上:

rdd.saveAsTextFile("hdfs:///user/ds/mynumbers")
14/09/29 00:38:47 INFO SparkContext: Starting job:
saveAsTextFile ...
14/09/29 00:38:49 INFO SparkContext: Job finished:
saveAsTextFile ...

该动作创建一个目录并为每个分区输出一个文件。切换到 Spark shell 外面的命令行,执行如下操作:

hadoop fs -ls /user/ds/mynumbers
    
-rw-r--r--   3 ds supergroup       0 2014-09-29 00:38 myfile.txt/_SUCCESS
-rw-r--r--   3 ds supergroup       4 2014-09-29 00:38 myfile.txt/part-00000
-rw-r--r--   3 ds supergroup       4 2014-09-29 00:38 myfile.txt/part-00001

记住,textFile 接受包含一个文本文件的目录作为输入,这意味将来的 Spark 作业可以把 mynumbers 作为其输入目录。

Scala REPL 返回的数据原始形式可能有点儿难以读懂,特别是对于包含了许多元素的数组更是如此。为了更容易读懂数组的内容,我们可以用 foreach 方法并结合 println 来打印出数组中的每个值,并且每一行打印一个值:

head.foreach(println)
...
"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2",
  "cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"
37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE
39086,47614,1,?,1,?,1,1,1,1,1,TRUE
70031,70237,1,?,1,?,1,1,1,1,1,TRUE
84795,97439,1,?,1,?,1,1,1,1,1,TRUE
36950,42116,1,?,1,1,1,1,1,1,1,TRUE
42413,48491,1,?,1,?,1,1,1,1,1,TRUE
25965,64753,1,?,1,?,1,1,1,1,1,TRUE
49451,90407,1,?,1,?,1,1,1,1,0,TRUE
39932,40902,1,?,1,?,1,1,1,1,1,TRUE

本书经常用到 foreach(println) 模式。它是一个常见的函数式编程模式:把函数 println 作为参数传递给另一个函数以执行某个动作。用过 R 的数据科学家很熟悉这种编程风格。为了在处理向量和列表时避免循环,他们习惯用高阶函数,比如 applylapply。Scala 的集合与 R 的列表和向量类似,我们通常希望少用 for 循环,而在处理集合元素时采用高阶函数。

很快我们就发现数据有几个问题,这些问题必须在开始对数据分析前解决好。首先,CSV 文件有一个标题行需要过滤掉,以免影响后续分析。我们可以将标题行中出现的 "id_1" 字符串作为过滤条件,编写一个简单的 Scala 函数来测试一行记录中是否包含该字符串,代码如下:

def isHeader(line: String) = line.contains("id_1")
isHeader: (line: String)Boolean

和 Python 类似,Scala 声明函数用关键字 def。和 Python 不同,我们必须为函数指定参数类型:在示例中,我们指明 line 参数是 String。函数体部分调用 String 类的 contains 方法,用于测试字符串中是否出现 "id_1" 字符序列,等号后的部分都是函数体的内容。虽然我们必须指定 line 参数的类型,但是没必要指定函数的返回类型,原因在于 Scala 编译器能根据 String 类的信息和 Stringcontains 方法返回 truefalse 这一事实来推断出函数的返回类型。

有时候我们希望能显式地指明函数返回类型,特别是碰到函数体很长、代码复杂并且包含多个 return 语句的情况。这时候,Scala 编译器不一定能推断出函数的返回类型。为了函数代码可读性更好,也可以指明函数的返回类型。这样他人在阅读代码的时候,就不必重新把整个函数读一遍了。可以紧跟在参数列表后面声明返回类型,示例如下:

def isHeader(line: String): Boolean = {
  line.contains("id_1")
}
isHeader: (line: String)Boolean

通过用 Scala 的 Array 类的 filter 方法打印出结果,可以在 head 数组上测试新编写的 Scala 函数:

head.filter(isHeader).foreach(println)
...
"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1",...

看起来我们的 isHeader 方法没什么问题:通过 filter 方法将 isHeader 作用在 head 数组上,返回的唯一结果是标题行本身。当然,我们其实想要的是所有非标题行。为了完成这个目标,Scala 有几种方法。第一个就是利用 Array 类的 filterNot 方法:

head.filterNot(isHeader).length
...
res: Int = 9

还可以利用 Scala 对匿名函数的支持,在 filter 函数里面对 isHeader 函数取非:

head.filter(x => !isHeader(x)).length
...
res: Int = 9

Scala 的匿名函数有点儿类似于 Python 的 lambda 函数。在示例代码中我们定义了一个名为 x 的参数并把它传给 isHeader 函数,再对 isHeader 函数的返回值取非。请注意,样例代码中没必要指定 x 变量的类型信息,Scala 编译器能够根据 head 的类型是 Array[String] 推断出 xString 类。

Scala 程序员最讨厌的就是键盘输入。因此 Scala 设计了许多小功能来减少输入,比如在匿名函数的定义中,为了定义匿名函数并给参数指定名称,只输入了字符 x=>。但像这么简单的匿名函数,甚至都没必要这么做:Scala 允许使用下划线(_)表示匿名函数的参数,因此我们可以少输入 4 个字符:

head.filter(!isHeader(_)).length
...
res: Int = 9

有时这种缩写语法使代码更易阅读,因为它省略了明显多余的标识符,但有时也会使代码更难懂。代码到底是更易懂还是更难懂,这就要靠我们自己判断了。

2.6 把代码从客户端发送到集群

刚才我们见识了 Scala 语言定义和运行函数的多种方式。我们执行的代码都作用在 head 数组中的数据上,这些数据都在客户端机器上。现在,我们打算在 Spark 里把刚写好的代码应用到关联记录数据集 RDD rawblocks,该数据集在集群上的记录有数百万条。

下面是一段示例代码,是不是觉得特别熟悉?

val noheader = rawblocks.filter(x => !isHeader(x))

用于过滤集群上整个数据集的语法和过滤本地机器上的 head 数组的语法一模一样。可以用 noheader 这个 RDD 来验证过滤规则是否正确:

noheader.first
...
res: String = 37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE

这太强大了!它意味着我们可以先从集群采样得到小数据集,在小数据集上开发和调试数据处理代码,等一切就绪后再把代码发送到集群上处理完整的数据集就可以了。最厉害的是,我们从头到尾都不用离开 shell 界面。除了 Spark,还真没有哪种工具能给你这种体验。

在后面几节中,我们将运用这种本地开发加测试和集群运算的方式,来展示更多处理和分析记录关联数据的技术。如果你现在想停下来喝口水,感叹一下 Spark 这个新世界的美妙,我们当然能理解。

2.7 从RDD到DataFrame

本书第 1 版的这一节中,我们介绍了一个新功能:如何在 REPL 中混合使用本地开发和测试以及集群中的运算。我们写了一段代码,对存储记录关联数据的 CSV 文件进行解析。代码完成的工作包括:把每一行按照逗号分隔成多个字段,再将每个字段转换成对应的数据类型(整型或双精度浮点数),并处理非法值。Spark 的这种处理数据的方式很有吸引力,特别是在数据集的结构不同寻常或者非标准的情况下,此时其他处理方式并不适用。

不过,我们遇到的大部分数据集都有着合理的结构,要么因为它们本来如此(比如来自数据库的表),要么因为有人已经对数据做好了清洗和结构化。对这类数据,我们完全没有必要花费精力自己写一套代码来解析它,只需简单地调用现成的类库,并利用数据的结构,即可将其解析成所需结构,然后就可以做数据分析了。Spark 1.3 中引入了一个这样的新数据结构——DataFrame。

DataFrame 是一个构建在 RDD 之上的 Spark 抽象,它专门为结构规整的数据集而设计,DataFrame 的一条记录就是一行,每行都由若干个列组成,每一列的数据类型都有严格定义。可以把 DataFrame 类型实例理解为 Spark 版本的关系数据库表。DataFrame 这个名字可能会让你联想到 R 语言的 data.frame 对象,或者 Python 的 pandas.DataFrame 对象,但是 Spark 的 DataFrame 与它们有很大的不同。这么说是因为 DataFrame 代表集群中的一个分布式数据集,而不是所有数据都存储在同一台机器上的本地数据。虽说 Spark 的 DataFrame 与 data.frame 以及 pandas.DataFrame 的用法比较相似,而且在生态系统中的角色也类似,但是 R 和 Python 中的某些操作在 Spark 中却无法使用。所以,最好把它们看成独立的实体,并尝试接纳这些不同点。

要为记录关联数据集建立一个 DataFrame,我们需要用到 SparkSession 对象 spark,这个对象是在启动 Spark REPL 时创建的:

spark
...
res: org.apache.spark.sql.SparkSession = ...

SparkSession 替代了 SQLContextSQLContext 最初是在 Spark 1.3 中引入的,现在已经不用了。与 SQLContext 类似,SparkSessionSparkContext 对象的一个封装,你可以通过 SparkSession 直接访问到 SparkContext

spark.sparkContext
...
res: org.apache.spark.SparkContext = ...

可以看到,spark.sparkContext 的值与我们之前用来创建 RDD 的 sc 变量是完全一样的。要创建一个 DataFrame,我们可以使用 SparkSession 的 Reader API 的 csv 方法:

val prev = spark.read.csv("linkage")
...
prev: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string, ...

默认情况下,CSV 文件中的每一列都是 string 类型,列名默认为 _c0_c1_c2,等等。要想知道一个 DataFrame 的前几行,可以调用 DataFrame 的 show() 方法。

prev.show()

可以看到,第一行是 DataFrame 表头的列名。正如我们所料,CSV 文件被整齐地划分成多个列。我们还发现有些列中出现了“?”,接下来需要将所有“?”标记为缺失值。除了给每一列正确命名以外,如果 Spark 还能推断出每一列的数据类型,那就再好不过了。

幸运的是,Spark 的 CSV 读取器提供了该项功能,我们只需设置 Spark 的 CSV reader API 即可。你可以在 spark-csv 项目的 GitHub 页面(https://github.com/databricks/spark-csv#features)上看到所有需设置的选项,spark-csv 在 Spark 1.x 时代是独立的项目,到了 Spark 2.x 才整合进来。现在我们可以通过以下方式读取并解析关联数据了:

val parsed = spark.read.
  option("header", "true").
  option("nullValue", "?").
  option("inferSchema", "true").
  csv("linkage")

parsed 调用 show 方法,可以看到列名已经设置成功,“?”也替换成了 null 值。要了解每列的推测类型,我们可以输出经过解析的 DataFrame 的模式信息,示例如下:

parsed.printSchema()
...
root
 |-- id_1: integer (nullable = true)
 |-- id_2: integer (nullable = true)
 |-- cmp_fname_c1: double (nullable = true)
 |-- cmp_fname_c2: double (nullable = true)
...

每个 StructField 实例包含了列名、每条记录中数据的最具体的类型,以及一个表示此列是否允许空值的布尔字段(默认值为真)。为了完成模式推断,Spark 需要遍历数据集两次:第一次找出每列的数据类型,第二次才真正进行解析。如果预先知道某个文件的模式,你可以创建一个 org.apache.spark.sql.types.StructType 实例,并使用模式函数将它传给 Reader API。在数据量很大的情况下,这样做可以获得巨大的性能提升,因为 Spark 不需要为确定每列的数据类型而额外遍历一次数据。

DataFrame 与数据格式

通过 DataFrameReader 和 DataFrameWriter API,Spark 2.0 内置支持多种格式读写 DataFrame。除了我们这里讨论过的 CSV 格式以外,还可以读写如下几种数据源。

json

  支持 CSV 格式具有的模式推断功能。

parquetorc

  两种二进制列式存储格式,这两种格式可以相互替代。

jdbc

  通过 JDBC 数据连接标准连接到关系型数据库。

libsvm

  一种常用于表示特征稀疏并且带有标号信息的数据集的文本格式。

text

  文件的每行作为字符串整体映射到 DataFrame 的一列。

要访问 DataFrameReader API 中的方法,可以调用 SparkSession 实例的 read 方法。要从文件中加载数据,可以调用 formatload 方法,也可以使用更快捷的方法,这些方法格式是内置的,示例如下:

val d1 = spark.read.format("json").load("file.json")
val d2 = spark.read.json("file.json")

在这个例子中,d1d2 引用的底层数据是同一份 JSON 数据,因此 d1d2 内容相同。每个不同的文件格式都有它们自己的设置项,可以通过 option 方法设置这些选项,参见我们对 CSV 文件的方法设置。

如果要把数据导出,你可以通过调用任何 DataFrame 实例的 write 方法访问 DataFrameWriter API。DataFrameWriter API 支持与 DataFrameReader API 相同的内置格式,所以要把文件保存成 parquet 格式的话,以下两种方法都可以:

d1.write.format("parquet").save("file.parquet")
d1.write.parquet("file.parquet")

默认情况下,Spark 在保存 DataFrame 时,如果目标文件已存在,Spark 会抛出一个错误信息。你可以通过 DataFrameWriter API 的枚举类型 SaveMode,控制 Spark 在这种情况下的行为。你可以选择强制覆盖(Overwrite)、在文件末尾追加(Append),或者文件已存在时跳过这次写入(Ignore):

d2.write.mode(SaveMode.Ignore).parquet("file.parquet")

你也可以用一个字符串("overwrite""append""ignore")来指定SaveMode,就像用 R 和 Python 的 DataFrame API 时一样。

2.8 用DataFrame API来分析数据

Spark 的 RDD API 为分析数据提供了少量易用的方法,例如 count() 方法可以计算一个 RDD 包含的记录数,countByValue() 方法可以获取不同值的分布直方图,RDD[Double]stats() 方法可以获取一些概要统计信息,例如最小值、最大值、平均值和标准差。但是 DataFrame API 的工具比 RDD API 更强大。用惯了 R、Python 和 SQL 的数据科学家对这套工具应该不会觉得陌生。本节将开始探索这套接口,并将它们应用在记录关联数据上。

研究一下 DataFrame 对象实例 parsed 的模式,看一下前几行数据,我们可以看到以下特征。

  • 前两个字段是整型 ID,代表在记录中匹配到的患者。
  • 后面 9 个值是数值类型(双精度浮点数或整型,可能有缺失值),代表患者记录数据中不同字段的匹配得分值,如名字、生日和住址。这些字段如果只有匹配和不匹配两种情况,则用 1 表示匹配,0 表示未匹配;如果有部分匹配的情况,则用双精度浮点数表示。
  • 最后一个字段是布尔值(truefalse),表示这条记录中的一对患者是否匹配。我们的目标是创建一个简单的分类器,它可以根据患者数据中的匹配评分来预测一条记录是否匹配。我们先调用 count 方法来了解记录数,该方法在 DataFrame 和 RDD 中是完全相同的:

    parsed.count()
    ...
    res: Long = 5749132
    
    

这个数据集相对较小,小到能存放在集群中一个节点的内存上。在没有集群可用的情况下,甚至可以存放在本地机器的内存里。到目前为止,我们每次处理数据集中的数据时,Spark 得重新打开文件,再重新解析每一行,然后才能执行所需的操作,例如显示前几行或计算记录的总数。当我们需要执行另外一个操作时,Spark 会反复执行读取及解析操作,即使我们已经从数据集中过滤出少量的数据,或者对原始数据集已经做过聚合。

这种方式浪费了计算资源。数据一旦被解析完,我们就可以把解析后的数据保存在集群中,这样就不必每次都重新解析数据了。Spark 支持这种用例,它允许我们调用 cache 方法,告诉 RDD 或 DataFrame 在创建时将它缓存在内存中。现在尝试缓存 parsed

parsed.cache()

缓存

虽然默认情况下 DataFrame 和 RDD 的内容是临时的,但是 Spark 提供了一种持久化底层数据的机制:

cached.cache()
cached.count()
cached.take(10)

在上述代码中,调用 cache 方法指示在下次计算 DataFrame 时,要把 DataFrame 的内容缓存起来。在这个示例中,DataFrame 的内容是调用 count 方法时得到的,take 方法返回 DataFrame 的一个本地 Array[Row],它表示前 10 个元素。当调用 take 时,访问的是缓存,而不是从 cached 的依赖关系中重新计算出来的。

Spark 为持久化数据定义了几种不同的机制,用不同的 StorageLevel 值表示。cache()persist(StorageLevel.Memory) 的简写,它将所有 Row 对象存储为未序列化的 Java 对象。当 Spark 预计内存不够存放一个分区时,它干脆就不在内存中存储这个分区,这样在下次需要时就必须重新计算。在对象需要频繁访问或低延访问时,适合使用 StorageLevel.MEMORY,因为它可以避免序列化的开销。相比其他选项,StorageLevel.MEMORY 的问题是要占用更大的内存空间。另外,大量小对象会对 Java 的垃圾回收施加压力,会导致程序停顿和常见的速度缓慢问题。

Spark 也提供了 MEMORY_SER 的存储级别,用于在内存中分配大字节缓冲区,以存储记录的序列化内容。如果使用得当(稍后会详细介绍),序列化数据占用的空间往往约为未经序列化数据的 17%~33%。

Spark 也可以用磁盘来缓存数据。存储级别 MEMORY_AND_DISKMEMORY_AND_DISK_SER 分别类似于 MEMORYMEMORY_SER。对于 MEMORYMEMORY_SER,如果一个分区在内存里放不下,整个分区都不会放入内存。对于 MEMORY_AND_DISKMEMORY_AND_DISK_SER,如果分区在内存里放不下,Spark 会将其溢写到磁盘上。

虽然 DataFrame 和 RDD 都可以被缓存,但是有了 DataFrame 的模式信息,Spark 就可以利用数据的详细信息,帮助 DataFrame 在持久化数据时达到比使用 RDD 的 Java 对象高得多的效率。

决定何时缓存数据是一门艺术,这个决定通常涉及空间和速度之间的权衡,而且还要时不时受到垃圾收集器的影响,因此如何抉择是很复杂的事情。一般来说,当数据可能被多个操作依赖时,并且相对于集群可用的内存和磁盘空间而言,如果数据集较小,而且重新生成的代价很高,那么数据就应该被缓存起来。

数据缓存完后,接下来我们想要知道,记录中匹配记录相对于不匹配记录的比例。在使用 RDD API 的情况下,我们需要编写一个 Scala 内联函数,从每个记录中提取列 is_match 的值,得到一个 RDD[Boolean],然后调用 countByValue 函数来统计 truefalse 出现的频率,计算完成后将一个 Map[Boolean, Long] 返回给客户端。事实上,我们仍然可以对位于解析后的 DataFrame 底层的 RDD 执行这种计算。

parsed.rdd.
  map(_.getAs[Boolean]("is_match")).
  countByValue()
...
Map(true -> 20931, false -> 5728201)

DataFrame 封装的 RDD 由 org.apache.spark.sql.Row 的实例组成,包括通过索引位置(从 0 开始计数)获取每个记录中值的访问方法,以及允许通过名称查找给定类型的字段的 getAs[T] 方法。

虽然基于 RDD 的分析能获得我们想要的结果,但作为 Spark 上通用的数据分析方法,它还有许多待改进之处。首先,当数据集中仅有几个不同的值时,使用 countByValue 函数来进行统计是唯一的正确做法。如果有许多不同的值,那么使用一个不返回结果到客户端的 RDD 函数将更为高效,例如 reduceByKey。其次,如果需要在随后的计算中使用 countByValue 聚合函数返回的结果,那么我们需要使用 SparkContextparallelize 方法将数据从客户端发回集群。一般来说,对结构化数据的聚合,我们希望有一种简单的方法可以适用于任何大小的数据集,这正是 DataFrame API 所提供的功能:

parsed.
  groupBy("is_match").
  count().
  orderBy($"count".desc)
  show()
...
+--------+-------+
|is_match|  count|
+--------+-------+
|   false|5728201|
|    true|  20931|
+--------+-------+

我们不需要再写一个函数来解析 is_match 列,只需要将列名 is_match 传递给 DataFrame 的 groupBy 方法,然后调用 count() 方法计算每个分组的记录数,再将结果按 count 列降序排序,最后简单地通过 show 方法就可以将计算结果呈现在 REPL 中。幕后的 Spark 引擎决定了如何最高效地执行聚合并返回结果,而用户无须担心底层 RDD API 使用的细节。在基于 Spark 的数据分析中,这显然是一种更简单、更快速、更有表现力的方法。

值得注意的是,我们有两种方式引用 DataFrame 的列名:作为字面量引用,例如 groupBy ("is_match");或者作为 Column 对象应用,例如 count 列上使用的特殊语法 $"<col>"。这两种方法在大多数情况下都是合法的,但是在 count 列上调用 desc 方法时需要使用 $ 语法。如果漏掉了字符串前面的 $ 符号,Scala 就会抛出一个错误,因为类 String 没有一个名为 desc 的方法。

DataFrame 的聚合函数

除了 count 方法以外,结合 DataFrame API 的 agg 方法和 org.apache.spark.sql.functions 包中定义的聚合函数,我们可以计算更复杂的聚合分析,比如总和、最小值、最大值和标准差。例如,为了求 parsedcmp_sex 字段的整体均值和标准差,我们可以这样写代码:

parsed.agg(avg($"cmp_sex"), stddev($"cmp_sex")).show()
+-----------------+--------------------+
|     avg(cmp_sex)|stddev_samp(cmp_sex)|
+-----------------+--------------------+
|0.955001381078048|  0.2073011111689795|
+-----------------+--------------------+

注意,默认情况下 Spark 只计算样本标准差;要计算总体标准差,需要使用 stddev_pop 函数。

你可能已经注意到,DataFrame API 的函数很像 SQL 查询组件,这并不是一个巧合。事实上,我们可以把创建的任何 DataFrame 都看作数据库中的一张表,并且可以使用熟悉而又强大的 SQL 语法来表达我们的问题。首先,将 DataFrame 对象 parsed 所关联的表名告诉 Spark SQL 引擎,因为 parsed 这个变量名对于 Spark SQL 引擎是不可用的:

parsed.createOrReplaceTempView("linkage")

因为 parsed 这个 DataFrame 变量只在这个 Spark REPL 的会话中可用,所以 linkage 现在是一张临时表。Spark SQL 也可以用于查询 HDFS 中的持久性表,只要我们设置 Spark 连接 Apache Hive metastore,metastore 记录了结构化数据集的模式和位置。当临时表在 Spark SQL 引擎中注册后,我们可以这样查询它:

spark.sql("""
  SELECT is_match, COUNT(*) cnt
  FROM linkage
  GROUP BY is_match
  ORDER BY cnt DESC
""").show()
...
+--------+-------+
|is_match|    cnt|
+--------+-------+
|   false|5728201|
|    true|  20931|
+--------+-------+

和 Python 一样,Scala 中 3 个连续的双引号可以用来表示一个跨多行的字符串。Spark 1.x 的 Spark SQL 编译器主要是为了兼容 HiveQL 中的非标准语法,这样用户就能比较容易地从 Apache Hive 迁移到 Spark 上来。Spark 2.0 默认使用兼容 ANSI 2003 的 Spark SQL,当然我们也可以选择使用 HiveQL 模式,只需通过 Spark Sesssion 的 Builder API 创建一个 SparkSession 实例,然后调用 enableHiveSupport 方法即可。

在 Spark 中进行数据分析,到底是应该使用 Spark SQL 还是 DataFrame API 呢?这两种方法各有利弊。SQL 大家都很熟悉,简单的查询很容易表达。在常用的列式存储中,如 ORC 和 Parquet,SQL 是快速读取和过滤存储最好的方式。SQL 的缺点是很难用动态、可读和可测试的方式来表达复杂的多阶段分析,而这些都是 DataFrame API 的强项。在本书的其余章节,Spark SQL 和 DataFrame API 二者都会使用。读者可以思考一下我们为什么这样选择,并练习如何在二者之间进行转换。

Spark SQL 与 Hive 的连接

Spark 1.x 有一个 HiveContext 类,它是 SQLContext 的子类,并支持 Hive 特有的 SQL 方言—— HiveQL。如果复制一个 hive-site.xml 文件到 Spark 安装目录的 conf 文件夹下,HiveContext 就可以跟 Hive metastore 交互了。在 Spark 2.x 中,虽然 HiveContext 被弃用了,但是仍然可以通过 hive-site.xml 文件连接到 Hive metastore,还可以调用 SparkSession Builder API 的 enableHiveSupport 方法来使用 HiveQL 语法进行查询。

val sparkSession = SparkSession.builder.
 master("local[4]")
 .enableHiveSupport()
 .getOrCreate()

在 Spark 2.x 中,你可以将 Hive metastore 中任何一张表视为一个 DataFrame,并可以使用 Spark SQL 对 metastore 中的表进行查询,还可以将这些查询结果保存在 metastore 中,这样它们就可以被其他 SQL 工具访问到了,包括 Hive 自身、Apache Impala 和 Presto 等。

2.9 DataFrame的统计信息

虽然在许多数据分析工作中,无论使用 SQL 还是 DataFrame,结果都是一样的,但是仍有一些常见工作,用 DataFrame 表示起来很简洁,而用 SQL 却显得很冗余。例如,计算一个 DataFrame 中数值列所有非空值的最小值、最大值、平均值和标准差。在 R 中,这个函数叫 summary;在 Spark 中,这个函数叫 describe,与 Pandas 中相同:

val summary = parsed.describe()
...
summary.show()

DataFrame 类型的 parsed 实例中的每个变量,在 DataFrame 类型的 summary 实例中都有相对应的一列;还有一个名为 summary 的列,用于指示 countmeanstddevminmax 这 5 个指标在行中是否出现。为了让 summary 的统计信息更便于阅读和比较,我们可以使用 select 方法来选出一部分列:

summary.select("summary", "cmp_fname_c1", "cmp_fname_c2").show()
+-------+------------------+------------------+
|summary|      cmp_fname_c1|      cmp_fname_c2|
+-------+------------------+------------------+
|  count|           5748125|            103698|
|   mean|0.7129024704436274|0.9000176718903216|
| stddev|0.3887583596162788|0.2713176105782331|
|    min|               0.0|               0.0|
|    max|               1.0|               1.0|
+-------+------------------+------------------+

请注意,cmp_fname_c1cmp_fname_c2count 变量值并不相同:几乎每条记录的 cmp_fname_c1 都是非空的,而只有 2% 的记录中 cmp_fname_c2 字段是非空的。为了得到一个有用的分类器,模型依赖的那些变量在数据中不能有太多缺失值,除非这些值的缺失也表示某种与记录是否匹配相关的含义。

在对数据中变量的分布有了大体的了解后,我们就想知道这些变量与列 is_match 的值之间的相关性。因此,接下来我们对 parsedis_match 字段为 truefalse 的两个子集计算概要统计信息。我们既可以使用 SQL 风格的 where 语法,也可以使用 DataFrame API 的 Column 对象来过滤 DataFrame,然后对得到的 DataFrame 使用 describe 方法:

val matches = parsed.where("is_match = true")
val matchSummary = matches.describe()

val misses = parsed.filter($"is_match" === false)
val missSummary = misses.describe()

where 函数的字符串的内部逻辑如果放到 Spark SQL 的 WHERE 子句中,语法也是正确的。使用 DataFrame API 方式的过滤条件稍微复杂一点:我们需要对列 $"is_match" 使用 === 操作符,并且还需要用 lit 方法封装布尔文字 false,这样就可以将其转换成能与 is_match 做对比的 Column 对象。需要注意的是,where 函数是 filter 函数的一个别名,我们可以随意调换上述代码片段中的 wherefilter,而结果不会发生任何变化。

现在我们比较 matchSummarymissSummary 这两个 DataFrame,这样就能知道记录的匹配情况对变量的分布有何影响。尽管数据集相对较小,单做这种比较也没有多大意思,但其实我们真正想做的是对 matchSummarymissSummary 这两个 DataFrame 做一个转置,将它们的行与列调换,这样就可以将两个转置过的 DataFrame 按变量关联起来,以便分析这些概要统计信息,这种做法被大多数数据科学家称为“数据集转置”(pivoting)或“重塑”(reshaping)。下一节将展示如何在 Spark 中执行这些转换。

2.10 DataFrame的转置和重塑

为了转置概要统计信息,首先要做的是将 matchSummarymissSummary 这两个 DataFrame 类型实例从“宽表”转换成“长表”。宽表中行代表指标,列代表变量;长表的每一行代表一个指标、一个变量,以及指标和变量对应的值。转换完成后,我们就可以将长表形式的 DataFrame 转换成另外一个宽表形式的 DataFrame,这样就完成了转置操作,只不过这一次操作中,变量对应行,指标对应列。

将宽表转换成长表,可以利用 DataFrame 的 flatMap 方法,它是 RDD.flatMap 的一个封装。flatMap 是 Spark 中最有用的转换函数之一:它接受一个函数作为参数,该函数处理一条输入记录,并返回一个包含零条或多条输出记录的序列。你可以将 flatMap 看作我们使用过的 mapfilter 转换函数的一般形式:mapflatMap 的一种特殊形式,即一条输入记录仅产生一条输出记录;filterflatMap 的另一种特殊形式,即输入和输出类型相同,并且基于一个布尔函数决定返回零条或一条记录。

为了让 flatMap 方法在一般的 DataFrame 上也能工作,要用到 DataFrame 的 schema 对象来获取 DataFrame 每一列的名字:

summary.printSchema()
...
root
 |-- summary: string (nullable = true)
 |-- id_1: string (nullable = true)
 |-- id_2: string (nullable = true)
 |-- cmp_fname_c1: string (nullable = true)
...

summary 实例的 schema 变量中,每个字段都被视为一个字符串。要想分析数值形式的统计信息,需要将字符串转换为双精度浮点数,输出的 DataFrame 应该有 3 列:指标名称(countmean 等)、列名(id1cmp_by 等),以及该列统计信息的双精度值。

val schema = summary.schema
val longForm = summary.flatMap(row => {
  val metric = row.getString(0)
  (1 until row.size).map(i => {
    (metric, schema(i).name, row.getString(i).toDouble)
  })
})

以上代码片段做了很多事情,让我们逐行进行分析。对 DataFrame 类型的 summary 的每一行,我们通过调用 row.getString(0) 来依据位置获得这个指标的名称。对于这一行中位置 1 后的其他列,我们调用 flatMap 操作,生成了一个元组序列。元组中第一个条目是指标的名称,第二个条目是列的名字(通过 schema(i).name 对象获取),第三个条目是统计量的值,我们用强制类型转换将 row.getString(i) 方法获取的字符串解析成一个双精度浮点数。

toDouble 方法是隐式转换的一个实例,而隐式类型是 Scala 最强大(也可能最危险)的特性之一。在 Scala 中,类 String 的实例其实就是 java.lang.String,而 java.lang.String 类并没有名为 toDouble 的方法;相反,这个方法定义在一个名为 StringOps 的 Scala 类中。隐式转换的工作原理如下:当在 Scala 的对象上调用一个方法,并且 Scala 编译器没有在该对象上的类定义中找到这个方法,那么编译器就会尝试将你的对象转换成拥有这个方法的类的实例。在这种情况下,编译器会发现 Java 的 String 类没有定义 toDouble 方法,而类 StringOps 中却有这个方法,并且 StringOps 类还有一个方法可以将 String 类的实例转换成 StringOps 类的实例。编译器悄悄地将 String 对象转换成 StringOps 对象,并调用新对象的 toDouble 方法。

Scala 类库开发人员(包括 Spark 核心开发者)非常喜欢隐式转换类型。它允许开发者增强核心类的功能,这样即使是像 String 这种不允许修改的类也可以进行增强。但对这些工具的用户来说,隐式类型转换就不那么简单了,因为隐式类型转换使得用户难以找到定义类方法的确切位置。尽管如此,我们还会在示例中遇到一些隐式转换,所以我们有必要提前熟悉一下。

这个代码块中最后需要注意的一点是变量 longForm 的类型:

longForm: org.apache.spark.sql.Dataset[(String, String, Double)]

这是我们第一次直接使用 Dataset[T] 接口,尽管我们一直在使用它的一个特例 DataFrame,DataFrame 其实是 Dataset[Row] 类型的别名。Dataset[T] 是 Spark 2.0 中新添加的 API,它是 Spark 1.3 中引入的 DataFrame 类型的一般化,能够处理比 Row 更丰富的数据类型。本章稍后将更详细地介绍 Dataset 的接口,但是现在你需要知道的是,由于 Spark API 中的一些巧妙的隐式转换,我们总是可以将 Dataset 转换回 DataFrame:

val longDF = longForm.toDF("metric", "field", "value")
longDF.show()
+------+------------+-------------------+
|metric|       field|              value|
+------+------------+-------------------+
| count|        id_1|          5749132.0|
| count|        id_2|          5749132.0|
| count|cmp_fname_c1|          5748125.0|
...
| count|      cmp_by|          5748337.0|
| count|     cmp_plz|          5736289.0|
|  mean|        id_1|  33324.48559643438|
|  mean|        id_2|  66587.43558331935|
|  mean|cmp_fname_c1| 0.7129024704436274|
...
|  mean|      cmp_bd|0.22446526708507172|
|  mean|      cmp_bm|0.48885529849763504|
+------+------------+-------------------+

给定一个 DataFrame 长表,可以得到这样一个宽表:对用作转置表行的列执行 groupBy 操作,然后对用作转置表列的列执行 pivot 操作。pivot 操作需要知道转置列的所有不同值,对列 values 使用 agg(first) 操作,我们就可以指定宽表中每个单元格的值,因为每个 fieldmetric 的组合都只有一个值,所以这样做是没问题的:

val wideDF = longDF.
  groupBy("field").
  pivot("metric", Seq("count", "mean", "stddev", "min", "max")).
  agg(first("value"))
wideDF.select("field", "count", "mean").show()
...
+------------+---------+-------------------+
|       field|    count|               mean|
+------------+---------+-------------------+
|     cmp_plz|5736289.0|0.00552866147434343|
|cmp_lname_c1|5749132.0| 0.3156278193084133|
|cmp_lname_c2|   2464.0|0.31841283153174377|
|     cmp_sex|5749132.0|  0.955001381078048|
|      cmp_bm|5748337.0|0.48885529849763504|
...
|      cmp_bd|5748337.0|0.22446526708507172|
|      cmp_by|5748337.0| 0.2227485966810923|
+------------+---------+-------------------+

现在我们已经知道了如何转置一个 DataFrame 类型的 summary,让我们将这段逻辑用一个函数实现,这样就可以在 matchSummarymissSummary 这两个 DataFrame 中重用了。在另一个 shell 窗口中使用文本编辑器,复制并粘贴以下代码,并保存到一个名为 Pivot.scala 的文件中:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.first

def pivotSummary(desc: DataFrame): DataFrame = {
  val schema = desc.schema
  import desc.sparkSession.implicits._

  val lf = desc.flatMap(row => {
    val metric = row.getString(0)
    (1 until row.size).map(i => {
      (metric, schema(i).name, row.getString(i).toDouble)
    })
  }).toDF("metric", "field", "value")
  lf.groupBy("field").
    pivot("metric", Seq("count", "mean", "stddev", "min", "max")).
    agg(first("value"))
}

现在在 Spark shell 中键入 load Pivot.scala,Scala REPL 将会动态编译你的代码,使 pivotSummary 函数对 matchSummarymissSummary 都可用:

val matchSummaryT = pivotSummary(matchSummary)
val missSummaryT = pivotSummary(missSummary)

2.11 DataFrame的连接和特征选择

到目前为止,我们只用了 Spark SQL 和 DataFrame API 来过滤和聚合一个数据集中的记录,但是也可以使用这些工具来完成 DataFrame 之间的连接(内连接、左外连接、右外连接和全连接)。尽管 DataFrame API 有一个 join 函数,但是用 Spark SQL 来表示这些连接会更容易,特别是当待连接的表中有许多列名在两个表中都存在时,通过 select 表达式能更方便地指出所引用的列。让我们为 matchSummarymissSummary 这两个 DataFrame 创建临时视图,在 field 列上连接它们,并在结果行上计算一些简单的统计信息:

matchSummaryT.createOrReplaceTempView("match_desc")
missSummaryT.createOrReplaceTempView("miss_desc")
spark.sql("""
  SELECT a.field, a.count + b.count total, a.mean - b.mean delta
  FROM match_desc a INNER JOIN miss_desc b ON a.field = b.field
  WHERE a.field NOT IN ("id_1", "id_2")
  ORDER BY delta DESC, total DESC
""").show()
...
+------------+---------+--------------------+
|       field|    total|               delta|
+------------+---------+--------------------+
|     cmp_plz|5736289.0|  0.9563812499852176|
|cmp_lname_c2|   2464.0|  0.8064147192926264|
|      cmp_by|5748337.0|  0.7762059675300512|
|      cmp_bd|5748337.0|   0.775442311783404|
|cmp_lname_c1|5749132.0|  0.6838772482590526|
|      cmp_bm|5748337.0|  0.5109496938298685|
|cmp_fname_c1|5748125.0|  0.2854529057460786|
|cmp_fname_c2| 103698.0|  0.09104268062280008|
|     cmp_sex|5749132.0|0.032408185250332844|
+------------+---------+--------------------+

好的特征有两个特点:第一,对于匹配的记录和不匹配的记录,该特征的值有明显不同(因此,平均值之间的差距是很大的);第二,对于数据集的任何记录对,该特征通常都存在值,所以我们可以依赖它。按照这个标准,cmp_fname_c2 并不是很有用,因为它在很多时候都是缺失的,而且对于匹配的记录和不匹配的记录,该特征的平均值差异相对较小——取值范围都是 0~1,而平均值之差只有 0.09。cmp_sex 特征也不是特别有用,虽然它在每对记录中都出现了,但是平均值仅相差 0.03。

相比之下,cmp_plzcmp_by 这两个特征就非常好,它们几乎出现在每一对记录中,而且平均值的差值也很大(这两个特征都超过了 0.77)。cmp_bdcmp_lname_c1cmp_bm 这些特征看起来也有用:它们在数据集中总体上都是有值的,并且匹配记录的平均值和不匹配记录的平均值差值较大。

特征 cmp_fname_c1cmp_lname_c2 情况比较复杂:cmp_fname_c1 区分得不是很好(均值的差值只有 0.28),但是在每对记录中几乎都不会缺席;cmp_lname_c2 的平均值相差很大,但是在很多记录中都是缺失的。基于这份数据,我们不是很清楚在什么情况下应该在我们的模型中加入这两个特征。

现在,我们将基于 cmp_plzcmp_bycmp_bdcmp_lname_c1 以及 cmp_bm 这些明显很好的特征值之和,构建一个简单的评分模型,对记录的相似性进行排序。对于少数缺少这些特征值的记录,我们将在求和时使用 0 来替代 null。通过创建一个由计算出的评分与 is_match 列组成的 DataFrame,对区分匹配记录和不匹配记录的评分设置多种不同的阈值并评估效果,我们就能对这个简单模型的性能有一个大体认识了。

2.12 为生产环境准备模型

尽管我们可以把评分函数写成一个 Spark SQL 的查询,但是在很多情况下,我们希望能将评分规则或机器学习模型部署到生产环境中,在那里并没有足够的时间运行 Spark SQL 来得到答案。对于这些情况,我们希望编写和测试的函数能够在 Spark 上运行,但是生产代码并不依赖于 Spark JAR 包,也不需要运行 SparkSession 来执行代码。

为了剥离出模型中 Spark 特定的组件,我们希望有一种创建简单记录类型的方法,从而可以将 DataFrame 中的字段视作静态类型的变量,而不用在 Row 中动态查找。幸运的是,Scala 提供了一种便捷的语法来创建这些记录,称为 case 类。case 类是一个简单的不可变类,它默认实现了所有 Java 类的基本方法,例如 toStringequalshashCode,使其非常容易使用。让我们为记录关联数据创建一个 case 类,其中字段名字及其类型与 DataFrame parsed 中列的名字和类型一一对应:

case class MatchData(
  id_1: Int,
  id_2: Int,
  cmp_fname_c1: Option[Double],
  cmp_fname_c2: Option[Double],
  cmp_lname_c1: Option[Double],
  cmp_lname_c2: Option[Double],
  cmp_sex: Option[Int],
  cmp_bd: Option[Int],
  cmp_bm: Option[Int],
  cmp_by: Option[Int],
  cmp_plz: Option[Int],
  is_match: Boolean
)

值得注意的是,我们使用了 Scala 内建的 Option[T] 类型来表示输入数据中字段的值是否为 null。在使用之前,Option 类需要客户端节点检查特定的字段是否为空(使用 None 对象表示),以避免 Scala 代码中抛出 NullPointerExceptions。像 id_1id_2is_match 这种不含 null 值的字段,可以不使用 Option 封装。

定义了类之后,我们就可以使用 as[T] 方法将 parsed 转换为 Dataset[MatchData]

val matchData = parsed.as[MatchData]
matchData.show()

如你所见,matchData 这个 Dataset 中所有的列和值与 parsed 这个 DataFrame 中的数据是一样的,我们仍然可以对 matchData 使用所有 SQL 风格的 DataFrame API 方法以及 Spark SQL 代码。两者之间的主要区别是,当我们对 matchData 调用函数时,例如 mapflatMapfilter,我们处理的是 MatchData 这个 case 类,而不是 Row 类。

对于评分函数,我们将计算一个 Option[Double] 类型的字段(cmp_lname_c1)和 4 个 Option[Int] 类型的字段(cmp_plzcmp_bycmp_bd 以及 cmp_bm)的总和。让我们写一个小助手 case 类来减少检查 Option 值是否存在的一些相关样板代码:

case class Score(value: Double) {
  def +(oi: Option[Int]) = {
    Score(value + oi.getOrElse(0))
  }
 }

case 类 Score 以一个 Double 类型的值(当前和)开始,并定义了一个 \+ 方法。该方法取 Option 中当前和的值,如果有值的话就取该选项的值,否则返回 0,这样它就将 Option[Int] 值合并到当前和中。为了使评分函数的名称更容易理解,这里我们用到了 Scala 的一个特点:对函数名称的限制没有 Java 那么死板,Scala 函数名的选择更广。

def scoreMatchData(md: MatchData): Double = {
  (Score(md.cmp_lname_c1.getOrElse(0.0)) + md.cmp_plz +
      md.cmp_by + md.cmp_bd + md.cmp_bm).value
}

实现了评分函数后,我们现在可以计算 matchData Dataset 中的每个 MatchData 对象的分数和 is_match 字段的值,并将结果存储在一个 DataFrame 中:

val scored = matchData.map { md =>
(scoreMatchData(md), md.is_match)
}.toDF("score", "is_match")

2.13 评估模型

创建评分函数的最后一步是决定分数的阈值,超过该阈值就预测这两个记录是匹配的。如果阈值设置过高,匹配的记录将被错误地标记为不匹配,这种情况称为假阴性率(false-negative rate);而如果阈值设置过低,不匹配的记录将被错误地标记为匹配,这种情况称为假阳性率(false-positive rate)。对于任何非平凡的问题,我们总是需要在假阳性和假阴性之间进行取舍,而阈值的设置通常与模型应用的实际情况有关,需要在两种错误的相对代价之间进行权衡。

为了帮助我们选择阈值,可以创建一个 2×2 的关联表 [ 也称为交叉制表(cross tabulation)或交叉表(crosstab)],它统计记录中分数低于 / 高于阈值的记录数,以及两个类别中匹配 / 不匹配的记录数。因为尚不知道将要使用的阈值,所以我们要编写一个函数,它使用 DataFrame API,并以 scored 这个 DataFrame 和选择的阈值作为参数计算交叉表:

def crossTabs(scored: DataFrame, t: Double): DataFrame = {
  scored.
    selectExpr(s"score >= $t as above", "is_match").
    groupBy("above").
    pivot("is_match", Seq("true", "false")).
    count()
}

注意,我们引入了 DataFrame API 的 selectExpr 方法,基于 t 参数的值动态地决定字段 above 的值,这里使用了 Scala 的字符串替换语法。Scala 字符串替换语法允许我们使用名称替换变量,只要在字符串字面量前加上一个字母 s(又一个 Scala 隐式技巧带来的便利)。定义完上述字段,我们就可以创建一个通常由 groupBypivotcount 方法三者组合而成的交叉表。

使用一个较高的阈值 4.0,意味着 5 个特征的平均值为 0.8,可以过滤掉几乎所有的不匹配记录,同时保留 90% 的匹配记录。

crossTabs(scored, 4.0).show()
...
+-----+-----+-------+
|above| true|  false|
+-----+-----+-------+
| true|20871|    637|
|false|   60|5727564|
+-----+-----+-------+

使用一个较低的阈值 2.0,可以保证捕捉到所有已知的匹配记录,但代价是假阳性(见右上方的单元格)很高:

crossTabs(scored, 2.0).show()
...
+-----+-----+-------+
|above| true|  false|
+-----+-----+-------+
| true|20931| 596414|
|false| null|5131787|
+-----+-----+-------+

尽管假阳性的数量有点儿多,但这个比较宽松的过滤器仍然去掉了 90% 的不匹配记录,同时保留了所有的真正匹配。虽然这已经很好了,但是还有改进空间;读者可以试一下能否找到一个更好的评分函数,这个函数要能成功地识别每一个真正的匹配,并且保持假阳性少于 100 个,它可以使用来自 MatchData 的其他值(包括缺失和不缺失的)。

2.14 小结

在阅读本章之前,你可能还没有用 Scala 和 Spark 做过数据准备和分析;或者你已经熟悉 Spark 1.0 API,现在正在努力学习 Spark 2.0 中的新技术。无论何种情况,我们都希望你能体会到这些工具提供的强大支持。如果你已经有了使用 Scala 和 Spark 的经验,我们希望你把本章介绍给你的朋友和同事,让他们也了解 Scala 和 Spark 的强大之处。

本章的目标是为你提供足够的 Scala 知识,以便能够理解并完成本书中的其他实例。如果你习惯通过实例来学习,那你得继续看看后面几章,届时将介绍 Spark 的机器学习库 MLlib。

当你成为资深 Spark 和 Scala 数据分析人员时,可能需要开始构建工具和类库,以帮助其他分析师和数据科学家应用 Spark 来解决问题。此时看看 Scala 其他的书会对你的开发有所帮助,比如由 Deam Wampler 和 Alex Payne 所著的《Scala 程序设计》1,以及 Alvin Alexander 所著的 Scala CookBook(这两本书的英文版均由 O'Reilly 出版社出版)。

1该书第 2 版已经由人民邮电出版社出版,书号 9787115416810。——编者注

目录