弹性分布式数据集:内存集群计算的一种容错抽象

程序员小x大约 24 分钟分布式系统分布式系统

弹性分布式数据集:内存集群计算的一种容错抽象

摘要

我们提出了弹性分布式数据集(RDDs),这是一种分布式内存抽象,它允许程序员以容错的方式在大型集群上进行内存中的计算。RDDs 是由当前计算框架处理效率低下的两类应用所推动的:迭代算法和交互式数据挖掘工具。在这两种情况下,将数据保存在内存中可以将性能提高一个数量级。

为了有效地实现容错,RDDs 提供了一种受限形式的共享内存,它基于粗粒度的转换而不是对共享状态的细粒度更新。然而,我们表明 RDDs 具有足够的表现力,可以捕获广泛的计算类型,包括最近针对迭代作业的专门编程模型(如 Pregel)以及这些模型未涵盖的新应用。我们在一个名为 Spark 的系统中实现了 RDDs,并通过各种用户应用程序和基准测试对其进行了评估。

引言

像 MapReduce 和 Dryad 这样的集群计算框架已被广泛应用于大规模数据分析。这些系统允许用户使用一组高级操作符来编写并行计算程序,而无需担心工作分配和容错问题。

尽管当前的框架为访问集群的计算资源提供了众多抽象,但它们缺乏利用分布式内存的抽象。这使得它们对于一类重要的新兴应用来说效率低下,即那些在多个计算中重复使用中间结果的应用。数据重用在许多迭代机器学习和图算法中很常见,包括 PageRank、K - 均值聚类和逻辑回归。另一个引人注目的用例是交互式数据挖掘,其中用户在数据的同一子集上运行多个临时查询。不幸的是,在大多数当前的框架中,在计算之间(例如,在两个 MapReduce 作业之间)重用数据的唯一方法是将其写入外部稳定存储系统,例如分布式文件系统。这会产生大量的开销,原因在于数据复制、磁盘 I/O 和序列化,这些开销可能会占据应用程序的执行时间。

认识到这个问题后,研究人员为一些需要数据重用的应用开发了专门的框架。例如,Pregel是一个用于迭代图计算的系统,它将中间数据保存在内存中,而 HaLoop提供了一个迭代的 MapReduce 接口。然而,这些框架仅支持特定的计算模式(例如,循环一系列 MapReduce 步骤),并且针对这些模式隐式地执行数据共享。它们没有为更一般的重用提供抽象,例如,让用户将多个数据集加载到内存中并在它们之间运行临时查询。

在本文中,我们提出了一种名为弹性分布式数据集(RDDs)的新抽象,它能够在广泛的应用中实现高效的数据重用。RDDs 是具有容错能力的并行数据结构,它允许用户显式地将中间结果持久化在内存中,控制其分区以优化数据放置,并使用丰富的操作符集对其进行操作。

在设计弹性分布式数据集(RDDs)时,主要的挑战是定义一个能够高效提供容错能力的编程接口。现有的集群内存存储抽象,如分布式共享内存、键值存储、数据库和 Piccolo,提供了一个基于可变状态(例如,表中的单元)的细粒度更新的接口。使用这个接口,提供容错的唯一方法是在机器之间复制数据或在机器之间记录更新。对于数据密集型工作负载,这两种方法都很昂贵,因为它们需要在集群网络上复制大量数据,而集群网络的带宽远低于随机存取存储器(RAM)的带宽,并且会产生大量的存储开销。

与这些系统相比,RDDs 提供了一个基于粗粒度转换(例如,映射、过滤和连接)的接口,这些转换将相同的操作应用于许多数据项。这使得它们能够通过记录用于构建数据集的转换(其血统)而不是实际数据来有效地提供容错能力。如果一个 RDD 的分区丢失,RDD 有足够的信息表明它是如何从其他 RDD 派生而来的,从而可以仅重新计算那个分区。因此,丢失的数据可以恢复,通常非常快,而不需要昂贵的复制。

虽然基于粗粒度转换的接口乍一看可能有限,但 RDD 非常适合许多并行应用程序,因为这些应用程序自然地对多个数据项应用相同的操作。实际上,我们表明 RDD 可以有效地表达许多迄今为止作为独立系统提出的集群编程模型,包括 MapReduce、DryadLINQ、SQL、Pregel 和 HaLoop,以及这些系统未涵盖的新应用,如交互式数据挖掘。我们相信,RDD 能够满足以前只能通过引入新框架才能满足的计算需求的能力,是 RDD 抽象强大的最有力证据。

我们在一个名为 Spark 的系统中实现了 RDD。Spark 正在加州大学伯克利分校和几家公司用于研究和生产应用。Spark 在 Scala 编程语言 中提供了一个类似于 DryadLINQ 的方便的语言集成编程接口。此外,Spark 可以交互式地从 Scala 解释器中查询大型数据集。 我们相信,Spark 是第一个允许通用编程语言以交互速度在集群上进行内存数据挖掘的系统。

我们通过微基准测试和对用户应用程序的测量来评估 RDD 和 Spark。我们表明,对于迭代应用程序,Spark 比 Hadoop 快高达 20 倍,将一个实际的数据分析报告的速度提高了 40 倍,并且可以交互式地用于扫描 1TB 的数据集,延迟为 5 - 7 秒。更重要的是,为了说明 RDD 的通用性,我们在 Spark 之上实现了 Pregel 和 HaLoop 编程模型,包括它们所采用的放置优化,作为相对较小的库(每个库 200 行代码)。

本文的结构如下:

  • 第 2 部分, 对RDD 和 Spark 进行概述
  • 第 4 部分 讨论 RDD 的内部表示
  • 第 5 部分讨论我们的实现
  • 第 6 部分讨论实验结果
  • 第 7 部分我们讨论 RDD 如何捕获几个现有的集群编程模型
  • 第 8 部分综述相关工作并得出结论

2.弹性分布式数据集(RDDs)

本节对弹性分布式数据集(RDDs)进行概述。首先定义 RDDs(2.1 节)并介绍其在 Spark 中的编程接口(2.2 节)。然后将 RDDs 与更细粒度的共享内存抽象进行比较(2.3 节)。最后,讨论 RDD 模型的局限性(2.4 节)。

2.1 RDD 抽象

从形式上讲,弹性分布式数据集(RDD)是一个只读的、分区的记录集合。RDD 只能通过对以下两者之一进行确定性操作来创建:(1)稳定存储中的数据;(2)其他 RDD。我们将这些操作称为转换,以区别于对 RDD 的其他操作。转换的例子包括映射(map)、过滤(filter)和连接(join)。

RDD 并非在所有时候都需要被具体化。相反,一个 RDD 拥有足够的关于它是如何从其他数据集派生而来的信息(它的血统),以便从稳定存储中的数据计算出它的分区。这是一个强大的属性:本质上,一个程序不能引用一个在出现故障后无法重新构建的 RDD。

最后,用户可以控制 RDD 的另外两个方面:持久性和分区。用户可以指出他们将重用哪些 RDD,并为它们选择一种存储策略(例如,内存存储)。他们还可以要求根据每个记录中的一个键将 RDD 的元素跨机器进行分区。这对于放置优化很有用,例如确保要连接在一起的两个数据集以相同的方式进行哈希分区。

2.2 Spark 编程接口

Spark 通过一种与 DryadLINQ 和 FlumeJava 类似的语言集成 API 来展示 RDD,在这种 API 中,每个数据集都表示为一个对象,并且通过对这些对象调用方法来执行转换操作。

程序员首先通过对稳定存储中的数据进行转换(例如映射和过滤)来定义一个或多个 RDD。然后,他们可以在行动(actions)中使用这些 RDD,行动是指那些向应用程序返回一个值或者将数据导出到存储系统的操作。行动的例子包括计数(count,返回数据集中的元素数量)、收集(collect,返回元素本身)以及保存(save,将数据集输出到存储系统)。与 DryadLINQ 一样,Spark 在行动中首次使用 RDD 时会延迟计算它们,以便能够将转换操作进行流水线处理。

此外,程序员可以调用持久化方法(persist method)来表明他们希望在未来的操作中重用哪些 RDD。默认情况下,Spark 将持久化的 RDD 保存在内存中,但如果内存不足,它可以将其溢出到磁盘上。用户还可以通过向持久化方法传入标志来请求其他持久化策略,例如仅将 RDD 存储在磁盘上或者在多台机器上复制它。最后,用户可以为每个 RDD 设置一个持久化优先级,以指定哪些内存中的数据应该首先溢出到磁盘上。

2.2.1 示例:控制台日志挖掘

假设一个网络服务出现错误,操作员想要在 Hadoop 文件系统(HDFS)中搜索数 TB 的日志以找到原因。使用 Spark,操作员可以将日志中的仅错误消息加载到一组节点的内存中,并进行交互式查询。

她首先会输入以下 Scala 代码:

lines = spark.textFile("hdfs://...")
errors = lines.filter(_.startsWith("ERROR"))
errors.persist()

第 1 行定义了一个由 HDFS 文件支持的 RDD(作为文本行的集合),而第 2 行从它派生一个经过过滤的 RDD。第 3 行随后要求将错误持久化到内存中,以便可以在查询之间共享。请注意,传递给过滤器的参数是闭包的 Scala 语法。

此时,在集群上还没有执行任何工作。然而,用户现在可以在行动中使用这个 RDD,例如,计算消息的数量:

errors.count()

用户还可以对 RDD 执行进一步的转换操作,并使用其结果,如下所示:

// Count errors mentioning MySQL:
errors.filter(_.contains("MySQL")).count()

// Return the time fields of errors mentioning
// HDFS as an array (assuming time is field
// number 3 in a tab-separated format):
errors.filter(_.contains("HDFS"))
.map(_.split('\t')(3))
.collect()

在涉及 "errors" 的第一个行动运行后,Spark 将把 "errors" 的分区存储在内存中,这极大地加快了后续对它的计算。请注意,基础 RDD "lines" 并没有加载到内存中。这是可取的,因为错误消息可能只是数据的一小部分(小到足以放入内存中)。

最后,为了说明我们的模型是如何实现容错性的,我们在图 1 中展示了我们第三个查询中 RDD 的血统图。在这个查询中,我们从 "errors" 开始,它是对 "lines" 进行过滤的结果,在运行 "collect" 之前应用了进一步的过滤和映射。Spark 调度器将把后两个转换操作进行流水线处理,并将一组计算它们的任务发送到持有 "errors" 缓存分区的节点上。此外,如果 "errors" 的一个分区丢失,Spark 将仅在 "lines" 的相应分区上应用过滤操作来重新构建它。

3.Spark编程接口

类型方法
转换(Transformation)map(f:TU):RDD[T]RDD[U])map(f: T \Rightarrow U) : RDD[T] \Rightarrow RDD[U])
filter(f:TBool):RDD[T]RDD[T])filter(f: T \Rightarrow Bool) : RDD[T] \Rightarrow RDD[T])
flatMap(f:TSeq[U]:RDD[T]RDD[U])flatMap(f: T \Rightarrow Seq[U] : RDD[T] \Rightarrow RDD[U])
sample(fraction:Float):RDD[T]RDD[T]sample(fraction: Float) : RDD[T] \Rightarrow RDD[T]
groupByKey():RDD[(K,V)]RDD[(K,Seq[V])]groupByKey() : RDD[(K, V)] \Rightarrow RDD[(K, \text{Seq}[V])]
reduceByKey(f:(V,V)V):RDD[(K,V)]RDD[(K,V)]reduceByKey(f : (V, V) \Rightarrow V) : RDD[(K, V)] \Rightarrow RDD[(K, V)]
union():(RDD[T],RDD[T])RDD[T]union() : (RDD[T], RDD[T]) \Rightarrow RDD[T]
join():(RDD[(K,V)],RDD[(K,W)])RDD[(K,(V,W))]join() : (RDD[(K, V)], RDD[(K, W)]) \Rightarrow RDD[(K, (V, W))]
cogroup():(RDD[(K,V)],RDD[(K,W)])RDD[(K,(Seq[V],Seq[W]))]cogroup():(RDD[(K, V)], RDD[(K, W)]) \Rightarrow RDD[(K, (\text{Seq}[V], \text{Seq}[W]))]
crossProduct():(RDD[T],RDD[U])RDD[(T,U)]crossProduct() : (RDD[T], RDD[U]) \Rightarrow RDD[(T, U)]
mapValues(f:VW):RDD[(K,V)]RDD[(K,W)](Preservespartitioning)mapValues(f : V \rightarrow W) : RDD[(K, V)] \Rightarrow RDD[(K, W)] (Preserves partitioning)
sort(c:Comparator[K]):RDD[(K,V)]RDD[(K,V)]sort(c : \text{Comparator}[K]) : RDD[(K, V)] \Rightarrow RDD[(K, V)]
partitionBy(p:Partitioner[K]):RDD[(K,V)]RDD[(K,V)]partitionBy(p : \text{Partitioner}[K]) : RDD[(K, V)] \Rightarrow RDD[(K, V)]
动作(Actions)count():RDD[T]Longcount() : RDD[T] \Rightarrow Long
collect():RDD[T]Seq[T]collect() : RDD[T] \Rightarrow Seq[T]
reduce(f:(T,T)T):RDD[T]Treduce(f:(T,T) \Rightarrow T) : RDD[T] \Rightarrow T
lookup(k,K):RDD[(K,V)]Seq[V](Onhash/rangepartitionedRDDs)lookup(k, K) : RDD[(K,V)] \Rightarrow Seq[V](On hash/range partitioned RDDs)
save(path:String)save(path:String) : Outputs RDD to a storage system, e.g., HDFS

4.表示 RDD

在将 RDD 作为一种抽象提供时,其中一个挑战是选择一种能够在各种转换中实现跟踪血缘的表示形式。理想情况下,实现 RDD 的系统应该提供尽可能丰富的一组转换操作符(例如,表 2 中的那些),并让用户以任意方式组合它们。我们为 RDD 提出一种简单的基于图的表示形式,这种形式有助于实现这些目标。我们在 Spark 中使用了这种表示形式来支持广泛的转换,而无需为每个转换在调度器中添加特殊逻辑,这极大地简化了系统设计。

简而言之,我们建议通过一个通用接口来表示每个 RDD,该接口公开五部分信息:一组分区,它们是数据集的原子部分;一组对父 RDD 的依赖关系;一个基于其父级计算数据集的函数;以及关于其分区方案和数据放置的元数据。例如,代表一个 HDFS 文件的 RDD 对文件的每个块都有一个分区,并且知道每个块位于哪些机器上。同时,对这个 RDD 进行映射的结果具有相同的分区,但在计算其元素时将映射函数应用于父级的数据。我们在表 3 中总结了这个接口。

操作含义
partitions()返回一系列分区对象
prederedLocations(p)列出由于数据局部性可以更快地访问分区 p 的节点。
dependencies()返回一些列依赖
iterator(p, parentIters)根据其父分区的迭代器计算分区 p 的元素。
partitioner()返回指定 RDD 是哈希分区还是范围分区的元数据。

在设计接口时,如何表示 RDD 之间的依赖关系是一个有趣的问题。我们发现将依赖关系分为两种类型既充分又有用,即:

  • 窄依赖,其中父 RDD 的每个分区最多被子 RDD 的一个分区使用;
  • 宽依赖,其中多个子分区可能依赖于它。例如,映射导致窄依赖,而连接导致宽依赖(除非父级是哈希分区的)。图 4 展示了其他例子。
图4:宽依赖和窄依赖的例子。其中每个方框都是一个 RDD,分区显示为蓝色填充的矩形。
图4:宽依赖和窄依赖的例子。其中每个方框都是一个 RDD,分区显示为蓝色填充的矩形。

这种区分有两个原因是有用的。

  • 首先,窄依赖允许在一个集群节点上进行流水线执行,该节点可以计算所有的父分区。例如,可以逐个元素地在一个 RDD 上应用映射操作后再应用过滤操作。相比之下,宽依赖需要来自所有父分区的数据可用,并使用类似 MapReduce 的操作在节点之间进行数据混洗。
  • 其次,在窄依赖的情况下,节点故障后的恢复更加高效,因为只需要重新计算丢失的父分区,并且它们可以在不同的节点上并行重新计算。相比之下,在具有宽依赖的血缘关系图中,单个故障节点可能导致 RDD 的所有祖先中的某些分区丢失,需要完全重新执行。

RDD 的这个通用接口使得在 Spark 中实现大多数转换只需要不到 20 行代码成为可能。事实上,即使是新的 Spark 用户也在不了解调度器细节的情况下实现了新的转换(例如,采样和各种类型的连接)。我们在下面概述一些 RDD 的实现。

HDFS 文件:我们示例中的输入 RDD 一直是 HDFS 中的文件。对于这些 RDD,partitions方法为文件的每个块返回一个分区(在每个Partition对象中存储块的偏移量),preferredLocations方法给出块所在的节点,iterator方法读取该块。

map:在任何 RDD 上调用map会返回一个MappedRDD对象。这个对象与它的父 RDD 具有相同的分区和首选位置,但是在它的iterator方法中,将传递给map的函数应用于父 RDD 的记录。

union:对两个 RDD 调用union会返回一个 RDD,其分区是其父 RDD 分区的并集。每个子分区通过对相应父分区的窄依赖进行计算。

sample:采样与映射类似,不同之处在于 RDD 为每个分区存储一个随机数生成器种子,以便确定性地对父记录进行采样。

join:连接两个 RDD 可能会导致两个窄依赖(如果它们都使用相同的分区器进行哈希分区或范围分区)、两个宽依赖或者两者混合(如果一个父 RDD 有分区器而另一个没有)。在任何情况下,输出 RDD 都有一个分区器(要么从父 RDD 继承一个,要么是一个默认的哈希分区器)。

5.实现

我们用大约 14000 行 Scala 代码实现了 Spark。该系统在 Mesos 集群管理器上运行,允许它与 Hadoop、MPI 和其他应用程序共享资源。每个 Spark 程序作为一个单独的 Mesos 应用程序运行,有自己的驱动程序(主节点)和工作节点,这些应用程序之间的资源共享由 Mesos 处理。

Spark 可以使用 Hadoop 现有的输入插件 API 从任何 Hadoop 输入源(例如 HDFS 或 HBase)读取数据,并且在未修改的 Scala 版本上运行。

现在我们概述一下系统中几个在技术上比较有趣的部分:我们的作业调度器(5.1 节)、允许交互式使用的 Spark 解释器(5.2 节)、内存管理(5.3 节)以及对检查点的支持(5.4 节)。

5.1作业调度

Spark 的调度器使用了我们在第 4 节中描述的 RDD 表示形式。

总体而言,我们的调度器与 Dryad 类似,但它还会考虑持久化 RDD 的哪些分区在内存中可用。每当用户在一个 RDD 上运行一个动作(例如,计数或保存)时,调度器会检查该 RDD 的血缘关系图,以构建一个要执行的阶段 DAG(有向无环图),如图 5 所示。每个阶段都尽可能多地包含具有窄依赖关系的流水线式转换。阶段的边界是宽依赖关系所需的混洗操作,或者是任何已经计算好的可以短路父 RDD 计算的分区。然后,调度器启动任务来计算每个阶段中缺失的分区,直到计算出目标 RDD。

我们的调度器基于数据局部性使用延迟调度将任务分配到机器上。如果一个任务需要处理一个在节点内存中可用的分区,我们就将它发送到那个节点。否则,如果一个任务处理的分区所在的 RDD 提供了首选位置(例如,一个 HDFS 文件),我们就将它发送到那些位置。

对于宽依赖(即混洗依赖),我们目前在持有父分区的节点上具体化中间记录,以简化故障恢复,这很像 MapReduce 具体化映射输出。

如果一个任务失败,只要它所在阶段的父分区仍然可用,我们就会在另一个节点上重新运行它。如果某些阶段变得不可用(例如,因为混洗的 "映射端" 的一个输出丢失了),我们就会重新提交任务以并行计算缺失的分区。我们目前还不能容忍调度器故障,不过复制 RDD 的血缘关系图会很简单。

最后,虽然 Spark 中的所有计算目前都是响应在驱动程序中调用的动作而运行,但我们也在尝试让集群上的任务(例如,映射任务)调用查找操作,该操作通过键对哈希分区的 RDD 的元素提供随机访问。在这种情况下,如果所需的分区缺失,任务将需要告诉调度器去计算这个分区。

5.2 解释器集成

Scala 包含一个类似于 Ruby 和 Python 的交互式 shell。鉴于使用内存中数据可获得的低延迟,我们希望让用户从解释器中交互式地运行 Spark 以查询大数据集。

Scala 解释器通常的工作方式是为用户输入的每一行代码编译一个类,将其加载到 JVM 中,并在其上调用一个函数。这个类包含一个单例对象,该对象包含那一行上的变量或函数,并在一个初始化方法中运行那一行的代码。例如,如果用户输入 "var x = 5",接着输入 "println (x)",解释器会定义一个名为 "Line1" 的类,并使第二行编译为 "println (Line1.getInstance ().x)"。

在 Spark 中,我们对解释器做了两处修改:

  • 类传输:为了让工作节点获取为每一行创建的类的字节码,我们让解释器通过 HTTP 提供这些类。
  • 修改后的代码生成:通常,为每一行代码创建的单例对象是通过其对应类上的静态方法访问的。这意味着当我们序列化一个引用前一行定义的变量的闭包时,例如上面例子中的 "Line1.x",Java 不会遍历对象图来传输包裹着 "x" 的 "Line1" 实例。因此,工作节点将不会接收到 "x"。我们修改了代码生成逻辑,以直接引用每一行对象的实例。

图 6 展示了在我们做出修改后,解释器如何将用户输入的一组行转换为 Java 对象。 我们发现 Spark 解释器在处理作为我们研究的一部分而获得的大型跟踪数据以及探索存储在 HDFS 中的数据集时非常有用。我们还计划使用它来交互式地运行更高级的查询语言,例如 SQL。

5.3 内存管理

Spark 为持久化的 RDD 提供了三种存储选项:以反序列化的 Java 对象形式存储在内存中、以序列化数据形式存储在内存中以及存储在磁盘上。第一种选项提供了最快的性能,因为 Java 虚拟机可以原生地访问每个 RDD 元素。第二种选项在空间有限时让用户选择一种比 Java 对象图更节省内存的表示形式,但性能会有所降低。第三种选项对于太大而无法保存在内存中但每次使用时重新计算成本又很高的 RDD 很有用。

为了管理有限的可用内存,我们在 RDD 级别使用 LRU(最近最少使用)淘汰策略。当计算一个新的 RDD 分区但没有足够的空间来存储它时,我们会从最近最少访问的 RDD 中淘汰一个分区,除非这个被淘汰的 RDD 与新分区所在的 RDD 是同一个。在这种情况下,我们将旧分区保留在内存中,以防止来自同一个 RDD 的分区不断地被换入换出。这很重要,因为大多数操作会在整个 RDD 上运行任务,所以很有可能已经在内存中的分区在未来会被需要。到目前为止,我们发现这个默认策略在我们所有的应用中都运行良好,但我们也通过为每个 RDD 设置 "持久化优先级" 给用户提供了更多的控制。 最后,集群上的每个 Spark 实例目前都有自己独立的内存空间。在未来的工作中,我们计划研究通过一个统一的内存管理器在 Spark 的不同实例之间共享 RDD。

5.4 对检查点的支持

尽管在出现故障后总是可以使用血缘关系来恢复 RDD,但对于具有长血缘关系链的 RDD 来说,这种恢复可能很耗时。因此,将一些 RDD 检查点保存到稳定存储中会很有帮助。

一般来说,检查点对于具有包含宽依赖的长血缘关系图的 RDD 很有用,例如我们在 PageRank 示例(3.2.2 节)中的排名数据集。在这些情况下,集群中的一个节点故障可能会导致每个父 RDD 中的一些数据片段丢失,需要完全重新计算。相比之下,对于对稳定存储中的数据具有窄依赖的 RDD,例如我们在逻辑回归示例(3.2.1 节)中的点以及 PageRank 中的链接列表,检查点可能永远都不值得。如果一个节点发生故障,这些 RDD 中丢失的分区可以在其他节点上并行重新计算,成本只是复制整个 RDD 的一小部分。

Spark 目前提供了一个用于检查点的 API(一个持久化的 REPLICATE 标志),但将决定哪些数据进行检查点的选择留给用户。然而,我们也在研究如何执行自动检查点。因为我们的调度器知道每个数据集的大小以及首次计算它所花费的时间,所以它应该能够选择一组最优的 RDD 进行检查点,以最小化系统恢复时间。

最后,请注意,RDD 的只读性质使得它们比一般的共享内存更容易进行检查点。由于不需要考虑一致性,RDD 可以在后台写出,而不需要程序暂停或分布式快照方案。

PS

spark运算的详细解释

flatMap

在 Spark 中,flatMap 是一个常用的转换操作。它与 map 类似,但是有一个关键的不同之处:map 对每个输入元素都产生一个输出元素,而 flatMap 可以对每个输入元素产生零个或多个输出元素。最终,它会将所有的输出元素"扁平化"成一个单一的集合。

flatMap 的工作原理:

  • 输入的每个元素通过 flatMap 函数处理时,返回的可以是一个列表(或者其他集合类型)。
  • 然后,Spark 会把这些列表中的元素合并成一个新的 RDD。

举个例子: 假设我们有一个包含字符串的 RDD,我们希望把每个字符串拆成单个的单词。

from pyspark import SparkContext

# 初始化SparkContext
sc = SparkContext("local", "flatMap example")

# 原始RDD,包含一些句子
rdd = sc.parallelize(["Hello world", "Apache Spark", "flatMap example"])

# 使用flatMap将每个句子分割成单个单词
words_rdd = rdd.flatMap(lambda sentence: sentence.split(" "))

# 收集结果
words = words_rdd.collect()

print(words)

输出:

['Hello', 'world', 'Apache', 'Spark', 'flatMap', 'example']
Loading...