第 3 章 音乐推荐和 Audioscrobbler 数据集

第 3 章 音乐推荐和Audioscrobbler数据集

作者:肖恩 • 欧文

 

偏好是无法度量的。

——佚名

经常有人问起我的职业。“数据科学”或“机器学习”固然听起来很高端,但常常把对方搞得一头雾水。发生这种情况很正常,即使数据科学家自己也很难把数据科学说清楚。数据科学就是存储大量数据,对数据进行计算,然后进行预测吗?通常这时我会直接举个例子来帮助提问者搞清楚我到底是做什么的:“嗯,你在亚马逊网站买了书以后,它会向你推荐类似的书,对吗?对,就是这个意思,其实它就用到了数据科学!”

从经验上来讲,推荐引擎大体上属于大规模机器学习。大家对此都了解,而且大部分人在亚马逊上都见过。从社交网络到视频网站,再到在线零售,都用到了推荐引擎,大家也都知道推荐引擎。实际应用中的推荐引擎我们也能直接看到。虽然我们知道 Spotify 上是计算机在挑选播放的歌曲,但我们可不一定知道 Gmail 系统可以判断收件箱里的邮件是不是垃圾邮件。

相比其他的机器学习算法,推荐引擎的输出更直观,更容易理解。有时这甚至会让人很激动。尽管我们认为每个人的音乐喜好都非常个性化,并且也很难解释这种现象,但是推荐引擎却很擅长推荐一些让人喜爱的歌曲,这些歌曲连我们自己都不知道会喜欢。

最后,在推荐引擎应用比较广泛的领域,比如音乐和电影,要解释为什么推荐的音乐和一个人以前听过的音乐相吻合,这是相对比较容易的。但对某些聚类和分类算法来说,情况就不是这样了。比如,支持向量机分类器其实就是一组系数,用这个分类器进行预测时,即使是业内人士,也很难解释这些系数的意义。

现在该开始介绍接下来的 3 章了,这 3 章讲述 Spark 中主要的机器学习算法。其中一章围绕推荐引擎展开,主要介绍音乐推荐。在随后的章节中我们先介绍 Spark 和 MLlib 的实际应用,接着介绍一些机器学习的基本思想,这样的阐述方式读者接受起来比较容易。

3.1 数据集

本章示例使用 Audioscrobbler 公开发布的一个数据集。Audioscrobbler 是 last.fm 的第一个音乐推荐系统。last.fm 创建于 2002 年,是最早的互联网流媒体广播站点之一。Audioscrobbler 提供了开放的“scrobbling”API,“scrobbling”可以记录听众播放过哪些艺术家的歌曲。last.fm(https://www.last.fm/)使用这些音乐播放记录构建了一个强大的音乐推荐引擎。由于第三方应用和网站可以把音乐播放数据反馈给这个推荐引擎,这个推荐引擎系统覆盖了数百万的用户。

在 last.fm 的年代,推荐引擎方面的研究大多局限于评分类数据。换句话说,人们常常把推荐引擎看成处理“Bob 给 Prince 的评价是 3 星半”这类输入数据的工具。

Audioscrobbler 数据集有些特别,因为它只记录了播放数据,如“Bob 播放了一首 Prince 的歌曲”。播放记录所包含的信息比评分要少。仅仅凭 Bob 播放过某歌曲这一信息并不能说明他真的喜欢这首歌。有时候我们会随便打开一首歌,甚至是整张专辑,然后就离开了房间,可能都不关心歌到底是谁唱的。

然而,人们虽然经常听音乐,但很少给音乐评分。因此 Audioscrobbler 数据集要大得多,它覆盖了更多的用户和艺术家,也包含了更多的总体信息,虽然单条记录的信息比较少。这种类型的数据通常被称为隐式反馈数据,因为用户和艺术家的关系是通过其他行动隐含体现出来的,而不是通过显式的评分或点赞得到的。

2005 年 last.fm 发布了该数据集的一个版本,读者可以在网上下载到压缩的归档文件(http://www-etud.iro.umontreal.ca/~bergstrj/audioscrobbler_data.html1。下载归档文件后,你会发现里面有几个文件。主要的数据集在文件 user_artist_data.txt 中,它包含 141 000 个用户和 160 万个艺术家,记录了约 2420 万条用户播放艺术家歌曲的信息,其中包括播放次数信息。

1若此链接无法下载,请访问 http://www.iro.umontreal.ca/~lisa/datasets/profiledata_06-May-2005.tar.gz。 ——译者注

数据集在 artist_data.txt 文件中给出了每个艺术家的 ID 和对应的名字。请注意,记录播放信息时,客户端应用提交的是艺术家的名字。名字如果有拼写错误,或使用了非标准的名称,事后才能被发现。比如,“The Smiths”“Smiths, The”和“the smiths”看似代表不同艺术家的 ID,但它们其实明显是指同一个艺术家。因此,为了将拼写错误的艺术家 ID 或 ID 变体对应到该艺术家的规范 ID,数据集提供了 artist_alias.txt 文件。

3.2 交替最小二乘推荐算法

现在我们要给这个隐式反馈数据选择一个合适的推荐算法。这个数据集只记录了用户和歌曲之间的交互情况。除了艺术家名字外,数据集没有包含用户的信息,也没有提供歌手的其他任何信息。我们要找的学习算法不需要用户和艺术家的属性信息。这类算法通常称为协同过滤算法(https://en.wikipedia.org/wiki/Collaborative_filtering)。举个例子,根据两个用户的年龄相同来判断他们可能有相似的偏好,这不叫协同过滤。相反,根据两个用户播放过许多相同歌曲来判断他们可能都喜欢某首歌,这才叫协同过滤。

Audioscrobbler 数据集包含了数千万条某个用户播放了某个艺术家歌曲次数的信息,看起来是很大。但从另一方面来看数据集又很小而且不充足,因为数据集是稀疏的。虽然数据集覆盖 160 万个艺术家,但平均来算,每个用户只播放了大约 171 个艺术家的歌曲。有的用户只播放过一个艺术家的歌曲。对这类用户,我们也希望算法能给出像样的推荐。毕竟每个用户在某个时刻只能播放一首歌曲。

最后,我们希望算法的扩展性好,不但能用于构建大型模型,而且推荐速度快。我们通常都要求推荐是接近实时的,也就是在一秒内给出推荐,而不是要等一天。

本实例将用到潜在因素(https://en.wikipedia.org/wiki/Factor_analysis)模型中的一种模型,这类模型涉及的范围很广泛。潜在因素模型试图通过数量相对少的未被观察到的底层原因,来解释大量用户和产品之间可观察到的交互。打个比方:有几千个专辑可选,为什么数百万人偏偏只买其中某些专辑?可以用对类别(可能只有数十种)的偏好来解释用户和专辑的关系,其中偏好信息并不能直接观察到,而数据也没有给出这些信息。

比如说,有一位客户购买了重金属乐队 Megadeth 和 Pantera 的专辑,同时还购买了古典音乐家莫扎特的专辑。很难解释为什么该客户只购买这些专辑,而没有买其他的。然而这也可能只是冰山一角,也许该客户喜欢的风格很广泛——从重金属到前卫摇滚,再到古典音乐。这种解释更简单,而且这样解释比较有利的一点是,客户可能对其他类型的专辑也感兴趣。在这个例子中,“喜欢重金属、前卫摇滚和古典音乐”这 3 个潜在因素可以解释数以万计的人对专辑的偏好。

说得更明确一些,本实例用的是一种矩阵分解模型(https://en.wikipedia.org/wiki/Non-negative_matrix_factorization)。数学上,这些算法把用户和产品数据当成一个大矩阵 A,矩阵第 i 行和第 j 列上的元素有值,代表用户 i 播放过艺术家 j 的音乐。矩阵 A 是稀疏的:A 中大多数元素都是 0,因为相对于所有可能的用户 - 艺术家组合,只有很少一部分组合会出现在数据中。算法将 A 分解为两个小矩阵 XY 的乘积。矩阵 X 和矩阵 Y 非常“瘦”,因为 A 有很多行和列,但 XY 的行很多而列很少(列数用 k 表示)。这 k 个列就是潜在因素,用于解释数据中的交互关系。

由于 k 的值小,矩阵分解算法只能是某种近似,如图 3-1 所示。

图 3-1:矩阵分解

矩阵分解算法有时称为矩阵补全(matrix completion)算法,因为原始矩阵 A 可能非常稀疏,但乘积 XY^T 是稠密的,即使该矩阵存在非零元素,非零元素的数量也非常少。因此模型只是对 A 的一种近似。原始 A 中大量元素是缺失的(元素值为 0),算法为这些缺失元素生成(补全)了一个值,从这个角度讲,我们可以把算法称为模型。

幸运的是,本例中的线性代数和我们的直觉很好地对应起来了。这种对应关系在这里是直接的,也是优雅的。两个矩阵分别有一行对应每个用户和每个艺术家。每行的值很少,只有 k 个。每个值代表了对应模型的一个隐含特征。因此行表示了用户和艺术家怎样关联到这 k 个隐含特征,而隐含特征可能就对应偏好或类别。于是问题就简化为用户 - 特征矩阵和特征 - 艺术家矩阵的乘积,该乘积的结果是对整个稠密的用户 - 艺术家相互关系矩阵的完整估计。该乘积可以理解成商品与其属性之间的一个映射,然后按用户属性进行加权。

不幸的是,A=XY^T 通常根本没有确切的解,原因就是 XY 通常不够大(严格来讲就是矩阵的阶太小),无法完美表示 A。这其实也是件好事。A 只是所有可能出现的交互关系的一个微小样本。在某种程度上我们认为 A 是对基本事实的一次观察,它太稀疏,因此很难解释这个基本事实。但用少数几个因素(k 个)就能很好地解释这个基本事实。想象一下你正在玩拼图游戏,图案是一只猫。游戏最终答案很简单,就是一只猫。但当你手头上只有几块拼板时,就会很难描述眼前看到的图案。

XY^T 应该尽可能逼近 A,毕竟这是所有后续工作的基础,但它不能也不应该完全复制 A。然而同样不幸的是,想直接同时得到 XY 的最优解是不可能的。好消息是,如果 Y 已知,求 X 的最优解是非常容易的,反之亦然。但 XY 事先都是未知的。

幸好有算法可以帮助我们摆脱这种两难的境地,并且能找到一个还不错的解决方案。具体来说,求解 XY 时,本章使用交替最小二乘(Alternating Least Squares,ALS)算法。这类方法在 Netflix 竞赛期间流行起来,对此一些论文功不可没,比如“Collaborative Filtering for Implicit Feedback Datasets”和“Large-scale Parallel Collaborative Filtering for the Netflix Prize”。实际上 Spark MLlib 的 ALS 算法实现思想就来源于这两篇论文。

虽然 Y 是未知的,但我们可以把它初始化为随机行向量矩阵。接着运用简单的线性代数,就能在给定 AY 的条件下求出 X 的最优解。实际上,X 的第 i 行是 A 的第 i 行和 Y 的函数,因此可以很容易分开计算 X 的每一行。因为 X 的每一行可以分开计算,所以我们可以将其并行化,而并行化是大规模计算的一大优点。

A_iY(Y^TY)^{-1}=X_i

要想两边精确相等是不可能的,因此实际的目标是最小化 |A_iY(Y^TY)^{-1}-X_i|,或者最小化两个矩阵的平方误差。这就是算法名称中“最小二乘”的来由。这里给出方程式只是为了说明行向量计算方法,但实践中从来不会对矩阵求逆,我们会借助于 QR 分解(https://en.wikipedia.org/wiki/QR_decomposition)之类的方法,这种方法速度更快而且更直接。

同理,我们可以由 X 计算每个 Y_j。然后又可以由 Y 计算 X,这样反复下去。这就是算法名称中“交替”的来由。这里有一个小问题:Y 是“瞎编”的,并且是随机的。X 是最优化计算出来的,这没错,但给定的 Y 却是“假”的。好在,只要这个过程一直继续,XY 最终会收敛得到一个合适的结果。

将 ALS 算法用于隐性数据矩阵分解时,ALS 矩阵分解要稍微复杂一点儿。它不是直接分解输入矩阵 A,而是分解由 0 和 1 组成的矩阵 P,当 A 中元素为正时,P 中对应元素为 1,否则为 0。A 中的具体值后面会以权重的形式反映出来。本书不对其中细节做过多讨论,但我们有必要知道如何使用该算法。

最后,ALS 算法也可以利用输入数据是稀疏的这一特点。稀疏的输入数据、可以用简单的线性代数运算求最优解,以及数据本身可并行化,这 3 点使得算法在大规模数据上速度非常快。这也就是我们要把 ALS 算法作为本章主题的主要原因,同时也解释了为什么到目前为止 Spark MLlib 只有 ALS 一种推荐算法。

3.3 准备数据

首先,需要得到数据集的文件。将 3 个数据文件全部复制到 HDFS。本章假定文件放在 /user/ds/ 目录下,启动 spark-shell。注意本章的运算比简单的应用要占用更多的内存。如果运行在本地而不是在集群上,为了保证内存充足,在启动 spark-shell 时要指定参数 --driver-memory 4g

构建模型的第一步是了解数据,对数据进行解析或转换,以便在 Spark 中做分析。

Spark MLlib 的 ALS 算法实现并不严格要求用户和产品的 ID 必须是数值型,不过当 ID 为 32 位非负整数时,效率会更高。使用 Int 表示 ID 是有好处的,但同时意味着 ID 不能超过 Int 的最大值(Int.MaxValue),即 2147483647。我们的数据集是否已经满足了这个要求?利用 SparkSessiontextFile 方法,将数据文件转换成 String 类型的数据集:

val rawUserArtistData =
  spark.read.textFile("hdfs:///user/ds/user_artist_data.txt")

rawUserArtistData.take(5).foreach(println)

...
1000002 1 55
1000002 1000006 33
1000002 1000007 8
1000002 1000009 144
1000002 1000010 314

默认情况下,该数据集为每个 HDFS 块生成一个分区,将 HDFS 块大小设为典型的 128 MB 或 64 MB。由于 HDFS 文件大小为 400 MB,所以文件被拆为 3 个或 6 个分区。这通常没什么问题,但由于相比简单文本处理,ALS 这类机器学习算法要消耗更多的计算资源,因此减小数据块大小以增加分区个数会更好。减小数据块大小能使 Spark 处理任务时同时使用的处理器核数更多,因为每个核可以独立处理一个分区数据。可以在读取文本文件以后,接着调用一个 .repartition(n) 来指定一个不同于默认值的分区数,这样就可以将分区数设得大一些。比如,可以考虑将这个参数设为集群处理器总核数。

文件的每行包含一个用户 ID、一个艺术家 ID 和播放次数,用空格分隔。要计算用户 ID 的统计信息,可以用空格拆分每行,并将前两个值解析为整数,其结果在概念上可以看成 Int 类型的两个列:用户 ID 和艺术家 ID。将其转换为包含列 userartist 的 DataFrame 是有意义的,因为这样就可以简单地计算出两列的最大值和最小值:

val userArtistDF = rawUserArtistData.map { line =>
  val Array(user, artist, _*) = line.split(' ') ➊
  (user.toInt, artist.toInt)
}.toDF("user", "artist")

userArtistDF.agg(
  min("user"), max("user"), min("artist"), max("artist")).show()

...
+---------+---------+-----------+-----------+
|min(user)|max(user)|min(artist)|max(artist)|
+---------+---------+-----------+-----------+
|       90|  2443548|          1|   10794401|
+---------+---------+-----------+-----------+

➊ 匹配并去掉剩余的标记。

最大的用户 ID 和艺术家 ID 分别是 2443548 和 10794401,而它们的最小值分别是 90 和 1,并没有出现负值。这些远比 2147483647 要小,所以在使用这些 ID 之前,没有必要进行额外的转换。

在这个例子后面的部分中,将会用到难以分辨的数字 ID 所对应的艺术家的名字,这些信息存储在 artist_data.txt 中。现在这个文件中包含了用制表符分割的艺术家 ID 和艺术家的名字。但是简单地把文件解析成二元组 (Int,String) 将会出错:

val rawArtistData = spark.read.textFile("hdfs:///user/ds/artist_data.txt")

rawArtistData.map { line =>
  val (id, name) = line.span(_ != '\t') ➊
  (id.toInt, name.trim)
}.count() ➋

...
java.lang.NumberFormatException: For input string: "Aya Hisakawa"

➊ 用第一个制表符分割行。

➋ 使用 .count 触发解析;这里会出错!

这里 span() 用第一个制表符将一行拆分成两部分,接着将第一部分解析为艺术家 ID,剩余部分作为艺术家的名字(去掉了空白的制表符)。文件里有少量行看起来是非法的:有些行没有制表符,有些行不小心加入了换行符。这些行会导致 NumberFormatException,它们不应该有输出结果。

然而,map() 函数要求对每个输入必须严格返回一个值,因此这里不能用这个函数。另一种可行的方法是用 filter() 方法删除那些无法解析的行,但这会重复解析逻辑。当需要将每个元素映射为零个、一个或更多结果时,我们应该使用 flatMap() 函数,因为它将每个输入对应的零个或多个结果组成的集合简单展开,然后放入到一个更大的数据集中。它可以和 Scala 集合一起使用,也可以和 Scala 的 Option 类一起使用。Option 代表一个值可以不存在,有点儿像只有 1 或 0 的一个简单集合,1 对应子类 Some,0 对应子类 None。因此在以下代码中,虽然 flatMap 中的函数本可以简单返回一个空 List,或一个只有一个元素的 List,但使用 SomeNone 更合理,这种方法简单明了。

val artistByID = rawArtistData.flatMap { line =>
  val (id, name) = line.span(_ != '\t')
  if (name.isEmpty) {
    None
  } else {
    try {
      Some((id.toInt, name.trim))
    } catch {
      case _: NumberFormatException => None
    }
  }
}.toDF("id", "name")

这里返回一个 DataFrame,艺术家 ID 和名字分别对应列“id”和“name”。

artist_alias.txt 将拼写错误的艺术家 ID 或非标准的艺术家 ID 映射为艺术家的正规名字。其中每行有两个 ID,用制表符分隔。这个文件相对较小,有 200 000 个记录。有必要把它转成 Map 集合的形式,将“不良的”艺术家 ID 映射到“良好的”ID,而不是简单地把它作为包含艺术家 ID 二元组的数据集。这里又有一点小问题:由于某种原因有些行没有艺术家的第一个 ID。这些行将被过滤掉:

val rawArtistAlias = spark.read.textFile("hdfs:///user/ds/artist_alias.txt")
val artistAlias = rawArtistAlias.flatMap { line =>
  val Array(artist, alias) = line.split('\t')
  if (artist.isEmpty) {
    None
  } else {
    Some((artist.toInt, alias.toInt))
  }
}.collect().toMap

artistAlias.head
...
(1208690,1003926)

比如,第一条将 ID 1208690 映射为 1003926。接下来我们可以从包含艺术家名字的数据集中进行查找:

artistByID.filter($"id" isin (1208690, 1003926)).show()

...
+-------+----------------+
|     id|            name|
+-------+----------------+
|1208690|Collective Souls|
|1003926| Collective Soul|
+-------+----------------+

显然,这条记录将“Collective Souls”映射为“Collective Soul”。后者才是这支乐队正确的名称。

3.4 构建第一个模型

虽然现在数据集的形式完全符合 Spark MLlib 的 ALS 算法实现的要求,但我们还需要一个额外的转换。如果艺术家 ID 存在一个不同的正规 ID,我们要用别名数据集将所有的艺术家 ID 转换成正规 ID。除此之外,只需要将输入的行解析成合适的列。可以定义一个辅助函数来做这件事情,以后还能重用它。

import org.apache.spark.sql._
import org.apache.spark.broadcast._

def buildCounts(
    rawUserArtistData: Dataset[String],
    bArtistAlias: Broadcast[Map[Int,Int]]): DataFrame = {
  rawUserArtistData.map { line =>
    val Array(userID, artistID, count) = line.split(' ').map(_.toInt)
    val finalArtistID =
      bArtistAlias.value.getOrElse(artistID, artistID) ➊
    (userID, finalArtistID, count)
  }.toDF("user", "artist", "count")
}

val bArtistAlias = spark.sparkContext.broadcast(artistAlias)

val trainData = buildCounts(rawUserArtistData, bArtistAlias)
trainData.cache()

➊ 如果艺术家存在别名,取得艺术家别名,否则取得原始名字。

虽然刚创建的 artistAlias 是驱动程序本地的一个 Map,我们仍然可以在 map() 函数中直接引用它。这是没问题的,因为 artistAlias 会随任务一起被自动复制。但是,它的体量可不小,要消耗大约 15 MB 内存,哪怕是序列化形式最少也得占用几兆字节。因为一个 JVM 中有许多任务,所以发送和存储如此多的副本太浪费了。

这时,我们可以为 artistAlias 创建一个广播变量,取名为 bArtistAlias。使用广播变量时,Spark 对集群中每个 executor 只发送一个副本,并且在内存里也只保存一个副本。如果有几千个任务在 executor 上并行执行,使用广播变量能节省巨大的网络流量和内存。

广播变量

Spark 执行一个阶段(stage)时,会为待执行函数建立闭包,也就是该阶段所有任务所需信息的二进制形式。这个闭包包括驱动程序里函数引用的所有数据结构。Spark 把这个闭包发送到集群的每个 executor 上。

当许多任务需要访问同一个(不可变的)数据结构时,我们应该使用广播变量。它对任务闭包的常规处理进行扩展,使我们能够:

  • 在每个 executor 上将数据缓存为原始的 Java 对象,这样就不用为每个任务执行反序列化;
  • 在多个作业、阶段和任务之间缓存数据。

举个例子,考虑自然语言处理应用的场景,这里需要用到一本大型英语单词词典,以及一个接受一行字符和单词词典作为输入的评分函数。广播词典意味着对每个 executor 只要执行一次传输数据:

val dict: Seq[String] = ...
val bDict = spark.sparkContext.broadcast(dict)
    
def query(path: String) = {
  spark.read.textFile(path).map(score(_, bDict.value))
  ...
}

在连接一张大表和一张小表时,DataFrame 操作有时也会自动利用广播变量,这一点超出了本书的范围。在某些时候,广播一张小表性能更好,这称为广播散列连接(broadcast hash join)。

调用 cache() 以指示 Spark 在 DataFrame 计算好之后将其暂时存储在集群的内存里。这样是有益的,因为 ALS 算法是迭代的,通常情况下至少要访问该数据 10 次以上。如果不调用 cache(),那么每次要用到 DataFrame 时都需要从原始数据中重新计算。如图 3-2 所示,Spark UI 界面的 Storage 标签页显示了有多少 DataFrame 被缓存起来了,占用了多少内存。图中 DataFrame 占用了集群将近 120 MB 的内存。

图 3-2:Spark UI 的 Storage 标签页,显示缓存 DataFrame 内存使用情况

注意,上面的 UI 中,Deserialized 标签实际上只与 RDD 相关,这里 Serialized 意味着数据是序列化成字节的形式存储在内存中的,而不是对象的形式。但是,像这样的 DatasetDataFrame 实例,会在内存中分别执行它们自己“编码”的公共数据类型。

实际上,120 MB 可以说小得惊人。考虑到这里存储了大约 2400 万条播放记录,快速粗略估计一下可以发现,这意味着每个 user-artist-count 的组合平均仅消耗了 5 字节。然而,3 个 32 位的整型就能消耗 12 字节。这是 DataFrame 的优点之一。因为存储的数据类型是原始的 32 位整型,所以能在内存中内部优化数据的表示方法。如果换成原始基于 RDD API 的 ALS,要存储 2400 万个 Rating 对象,RDD 占用内存将超过 900 MB。

最后,我们构建模型:

import org.apache.spark.ml.recommendation._
import scala.util.Random
val model = new ALS().
    setSeed(Random.nextLong()). ➊
    setImplicitPrefs(true).
    setRank(10).
    setRegParam(0.01).
    setAlpha(1.0).
    setMaxIter(5).
    setUserCol("user").
    setItemCol("artist").
    setRatingCol("count").
    setPredictionCol("prediction").
    fit(trainData)

➊ 使用随机种子。

这样我们就构建了一个带有默认配置的 ALSModel 模型。这个操作可能要花费几分钟或者更长时间,具体时间取决于所用的集群。有些机器学习模型最终可能只有几个参数或系数,相比之下,我们这里使用的模型是巨大的。对于每个用户和产品,模型都包含一个有 10 个值的特征向量。在本章的示例中,总共有超过 170 万个特征向量。模型用两个不同的 DataFrame,它们分别表示“用户 - 特征”和“产品 - 特征”这两个大型矩阵。

你看到的结果会有些不同,原因是最终的模型取决于初始特征向量,而这些初始特征向量是随机选择的。然而,MLlib 的 ALS 模型和其他组件默认设置了固定的随机种子,每次都会做出相同的随机选择。这一点和其他库不一样,在默认情况下,一般库的随机元素通常不是固定的。所以,在这里和以后使用 MLlib 时,需要使用 setSeed(Random.nextLong()) 设置一个真正的随机种子。

想看看某些特征向量,试试以下代码。它只显示一行,并且不截断特征向量显示宽度:

model.userFactors.show(1, truncate = false)

...
+---+----------------------------------------------- ...
|id |features                                        ...
+---+----------------------------------------------- ...
|90 |[-0.2738046, 0.03154172, 1.046261, -0.52314466, ...
+---+----------------------------------------------- ...

ALS 中的其他方法,如 setAlpha,会设置超参数,它们的值将影响模型的推荐质量,我们稍后再详细解释。更重要的是,首先要问:模型质量怎样?模型能给出好的推荐吗?

3.5 逐个检查推荐结果

应该看看模型给出的艺术家推荐直观上是否合理,我们检查一下用户播放过的艺术家,然后看看模型向用户推荐的艺术家。具体来看看用户 2093760 的例子。首先,我们观察他 / 她的播放记录来了解其品味。现在我们要提取该用户收听过的艺术家 ID 并打印他们的名字,这意味着先在输入数据中搜索该用户收听过的艺术家的 ID,然后用这些 ID 对艺术家集合进行过滤,这样我们就可以获取并按序打印这些艺术家的名字:

val userID = 2093760

val existingArtistIDs = trainData.
  filter($"user" === userID). ➊
  select("artist").as[Int].collect() ➋

artistByID.filter($"id" isin (existingArtistIDs:_*)).show() ➌


...
+-------+---------------+
|     id|           name|
+-------+---------------+
|   1180|     David Gray|
|    378|  Blackalicious|
|    813|     Jurassic 5|
|1255340|The Saw Doctors|
|    942|         Xzibit|
+-------+---------------+

➊ 找到用户 2093760 对应的行。

➋ 收集艺术家 ID 的整型集合。

➌ 过滤艺术家;:_* 变长参数语法。

用户播放过的艺术家既有大众流行音乐风格的也有嘻哈风格的。难道用户是 Jurassic 5 乐队的粉丝?记住这是 2005 年。顺便解释一下:Saw Doctors 是一支典型爱尔兰风格的摇滚乐队,在爱尔兰非常受欢迎。

不好的方面是,ALS 模型竟然没有提供直接计算用户最佳推荐的方法。用户最佳推荐的目的是评估用户对任意给定艺术家的偏好。Spark 2.2 加入了一个 recommendAll 方法来解决这个问题,但是在撰写本文的时候 Spark 2.2 还没有发布 2recommendAll 方法可以用来给所有的艺术家打分,然后返回其中分值最高的。

2Spark 2.2 已于 2017 年 7 月 11 日正式发布。——译者注

def makeRecommendations(
    model: ALSModel,
    userID: Int,
    howMany: Int): DataFrame = {

  val toRecommend = model.itemFactors.
    select($"id".as("artist")).
    withColumn("user", lit(userID)) ➊

  model.transform(toRecommend).
    select("artist", "prediction").
    orderBy($"prediction".desc).
    limit(howMany) ➋
}

➊ 选择所有艺术家 ID 与对应的目标用户 ID。

➋ 对所有艺术家评分,并返回其中分值最高的。

请注意,此方法不必过滤用户已经听过的艺术家的 ID。虽然这是很常见的需求,但并不是必需的,因为不过滤也不会影响我们最终的目标。

现在,做出推荐就很简单了,虽然采用这种方式计算它们需要一些时间。因此,它适用于批量评分,而不适用于实时评分。

val topRecommendations = makeRecommendations(model, userID, 5)
topRecommendations.show()

...
+-------+-----------+
| artist| prediction|
+-------+-----------+
|   2814|0.030201003|
|1300642|0.029290354|
|1001819|0.029130368|
|1007614|0.028773561|
|1037970|0.028646756|
+-------+-----------+

结果包含了一个艺术家 ID 和一个“预测”。虽然字段名称叫 rating,但其实不是估计的得分。对这类 ALS 算法,预测是一个 0~1 的模糊值,值越大,推荐质量越好。它不是概率,但可以把它理解成对 0/1 值的一个估计,0 表示用户不喜欢播放艺术家的歌曲,1 表示喜欢播放艺术家的歌曲。

得到所推荐艺术家的 ID 之后,就可以用类似的方法查到艺术家的名字:

val recommendedArtistIDs =
  topRecommendations.select("artist").as[Int].collect()

artistByID.filter($"id" isin (recommendedArtistIDs:_*)).show()

...
+-------+----------+
|     id|      name|
+-------+----------+
|   2814|   50 Cent|
|1007614|     Jay-Z|
|1037970|Kanye West|
|1001819|      2Pac|
|1300642|  The Game|
+-------+----------+

结果全部是嘻哈风格。我们一眼就能看出,这些推荐都不怎么样。虽然推荐的艺术家都受人欢迎,但好像并没有针对用户的收听习惯进行个性化。

3.6 评价推荐质量

当然,刚才只是对一个用户的推荐结果的一次主观评价。除了用户本人,其他任何人都很难对推荐的好坏给出定量描述。而且,想对推荐结果做人工评分,哪怕只评价一小部分结果,也是不切实际的。

我们假定用户会倾向于播放受人欢迎的艺术家的歌曲,而不会播放不受欢迎的艺术家的歌曲,这个假设是合理的。因此,用户的播放数据在一定程度上表示了“优秀的”和“糟糕的”艺术家推荐。这个假设虽然还有点儿问题,但是在没有其他数据的情况下,也只能这么做了。比如,170 万个艺术家,除了之前推荐的 5 个艺术家之外没有播放过的艺术家中,用户 2093760 很可能对其中某些感兴趣,所以不能说没有听过的艺术家都是“糟糕的”,都不能推荐。

如果根据好艺术家在推荐列表中排名应该靠前这个标准来评价推荐引擎,情况会怎样?推荐引擎这类的评分系统有几个指标,这个指标是其中之一。问题是如果将“好”的标准定义为“用户收听过艺术家”,那么推荐系统在输入中已经利用了这些信息。它可以简单把用户以前听过的艺术家作为最靠前的推荐结果返回,而这样就能得到最高的评价。然而这是无益的,因为推荐引擎的作用在于向用户推荐他从来没听过的艺术家。

为了使推荐变得有用,可以从数据集中拿出一些艺术家的播放数据放在一边,在整个 ALS 模型构建过程中并不使用这些数据。这些放在一边的数据中的艺术家可以作为每个用户的优秀推荐,但这些数据并没有喂给推荐引擎。让推荐引擎对模型中所有的产品进行评分,然后对比检查放在一边的艺术家的推荐排名情况。理想情况下,推荐引擎对这些艺术家的推荐排名应该最靠前或接近最靠前。

接着我们就可以计算推荐引擎的得分,方法是比较放在一边的艺术家推荐排名和整个数据集中的艺术家的推荐排名(在实践中,我们通常只比较一小部分,因为需要比较的艺术家组合可能非常多)。对比组合中放在一边的艺术家排名高的组合所占比例就是模型的得分。1.0 代表最好,0.0 代表最差,0.5 是随机给艺术家排名的模型的期望得分。

这个指标和一个信息检索概念直接相关,这个概念就是接受者操作特征(receiver operating characteristic,ROC,https://en.wikipedia.org/wiki/Receiver_operating_characteristic)曲线。上一段中的指标等于 ROC 曲线下区域的面积,称为 AUC(Area Under the Curve)。可以把 AUC 看成是随机选择的好推荐比随机选择的差推荐的排名高的概率。

AUC 指标也用于评价分类器。MLlib 的 BinaryClassificationMetrics 类实现了这个指标及相关方法。对于推荐引擎,为每个用户计算 AUC 并取其平均值,最后的结果指标稍有不同,可称为“平均 AUC”。我们需要自己来实现,因为它不是在 Spark 中实现的。

其他和评分系统相关的评价指标在 RankingMetrics 类中实现。这些指标包括准确率、召回率和平均准确率(Mean Average Precision,MAP,https://en.wikipedia.org/wiki/Information_retrieval)。MAP 也常用,它更强调排在最前面的推荐的质量。但是,AUC 作为一种普遍和综合的测量整体模型输出质量的手段,是我们采用的。

事实上,取出一部分数据来选择模型并评估模型准确率是所有机器学习的通用做法。通常数据被分成三个子集:训练集、交叉验证(Cross-Validation,CV)集和测试集。在本章的初步示例中,我们只用了两个数据集:训练集和交叉验证集。这对于模型选择来说已经足够了。第 4 章会进一步讨论这个概念并介绍测试集。

3.7 计算AUC

平均 AUC 的具体实现请参考本书附带的源代码。代码实现比较复杂,请参考源代码的注释,这里我们就不重复说明了。该实现接受一个交叉验证集和一个预测函数,交叉验证集代表每个用户对应的“正面的”或“好的”艺术家。预测函数把每个包含“用户 - 艺术家”对的 DataFrame 转换为一个同时包含“用户 - 艺术家”和“预测”的 DataFrame,“预测”表示“用户”与“艺术家”之间关联的强度值,这个值越高,代表推荐的排名越高。

为了利用输入数据,我们需要把它分成训练集和验证集。训练集只用于训练 ALS 模型,验证集用于评估模型。这里我们将 90% 的数据用于训练,剩余的 10% 用于交叉验证:

def areaUnderCurve(
    positiveData: DataFrame,
    bAllArtistIDs: Broadcast[Array[Int]],
    predictFunction: (DataFrame => DataFrame)): Double = {
  ...
}

val allData = buildCounts(rawUserArtistData, bArtistAlias) ➊
val Array(trainData, cvData) = allData.randomSplit(Array(0.9, 0.1))
trainData.cache()
cvData.cache()

val allArtistIDs = allData.select("artist").as[Int].distinct().collect() ➋
val bAllArtistIDs = spark.sparkContext.broadcast(allArtistIDs)

val model = new ALS().
    setSeed(Random.nextLong()).
    setImplicitPrefs(true).
    setRank(10).setRegParam(0.01).setAlpha(1.0).setMaxIter(5).
    setUserCol("user").setItemCol("artist").
    setRatingCol("count").setPredictionCol("prediction").
    fit(trainData)
areaUnderCurve(cvData, bAllArtistIDs, model.transform)

➊ 注意这个函数已经在前文定义过了。

➋ 去重并收集给驱动程序。

注意:areaUnderCurve() 把一个函数作为它的第三个参数。这里传入的是 ALSModeltransform,很快我们会把它替换成其他方法。

结果约为 0.879。这个结果好吗?它肯定比随机推荐的 0.5 要好,并且接近最高分 1.0。一般 AUC 超过 0.9 是高分。

可以从数据集中选择另外的 90% 作为训练集,这样就可以多次进行模型评估。得到的 AUC 值的平均可能会更好地估计算法在数据集上的表现。实际中一个常用的做法是把数据集分成 k 个大小差不多的子集,用 k-1 个子集做训练,在剩下的一个子集上做评估。我们把这个过程重复 k 次,每次用一个不同的子集做评估。这种做法称为 k 折交叉验证(k-fold cross-validation,https://en.wikipedia.org/wiki/Cross-validation_(statistics))算法。为了简便,我们在示例中并没有实现 k 折交叉验证技术。但 MLlib 的 CrossValidator API 在一定程度上提供了对这项技术的支持。这一验证用的 API 在第 4 章还会出现。

有必要把上述方法和一个更简单方法做一个基准比对。举个例子,考虑下面的推荐方法:向每个用户推荐播放最多的艺术家。这个策略一点儿都不个性化,但它很简单,也可能有效。定义这个简单预测函数并评估它的 AUC 得分:

def predictMostListened(train: DataFrame)(allData: DataFrame) = {

  val listenCounts = train.
    groupBy("artist").
    agg(sum("count").as("prediction")).
    select("artist", "prediction")

  allData.
    join(listenCounts, Seq("artist"), "left_outer").
    select("user", "artist", "prediction")
}

areaUnderCurve(cvData, bAllArtistIDs, predictMostListened(trainData))

这里再次显示了 Scala 语法的特别之处。这里函数定义看似有两个参数列表。调用函数并应用前两个参数得到了一个偏应用函数(partially applied function),这个函数本身又带一个参数(allData)并返回预测结果。predictMostListened(sc, trainData) 的返回结果是一个函数

结果得分大约是 0.88。这意味着,对 AUC 这个指标,非个性化的推荐表现已经不错了。然而,我们想要的是得分更高,也就是更为“个性化”的推荐。显然这个模型还有待改进。还有没有可能做得更好呢?

3.8 选择超参数

到目前为止,我们并没有对给出的超参数值做任何说明。这些值不是由算法学习得到的,而是由调用者指定的。配置的超参数包括以下几个。

  • setRank(10)

    模型的潜在因素的个数,即“用户 - 特征”和“产品 - 特征”矩阵的列数;一般来说,它也是矩阵的阶。

  • setMaxIter(5)

    矩阵分解迭代的次数;迭代的次数越多,花费的时间越长,但分解的结果可能会更好。

  • setRegParam(0.01)

    标准的过拟合参数,通常也称作 lambda;值越大越不容易产生过拟合,但值太大会降低分解的准确率。

  • setAlpha(1.0)

    控制矩阵分解时,被观察到的“用户 - 产品”交互相对没被观察到的交互的权重。

可以把 rankregParamalpha 看作模型的超参数。(maxIter 更像是对分解过程使用的资源的一种约束。)这些值不会体现在 ALSModel 的内部矩阵中,这些矩阵只是参数,其值由算法选定。超参数则是构建过程本身的参数。

刚才列表中给出的超参数值不一定是最优的。如何选择好的超参数值在机器学习中是个普遍性问题。最基本的方法是尝试不同值的组合并对每个组合评估某个指标,然后挑选指标值最好的组合。

在下面的示例中,我们尝试了 8 种可能的组合:rank = 5 或 30,regParam = 4.0 或 0.0001,以及 alpha = 1.0 或 40.0。这些值当然也是猜的,但它们能够覆盖很大范围的参数值。各种组合的结果按 AUC 得分从高到底排序:

val evaluations =
  for (rank     <- Seq(5,  30);
       regParam <- Seq(4.0, 0.0001);
       alpha    <- Seq(1.0, 40.0)) ➊
    yield {
      val model = new ALS().
        setSeed(Random.nextLong()).
        setImplicitPrefs(true).
        setRank(rank).setRegParam(regParam).
        setAlpha(alpha).setMaxIter(20).
        setUserCol("user").setItemCol("artist").
        setRatingCol("count").setPredictionCol("prediction").
        fit(trainData)
        val auc = areaUnderCurve(cvData, bAllArtistIDs, model.transform)

      model.userFactors.unpersist() ➋
      model.itemFactors.unpersist()

      (auc, (rank, regParam, alpha))
    }

evaluations.sorted.reverse.foreach(println) ➌

...
(0.8928367485129145,(30,4.0,40.0))
(0.891835487024326,(30,1.0E-4,40.0))
(0.8912376926662007,(30,4.0,1.0))
(0.889240668173946,(5,4.0,40.0))
(0.8886268430389741,(5,4.0,1.0))
(0.8883278461068959,(5,1.0E-4,40.0))
(0.8825350012228627,(5,1.0E-4,1.0))
(0.8770527940660278,(30,1.0E-4,1.0))

➊ 可以理解为 3 层嵌套 for 循环。

➋ 立即释放模型占用的资源。

➌ 按第一个值(AUC)的降序排列并输出。

 这里的 for 语法是 Scala 中写嵌套循环的一种方式,相当于一个 alpha 循环外面嵌套一个 regParam 循环,外面再嵌套一个 rank 循环。

虽然这些值的绝对差很小,但对于 AUC 值来说,仍然具有一定的意义。有意思的是,参数 alpha 取 40 的时候看起来总是比取 1 表现好(为了满足读者的好奇,顺便提一下,40 是前面提到的最初 ALS 论文的默认值之一)。这说明了模型在强调用户听过什么时的表现要比强调用户没听过什么时要好。

regParam 取较大的值看起来结果要稍微好一些。这表明模型有些受过拟合的影响,因此需要一个较大的 regParam 值以防止过度精确拟合每个用户的稀疏输入数据。第 4 章将进一步讨论过拟合。

正如预期的那样,对于这种体量的模型来说,5 个特征有点少,这个模型的表现要逊于使用 30 个用户品味特征的模型。正确的特征值个数实际上可能大于 30,而特征值个数太小时,无论是多少,区别都不大。

当然我们可以重复上述过程,试试不同的取值范围或试试更多值。这是超参数选择的一种暴力方式。但是在当今这个世界,这种简单粗暴的方式变得相对可行:集群常常有几 TB 内存,成百上千个核,Spark 之类的框架可以利用并行计算和内存来提高速度。

严格来说,理解超参数的含义其实不是必需的,但知道这些值的典型范围有助于找到一个合适的参数空间开始搜索,这个空间不宜太大,也不能太小。

我们目前使用的是较为原始的手动调参过程:设置超参数、构建模型、评估模型的三部曲。在第 4 章中学习更多 Spark ML API 以后,会发现一种更自动化的方式——管道(Pipeline+ TrainValidationSplit

3.9 产生推荐

现在用上一节中得到的最优超参数继续下面的工作,看看新模型对 ID 为 2093760 的用户给出什么样的推荐:

+-----------+
|       name|
+-----------+
|  [unknown]|
|The Beatles|
|     Eminem|
|         U2|
|  Green Day|
+-----------+

令人欣慰的是,现在给出的推荐要更符合这位用户的品味一些,其中大部分是流行摇滚,而非之前那样全都是嘻哈风格。推荐列表中 [unknown] 明显不是一个艺术家。查看原始数据集会发现 [unknown] 出现了 429 447 次,几乎可以排到前 100 了。[unknown] 是没有艺术家信息的播放记录的一个默认值,可能是某个客户端提供的。这个信息是没有用的,下次我们应该在开始的时候把它扔掉。这又再次说明,数据科学实践往往是迭代式的,每个阶段我们对数据都有新发现。

这个模型可以对所有用户产生推荐。它可以用于批处理,批处理每隔一个小时或更短的时间为所有用户重算模型和推荐结果,具体时间间隔取决于数据规模和集群速度。

但是目前 Spark MLlib 的 ALS 实现并不支持向所有用户给出推荐。该实现可以每次对一个用户进行推荐,这样每一次都会启动一个短的几秒钟的分布式作业。这适合对小用户群体快速重算推荐。下面对数据中的 100 个用户进行推荐并打印结果:

val someUsers = allData.select("user").as[Int].distinct().take(100) ➊
val someRecommendations =
  someUsers.map(userID => (userID, makeRecommendations(model, userID, 5))) ➋
someRecommendations.foreach { case (userID, recsDF) =>
  val recommendedArtists = recsDF.select("artist").as[Int].collect()
  println(s"$userID -> ${recommendedArtists.mkString(", ")}") ➌
}

...

1000190 -> 6694932, 435, 1005820, 58, 1244362
1001043 -> 1854, 4267, 1006016, 4468, 1274
1001129 -> 234, 1411, 1307, 189, 121
...

❶ 把 100 个(不同的)用户复制到驱动程序端。

map() 在这里是一个本地 Scala 运算。

mkString 用分隔符把集合中的元素连接成一个字符串。

现在只是把推荐结果打印出来。结果也可以写到外部存储上,比如 HBase(https://hbase.apache.org/)上,这样可以在运行时利用 HBase 提供快速查询。

有意思的是,整个流程也可能用于向艺术家推荐用户。这可用于回答类似这样的问题:“艺术家 X 的新专辑,哪 100 个用户可能最感兴趣?”在向艺术家推荐用户时,只需要在解析输入的时候对换用户和艺术家字段就可以了。

rawArtistData.map { line =>
  val (id, name) = line.span(_ != '\t')
  (name.trim, id.int)
}

3.10 小结

显然我们可以花多点儿时间来对模型参数进行调优,找出输入数据中的异常情况,比如说有的艺术家名字为 [unknown],并修复这些问题。举个例子来说,对播放次数进行快速分析就会发现,ID 为 2064012 的用户播放 ID 为 4468 的艺术家高达 439 771 次,这太让人吃惊了。ID 为 4468 的艺术家是杰出的独立金属乐队 System of a Down,之前的推荐中出现过。假定每首歌曲长度为 4 分钟,如果播放“Chop Suey!”和“B.Y.O.B.”这样的热门歌曲,33 年也完成不了。因为乐队 1998 年才开始录制唱片,即使同时播放 4~5 首单曲也要 7 年,所以这肯定是垃圾数据、数据错误或者某类实际数据问题,这些问题生产系统必须要解决。

ALS 不是唯一的推荐引擎算法。目前它是 Spark MLlib 唯一支持的算法。但是,对于非隐含数据,MLlib 也支持一种 ALS 的变体,它的用法和 ALS 是一样的,不同之处在于 ALS 使用 setImplicitPrefs(false) 配置。它适用于给出评分数据而不是次数数据。比如,如果数据集是用户对艺术家的打分,取值范围是 1~5,那么用这种变体就很合适。推荐方法 ALSModel.transform 返回结果列 prediction,它才是模型估算出的评分。这种情况下,简单的均方根误差(root mean squared error,RMSE)度量标准就能够评价推荐算法了。

今后 Spark MLlib 或其他库可能支持其他推荐算法。

在生产环境下,推荐引擎需要实时给出推荐,因为它们常用于电商网站。在这类环境中,客户浏览商品页面时需要推荐引擎频繁给出推荐。像前面提到的那样,预先计算并把得到的推荐结果存到一个 NoSQL 存储中,在大规模情况下不失为一种合理的策略。这种做法的一个缺点是需要对所有可能提供快速推荐的用户进行预计算,但这些用户可能是任何一个用户。举例来讲,即使 100 万个用户中每天只有 10 000 个访问网站,我们也要为 100 万个用户做预计算,其中 99% 的工作是浪费的。

如果能随时按需计算出推荐结果就更好了。虽然我们可以用 ALSModel 为单个用户计算推荐结果,它却还是一个分布式运算,需要花费几秒钟。因为 ALSModel 非常大,所以实际上是个分布式数据集。其他模型情况有些不同,它们可以更快地给出评分。Oryx 2(https://github.com/OryxProject/oryx)之类的项目试图实现实时按需推荐,其底层用 MLlib 之类的库,但用高效的方式访问内存中的模型数据。

目录