MapReduce:大型集群上的简化数据处理

程序员小x大约 39 分钟分布式系统分布式系统

MapReduce:大型集群上的简化数据处理

摘要

MapReduce 是一种编程模型以及相关的用于处理和生成大型数据集的实现。用户指定一个映射函数,该函数处理键值对以生成一组中间键值对,以及一个归约函数,该函数合并与同一中间键相关联的所有中间值。正如论文中所示,许多现实世界中的任务都可以用这个模型来表达。

用这种函数式风格编写的程序会自动并行化,并在一个由大量商品机器组成的大型集群上执行。运行时系统负责处理输入数据分区的细节、在一组机器上调度程序的执行、处理机器故障以及管理所需的机器间通信。这使得没有任何并行和分布式系统经验的程序员能够轻松利用大型分布式系统的资源。

我们的 MapReduce 实现运行在一个由大量商品机器组成的大型集群上,并且具有高度的可扩展性:一个典型的 MapReduce 计算在数千台机器上处理许多太字节的数据。程序员发现这个系统易于使用:已经实现了数百个 MapReduce 程序,并且每天在谷歌的集群上执行多达一千个 MapReduce 作业。

1.介绍

在过去的五年里,作者和谷歌的许多其他人已经实现了数百个特殊用途的计算,这些计算处理大量的原始数据,例如抓取的文档、网络请求日志等,以计算各种派生数据,如倒排索引、网络文档图形结构的各种表示、每个主机抓取的页面数量摘要、给定日期中最频繁的查询集合等。大多数这样的计算在概念上是直接明了的。然而,输入数据通常很大,并且为了在合理的时间内完成计算,这些计算必须分布在数百或数千台机器上。如何并行化计算、分布数据以及处理故障等问题共同作用,使得原本简单的计算被大量复杂的代码所掩盖,这些代码是用来处理这些问题的。

作为对这种复杂性的一种回应,我们设计了一种新的抽象概念,它允许我们表达我们试图执行的简单计算,同时将并行化、容错、数据分布和负载均衡等杂乱的细节隐藏在一个库中。我们的抽象概念受到 Lisp 和许多其他函数式语言中存在的 map(映射)和 reduce(归约)原语的启发。我们意识到,我们的大多数计算都涉及对输入中的每个逻辑 "记录" 应用一个映射操作,以便计算一组中间键值对,然后对具有相同键的所有值应用一个归约操作,以便适当地组合派生数据。我们使用带有用户指定的映射和归约操作的函数式模型,使得我们能够轻松地并行化大型计算,并将重新执行作为容错的主要机制。

这项工作的主要贡献在于提供了一个简单而强大的接口,能够实现大规模计算的自动并行化和分布,同时结合了该接口的一种实现,在由大量商用个人电脑组成的大型集群上实现了高性能。

下面是论文的组织结构:

  • 第 2 节描述了基本的编程模型并给出了几个例子。
  • 第 3 节描述了针对我们的基于集群的计算环境而定制的 MapReduce 接口的一种实现。
  • 第 4 节描述了我们发现有用的编程模型的几个改进之处。
  • 第 5 节对我们的实现针对各种任务进行了性能测量。
  • 第 6 节探讨了 MapReduce 在谷歌内部的使用,包括我们将其用作重写我们的生产索引系统的基础的经验。
  • 第 7 节讨论了相关工作和未来的工作。

2.编程模型

该计算接收一组输入键值对,并生成一组输出键值对。MapReduce 库的用户将计算表示为两个函数:Map(映射)和 Reduce(归约)。

Map(映射)函数由用户编写,接收一个输入键值对并生成一组中间键值对。MapReduce 库将所有与同一中间键 I 相关联的中间值组合在一起,并将它们传递给 Reduce(归约)函数。

Reduce(归约)函数,同样由用户编写,接收一个中间键 I 和该键的一组值。它将这些值合并在一起以形成一个可能更小的值集合。通常每次调用 Reduce 函数只会产生零个或一个输出值。中间值通过迭代器提供给用户的归约函数。这使得我们能够处理那些大到无法在内存中容纳的值列表。

2.1 例子

考虑在大量文档集合中计算每个单词出现次数的问题。用户会编写类似于以下伪代码的代码:

map(String key, String value):
    // key: document name
    // value: document contents
    for each word w in value:
        EmitIntermediate(w, "1");

reduce(String key, Iterator values):
    // key: a word
    // values: a list of counts
    int result = 0;
    for each v in values:
        result += ParseInt(v);
        Emit(AsString(result))

映射函数发出每个单词以及相关的出现次数计数(在这个简单的例子中只是 "1")。归约函数将针对特定单词发出的所有计数相加求和。

此外,用户编写代码以用输入和输出文件的名称以及可选的调优参数来填充一个 MapReduce 规范对象。然后,用户调用 MapReduce 函数,将规范对象传递给它。用户的代码与用 C++ 实现的 MapReduce 库链接在一起。附录 A 包含了这个示例的完整程序文本。

2.2 类型

尽管前面的伪代码是根据字符串输入和输出来编写的,但从概念上讲,用户提供的映射和归约函数具有相关联的类型:

map (k1,v1) → list (k2,v2)
reduce (k2,list (v2)) → list (v2)

也就是说,输入键和值来自与输出键和值不同的域。此外,中间键和值与输出键和值来自相同的域。 我们的 C++ 实现将字符串传递给用户定义的函数以及从用户定义的函数接收字符串,并将字符串与适当类型之间的转换留给用户代码来完成。

2.3 更多的例子

以下是一些可以很容易地表示为 MapReduce 计算的有趣程序的简单例子。

分布式 grep:如果一行匹配给定的模式,映射函数就输出这一行。归约函数是一个恒等函数,它只是将提供的中间数据复制到输出中。

URL 访问频率计数:映射函数处理网页请求日志并输出 <URL, 1>。归约函数将相同 URL 的所有值相加,并输出< URL, 总计数 > 对。

反向网页链接图:对于在名为 source 的页面中找到的每个指向目标 URL 的链接,映射函数输出 < target, source > 对。归约函数将与给定目标 URL 相关联的所有源 URL 列表连接起来,并输出 < target, 源列表 > 对。

这个使用场景主要是搜索引擎如何进行排序,pagerank算法的思路是如果一个网页被很多其他网页所链接,说明它受到普遍的关注和信赖,那么它的排名就高。同时,每个网页的权重不同,那些权重较大的网页拥有的链接更可靠,这些链接的排名往往更靠前。

这就需要网络爬虫统计每个链接 target 被哪些 source 引用过这种信息了。通过爬虫我们可以获得的是 <source, target>数据对,映射函数输出 < target, source > 对, 归约函数将与给定目标 URL 相关联的所有源 URL 列表连接起来,生成<target, list(source)),这个问题就可以很好的解决了。

每个主机的词向量:词向量将一个文档或一组文档中出现的最重要的词总结为 < hword, frequency > 对的列表。对于每个输入文档,映射函数输出 < hostname, 词向量 > 对(其中主机名是从文档的 URL 中提取的)。归约函数接收给定主机的所有每个文档的词向量。它将这些词向量相加,丢弃不常出现的词,然后输出最终的 < hostname, 词向量 > 对。

倒排索引:映射函数解析每个文档,并输出一系列 <word(单词), document ID(文档 ID)> 对。归约函数接收给定单词的所有对,对相应的文档 ID 进行排序,并输出 < word, list (document ID)> 对。所有输出对的集合形成一个简单的倒排索引。很容易扩展这个计算以跟踪单词位置。

分布式排序:映射函数从每个记录中提取键,并输出 < key(键), record(记录)> 对。归约函数原封不动地输出所有对。这个计算依赖于第 4.1 节中描述的分区功能和第 4.2 节中描述的排序属性。

3.实现

MapReduce 接口有许多不同的实现方式是可能的。正确的选择取决于环境。例如,一种实现可能适用于小型共享内存机器,另一种适用于大型 NUMA 多处理器,还有一种适用于更大规模的联网机器集合。

本节描述一种针对谷歌广泛使用的计算环境的实现:由通过交换式以太网连接在一起的大量商品个人电脑组成的大型集群。在我们的环境中:

  • 1.机器通常是运行 Linux 的双处理器 x86 处理器,每台机器有 2 - 4GB 的内存。
  • 2.使用商品网络硬件 —— 在机器层面通常是 100 兆比特 / 秒或 1 千兆比特 / 秒,但整体的二分带宽平均要小得多。
  • 3.一个集群由数百或数千台机器组成,因此机器故障很常见。
  • 4.存储由直接连接到各个机器的廉价 IDE 磁盘提供。内部开发的一个分布式文件系统用于管理存储在这些磁盘上的数据。该文件系统使用复制在不可靠的硬件之上提供可用性和可靠性。
  • 5.用户将作业提交给一个调度系统。每个作业由一组任务组成,并且由调度程序映射到一个集群内的一组可用机器上。

3.1 执行概述

通过自动将输入数据划分成一组 M 个分片,映射调用被分布在多台机器上。输入分片可以由不同的机器并行处理。归约调用通过使用分区函数(例如,hash (key) mod R)将中间键空间划分为 R 个部分来进行分布。分区的数量(R)和分区函数由用户指定。

图 1 展示了在我们的实现中 MapReduce 操作的总体流程。当用户程序调用 MapReduce 函数时,会发生以下一系列动作(图 1 中的编号标签与下面列表中的数字相对应):

map-reduce执行流程
map-reduce执行流程
  • 1.用户程序中的 MapReduce 库首先将输入文件分割成 M 个部分,通常每个部分为 16 兆字节到 64 兆字节(用户可以通过一个可选参数进行控制)。然后,它在一个机器集群上启动该程序的多个副本。

  • 2.程序副本中有一个是特殊的 —— 主节点。其余的都是由主节点分配工作的工作节点。有 M 个映射任务和 R 个归约任务需要分配。主节点挑选空闲的工作节点,并为每个工作节点分配一个映射任务或归约任务。

  • 3.被分配了映射任务的工作节点读取相应输入分片的内容。它从输入数据中解析出键值对,并将每一对传递给用户定义的映射函数。由映射函数生成的中间键值对被缓存在内存中。

  • 4.定期地,缓冲的键值对被写入本地磁盘,并通过分区函数被划分成 R 个区域。这些缓冲键值对在本地磁盘上的位置被传回给主节点,主节点负责将这些位置转发给归约工作节点。

    1. 当归约工作节点被主节点通知这些位置时,它使用远程过程调用从映射工作节点的本地磁盘读取缓冲数据。当归约工作节点读取了所有中间数据后,它按照中间键对数据进行排序,以便相同键的所有出现都被分组在一起。排序是必要的,因为通常许多不同的键会映射到同一个归约任务。如果中间数据的量太大而无法放入内存中,则使用外部排序。
    1. 归约工作节点遍历已排序的中间数据,对于遇到的每个唯一中间键,它将键和相应的中间值集合传递给用户的归约函数。归约函数的输出被附加到这个归约分区的最终输出文件中。
    1. 当所有映射任务和归约任务都已完成时,主节点唤醒用户程序。此时,用户程序中的 MapReduce 调用返回到用户代码。成功完成后,MapReduce 执行的输出可在 R 个输出文件中获得(每个归约任务一个文件,文件名由用户指定)。通常,用户不需要将这 R 个输出文件合并为一个文件 —— 他们经常将这些文件作为输入传递给另一个 MapReduce 调用,或者在另一个能够处理被划分到多个文件中的输入的分布式应用程序中使用它们。

成功完成后,MapReduce 执行的输出可在 R 个输出文件中获得(每个归约任务一个文件,文件名由用户指定)。通常,用户不需要将这 R 个输出文件合并为一个文件 —— 他们常常将这些文件作为输入传递给另一个 MapReduce 调用,或者在另一个能够处理被划分到多个文件中的输入的分布式应用程序中使用它们。

3.2 主节点的数据结构

主节点保存下面这些数据结构:

  • 对于每个映射任务和归约任务,它存储任务的状态(空闲、进行中或已完成)
  • 工作机器的标识(对于非空闲任务)。

主节点是中间文件区域的位置信息从映射任务传播到归约任务的通道。因此,对于每个已完成的映射任务,主节点存储该映射任务生成的 R 个中间文件区域的位置和大小。随着映射任务的完成,会接收到对这个位置和大小信息的更新。这些信息会逐渐推送给正在进行归约任务的工作节点。

3.3 容错性

由于 MapReduce 库被设计用于帮助使用数百或数千台机器处理非常大量的数据,所以该库必须优雅地容忍机器故障。

工作节点故障

主节点定期向每个工作节点发送 "心跳" 信号。如果在特定时间内没有收到某个工作节点的响应,主节点就将该工作节点标记为故障状态。由该故障工作节点已完成的任何映射任务都被重置回初始的空闲状态,因此有资格被调度到其他工作节点上执行。类似地,在故障工作节点上正在进行的任何映射任务或归约任务也被重置为空闲状态,并有资格被重新调度。

已完成的映射任务在出现故障时会被重新执行,因为它们的输出存储在故障机器的本地磁盘上,因此无法访问。已完成的归约任务不需要重新执行,因为它们的输出存储在全局文件系统中。

当一个映射任务先由工作节点 A 执行,然后由于 A 出现故障而由工作节点 B 执行时,所有执行归约任务的工作节点都会收到重新执行的通知。任何尚未从工作节点 A 读取数据的归约任务将从工作节点 B 读取数据。

MapReduce 对大规模的工作节点故障具有弹性。例如,在一次 MapReduce 操作中,正在运行的集群上的网络维护导致一次有 80 台机器的组在几分钟内无法访问。MapReduce 主节点只是重新执行了那些无法访问的工作节点所做的工作,并继续向前推进,最终完成了 MapReduce 操作。

主节点故障

让主节点定期对上述主节点的数据结构进行检查点写入是很容易的。如果主节点任务死亡,可以从最后一个检查点的状态启动一个新的副本。然而,鉴于只有一个主节点,它出现故障的可能性不大;因此,我们当前的实现如果主节点出现故障就会中止 MapReduce 计算。客户端可以检查这种情况,如果需要,可以重试 MapReduce 操作。

主节点并不具有高可用性,这应该属于MapReduce的一大缺点。主节点挂了就重试。

存在故障时的语义

当用户提供的映射(map)和归约(reduce)操作符是其输入值的确定性函数时,我们的分布式实现所产生的输出与整个程序无故障顺序执行所产生的输出相同。

这里解释一下确定性的含义。MapReduce中的确定性指的是给定相同的输入,映射(map)和归约(reduce)操作会每次产生相同的输出。这种确定性使得程序员可以确信,程序的行为是一致的,不会因执行环境的不同或并行化导致结果的变化。

这里作者强调了确定性的Map和Reduce操作符会产生与串行(非故障)执行相同的结果。这是分布式系统中很重要的属性,因为分布式系统本身会因为任务调度、节点故障等因素而导致操作的执行顺序不固定,所以确定性操作符确保了即使在并行执行或重试任务的情况下,程序的最终结果不会不同。

如何保证确定性

论文中提到,为了保证Map和Reduce任务的输出一致性,MapReduce框架使用了"原子提交"机制。具体来说:

  • 每个任务(Map或Reduce)会将自己的输出写入临时文件。
  • Map任务会为每个Reduce任务创建一个临时文件,总共有R个临时文件(其中R是Reduce任务的数量)。
  • Map任务完成后,会向主节点(Master)发送一个消息,告知它任务完成,并提供R个临时文件的名称。
  • 如果主节点收到一个已经完成的Map任务的消息,会忽略该消息。否则,主节点会记录这些临时文件。

而对于Reduce任务,原子重命名操作用于保证最终输出的一致性:

  • 每个Reduce任务会将其临时输出文件原子地重命名为最终的输出文件名。
  • 即使该Reduce任务在多个节点上并行执行,多个重命名操作也会确保最终文件的状态是一致的,不会出现冲突。

这种原子性操作是通过底层文件系统的原子重命名操作来实现的,这保证了在分布式环境中,即使有多次重试任务调度的变化,最终输出是唯一且一致的。

当MapReduce中的操作符是非确定性时,输出的结果就不再是完全确定的。在这种情况下,输出可能依赖于执行的顺序、调度或并发执行的任务。因此,框架提供了更弱的语义来处理这种情况。

具体来说,非确定性操作的语义是这样的:

  • 对于一个特定的Reduce任务(比如R1),其输出应该与在一个串行执行的非确定性程序中的某次执行结果一致。
  • 然而,对于另一个Reduce任务(比如R2),它的输出可能是基于不同串行执行的结果。换句话说,R1和R2的输出结果可能是由不同的串行执行路径产生的。

举个例子来解释Map和Reduce的非确定性

假设你有一个Map任务M,它的映射过程依赖于外部数据,比如一个API请求的返回结果。如果该API是一个非确定性的服务(比如返回的结果根据请求的时机而变化),那么Map任务M的输出就不是确定的。

例如:

  • 第一次执行Map任务M时,它可能从API获取了数据A,进而生成了某些输出。
  • 第二次执行Map任务M时,它可能从API获取了数据B(如果API返回的数据在两次请求之间发生了变化),进而生成不同的输出。

由于Map任务的输出是非确定的,不同的Reduce任务(R1和R2)可能会处理不同版本的Map输出,导致最终结果的不一致性。

论文中提到,在这种情况下,每个Reduce任务的输出仍然是确定的——它对应于某次串行执行的结果。但是,不同的Reduce任务可能会看到不同的Map输出,导致它们的结果也有所不同。

3.4 局部性

在我们的计算环境中,网络带宽是一种相对稀缺的资源。我们利用输入数据存储在构成我们集群的机器的本地磁盘上这一事实来节省网络带宽。GFS 将每个文件分成 64MB 的块,并在不同的机器上存储每个块的若干副本(通常是三个副本)。MapReduce 主节点考虑输入文件的位置信息,并尝试在包含相应输入数据副本的机器上调度一个映射任务。如果做不到这一点,它会尝试在该任务的输入数据副本附近调度一个映射任务(例如,在与包含数据的机器位于同一网络交换机上的工作机器上)。当在集群中的很大一部分工作机器上运行大型 MapReduce 操作时,大多数输入数据是在本地读取的,不消耗网络带宽。

数据就近处理。

3.5 任务粒度

如上文所述,我们将映射阶段细分为 M 个部分,将归约阶段细分为 R 个部分。理想情况下,M 和 R 应该远大于工作机器的数量。让每个工作机器执行许多不同的任务可以改善动态负载均衡,并且在工作机器出现故障时也能加快恢复速度:它已完成的许多映射任务可以分散到所有其他工作机器上。

在我们的实现中,M 和 R 的大小实际上存在一定的限制,因为主节点必须做出 O (M + R) 次调度决策,并在内存中保存 O (M * R) 的状态,如上文所述。(然而,内存使用的常量因子很小:O (M * R) 部分的状态大约是每个映射任务 / 归约任务对占用一个字节的数据。)此外,R 通常受到用户的限制,因为每个归约任务的输出最终会在一个单独的输出文件中。在实践中,我们倾向于选择 M,使得每个单独的任务大约有 16MB 到 64MB 的输入数据(这样上述的局部性优化效果最佳),并且我们使 R 是我们预期使用的工作机器数量的一小倍。我们经常使用 2000 台工作机器进行 MapReduce 计算,其中 M = 200000,R = 5000。

3.6 候补任务

导致 MapReduce 操作总耗时延长的一个常见原因是 "掉队者":一台机器在计算中完成最后几个映射或归约任务之一所需的时间异常长。

落后者的出现可能有很多原因。例如,一台磁盘有问题的机器可能会频繁出现可纠正的错误,使其读取性能从 30MB/s 降至 1MB/s。集群调度系统可能在该机器上调度了其他任务,由于对 CPU、内存、本地磁盘或网络带宽的竞争,导致它执行 MapReduce 代码的速度变慢。我们最近遇到的一个问题是机器初始化代码中的一个错误,导致处理器缓存被禁用:受影响的机器上的计算速度减慢了一百多倍。

我们有一种通用机制来缓解落后者的问题。当 MapReduce 操作接近完成时,主节点会调度剩余正在进行的任务的备份执行。只要主执行或备份执行中的任何一个完成,该任务就被标记为已完成。我们对这个机制进行了调整,使其通常不会使操作使用的计算资源增加超过几个百分点。 我们发现这显著减少了完成大型 MapReduce 操作的时间。例如,在第 5.3 节中描述的排序程序在禁用备份任务机制时完成时间要多花 44%。

4.改进

虽然仅仅通过编写映射(Map)和归约(Reduce)函数所提供的基本功能足以满足大多数需求,但我们发现一些扩展功能很有用。本节将对这些扩展功能进行描述。

4.1 分区函数

MapReduce 的用户指定他们期望的归约任务数量 / 输出文件数量(R)。数据通过对中间键使用分区函数在这些任务之间进行分区。提供了一个默认的分区函数,它使用哈希(例如 "hash (key) mod R")。这往往会产生相当均衡的分区。然而,在某些情况下,通过键的其他函数对数据进行分区是有用的。例如,有时输出键是 URL,我们希望单个主机的所有条目最终都在同一个输出文件中。为了支持这种情况,MapReduce 库的用户可以提供一个特殊的分区函数。例如,使用 "hash (Hostname (urlkey)) mod R" 作为分区函数会使来自同一主机的所有 URL 最终都在同一个输出文件中。

4.2 顺序保证

我们保证在给定的分区内,中间键值对按照键的升序进行处理。这个顺序保证使得为每个分区生成一个已排序的输出文件变得容易,当输出文件格式需要支持按键进行高效随机访问查找时,或者当输出的用户发现数据已排序很方便时,这是很有用的。

4.3 合并器函数

在某些情况下,每个映射任务产生的中间键中存在大量重复,并且用户指定的归约函数是可交换和可结合的。一个很好的例子是第 2.1 节中的单词计数示例。由于单词频率往往遵循齐普夫分布,每个映射任务将产生成百上千条形式为 <the, 1> 的记录。所有这些计数都将通过网络发送到一个单独的归约任务,然后由归约函数将它们相加得到一个数字。我们允许用户指定一个可选的合并器函数,在数据通过网络发送之前对其进行部分合并。

合并器函数在执行映射任务的每台机器上执行。通常,实现合并器和归约函数使用相同的代码。归约函数和合并器函数之间的唯一区别在于 MapReduce 库如何处理函数的输出。归约函数的输出被写入最终输出文件。合并器函数的输出被写入一个中间文件,该文件将被发送到归约任务。 部分合并显著加快了某些类别的 MapReduce 操作的速度。附录 A 包含一个使用合并器的示例。

4.4 输入和输出类型

MapReduce 库为以几种不同格式读取输入数据提供支持。例如,"文本" 模式输入将每一行视为一个键值对:键是文件中的偏移量,值是该行的内容。另一种常见的受支持格式存储按键排序的一系列键值对。每个输入类型的实现都知道如何将自身拆分为有意义的范围,以便作为单独的映射任务进行处理(例如,文本模式的范围拆分确保范围拆分仅发生在行边界处)。用户可以通过提供一个简单读取器接口的实现来添加对新输入类型的支持,尽管大多数用户只使用少数预定义的输入类型之一。 读取器不一定需要提供从文件中读取的数据。例如,很容易定义一个从数据库或从内存中映射的数据结构中读取记录的读取器。 以类似的方式,我们支持一组输出类型以不同的格式生成数据,并且用户代码很容易添加对新输出类型的支持。

4.5 副作用

在某些情况下,MapReduce 的用户发现从他们的映射和 / 或归约操作符中生成辅助文件作为额外输出很方便。我们依靠应用程序编写者使这种副作用具有原子性和幂等性。通常,应用程序写入一个临时文件,并在完全生成后原子性地重命名这个文件。

我们不支持单个任务生成的多个输出文件的原子两阶段提交。因此,生成具有跨文件一致性要求的多个输出文件的任务应该是确定性的。在实践中,这个限制从未成为问题。

4.6 跳过错误记录

有时用户代码中存在错误,导致映射或归约函数在某些记录上确定性地崩溃。这样的错误会阻止 MapReduce 操作完成。通常的做法是修复错误,但有时这不可行;也许错误在第三方库中,而该库的源代码不可用。另外,有时忽略一些记录是可以接受的,例如在对大型数据集进行统计分析时。我们提供一种可选的执行模式,在这种模式下,MapReduce 库检测哪些记录会导致确定性崩溃,并跳过这些记录以继续前进。 每个工作进程安装一个信号处理程序,用于捕获段错误和总线错误。在调用用户的映射或归约操作之前,MapReduce 库将参数的序列号存储在一个全局变量中。如果用户代码生成一个信号,信号处理程序会发送一个包含序列号的 "最后一口气" UDP 数据包给 MapReduce 主节点。当主节点在特定记录上看到多次失败时,它会在下次重新执行相应的映射或归约任务时指示应该跳过该记录。

4.7 本地执行

调试映射或归约函数中的问题可能很棘手,因为实际计算发生在分布式系统中,通常在数千台机器上,工作分配决策由主节点动态做出。为了便于调试、分析和小规模测试,我们开发了 MapReduce 库的另一种实现,它在本地机器上顺序执行 MapReduce 操作的所有工作。为用户提供了控制,以便计算可以限制在特定的映射任务上。用户使用一个特殊标志调用他们的程序,然后可以轻松使用他们认为有用的任何调试或测试工具(例如 gdb)。

4.8 状态信息

主节点运行一个内部 HTTP 服务器,并导出一组供人查看的状态页面。状态页面显示计算的进度,例如已经完成了多少任务、有多少任务正在进行、输入字节数、中间数据字节数、输出字节数、处理速率等等。页面还包含指向每个任务生成的标准错误和标准输出文件的链接。用户可以使用这些数据来预测计算需要多长时间,以及是否应该向计算中添加更多资源。 这些页面还可以用于确定计算何时比预期慢得多。此外,顶级状态页面显示哪些工作节点出现了故障,以及它们在出现故障时正在处理哪些映射和归约任务。当试图诊断用户代码中的错误时,这个信息很有用。

4.9 计数器

MapReduce 库提供了一种计数器工具,用于统计各种事件的发生次数。例如,用户代码可能想要统计处理的单词总数或者已索引的德语文档数量等等。 要使用这个工具,用户代码创建一个有名称的计数器对象,然后在映射和 / 或归约函数中适当地增加计数器的值。例如:

Counter* uppercase;
uppercase = GetCounter ("uppercase");
map (String name, String contents):
    for each word w in contents:
        if (IsCapitalized (w)):
            uppercase->Increment ();
        EmitIntermediate (w, "1");

来自各个工作机器的计数器值会定期传播到主节点(搭载在 ping 响应上)。主节点会聚合来自成功的映射和归约任务的计数器值,并在 MapReduce 操作完成时将它们返回给用户代码。当前的计数器值也会显示在主节点的状态页面上,以便用户可以观察正在进行的计算的进度。在聚合计数器值时,主节点会消除同一映射或归约任务的重复执行的影响,以避免重复计数。(重复执行可能是由于我们使用备份任务以及由于故障而重新执行任务引起的。)

一些计数器值由 MapReduce 库自动维护,例如处理的输入键值对的数量和生成的输出键值对的数量。

用户发现计数器工具对于检查 MapReduce 操作的行为是否正常很有用。例如,在一些 MapReduce 操作中,用户代码可能想要确保生成的输出键值对的数量恰好等于处理的输入键值对的数量,或者处理的德语文档的比例在处理的文档总数的某个可容忍比例范围内。

5.性能

在本节中,我们测量 MapReduce 在一大群机器上运行的两个计算的性能。一个计算在大约 1TB 的数据中搜索特定模式。另一个计算对大约 1TB 的数据进行排序。这两个程序代表了 MapReduce 用户编写的一大部分实际程序 —— 一类程序将数据从一种表示形式转换为另一种表示形式,另一类程序从大量数据集中提取少量有趣的数据。

5.1 集群配置

所有程序都在一个由大约 1800 台机器组成的集群上执行。每台机器有两个启用了超线程技术的 2GHz 英特尔至强处理器、4GB 内存、两个 160GB IDE 磁盘和一个千兆以太网链接。这些机器被安排在一个两级树形交换网络中,在根节点处有大约 100 - 200Gbps 的总带宽可用。所有的机器都在同一个托管设施中,因此任意两台机器之间的往返时间都小于一毫秒。在 4GB 的内存中,大约有 1 - 1.5GB 被集群上运行的其他任务保留。这些程序在一个周末的下午执行,此时 CPU、磁盘和网络大多处于空闲状态。

5.2 Grep

Grep 程序扫描 1010{10}^{10} 个 100 字节的记录,搜索一个相对罕见的三个字符的模式(该模式出现在 92337 个记录中)。输入被分割成大约 64MB 的块(M = 15000),并且整个输出被放置在一个文件中(R = 1)。

图 2 展示了随着时间推移计算的进展情况。Y 轴表示输入数据被扫描的速率。随着更多的机器被分配到这个 MapReduce 计算中,速率逐渐提升,当分配了 1764 个工作机器时,峰值超过 30GB / 秒。随着映射任务的完成,速率开始下降,并在计算开始约 80 秒时降至零。整个计算从开始到结束大约需要 150 秒。这包括大约一分钟的启动开销。这个开销是由于程序传播到所有工作机器,以及与 GFS 交互时的延迟,包括打开 1000 个输入文件以及获取局部性优化所需的信息。

5.3 排序

排序程序对 1010{10}^{10} 个 100 字节的记录进行排序(大约 1TB的数据)。这个程序是仿照 TeraSort 基准测试设计的。

这个排序程序由不到 50 行的用户代码组成。一个三行的映射函数从文本行中提取一个 10 字节的排序键,并将键和原始文本行作为中间键 / 值对输出。我们使用内置的恒等函数作为归约操作符。这个函数将中间键 / 值对原封不动地作为输出键 / 值对传递。最终排序后的输出被写入一组双向复制的 GFS 文件中(即,作为程序的输出写入 2TB 字节的数据)。

和之前一样,输入数据被分割成 64MB 的块(M = 15000)。我们将排序后的输出分割成 4000 个文件(R = 4000)。分区函数使用键的初始字节将其划分到 R 个部分中的一个。我们这个基准测试的分区函数内置了对键分布的了解。在一般的排序程序中,我们会添加一个预排序的 MapReduce 操作,它会收集键的样本,并使用采样键的分布来计算最终排序过程的分割点。

图 3(a)展示了排序程序正常执行的进展情况。左上图显示了输入读取的速率。速率在大约 13GB / 秒时达到峰值,并且很快下降,因为所有的映射任务在 200 秒之前就完成了。请注意,输入速率比 grep 的要低。这是因为排序映射任务大约花费一半的时间和 I/O 带宽将中间输出写入本地磁盘。而 grep 对应的中间输出大小可以忽略不计。

左中图显示了数据从映射任务通过网络发送到归约任务的速率。这种混洗操作在第一个映射任务完成后立即开始。图中的第一个隆起部分是针对第一批大约 1700 个归约任务(整个 MapReduce 被分配了大约 1700 台机器,并且每台机器一次最多执行一个归约任务)。在计算进行到大约 300 秒时,第一批归约任务中的一些完成了,我们开始为剩余的归约任务混洗数据。所有的混洗操作在计算进行到大约 600 秒时完成。

左下图显示了归约任务将排序后的数据写入最终输出文件的速率。在第一个混洗阶段结束和写入阶段开始之间有一个延迟,因为机器正忙于对中间数据进行排序。写入以大约 2 - 4GB / 秒的速率持续一段时间。所有的写入在计算进行到大约 850 秒时完成。 包括启动开销在内,整个计算需要 891 秒。这与目前 TeraSort 基准测试的最佳报告结果 1057 秒相似。

有几点需要注意:由于我们的局部性优化,输入速率高于混洗速率和输出速率 —— 大多数数据是从本地磁盘读取的,绕过了我们相对带宽受限的网络。混洗速率高于输出速率,因为输出阶段写入排序后数据的两份副本(出于可靠性和可用性的原因,我们对输出进行了两份复制)。我们写入两份副本是因为这是我们底层文件系统提供的可靠性和可用性机制。如果底层文件系统使用纠删码而不是复制,那么写入数据的网络带宽需求将会降低。

5.4 备份任务的影响

在图 3(b)中,我们展示了禁用备份任务时排序程序的执行情况。执行流程与图 3(a)所示类似,不同之处在于有一个很长的拖尾,几乎没有任何写入活动发生。在 960 秒后,除了 5 个归约任务外,所有任务都已完成。然而,这最后几个落后者直到 300 秒后才完成。整个计算耗时 1283 秒,所用时间增加了 44%。

5.5 机器故障

在图 3(c)中,我们展示了排序程序的一种执行情况,在计算进行几分钟后,我们故意杀死了 1746 个工作进程中的 200 个。底层的集群调度程序立即在这些机器上重新启动新的工作进程(因为只是进程被杀死,机器仍然正常运行)。

工作进程的死亡表现为负的输入速率,因为一些先前已完成的映射工作消失了(因为相应的映射工作进程被杀死),需要重新执行。这种映射工作的重新执行相对较快。整个计算包括启动开销在内用时 933 秒完成(仅比正常执行时间增加了 5%)。

附录A

单词频率

本节包含一个程序,用于统计在命令行中指定的一组输入文件中每个唯一单词的出现次数。

#include "mapreduce/mapreduce.h"
// User's map function
class WordCounter : public Mapper {
  public:
    virtual void Map(const MapInput& input) {
        const string& text = input.value();
        const int n = text.size();
        for (int i = 0; i < n; ) {
            // Skip past leading whitespace
            while ((i < n) && isspace(text[i]))
                i++;
            // Find word end
            int start = i;
            while ((i < n) && !isspace(text[i]))
                i++;
            if (start < i)
                Emit(text.substr(start,i-start),"1");
        }
}
};

REGISTER_MAPPER(WordCounter);

// User's reduce function
class Adder : public Reducer {
    virtual void Reduce(ReduceInput* input) {
        // Iterate over all entries with the
        // same key and add the values
        int64 value = 0;
        while (!input->done()) {
            value += StringToInt(input->value());
            input->NextValue();
        }
        // Emit sum for input->key()
        Emit(IntToString(value));
    }
};

REGISTER_REDUCER(Adder);

int main(int argc, char** argv) {
    ParseCommandLineFlags(argc, argv);
    MapReduceSpecification spec;
    // Store list of input files into "spec"
    for (int i = 1; i < argc; i++) {
        MapReduceInput* input = spec.add_input();
        input->set_format("text");
        input->set_filepattern(argv[i]);
        input->set_mapper_class("WordCounter");
    }
    // Specify the output files:
    // /gfs/test/freq-00000-of-00100
    // /gfs/test/freq-00001-of-00100
    // ...
    MapReduceOutput* out = spec.output();
    out->set_filebase("/gfs/test/freq");
    out->set_num_tasks(100);
    out->set_format("text");
    out->set_reducer_class("Adder");
    // Optional: do partial sums within map
    // tasks to save network bandwidth
    out->set_combiner_class("Adder");
    // Tuning parameters: use at most 2000
    // machines and 100 MB of memory per task
    spec.set_machines(2000);
    spec.set_map_megabytes(100);
    spec.set_reduce_megabytes(100);
    // Now run it
    MapReduceResult result;
    if (!MapReduce(spec, &result)) abort();
    // Done: ’result’ structure contains info
    // about counters, time taken, number of
    // machines used, etc.
    return 0;
}

PS

Map-reduce 排序是怎么做的?

假设现在有两份输入input1和input2,现在对其进行排序:

input1:

banana
apple
lemon
orange
mango

input2:

grape
cherry
pineapple
peach
kiwi

构建两个map任务(M=2),map任务将按照下面的规则对数据进行分区:

  • 所有以字母 a 到 f 开头的单词被分配分区1
  • 所有以字母 g 到 l 开头的单词被分配分区2
  • 所有以字母 m 到 z 开头的单词被分配分区3

map task1会将input1处理成下面的中间key-value对:

banana   =>  (1, banana)
apple    =>   (1, apple)
lemon    =>   (2, lemon)
orange   =>   (3, orange)
mango    =>   (3, mango)

map task2会将input2处理成下面的中间key-value对:

grape        =>   (2, grape)
cherry       =>   (1, cherry)
pineapple    =>   (3, pineapple)
peach        =>   (3, peach)
kiwi         =>   (2, kiwi)

接下来构建3个reduce任务,分别对partion1-3进行内部排序

reduce1的input为(1, [(1, banana),(1, apple), (1, cherry)]):

排序后为:

apple
banana
cherry

reduce2的input为(2, [(2, lemon),(2, grape), (2, kiwi)]):

排序后为:

grape
kiwi
lemon

reduce3的input为(3, [(3, orange),(3, mango), (3, pineapple),(3, peach)]):

排序后为:

mango
orango
peach
pineapple

将3个reduce的结果进行串联就可以得到全局排序的结果:

apple
banana
cherry
grape
kiwi
lemon
mango
orango
peach
pineapple
Loading...