Lecture 36: MapReduce, Spark, Amdahl’s Law, Data-level Parallelism
Amdahl’s Law
阿姆达尔定律
增强效果E带来的加速比:
阿姆达尔定律描述了计算加速的潜在限制,特别是当你对一个系统进行部分优化时。计算增强效果E带来的加速比的公式为:
\[ \text{Speedup w/E} = \frac{\text{执行时间(无增强效果E)}}{\text{执行时间(有增强效果E)}} \]
其中,增强效果E仅加速任务的一部分,表示为(1-s),而s则代表未被加速的任务部分。假设加速部分的加速倍数为P,那么程序整体的加速比可以计算如下:
例子:
假设任务中有20%(即s = 0.2)是无法加速的部分,而80%(即1-s = 0.8)可以被加速,并且加速倍数为16。那么整个程序的加速比计算为:
\[ \text{Speedup} = \frac{1}{s + \frac{(1-s)}{P}} = \frac{1}{0.2 + \frac{0.8}{16}} = 4 \]
阿姆达尔定律的后果:
阿姆达尔定律揭示了并行化的潜力受到程序中串行部分s的限制。即使并行部分的加速倍数P趋向于无限大,程序的总体加速比仍然无法超过1/s。例如,如果程序中5%(s=0.05)是无法并行化的,那么即使并行部分的加速倍数无穷大,最高加速比也仅为20。这个定律提醒我们,在优化并行部分的同时,无法忽视串行部分对整体性能的制约。
阿姆达尔定律不仅适用于并行计算,也适用于任何部分优化的系统。例如,在软件开发中,优化某些代码路径的性能,最终可能对整个程序的影响有限,特别是如果非优化路径在整体执行时间中占据显著比例。
Request-Level and Data-Level Parallelism
请求级并行性 (RLP)
请求级并行性是现代互联网服务的重要特性。以下是RLP的关键点:
- 大量独立请求:
- 例如,在电子商务平台或搜索引擎中,每秒都会收到大量的独立用户请求。由于这些请求彼此独立,不需要共享数据或同步,因此可以很容易地分配到不同的服务器或计算节点进行并行处理。
- 易于划分的计算:
- 请求级并行性使得计算任务在单个请求内部和不同请求之间都可以被划分成独立的部分,从而可以在多个处理器或服务器上同时执行。这种并行性不仅提高了系统的响应速度,还增强了系统的可扩展性,使得处理海量请求成为可能。
在大型分布式系统中,请求级并行性尤其重要。它不仅能够提高单一系统的性能,还能通过集群或云计算平台扩展系统的处理能力,从而处理全球范围内的大量用户请求。例如,云计算平台通过动态分配计算资源来处理不同时间段的流量高峰,确保系统的高可用性和稳定性。
Google 查询服务架构
Google 的查询服务架构展示了如何通过分布式计算来高效处理大量的查询请求。这种架构包括多个组件,各自负责不同的任务,共同提高查询处理的效率和响应速度。
-
Web服务器:当用户发出查询请求时,Web服务器首先接收这些请求,并将其分发到多个后端服务器。Web服务器的任务是协调这些请求,并将最终结果返回给用户。
-
索引服务器:索引服务器存储经过处理的网页索引。它们的主要任务是在接收到查询请求后,迅速查找与查询相关的索引数据,并将匹配的结果返回给Web服务器。索引服务器通过使用优化的数据结构和算法来确保查询的快速响应。
-
文档服务器:这些服务器负责存储完整的网页内容。在索引服务器确定相关内容后,文档服务器从存储的网页中获取完整的内容,并将这些信息返回给Web服务器,最终交付给用户。
-
辅助组件:Google查询服务架构中还包括拼写检查器和广告服务器。拼写检查器负责识别和纠正用户输入的错误查询,广告服务器则负责根据查询内容返回相关广告,这些辅助组件进一步增强了用户体验和Google的商业模式。
这种架构体现了分布式计算的核心思想,即将复杂的任务拆分成多个独立的部分并行处理。这种方法不仅提高了系统的处理能力,还增强了系统的容错性和扩展性,确保能够应对海量用户的并发请求。
数据级并行性 (DLP)
数据级并行性(DLP)指的是同时对大量数据进行操作的能力。DLP在处理大规模数据集时尤为重要,因为它可以显著提高处理速度和效率。DLP有两种主要形式:
-
内存中的数据并行性:当需要处理的数据量很大时,可以将数据划分为多个独立的部分,并行处理。例如,对两个大型数组进行加法运算时,可以同时对不同的数组片段进行操作。
-
磁盘上的数据并行性:当数据存储在多个磁盘上时,可以并行访问这些磁盘上的数据。例如,在大规模文本搜索中,可以同时在多个磁盘上搜索不同的文档,这样可以显著缩短搜索时间。
MapReduce中的DLP:
MapReduce 是一种常见的数据级并行性实现技术,能够在多个服务器和磁盘上并行处理数据。它通过将任务分为“Map”和“Reduce”两个阶段,使得能够高效地处理大规模的数据集。例如,在处理社交媒体数据或电子商务交易日志时,MapReduce可以显著加快数据处理的速度和效率。
MapReduce
什么是 MapReduce?
MapReduce 是一种用于处理和生成大规模数据集的编程模型,旨在实现高扩展性和容错能力。它最早由Google提出,每天处理超过25 PB的数据。Hadoop 是 MapReduce 的开源实现,被广泛应用于各大互联网公司。
MapReduce 的应用场景:
- 在Google:
- 构建搜索引擎的索引。
- 聚类新闻文章以提供个性化推荐。
- 统计机器翻译的词频。
- 生成多层街道地图的数据。
- 在雅虎:
- 支持搜索的网络地图建设。
- 进行邮件系统的垃圾邮件检测。
- 在Facebook:
- 用于数据挖掘以改进用户体验。
- 优化广告投放策略。
- 检测并过滤垃圾邮件。
MapReduce 的核心在于其简单的编程接口和强大的并行处理能力,尤其适合大数据处理的场景。通过将任务拆分为小块并行处理,它能够有效地利用分布式系统的计算资源,并且具备高度的容错能力。
MapReduce 设计目标
MapReduce 旨在通过其设计实现高效的分布式数据处理。以下是其主要设计目标:
-
可扩展性:MapReduce 被设计为能够扩展到数千台机器和磁盘上,以处理超大规模的数据集。这种扩展性确保了系统可以应对不断增长的数据处理需求。
-
成本效益:MapReduce 通过使用廉价但不可靠的商用机器和网络,降低了硬件成本。为了应对硬件故障,MapReduce 实现了自动容错机制,确保数据处理的可靠性。此外,MapReduce 的简单编程模型降低了开发和维护的难度,减少了对高级程序员的需求。
MapReduce 的设计目标充分体现了分布式计算系统的优势。通过利用廉价硬件并实现容错机制,MapReduce 可以在不牺牲性能的前提下降低运营成本,同时保证系统的高可用性和可扩展性。
典型的 Hadoop 集群
Hadoop 是 MapReduce 的开源实现,广泛应用于处理大规模数据。典型的 Hadoop 集群配置如下:
- 集群配置:
- 每个机架(Rack)通常包含40个节点(Node),而整个集群可能包含1000到4000个节点。
- 机架内的带宽为1 Gbps,而机架之间的带宽则为8 Gbps,以确保高效的数据传输。
- 节点规格:
- 以雅虎的Terasort为例,每个节点配备8个2 GHz的核心、8 GB内存和4个硬盘,总存储容量约为4 TB。
Hadoop 集群的设计使得它能够处理和存储大量数据,同时保持高效的处理能力和成本效益。在大规模数据处理场景中,如搜索引擎、社交网络和电子商务平台,Hadoop 集群的这种设计使得它成为大数据处理的理想选择。
MapReduce 在 CS10 和 CS61A 课程中的应用
在 CS10 和 CS61A{,S} 课程中,MapReduce 模型通过简单易懂的例子向学生展示分布式计算的基本思想。例如,将 square
函数应用于一个列表中的每个元素,然后将结果相加,这个过程正是 MapReduce 的一个典型应用:
- Map:首先,将
square
函数应用于列表中的每个元素。比如,输入列表1, 20, 3, 10
通过square
函数后变为1, 400, 9, 100
。 - Reduce:接着,将所有经过
square
处理的结果相加,得到最终结果510
。
这个简单的例子清晰地展示了 MapReduce 的核心思想,即通过分布式的 Map 操作和后续的 Reduce 操作,将计算任务分解并并行处理,从而简化复杂问题。
MapReduce 编程模型
MapReduce 是一种强大的并行计算模型,其核心思想是处理一组键/值对 (key/value pairs)。在 MapReduce 编程模型中,开发者需要定义两个关键函数:
- Map 函数:
map(in_key, in_value) → list(interm_key, interm_value)
- 这个函数负责处理输入的键/值对,将其分割成多个片段,并生成一组中间键/值对。每个片段可以独立地分配给不同的 worker 进行处理,从而实现并行化。
- Reduce 函数:
reduce(interm_key, list(interm_value)) → list(out_value)
- 在 Reduce 步骤中,所有具有相同中间键的值将被组合处理,通常生成一个最终的输出值。这个步骤进一步汇总和处理 Map 步骤生成的中间数据。
// Mapper function
map(String input_key,
String input_value):
// input_key : document name (or line of text)
// input_value: document contents
for each word w in input value:
EmitIntermediate(w, "1");
// Reducer function
reduce(String output_key,
Iterator intermediate_values):
// output_key : a word
// output_values: a list of counts
int result = 0;
for each v in intermediate_values:
result += ParseInt(v);
Emit(AsString(result));
通过这种结构化的方法,MapReduce 模型能够有效处理大规模数据集,具有高度的并行性和容错性,尤其适合在分布式环境下运行。
MapReduce 词频统计示例
一个常见的 MapReduce 应用是词频统计,这个过程可以分为两个主要步骤:Map 和 Reduce。
- Map 步骤:
- 在这个步骤中,
Mapper
节点对每一个输入文档(或文本行)执行map
函数。对于文档中的每个单词,Map 函数会生成一个键值对(单词, 1)
。例如,对于文本 “I do learn”,输出将是("I", 1), ("do", 1), ("learn", 1)
。
- 在这个步骤中,
- Reduce 步骤:
- 接下来,
Reducer
节点会处理来自不同Mapper
节点的所有中间值,并对相同单词的出现次数进行汇总。例如,单词 “I” 的中间值可能为[1, 1]
,最终的输出为("I", 2)
。
- 接下来,
- 分布式文件系统 (DFS):
- 数据存储在分布式文件系统中,这使得 MapReduce 可以在多个节点上并行处理数据,从而显著提高处理效率。
MapReduce 词频统计图解
这个图解清晰地展示了 MapReduce 在词频统计过程中的数据流动:
- 输入阶段:
- 多个文件(如 file1, file2 等)被输入到多个
Mapper
节点。每个Mapper
节点处理输入文件,将每个单词映射为(单词, 1)
的形式。例如,file1 中的文本 “ah ah er” 被映射为ah:1, ah:1, er:1
。
- 多个文件(如 file1, file2 等)被输入到多个
- 中间结果阶段:
- 来自不同
Mapper
节点的输出被分组,相同的键(单词)被聚合在一起。例如,不同文件中的单词ah
被聚合为ah:1,1,1,1
。
- 来自不同
- 分组后的 Reduce 阶段:
- 聚合后的键值对被传递给
Reducer
节点,Reducer
节点对每个分组进行汇总计算,得出最终的词频结果。例如,单词ah
的最终输出为4
,表示它在所有文件中共出现了 4 次。
- 聚合后的键值对被传递给
通过这种流程,MapReduce 可以高效地处理和分析大规模数据,特别适合分布式计算环境。
MapReduce 的强大之处在于其能够自动处理分布式计算中的许多复杂问题,如数据分割、并行执行、数据传输和错误处理。这使得开发者能够专注于编写简单的 Map 和 Reduce 函数,而将复杂的并行计算细节交给 MapReduce 框架处理。尤其在处理像词频统计这种大规模文本分析任务时,MapReduce 的分布式计算能力能够显著提高处理效率和可扩展性,适用于互联网搜索、社交媒体分析等场景。
MapReduce 处理时间线
在 MapReduce 任务执行过程中,主节点(Master)负责协调和管理任务的分配,而工作节点(Worker)则具体执行任务。下图展示了一个典型的 MapReduce 任务执行流程的时间线:
- 任务分配:
- 主节点分配任务:主节点根据任务的输入数据,将 Map 和 Reduce 任务分配给多个工作节点。任务的分配通常基于数据的分区情况,以确保各个节点能并行处理数据。
- 工作节点开始执行:一旦分配完成,工作节点开始执行各自的 Map 或 Reduce 任务。
- Map 任务执行:
- 数据处理:每个工作节点在执行 Map 任务时,读取输入数据并应用 Map 函数,将数据处理成键值对。这个过程中,处理是高度并行化的,能够显著提高数据处理效率。
- 中间结果输出:处理后的键值对暂时存储在工作节点的本地存储中,等待下一阶段的处理。
- 数据混排(Shuffle):
- 中间数据的重新分配:当一个 Map 任务完成后,主节点会启动数据混排过程,将 Map 任务输出的中间数据重新分配给相应的 Reduce 任务。这一过程通常涉及大量的数据传输,是 MapReduce 的关键步骤之一。
- 准备 Reduce 任务:通过混排,将同一个键的所有中间结果聚集在一起,为 Reduce 任务做准备。
- Reduce 任务执行:
- 数据聚合和处理:当所有数据混排完成后,Reduce 任务开始执行。工作节点应用 Reduce 函数,将聚集的中间数据进行汇总、排序或进一步计算,并生成最终的输出结果。
- 结果写入存储:Reduce 任务的输出结果通常被写入分布式存储系统,如 HDFS(Hadoop Distributed File System),以供后续使用。
- 容错处理:
- 故障检测与任务重分配:MapReduce 系统具有内置的容错机制。当某个工作节点“死亡”或出现故障时,主节点会检测到并将该节点未完成的任务重新分配给其他工作节点。这种机制确保了即使在大规模分布式环境中,任务也能可靠完成。
这一机制通过分布式处理、大规模并行化以及容错设计,确保了 MapReduce 系统的高效性和可靠性。
MapReduce WordCount Java 代码
public static void main(String[] args) throws IOException {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(WCMap.class);
conf.setCombinerClass(WCReduce.class);
conf.setReducerClass(WCReduce.class);
conf.setInputPath(new Path(args[0]));
conf.setOutputPath(new Path(args[1]));
JobClient.runJob(conf);
}
public class WCMap extends MapReduceBase implements Mapper {
private static final IntWritable ONE = new IntWritable(1);
public void map(WritableComparable key, Writable value,
OutputCollector output,
Reporter reporter) throws IOException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
output.collect(new Text(itr.next()), ONE);
}
}
}
public class WCReduce extends MapReduceBase implements Reducer {
public void reduce(WritableComparable key, Iterator values,
OutputCollector output,
Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += ((IntWritable) values.next()).get();
}
output.collect(key, new IntWritable(sum));
}
}
这段 Java 代码展示了如何在 MapReduce 框架中实现一个简单的词频统计任务。代码分为三个关键部分:
- 主程序 (
main
):- 设置作业配置 (
JobConf
) 并指定 Map 和 Reduce 类。 - 设置输入输出路径,并启动作业。
- 设置作业配置 (
- Map 类 (
MapClass
):- 实现
Mapper
接口,负责对输入的文本数据进行处理。 - 对每个输入的文本行,使用
StringTokenizer
将其分割为单词,并将每个单词与值1
一起发射。
- 实现
- Reduce 类 (
ReduceClass
):- 实现
Reducer
接口,负责汇总相同单词的出现次数。 - 对接收到的每个键(单词),将所有值(次数)相加,输出最终的词频统计结果。
- 实现
这段代码展示了 MapReduce 的基本工作流程,尤其适用于处理大规模文本数据,能够高效地进行分布式计算和数据分析。
Apache Spark
Apache Spark 是一个用于大规模数据处理的快速且通用的计算引擎。
速度优势:
- 内存计算:Spark 在内存中执行计算,速度比 Hadoop MapReduce 快 100 倍。如果在磁盘上操作,Spark 的速度也比 MapReduce 快 10 倍。这种高效性来源于 Spark 的有向无环图(DAG)执行引擎,支持循环数据流和高效的内存计算。
易用性:
- 多语言支持:开发者可以使用 Java、Scala 或 Python 编写应用程序,这使得 Spark 具有广泛的适用性。
- 丰富的操作符:Spark 提供了超过 80 种高级操作符,使得开发并行应用程序变得简单且高效。开发者还可以直接从 Scala 和 Python shell 进行交互式开发,进一步提高开发效率。
通过这些特性,Apache Spark 成为现代大数据处理中的重要工具,广泛应用于需要高效并行处理的各种场景中。
相比传统的 Hadoop MapReduce,Spark 不仅在速度上有显著提升,还在灵活性和易用性上为开发者提供了更多的工具和选项。Spark 的内存计算能力使得它在需要频繁迭代和实时数据处理的应用中表现尤为出色。此外,Spark 提供的高层次 API(如 DataFrame 和 Dataset)使得复杂数据处理任务变得更加直观和易于管理,从而大大降低了开发难度。
Word Count in Spark’s Python API
使用 Spark 的 Python API 进行词频统计
在 Spark 中,使用 flatMap
、map
和 reduceByKey
这三个操作,可以轻松实现词频统计任务:
flatMap
操作:flatMap
用于将输入的每一行文本拆分成单词。对于每一行文本,flatMap
会生成一个包含所有单词的列表。因此,如果输入文本有多行,flatMap
会输出一个由所有单词组成的平展列表。
map
操作:map
将每个单词映射为键值对(word, 1)
,其中word
是单词本身,1
表示该单词出现了一次。这个操作会为每个单词生成一个(key, value)
对。
reduceByKey
操作:reduceByKey
用于对具有相同键的值进行汇总。对于每个单词,reduceByKey
将所有出现次数相加,得出每个单词的总出现次数。
这种方法展示了 Spark 的简洁性和高效性,与传统的 Hadoop MapReduce 相比,Spark 通过内存计算和简化的 API 使得代码更简洁,同时也显著提高了执行速度。
进一步扩展:
通过 Spark 的 Python API,可以非常方便地扩展这个例子。例如,你可以通过增加更多的操作,如过滤特定的单词或排序结果,来实现更复杂的数据处理任务。此外,Spark 的惰性求值机制(Lazy Evaluation)确保了只有在真正需要计算结果时才会触发计算,这样可以优化执行计划,提高效率。
flatMap in Spark’s Python API
Spark 的 Python API 中的 flatMap
操作
flatMap
是 Spark 中一个非常强大的操作,它与 map
操作类似,但有一个关键的区别:map
为每个输入元素生成一个输出元素,而 flatMap
可以为每个输入元素生成零个、一个或多个输出元素。
示例1:
假设我们有一个包含范围 [0, 1, 2, 3, 4]
的 RDD,我们可以使用 flatMap
将这个范围内的每个数字扩展成一个包含该数字及其前后数字的列表。例如:
rdd = sc.parallelize(range(5))
result = rdd.flatMap(lambda x: range(x, x+3)).collect()
这将输出 [0, 1, 2, 1, 2, 3, 2, 3, 4, 3, 4, 5, 4, 5, 6]
,因为 flatMap
将每个输入元素映射为 [x, x+1, x+2]
这样的三元组。
flatMap
非常适合用于处理需要展开数据的场景,例如文本解析或处理嵌套数据结构。在数据处理的管道中,flatMap
可以有效地将复杂的结构展平,使后续的操作更加简洁和高效。
示例2:
在这张图中,展示了如何使用 Spark 的 Python API 对文本文件进行单词计数:
- 文件内容:
- 输入文件
file.txt
中包含了多行文本,例如 “ah ah er”, “ah”, “if or” 等等。
- 输入文件
- 读取文件:
- 首先使用
sc.textFile("file.txt")
读取文本文件,并将其转换为 RDD,存储在变量W
中。
- 首先使用
- 使用
flatMap
解析文本:W.flatMap(lambda line: line.split()).collect()
:- 这一行代码使用
flatMap
操作将每一行文本按空格拆分为单词列表。 - 结果会生成一个包含所有单词的扁平化列表。
- 这一行代码使用
- 将单词映射为键值对:
W.flatMap(lambda line: line.split()).map(lambda word: (word,1)).collect()
:- 这里将每个单词映射为一个
(word, 1)
的键值对,表示每个单词出现一次。
- 这里将每个单词映射为一个
- 使用
reduceByKey
进行单词计数:W.flatMap(lambda line: line.split()).map(lambda word: (word,1)).reduceByKey(lambda a,b: a+b).collect()
:- 最后,
reduceByKey
操作将具有相同键(单词)的值(出现次数)相加,得到每个单词在整个文本中的总计数。
- 最后,
Parallel? Let’s sanity-check…
验证 Spark 的并行计算能力
为了验证 Spark 的并行计算能力,可以通过一个简单的例子来进行测试:
- 定义一个耗时的计算任务:
- 假设我们有一个需要消耗大量计算资源的任务,例如计算某个数的阶乘或执行复杂的数学运算。
- 使用
sc.parallelize
将任务分布到多个节点:- 我们可以将任务划分为多个子任务,通过
sc.parallelize
方法将这些子任务分布到多个计算节点上并行执行。
- 我们可以将任务划分为多个子任务,通过
代码示例:
import time
def crunch(x):
time.sleep(1) # 模拟一个耗时操作
return x * x
rdd = sc.parallelize(range(10), numSlices=10)
result = rdd.map(crunch).collect()
在这个例子中,sc.parallelize
将任务分成 10 个部分,分配到 10 个节点上并行执行。结果显示相比于串行执行,计算时间显著减少。
这个例子展示了 Spark 的核心优势之一:并行计算能力。通过将任务分割并行执行,Spark 可以大幅提升计算效率,尤其适用于处理大规模数据集或计算密集型任务。开发者可以利用 Spark 的并行计算框架,在分布式环境中快速处理和分析数据。
结论
并行计算的重要性和 MapReduce 的作用
- 第四个大想法:并行计算:
- 并行计算在大规模数据处理中至关重要。通过将任务分解并行执行,可以显著提高处理速度,适应现代计算对高性能的需求。
- 阿姆达尔定律的限制:
- 即使并行计算可以显著提升性能,性能提升仍受到程序中串行部分的限制。阿姆达尔定律指出,串行部分越大,整体加速效果越有限。因此,在设计并行程序时,尽量减少串行部分是关键。
- MapReduce 作为抽象模型:
- MapReduce 提供了一个强大的抽象模型,适合大规模分布式系统的编程。它隐藏了底层的复杂性,如机器故障和数据传输,使开发者能够专注于核心业务逻辑。
- Spark 的进一步优化:
- Spark 在 MapReduce 的基础上进一步优化,通过内存计算和惰性求值机制,显著提升了数据处理的效率。Spark 的灵活性和高性能使其成为现代大数据处理的重要工具。
总结:
并行计算和分布式处理是现代数据处理的核心技术。理解并利用这些技术,可以帮助开发者在大规模数据分析中取得显著的效率提升。通过使用工具如 MapReduce 和 Spark,可以在处理大数据时保持高效、可靠,并应对复杂的计算挑战。