初识MapReduce

初识MapReduce

作者/Joel Grus

Joel Grus是Google的一位软件工程师,曾于数家创业公司担任数据科学家。目前住在西雅图,专注于数据科学工作并乐此不疲。偶尔在joelgrus.com发表博客,长期活跃于Twitter @joelgrus。

 

明天已经照耀现在,只是尚未洒满每个角落。

——威廉 · 吉布森

MapReduce 是一个用在大型数据集上执行并行处理的算法模型。尽管这是一个非常强大的技术,但它的原理却很简单。

假设我们有一组待处理的项目,这些项目可能是网页日志、许多本书的文本、图像文件或者是其他东西。处理项目时,MapReduce 算法包括下面几个基本步骤。

  • 使用 mapper 函数把每个项目转化成零个或多个键值对。(功能类似于Python 里的 map 函数,为了便于区分我们使用mapper。)

  • 用相同的键把所有的键值对收集起来。

  • 在每一个分好组的值集合上使用 reducer 函数,对每个对应的键生成输出值。

这样的讲解很抽象,我们接下来看一个具体的例子。数据科学里的绝对规则很少,但有一个必须遵守:第一个面对的 MapReduce 例子中必须要涉及单词计数。

案例:单词计数

DataSciencester 网站的用户数量已增长到了数百万!这对你的工作是极大的保障,但也使日常分析变得有点困难。

比如,内容部门的副总想知道用户的状态更新都涉及什么内容。作为初步的尝试,你决定统计一下出现的单词数量,这样你就可以准备一个关于出现频度最高的单词的报告。

当只有几百个用户的时候,下面的做法很简单:

def word_count_old(documents):
   """word count not using MapReduce"""
    return Counter(word
        for document in documents
        for word in tokenize(document))

但面对数百万个用户的时候,documents 集合就变得很大,你的电脑根本装不下。如果你能把它们纳入 MapReduce 模型处理,就可以使用你的引擎上已经部署的一些“大数据”架构。

首先,我们需要一个函数来把文档转化成一系列的键值对。我们希望输出的结果能按单词分组,这意味着键应该是单词。对每一个单词,我们只发送值 1 来表示这个键值对对应于单词出现一次:

def wc_mapper(document):
    """for each word in the document, emit (word,1)"""
    for word in tokenize(document):
        yield (word, 1)

先暂时跳过第二步,假设对某些词我们已经收集了所发送过的对应计数的列表。接下来生成我们所需要的这个词的全部计数:

def wc_reducer(word, counts):
    """sum up the counts for a word"""
    yield (word, sum(counts))

再返回步骤 2,现在我们需要收集来自 wc_mapper 的结果,再把它们传递给 wc_reducer。让我们思考一下怎么在一台计算机上完成这件事:

def word_count(documents):
    """count the words in the input documents using MapReduce"""

    # 存放分好组的值
    collector = defaultdict(list)

    for document in documents:
        for word, count in wc_mapper(document):
            collector[word].append(count)

    return [output
            for word, counts in collector.iteritems()
            for output in wc_reducer(word, counts)]

假设我们有三个文档 ["data science", "big data", "science fiction"]。

然后把 wc_mapper 应用到第一个文档,产生两个键值对 ("data", 1)("science", 1)。处理完所有三个文档之后,列表 collector 会包含:

{ "data" : [1, 1],
  "science" : [1, 1],
  "big" : [1],
  "fiction" : [1] }

然后 wc_reducer 函数生成了每个单词的计数:

[("data", 2), ("science", 2), ("big", 1), ("fiction", 1)]

为什么是MapReduce

MapReduce 的主要优点就是通过将处理过程移动到数据来进行分布式的计算。

我们最初的(非 MapReduce 的)方法要求机器在每一个文档上进行处理。这意味着所有的文档要么存在机器上要么在处理期间转移到机器上。更重要的是,这意味着机器一次只能处理一个文档。

如果机器是多核的,且如果代码是为了利用多核的优势而重写过的,那它是有可能一次处理几个文档的。但即使这样,所有的文档也都要放到机器中来。

假设现在我们将数十亿个文档分散到 100 台机器上。利用正确的架构(并且掩盖掉一些细节),我们可以做下面的事。

  • 让每一台机器在它的文档上运行 mapper 函数,产生大量的键值对。

  • 把那些键值对分配到一些“reducing”的机器上,确保对应任何一个给定键的对在同一台机器上完成计算。

  • 每一台 reducing 机器通过键分组这些对,然后对每个值的集合运行 reducer 函数。

  • 返回每个键值对。

它可以处理的横向规模水平令人惊叹。如果我们把机器的数量加倍,那么(忽略运行 MapReduce 系统的某些固定成本)计算的运行速度大约会快两倍。每一个 mapper 机器只需要做一半的工作,(假设有足够多不同的键来进一步分配 reducer 工作)reducer 机器也是。

更加一般化的MapReduce

思考一下,前面例子中所有的单词计数代码是包括在 wc_mapperwc_reducer 这两个函数中的。这意味着通过几个改变我们会得到一个更通用的框架(仍然是运行在单个机器上的):

def map_reduce(inputs, mapper, reducer):
    """runs MapReduce on the inputs using mapper and reducer"""
    collector = defaultdict(list)

    for input in inputs:
        for key, value in mapper(input):
            collector[key].append(value)

    return [output
            for key, values in collector.iteritems()
            for output in reducer(key,values)]

然后我们可以通过下面的方法简单地完成单词计数:

word_counts = map_reduce(documents, wc_mapper, wc_reducer)

这为我们解决各种类型的问题提供了灵活性。

在继续讲解之前,我们先观察一下 wc_reducer 函数,它仅仅是把对应于每一个键的值加起来。这种聚合是非常普遍的,值得把它抽象出来:

def reduce_values_using(aggregation_fn, key, values):
    """reduces a key-values pair by applying aggregation_fn to the values"""
    yield (key, aggregation_fn(values))

def values_reducer(aggregation_fn):
    """turns a function (values -> output) into a reducer
    that maps (key, values) -> (key, output)"""
    return partial(reduce_values_using, aggregation_fn)

之后我们就可以轻松创建以下内容:

sum_reducer = values_reducer(sum)
max_reducer = values_reducer(max)
min_reducer = values_reducer(min)
count_distinct_reducer = values_reducer(lambda values: len(set(values)))

案例:分析状态更新

内容部门的副总对单词计数印象深刻,并询问你还能从用户的状态更新中学到什么。你设法提取了一个类似下面这样的状态更新的数据集:

{"id": 1,
 "username" : "joelgrus",
 "text" : "Is anyone interested in a data science book?",
 "created_at" : datetime.datetime(2013, 12, 21, 11, 47, 0),
 "liked_by" : ["data_guy", "data_gal", "mike"] }

假设我们想找出每周内的哪一天人们讨论数据科学最多。为了找到这个结果,只需计算一下一周内每一天有多少个数据科学更新。这意味着我们需要按每周内的每天进行分组,这就是我们的键。而且如果对每一个包含“数据科学”的更新发送一个值 1,就可以使用 sum 函数得到总数:

def data_science_day_mapper(status_update):
    """yields (day_of_week, 1) if status_update contains "data science" """
    if "data science" in status_update["text"].lower():
        day_of_week = status_update["created_at"].weekday()
        yield (day_of_week, 1)

data_science_days = map_reduce(status_updates,
                               data_science_day_mapper,
                               sum_reducer)

再举一个稍微复杂一点的例子,假设我们需要找出每个用户在其状态更新中最常用的单词是什么。为了建立 mapper,我们的脑海中会浮现以下三种方法。

  • 把用户名放到键当中:把单词和计数放到值当中。

  • 把单词放到键当中:把用户名和计数放到值当中。

  • 把用户名和单词放到键当中:把计数放到值当中。

稍作考虑,很容易就能判断出应该按 username 分组,因为我们想分别考虑每个人的单词。我们不想用 word 来分组,因为 reducer 需要看到每个人所有的词汇,找到哪一个是最流行的。这意味着第一种选择是最优选择:

def words_per_user_mapper(status_update):
    user = status_update["username"]
    for word in tokenize(status_update["text"]):
        yield (user, (word, 1))

def most_popular_word_reducer(user, words_and_counts):
    """given a sequence of (word, count) pairs,
    return the word with the highest total count"""

    word_counts = Counter()
    for word, count in words_and_counts:
        word_counts[word] += count

    word, count = word_counts.most_common(1)[0]

    yield (user, (word, count))

user_words = map_reduce(status_updates,
                        words_per_user_mapper,
                        most_popular_word_reducer)

或者我们能为每个用户找到各自的状态点赞者的个数:

def liker_mapper(status_update):
    user = status_update["username"]
    for liker in status_update["liked_by"]:
        yield (user, liker)

distinct_likers_per_user = map_reduce(status_updates,
                                      liker_mapper,
                                      count_distinct_reducer)

案例:矩阵计算

回想“矩阵乘法”,给定一个 m × n 的矩阵 A 和一个 n × k 的矩阵 B,可以把它们乘起来得到 m × k 的矩阵 C,其中 C 的第 i 行第 j 列的元素由下式给出:

{%}

如同我们之前所见的,表示一个 m × n 矩阵的“自然的”方法是列表的列表,其中元素 Aij 是第 i 个列表的第 j 个元素。

但大型矩阵有时候是稀疏的,即大部分的元素等于 0。对于大型稀疏矩阵而言,列表的列表是一种非常浪费的表达方式。一种更简洁的表达方式是元组的列表 (name, i, j, value),其中 name 代表矩阵,而 i、j、value 表示一个非零元素的位置。

比如,一个十亿 × 十亿的矩阵会有亿亿级别(quintillion,1×1018)的元素,这是难以存储在一个计算机中的。但是如果每行当中只有不多的一些非零元素,上面那种替代的表示法就会小很多个数量级。

基于这种表示法,我们可以使用 MapReduce 以分布式的方式执行矩阵乘法。

为使用这种算法,请注意,Aij 只用于计算 C 的第 i 行的元素,Bij 只用于计算 C 的第 j 列的元素。我们的目标是使 reducer 的每一个输出构成矩阵 C 的一个元素。这意味着我们需要用 mapper 发送键值,以确定 C 中的每个元素。建议像下面这样处理:

def matrix_multiply_mapper(m, element):
    """m is the common dimension (columns of A, rows of B)
    element is a tuple (matrix_name, i, j, value)"""
    name, i, j, value = element

    if name == "A":
        # A_ij是每个C_ik之和的第j个元素,其中k=1..m
        for k in range(m):
            # 与C_ik的其他元素分组
            yield((i, k), (j, value))
    else:
        # B_ij是每个C_kj之和的第i个元素
        for k in range(m):
            # 与C_kj的其他元素分组
            yield((k, j), (i, value))

def matrix_multiply_reducer(m, key, indexed_values):
    results_by_index = defaultdict(list)
    for index, value in indexed_values:
        results_by_index[index].append(value)

    # 对有两个结果的位置把所有的乘积加起来
    sum_product = sum(results[0] * results[1]
                      for results in results_by_index.values()
                      if len(results) == 2)

    if sum_product != 0.0:
        yield (key, sum_product)

比如,如果你有如下的两个矩阵:

A = [[3, 2, 0],
     [0, 0, 0]]

B = [[4, -1, 0],
     [10, 0, 0],
     [0, 0, 0]]

你可以把它们重写为元组:

entries = [("A", 0, 0, 3), ("A", 0, 1,  2),
           ("B", 0, 0, 4), ("B", 0, 1, -1), ("B", 1, 0, 10)]
mapper = partial(matrix_multiply_mapper, 3)
reducer = partial(matrix_multiply_reducer, 3)

map_reduce(entries, mapper, reducer) # [((0, 1), -3), ((0, 0), 32)]

在这样一个小矩阵上操作并不太有趣,但是如果你有百万行百万列的矩阵,MapReduce 就会起很大作用。

题外话:组合器

你很可能已经注意到,许多 mapper 包括一些额外的信息。比如,在计数单词的时候,与其发送 (word, 1) 并累加求和,不如发送 (word, None) 再取其长度。

没有这么做的一个原因是,在分布式情景下,我们有时会想用组合器(combiner)来缩减在计算机之间转移的数据数量。如果一个 mapper 机器发现单词“data”500 次,可以让它在移交数据给缩减机器之前把 500 个 ("data", 1) 组合成一个单独的 ("data", 500)。这使得转移的数据少了很多,算法会大大加快。

基于我们编写 reducer 的方法,它能正确地处理这些组合的数据。(如果我们已经用 len 函数写了 reducer,它就不能处理组合了。)

最广泛使用 MapReduce 系统的是 Hadoop(http://hadoop.apache.org/),它值得用很多本书来阐述。它有许多商业或非商业的发行版,以及一个由 Hadoop 相关工具组成的巨大生态系统。

为了使用 Hadoop,你需要建立自己的聚类(或者可以在允许的情况下使用别人建好的聚类),当然,对于承压能力较差的人来说,可以不把这当成必然的任务。Hadoop mapper 和 reducer 通常是用 Java 写成的,尽管有一种被称为“Hadoop streaming”的功能允许你用其他的语言(包括 Python)来写。

Amazon.com 提供 Elastic MapReduce(http://aws.amazon.com/cn/elasticmapreduce/)服务,它可以用编程的方式来创建或销毁聚类,仅仅根据使用该服务的时长来收费。

mrjob(https://github.com/Yelp/mrjob)是 Python 的一个 Hadoop(或 Elastic MapReduce)的接口包。

Hadoop 任务是典型的高延迟的,这对于“实时分析”来说不是个好选择。有多种建立在 Hadoop 之上的“实时分析”工具,同时还有一些替代性的框架也日益流行。最流行的两种是 Spark(http://spark.apache.org/)和 Storm(http://storm.incubator.apache.org/)。

总之,还有很多新的分布式框架等待我们去开发。

本文节选自《数据科学入门》

 

{%}

《数据科学入门》基于Python语言环境从零开始讲解数据科学工作。Python易于理解,包含丰富的与数据科学相关的库。具体内容包括:Python速成,可视化数据,线性代数,统计,概率,假设与推断,梯度下降法,如何获取数据,k近邻法,朴素贝叶斯算法,等等。借助大量具体例子以及数据挖掘、统计学、机器学习等领域的重要概念,本书详细展示了什么是数据科学。