Skip to content

Latest commit

 

History

History
1039 lines (609 loc) · 114 KB

ch10.md

File metadata and controls

1039 lines (609 loc) · 114 KB

10. 批处理

带有太强个人色彩的系统无法成功。当第一版健壮的设计完成时,不同的人们以自己的方式来测试时,真正的考验才开始。

——高德纳


[TOC]

在本书的前两部分中,我们讨论了很多关于请求(requests)和查询(queries)以及相应的响应(response)或结果(results)。在许多现代数据系统中都假设采用这种数据处理方式:你要求某些东西,或者发送指令,一段时间后(希望)系统会给你一个答案。数据库,缓存,搜索索引,Web服务器以及其他许多系统都以这种方式工作。

在这类**在线(online)**系统中,无论是浏览器请求页面还是调用远程API的服务,我们通常都假设请求是由人类用户触发的,且用户正在等待响应。他们不必等太久,所以我们非常重视这些系统的响应时间(参阅“描述性能”)。

Web和越来越多的基于HTTP/REST的API使交互的请求/响应风格变得如此普遍,以至于很容易将其视为理所当然。但我们应该记住,这不是构建系统的唯一方式,其他方法也有其优点。我们来区分三种不同类型的系统:

服务(在线系统)

服务等待客户的请求或指令到达。当收到一个,服务试图尽快处理它,并发回一个响应。响应时间通常是服务性能的主要衡量指标,可用性通常非常重要(如果客户端无法访问服务,用户可能会收到错误消息)。

批处理系统(离线系统)

一个批处理系统需要大量的输入数据,运行一个工作来处理它,并产生一些输出数据。工作往往需要一段时间(从几分钟到几天),所以通常不会有用户等待工作完成。相反,批量作业通常会定期运行(例如,每天一次)。批处理作业的主要性能衡量标准通常是吞吐量(通过特定大小的输入数据集所需的时间)。我们将在本章中讨论批处理。

流处理系统(近实时系统)

流处理是在线和离线/批处理之间的一个地方(所以有时候被称为近实时或近线处理)。像批处理系统一样,流处理器消耗输入并产生输出(而不是响应请求)。但是,流式作业在事件发生后不久就会对事件进行操作,而批处理作业则使用固定的一组输入数据进行操作。这种差异使流处理系统比等效的批处理系统具有更低的延迟。由于流处理基于批处理,我们将在第11章讨论它。

正如我们将在本章中看到的那样,批量处理是构建可靠,可扩展和可维护应用程序的重要组成部分。例如,2004年发布的批处理算法Map-Reduce(可能过热地)被称为“使得Google具有如此大规模可扩展性的算法”【2】。随后在各种开源数据系统中实现,包括Hadoop,CouchDB和MongoDB。

与多年前为数据仓库开发的并行处理系统【3,4】相比,MapReduce是一个相当低级别的编程模型,但它对于在商用硬件上的处理规模上迈出了重要的一步。。虽然MapReduce的重要性正在下降【5】,但它仍然值得理解,因为它提供了一副批处理为什么,以及如何有用的清晰画面。

实际上,批处理是一种非常古老的计算形式。早在可编程数字计算机诞生之前,打孔卡制表机(例如1890年美国人口普查【6】中使用的霍尔里斯机)实现了半机械化的批处理形式,以计算来自大量输入的汇总统计量。 Map-Reduce与1940年代和1950年代广泛用于商业数据处理的机电IBM卡片分类机器有着惊人的相似之处【7】。像往常一样,历史总是在不断重复自己。

在本章中,我们将看看MapReduce和其他一些批处理算法和框架,并探讨它们在现代数据系统中的使用方式。但首先,我们将从如何使用标准Unix工具进行数据处理开始。即使你已经熟悉了它们,也值得重温一下Unix哲学,因为从Unix的想法和经验教训能转移到大规模,异构的分布式数据系统。

使用Unix工具的批处理

我们从一个简单的例子开始。假设你有一台Web服务器,每次提供请求时都会在日志文件中附加一行。例如,使用nginx默认访问日志格式,日志的一行可能如下所示:

216.58.210.78 - - [27/Feb/2015:17:55:11 +0000] "GET /css/typography.css HTTP/1.1" 
200 3377 "http://martin.kleppmann.com/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_5) 
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/40.0.2214.115 Safari/537.36"

(实际上这只是一行,为了可读性拆分为多行)。这一行中有很多信息。为了解释它,你需要看一看日志格式的定义,如下所示::

 $remote_addr - $remote_user [$time_local] "$request"
 $status $body_bytes_sent "$http_referer" "$http_user_agent"

因此,日志的这一行表明,在2015-02-27 17:55:11 UTC,服务器从客户端IP地址216.58.210.78接收到文件/css/typography.css的请求。用户没有被认证,所以$remote_user被设置为连字符(- )。响应状态是200(即请求成功),响应的大小是3377字节。网页浏览器是Chrome 40,并且它加载了该文件,因为它是在URL http://martin.kleppmann.com/的页面中引用的。

分析简单日志

各种工具可以把这些日志文件,并产生漂亮的报告有关你的网站流量,但出于练习的目的,让我们使用基本的Unix工具自己写一个。 例如,假设你想在你的网站上找到五个最受欢迎的网页。 可以在Unix shell中这样做:1

cat /var/log/nginx/access.log | #1
	awk '{print $7}' | #2
	sort             | #3
	uniq -c          | #4
	sort -r -n       | #5
	head -n 5          #6
  1. 读取日志文件
  2. 将每一行按空格分割成不同的字段,每行只输出第七个这样的字段,恰好是请求的URL。在我们的示例行中,这个请求URL是/css/typography.css
  3. 按字母顺序排列请求的URL列表。如果某个URL被请求过n次,那么排序后,该文件将包含连续n次重复的URL。
  4. uniq命令通过检查两条相邻的行是否相同来过滤掉其输入中的重复行。 -c选项告诉它也输出一个计数器:对于每个不同的URL,它会报告输入中出现该URL的次数。
  5. 第二种排序按每行起始处的数字(-n)排序,这是请求URL的次数。然后以反向(-r)顺序返回结果,即首先以最大的数字返回结果。
  6. 最后,头只输出输入的前五行(-n 5),并丢弃其余的。该系列命令的输出如下所示:
    4189 /favicon.ico
    3631 /2013/05/24/improving-security-of-ssh-private-keys.html
    2124 /2012/12/05/schema-evolution-in-avro-protocol-buffers-thrift.html
    1369 /
     915 /css/typography.css

尽管如果你不熟悉Unix工具,上面的命令行可能看起来有点模糊,但是它非常强大。它将在几秒钟内处理千兆字节的日志文件,你可以轻松修改分析以适应你的需求。例如,如果要从报告中省略CSS文件,请将awk参数更改为'$7 !~ /\.css$/ {print $7}'等等。

本书中没有空间来详细探索Unix工具,但是非常值得学习。令人惊讶的是,使用awk,sed,grep,sort,uniq和xargs的组合,可以在几分钟内完成许多数据分析,并且它们的表现令人惊讶地很好【8】。

命令链与自定义程序

除了Unix命令链,你还可以写一个简单的程序来做同样的事情。例如在Ruby中,它可能看起来像这样:

counts = Hash.new(0)         # 1
File.open('/var/log/nginx/access.log') do |file| 
    file.each do |line|
        url = line.split【6】  # 2
        counts[url] += 1     # 3
    end
end

top5 = counts.map{|url, count| [count, url] }.sort.reverse[0...5] # 4
top5.each{|count, url| puts "#{count} #{url}" }                   # 5
  1. counts是一个哈希表,保存了我们浏览每个URL次数的计数器,计数器默认为零。
  2. 对日志的每一行,从第七个空格分隔的字段提取URL(这里的数组索引是6,因为Ruby的数组索引从零开始)。
  3. 增加日志当前行中URL的计数器。
  4. 按计数器值(降序)对散列表内容进行排序,并取前五位。
  5. 打印出前五个条目。

这个程序并不像Unix管道那样简洁,但是它的可读性很强,你喜欢哪一个属于口味的问题。但两者除了表面上的差异之外,执行流程也有很大差异,如果你在大文件上运行此分析,则会变得明显。

排序与内存中的聚合

Ruby脚本保存一个URL的内存哈希表,其中每个URL映射到它已经被看到的次数。 Unix流水线的例子没有这样一个哈希表,而是依赖于对一个URL列表进行排序,在这个URL列表中,同一个URL的多个发生只是简单的重复。

哪种方法更好?这取决于你有多少个不同的网址。对于大多数中小型网站,你可能可以适应所有不同的URL,并且可以为每个网址(例如1GB内存)提供一个计数器。在此示例中,作业的工作集(作业需要随机访问的内存量)仅取决于不同URL的数量:如果单个URL有一百万个日志条目,则散列中所需的空间表仍然只有一个URL加上计数器的大小。如果这个工作集足够小,那么内存散列表工作正常,甚至在笔记本电脑上也是如此。

另一方面,如果作业的工作集大于可用内存,则排序方法的优点是可以高效地使用磁盘。这与我们在第74页的“SSTables和LSM树”中讨论过的原理是一样的:数据块可以在内存中排序并作为段文件写入磁盘,然后多个排序的段可以合并为一个更大的排序文件。 归并排序具有在磁盘上运行良好的顺序访问模式。 (请记住,优化为顺序I/O是第3章中反复出现的主题。这里再次出现相同的模式。)

GNU Coreutils(Linux)中的排序实用程序通过溢出到磁盘自动处理大于内存的数据集,并自动并行排序跨多个CPU核心【9】。这意味着我们之前看到的简单的Unix命令链很容易扩展到大数据集,而不会耗尽内存。瓶颈可能是从磁盘读取输入文件的速度。

Unix哲学

我们可以非常容易地使用前一个例子中的一系列命令来分析日志文件,这并非巧合:事实上,这实际上是Unix的关键设计思想之一,且它今天仍然令人惊讶地关联。让我们更深入地研究一下,以便从Unix中借鉴一些想法【10】。

Unix管道的发明者道格·麦克罗伊(Doug McIlroy)在1964年首先描述了这种情况【11】:“当需要以另一种方式处理数据时,我们应该有一些连接程序的方法,比如[a] 。这也是I/O的方式。“管道类比困难了,连接程序和管道的想法成为了现在被称为Unix哲学的一部分 —— 一套在开发者中流行的设计原则。 Unix哲学在1978年描述如下【12,13】:

  1. 让每个程序都做好一件事。要做一件新的工作,写一个新程序,而不是通过添加“功能”让老程序复杂化。
  2. 期待每个程序的输出成为另一个程序的输入。不要将无关信息混入输出。避免使用严格的列数据或二进制输入格式。不要坚持交互式输入。
  3. 设计和构建软件,甚至是操作系统,要尽早尝试,最好在几周内完成。不要犹豫,扔掉笨拙的部分,重建它们。
  4. 优先使用工具,而不是不熟练的帮助来减轻编程任务,即使必须曲线救国编写工具,并期望在用完后扔掉大部分。

这种方法 —— 自动化,快速原型设计,迭代式迭代,实验友好,将大型项目分解成可管理的块 —— 听起来非常像今天的敏捷开发和DevOps运动。奇怪的是,四十年来变化不大。

排序工具是一个很好的例子。它可以说是一个比大多数编程语言在其标准库(不会溢出到磁盘并且不使用多线程,即使是有益的)中更好的排序实现。然而,单独使用sort 几乎没什么用。它只能与其他Unix工具(如uniq)结合使用。

像bash这样的Unix shell可以让我们轻松地将这些小程序组合成令人惊讶的强大数据处理作业。尽管这些程序中有很多是由不同人群编写的,但它们可以灵活地结合在一起。 Unix如何实现这种可组合性?

统一的接口

如果你希望一个程序的输出成为另一个程序的输入,那意味着这些程序必须使用相同的数据格式 —— 换句话说,一个兼容的接口。如果你希望能够将任何程序的输出连接到任何程序的输入,那意味着所有程序必须使用相同的I/O接口。

在Unix中,这种接口是一个文件(file)(更准确地说,是一个文件描述符(file descriptor))。一个文件只是一串有序的字节序列。因为这是一个非常简单的接口,所以可以使用相同的接口来表示许多不同的东西:文件系统上的真实文件,到另一个进程(Unix套接字,stdin,stdout)的通信通道,设备驱动程序(比如/dev/audio/dev/lp0),表示TCP连接的套接字等等。很容易将这些视为理所当然的事情,但实际上这是非常出色的设计:这些非常不同的事物可以共享一个统一的接口,所以它们可以很容易地连接在一起2

按照惯例,许多(但不是全部)Unix程序将这个字节序列视为ASCII文本。我们的日志分析示例使用了这个事实:awk,sort,uniq和head都将它们的输入文件视为由\n(换行符,ASCII 0x0A)字符分隔的记录列表。 \n的选择是任意的 —— 可以说,ASCII记录分隔符0x1E本来就是一个更好的选择,因为它是为了这个目的而设计的【14】,但是无论如何,所有这些程序都使用相同的记录分隔符允许它们互操作。

每个记录(即一行输入)的解析更加模糊。 Unix工具通常通过空白或制表符将行分割成字段,但也使用CSV(逗号分隔),管道分隔和其他编码。即使像xargs这样一个相当简单的工具也有六个命令行选项,用于指定如何解析输入。

ASCII文本的统一接口主要工作,但它不是很漂亮:我们的日志分析示例使用{print $7}来提取网址,这样可读性不是很好。在理想的世界中,这可能是{print $request_url}或类似的东西。我们稍后会回到这个想法。

尽管几十年后还不够完美,但统一的Unix接口仍然是非常出色的设计。与Unix工具一样,软件的交互操作和编写并不是很多,你不能通过自定义分析工具轻松地将电子邮件帐户的内容和在线购物历史记录传送到电子表格中,并将结果发布到社交网络或维基。今天,像Unix工具一样流畅地运行程序是一个例外,而不是规范。

即使是具有**相同数据模型(same data model)**的数据库,将数据从一种导到另一种也并不容易。缺乏整合导致了数据的巴尔干化3

逻辑和布线相分离

Unix工具的另一个特点是使用标准输入(stdin)和标准输出(stdout)。如果你运行一个程序,而不指定任何其他的东西,标准输入来自键盘,标准输出指向屏幕。但是,你也可以从文件输入和/或将输出重定向到文件。管道允许你将一个进程的标准输出附加到另一个进程的标准输入(具有小内存缓冲区,而不需要将整个中间数据流写入磁盘)。

程序仍然可以直接读取和写入文件,但如果程序不担心特定的文件路径,只使用标准输入和标准输出,则Unix方法效果最好。这允许shell用户以任何他们想要的方式连接输入和输出;该程序不知道或不关心输入来自哪里以及输出到哪里。 (人们可以说这是一种松耦合(loose coupling)晚期绑定(late binding)【15】或控制反转(inversion of control)【16】)。将输入/输出布线与程序逻辑分开,可以将小工具组合成更大的系统。

你甚至可以编写自己的程序,并将它们与操作系统提供的工具组合在一起。你的程序只需要从标准输入读取输入,并将输出写入标准输出,并且可以参与数据处理流水线。在日志分析示例中,你可以编写一个工具将用户代理字符串转换为更灵敏的浏览器标识符,或者将IP地址转换为国家代码的工具,并将其插入管道。排序程序并不关心它是否与操作系统的另一部分或者你写的程序通信。

但是,使用stdin和stdout可以做什么是有限的。需要多个输入或输出的程序是可能的但棘手的。如果程序直接打开文件进行读取和写入,或者将另一个程序作为子进程启动,或者打开网络连接,则无法将程序的输出传输到网络连接中【17,18】4 。 I/O由程序本身连接。它仍然可以配置(例如通过命令行选项),但是减少了在Shell中连接输入和输出的灵活性。

透明度和实验

使Unix工具如此成功的部分原因是它们使得查看正在发生的事情变得非常容易:

  • Unix命令的输入文件通常被视为不可变的。这意味着你可以随意运行命令,尝试各种命令行选项,而不会损坏输入文件。

  • 你可以在任何时候结束管道,将管道输出到less,然后查看它是否具有预期的形式。这种检查能力对调试非常有用。

  • 你可以将一个流水线阶段的输出写入文件,并将该文件用作下一阶段的输入。这使你可以重新启动后面的阶段,而无需重新运行整个管道。

因此,与关系数据库的查询优化器相比,即使Unix工具非常简单,工具简单,但仍然非常有用,特别是对于实验而言。

然而,Unix工具的最大局限在于它们只能在一台机器上运行 —— 而Hadoop这样的工具即为此而生。

MapReduce和分布式文件系统

MapReduce有点像Unix工具,但分布在数千台机器上。像Unix工具一样,这是一个相当直接的,蛮力的,但却是令人惊讶的有效工具。一个MapReduce作业可以和一个Unix进程相媲美:它需要一个或多个输入,并产生一个或多个输出。

和大多数Unix工具一样,运行MapReduce作业通常不会修改输入,除了生成输出外没有任何副作用。输出文件以连续的方式写入一次(一旦写入文件,不会修改任何现有的文件部分)。

虽然Unix工具使用stdin和stdout作为输入和输出,但MapReduce作业在分布式文件系统上读写文件。在Hadoop的Map-Reduce实现中,该文件系统被称为HDFS(Hadoop分布式文件系统),一个开源的重新实现Google文件系统(GFS)【19】。

除HDFS外,还有各种其他分布式文件系统,如GlusterFS和Quantcast File System(QFS)【20】。诸如Amazon S3,Azure Blob存储和OpenStack Swift 【21】等对象存储服务在很多方面都是相似的5。在本章中,我们将主要使用HDFS作为示例,但是这些原则适用于任何分布式文件系统。

与网络连接存储(NAS)和存储区域网络(SAN)架构的共享磁盘方法相比,HDFS基于无共享原则(参见第二部分的介绍)。共享磁盘存储由集中式存储设备实现,通常使用定制硬件和专用网络基础设施(如光纤通道)。另一方面,无共享方法不需要特殊的硬件,只需要通过传统数据中心网络连接的计算机。

HDFS包含在每台机器上运行的守护进程,暴露一个允许其他节点访问存储在该机器上的文件的网络服务(假设数据中心中的每台通用计算机都附带有一些磁盘)。名为NameNode的中央服务器会跟踪哪个文件块存储在哪台机器上。因此,HDFS在概念上创建一个可以使用运行守护进程的所有机器的磁盘上的空间的大文件系统。

为了容忍机器和磁盘故障,文件块被复制到多台机器上。复制可能意味着多个机器上的相同数据的多个副本,如第5章中所述,或者像Reed-Solomon代码这样的擦除编码方案,它允许以比完全复制更低的存储开销恢复丢失的数据【20,22】。这些技术与RAID相似,可以在连接到同一台机器的多个磁盘上提供冗余;区别在于在分布式文件系统中,文件访问和复制是在传统的数据中心网络上完成的,没有特殊的硬件。

HDFS已经很好地扩展了:在撰写本文时,最大的HDFS部署运行在成千上万台机器上,总存储容量达数百PB【23】。如此大的规模已经变得可行,因为使用商品硬件和开源软件的HDFS上的数据存储和访问成本远低于专用存储设备上的同等容量【24】。

MapReduce作业执行

MapReduce是一个编程框架,你可以使用它编写代码来处理HDFS等分布式文件系统中的大型数据集。理解它的最简单方法是参考“简单日志分析”中的Web服务器日志分析示例。MapReduce中的数据处理模式与此示例非常相似:

  1. 读取一组输入文件,并将其分解成记录。在Web服务器日志示例中,每条记录都是日志中的一行(即\n是记录分隔符)。
  2. 调用Mapper函数从每个输入记录中提取一个键和值。在前面的例子中,mapper函数是awk'{print $7}':它提取URL($7)作为关键字,并将值保留为空。
  3. 按键排序所有的键值对。在日志示例中,这由第一个排序命令完成。
  4. 调用reducer函数遍历排序后的键值对。如果同一个键出现多次,排序使它们在列表中相邻,所以很容易组合这些值而不必在内存中保留很多状态。在前面的例子中,reducer是由uniq -c命令实现的,该命令使用相同的键来统计相邻记录的数量。

这四个步骤可以由一个MapReduce作业执行。步骤2(Map)和4(Reduce)是你编写自定义数据处理代码的地方。步骤1(将文件分解成记录)由输入格式解析器处理。步骤3中的排序步骤隐含在MapReduce中 —— 你不必编写它,因为Mapper的输出始终在送往reducer之前进行排序。

要创建MapReduce作业,你需要实现两个回调函数,mapper和reducer,其行为如下(参阅“MapReduce查询”):

Mapper

每个输入记录都会调用一次Mapper,其工作是从输入记录中提取键和值。对于每个输入,它可以生成任意数量的键值对(包括None)。它不会保留从一个输入记录到下一个记录的任何状态,因此每个记录都是独立处理的。

Reducer MapReduce框架采用由Mapper生成的键值对,收集属于同一个键的所有值,并使用迭代器调用reducer以调用该值集合。 Reducer可以产生输出记录(例如相同URL的出现次数)。

在Web服务器日志示例中,我们在第5步中有第二个排序命令,它按请求数对URL进行排序。在MapReduce中,如果你需要第二个排序阶段,则可以通过编写第二个MapReduce作业并将第一个作业的输出用作第二个作业的输入来实现它。这样看来,Mapper的作用是将数据放入一个适合排序的表单中,并且Reducer的作用是处理已排序的数据。

分布式执行MapReduce

Unix命令管道的主要区别在于,MapReduce可以在多台机器上并行执行计算,而无需编写代码来明确处理并行性。Mapper和Reducer一次只能处理一条记录;他们不需要知道他们的输入来自哪里或者输出什么地方,所以框架可以处理机器之间移动数据的复杂性。

在分布式计算中可以使用标准的Unix工具作为Mapper和Reducer【25】,但更常见的是,它们被实现为传统编程语言的函数。在Hadoop MapReduce中,Mapper和Reducer都是实现特定接口的Java类。在MongoDB和CouchDB中,Mapper和Reducer都是JavaScript函数(参阅“MapReduce查询”)。

图10-1显示了Hadoop MapReduce作业中的数据流。其并行化基于分区(参见第6章):作业的输入通常是HDFS中的一个目录,输入目录中的每个文件或文件块都被认为是一个单独的分区,可以单独处理map任务(图10-1中的m1,m2和m3标记)。

每个输入文件的大小通常是数百兆字节。 MapReduce调度器(图中未显示)试图在其中一台存储输入文件副本的机器上运行每个Mapper,只要该机器有足够的备用RAM和CPU资源来运行映射任务【26】。这个原则被称为将数据放在数据附近【27】:它节省了通过网络复制输入文件,减少网络负载和增加局部性。

图10-1 具有三个Mapper和三个Reducer的MapReduce任务

在大多数情况下,应该在映射任务中运行的应用程序代码在分配运行它的任务的计算机上还不存在,所以MapReduce框架首先复制代码(例如Java程序中的JAR文件)到适当的机器。然后启动Map任务并开始读取输入文件,一次将一条记录传递给Mapper回调。Mapper的输出由键值对组成。

计算的减少方面也被分割。虽然Map任务的数量由输入文件块的数量决定,但Reducer的任务的数量是由作业作者配置的(它可以不同于Map任务的数量)。为了确保具有相同键的所有键值对在相同的Reducer处结束,框架使用键的散列值来确定哪个减少的任务应该接收到特定的键值对(参见“按键散列分区”))。

键值对必须进行排序,但数据集可能太大,无法在单台机器上使用常规排序算法进行排序。相反,分类是分阶段进行的。首先,每个映射任务都基于键的散列,通过Reducer分割其输出。这些分区中的每一个都被写入映射程序本地磁盘上的已排序文件,使用的技术与我们在“SSTables与LSM树”中讨论的类似。

只要Mapper完成读取输入文件并写入其排序后的输出文件,MapReduce调度器就会通知减速器他们可以从该Mapper开始获取输出文件。减法器连接到每个Mapper,并为其分区下载排序后的键值对的文件。通过简化,分类和将数据分区从Mapper复制到Reducer的过程被称为混洗(shuffle)【26】(一个令人困惑的术语 —— 不像洗牌一样,在MapReduce中没有随机性)。

reduce任务从Mapper获取文件并将它们合并在一起,并保存排序顺序。因此,如果不同的Mapper使用相同的键生成记录,则它们将在合并的Reducer输入中相邻。

使用一个键和一个迭代器调用reducer,迭代器使用相同的键(在某些情况下可能不是全部适合内存)逐步扫描所有记录。 Reducer可以使用任意逻辑来处理这些记录,并且可以生成任意数量的输出记录。这些输出记录写入到分布式文件系统上的文件(通常是运行reducer的机器的本地磁盘上的一个副本,其他机器上的副本)。

MapReduce工作流

单个MapReduce作业可以解决的问题范围有限。参阅日志分析示例,一个MapReduce作业可以确定每个URL的页面浏览次数,但不是最常用的URL,因为这需要第二轮排序。

因此,将MapReduce作业链接到工作流中是非常常见的,例如,一个作业的输出成为下一个作业的输入。 Hadoop Map-Reduce框架对工作流程没有特别的支持,所以这个链接是通过目录名隐含完成的:第一个作业必须被配置为将其输出写入HDFS中的指定目录,第二个作业必须是配置为读取与其输入相同的目录名称。从MapReduce框架的角度来看,他们是两个独立的工作。

因此,被链接的MapReduce作业不如Unix命令的流水线(它直接将一个进程的输出作为输入传递给另一个进程,只使用一个小的内存缓冲区),更像是一系列命令,其中每个命令的输出写入临时文件,下一个命令从临时文件中读取。这种设计有利有弊,我们将在“物化中间状态”中讨论。

当作业成功完成时,批处理作业的输出仅被视为有效(MapReduce丢弃失败作业的部分输出)。因此,工作流程中的一项工作只有在先前的工作 —— 即生产其投入方向的工作 —— 成功完成时才能开始。处理这些作业之间的依赖关系执行,为Hadoop开发了各种工作流调度器,包括Oozie,Azkaban,Luigi,Airflow和Pinball 【28】。

这些调度程序还具有管理功能,在维护大量批处理作业时非常有用。在构建推荐系统【29】时,由50到100个MapReduce作业组成的工作流是常见的,而在大型组织中,许多不同的团队可能运行不同的作业来读取彼此的输出。工具支持对于管理这样复杂的数据流非常重要。

Hadoop的各种高级工具(如Pig 【30】,Hive 【31】,Cascading 【32】,Crunch 【33】和FlumeJava 【34】)也设置了多个MapReduce阶段的工作流程 。

Reduce端连接与分组

我们在第2章中讨论了数据模型和查询语言的联接,但是我们还没有深入探讨连接是如何实现的。现在是我们再次拿起那个线程的时候了。

在许多数据集中,通常一条记录与另一条记录有关联:关系模型中的外键,文档模型中的文档引用或图模型中的边。只要有一些代码需要访问该关联两边的记录(包含引用的记录和被引用的记录),连接就是必需的。正如第2章所讨论的,非规范化可以减少对连接的需求,但通常不会将其完全移除6

在数据库中,如果执行只涉及少量记录的查询,数据库通常会使用索引来快速定位感兴趣的记录(参阅第3章)。如果查询涉及连接,则可能需要多个索引查找。然而,MapReduce没有索引的概念 —— 至少不是通常意义上的。

当MapReduce作业被赋予一组文件作为输入时,它读取所有这些文件的全部内容;一个数据库会调用这个操作一个全表扫描。如果你只想读取少量的记录,则与索引查找相比,全表扫描的成本非常高昂。但是,在分析查询中(参阅“事务处理或分析?”),通常需要计算大量记录的聚合。在这种情况下,扫描整个输入可能是相当合理的事情,特别是如果可以在多台机器上并行处理。

当我们在批处理的背景下讨论连接时,我们的意思是解决数据集内某个关联的所有事件。 例如,我们假设一个工作是同时为所有用户处理数据,而不仅仅是为一个特定用户查找数据(这可以通过索引更有效地完成)。

示例:分析用户活动事件

图10-2给出了一个批处理作业中加入典型的例子。 在左侧是事件日志,描述登录用户在网站上做的事情(称为活动事件或点击流数据),右侧是用户数据库。 你可以将此示例看作是星型模式的一部分(参阅“星型和雪花型:分析的模式”):事件日志是事实表,用户数据库是其中一个纬度。

图10-2 用户行为日志与用户档案的连接

分析任务可能需要将用户活动与用户简档信息相关联:例如,如果简档包含用户的年龄或出生日期,则系统可以确定哪些年龄组最受欢迎。但是,活动事件仅包含用户标识,而不包含完整的用户配置文件信息。在每一个活动事件中嵌入这个简介信息很可能是非常浪费的。因此,活动事件需要加入用户配置文件数据库。

这个连接的最简单实现将逐个遍历活动事件,并为每个遇到的用户ID查询用户数据库(在远程服务器上)。这是可能的,但是它很可能会遭受非常差的性能:处理吞吐量将受到数据库服务器的往返时间的限制,本地缓存的有效性将很大程度上取决于数据的分布,并行运行大量查询可能会轻易压倒数据库【35】。

为了在批处理过程中实现良好的吞吐量,计算必须(尽可能)在一台机器上进行。通过网络为你要处理的每个记录进行随机访问请求太慢。而且,查询远程数据库意味着批处理作业变得不确定,因为远程数据库中的数据可能会改变。

因此,更好的方法是获取用户数据库的副本(例如,使用ETL进程从数据库备份中提取数据,参阅“数据仓库”),并将其放入与日志相同的分布式文件系统用户活动事件。然后,你可以将用户数据库存储在HDFS中的一组文件中,并将用户活动记录在另一组文件中,并且可以使用MapReduce将所有相关记录集中到同一地点并高效地处理它们。

排序合并连接

回想一下,Mapper的目的是从每个输入记录中提取一个键和值。在图10-2的情况下,这个键就是用户ID:一组Mapper会覆盖活动事件(提取用户ID作为键和活动事件作为值),而另一组Mapper将会检查用户数据库(提取用户ID作为键和用户的出生日期作为值)。这个过程如图10-3所示。

图10-3 Reduce端在user ID上进行归并排序连接,如果输入数据集分片成多个文件,则每个都会被多个Mapper并行处理

当MapReduce框架通过key对mapper输出进行分区,然后对键值对进行排序时,效果是所有活动事件和用户ID相同的用户记录在reducer输入中彼此相邻。 Map-Reduce作业甚至可以安排记录进行排序,使减速器始终如一 首先从用户数据库中查看记录,然后按照时间戳顺序查看活动事件 —— 这种技术被称为次级排序【26】。

然后reducer可以很容易地执行实际的加入逻辑:每个用户ID调用一次reducer函数,并且由于二次排序,第一个值应该是来自用户数据库的出生日期记录。 Reducer将出生日期存储在局部变量中,然后使用相同的用户ID遍历活动事件,输出已观看网址和观看者年龄对。随后的Map-Reduce作业可以计算每个URL的查看者年龄分布,并按年龄组进行聚类。

由于reducer一次处理一个特定用户ID的所有记录,因此只需要一次将一个用户记录保存在内存中,而不需要通过网络发出任何请求。这个算法被称为排序合并连接,因为Mapper输出是按键排序的,然后Reducer将来自连接两边的排序的记录列表合并在一起。

把相关数据放在一起

在排序合并连接中,Mapper和排序过程确保将执行特定用户标识的连接操作的所有必需数据放在一起:一次调用reducer。预先排列了所有需要的数据,reducer可以是一个相当简单,单线程的代码,可以通过高吞吐量和低内存开销通过记录。

查看这种体系结构的一种方法是Mapper将“消息”发送给reducer。当一个Mapper发出一个键值对时,这个键的作用就像值应该传递到的目标地址。即使键只是一个任意的字符串(不是像IP地址和端口号那样的实际的网络地址),它的行为就像一个地址:所有具有相同键的键对将被传送到相同的目标(一次Reduce的调用)。

使用MapReduce编程模型将计算的物理网络通信方面(从正确的计算机获取数据)从应用程序逻辑中分离出来(处理完数据后)。这种分离与数据库的典型使用形成了鲜明的对比,从数据库中获取数据的请求经常发生在应用程序代码的深处【36】。由于MapReduce能够处理所有的网络通信,因此它也避免了应用程序代码担心部分故障,例如另一个节点的崩溃:MapReduce在不影响应用程序逻辑的情况下,透明地重试失败的任务。

GROUP BY

除了连接之外,“将相关数据引入同一地点”模式的另一个常见用法是通过某个键(如SQL中的GROUP BY子句)对记录进行分组。所有用相同的键记录一个组,并且下一步往往是在每个组内进行某种聚合,例如:

  • 计算每个组中记录的数量(例如,在统计页面视图的示例中,你将在SQL中表示为COUNT(*)聚合)
  • 在SQL中的一个特定字段(SUM(fieldname))中添加值
  • 根据某些排名函数选择前k个记录

使用MapReduce实现这种分组操作的最简单方法是设置Mapper,以便它们生成的键值对使用所需的分组键。然后,分区和排序过程将所有记录与同一个Reducer中的相同键集合在一起。因此,在MapReduce上实现时,分组和连接看起来非常相似。

分组的另一个常见用途是整理特定用户会话的所有活动事件,以便找出用户采取的一系列操作(称为会话化【37】)。例如,可以使用这种分析来确定显示网站新版本的用户是否比那些显示旧版本(A/B测试)的用户更有可能进行购买,或计算某个营销活动是值得的。

如果你有多个Web服务器处理用户请求,则特定用户的活动事件很可能分散在各个不同的服务器的日志文件中。你可以通过使用会话cookie,用户ID或类似的标识符作为分组键来实现会话,并将特定用户的所有活动事件放在一起,同时将不同用户的事件分配到不同的分区。

处理倾斜

如果存在与单个键相关的大量数据,则“将具有相同键的所有记录带到相同位置”的模式被破坏。例如,在社交网络中,大多数用户可能会连接到几百人,但少数名人可能有数百万的追随者。这种不成比例的活动数据库记录被称为关键对象【38】或热键。

在单个Reducer中收集与名人相关的所有活动(例如回复他们发布的内容)可能导致严重的倾斜(也称为热点)—— 也就是说,一个减速器必须处理比其他更多的记录(参见“负载倾斜与消除热点“)。由于MapReduce作业只有在其所有Mapper和Reducer都完成时才完成,所有后续作业必须等待最慢的Reducer才能启动。

如果加入输入有热点键,则可以使用一些算法进行补偿。例如,Pig中的倾斜连接方法首先运行一个抽样作业来确定哪些键是热的【39】。执行实际加入时,Mapper发送任何随机选择一个与几个减速器之一相关的热键的记录(与传统的MapReduce相比,它选择一个基于键散列确定性的减速器)。对于加入的其他输入,与热键相关的记录需要被复制到所有处理该键的Reducer【40】。

这种技术将处理热键的工作分散到多个reducer上,这样可以使其更好地并行化,而不必将其他join连接复制到多个reducer。 Crunch中的分片连接方法是相似的,但需要显式指定热键而不是使用采样作业。这种技术也非常类似于我们在“负载倾斜与消除热点”中讨论的技术,使用随机化来缓解分区数据库中的热点。

Hive的偏斜连接优化采取了另一种方法。它需要在表格元数据中明确指定热键,并将与这些键相关的记录与其余文件分开存放。在该表上执行连接时,它将使用Map边连接(参阅下一节)获取热键。

使用热键对记录进行分组并汇总记录时,可以分两个阶段进行分组。第一个MapReduce阶段将记录发送到随机Reducer,以便每个Reducer对热键的记录子集执行分组,并为每个键输出更紧凑的聚合值。第二个Map-Reduce作业然后将来自所有第一阶段减速器的值合并为每个键的单个值。

Map端连接

上一节描述的连接算法在reducer中执行实际的连接逻辑,因此被称为reduce-side连接。Mapper扮演着输入数据的角色:从每个输入记录中提取键和值,将键值对分配给reducer分区,并按键排序。

减少方法的优点是不需要对输入数据做任何假设:无论其属性和结构如何,Mapper都可以准备数据以准备加入。然而,不利的一面是,所有这些排序,复制到Reducer以及合并减速器输入可能是非常昂贵的。取决于可用的内存缓冲区,当数据通过MapReduce 【37】阶段时,数据可能被写入磁盘几次。

另一方面,如果你可以对输入数据进行某些假设,则可以通过使用所谓的map端连接来加快连接速度。这种方法使用了一个Reduce的MapReduce作业,其中没有减速器,也没有排序。相反,每个Mapper只需从分布式文件系统读取一个输入文件块,然后将一个输出文件写入文件系统即可。

广播散列连接

执行Map端连接最简单的方法适用于大数据集与小数据集连接的情况。特别是,小数据集需要足够小,以便可以将其全部加载到每个Mapper的内存中。

例如,假设在图10-2的情况下,用户数据库足够小以适应内存。在这种情况下,当Mapper启动时,它可以首先将用户数据库从分布式文件系统读取到内存中的哈希表中。完成此操作后,Map程序可以扫描用户活动事件,并简单地查找散列表中每个事件的用户标识7

仍然可以有几个映射任务:一个用于连接的大输入的每个文件块(在图10-2的例子中,活动事件是大输入)。这些Mapper中的每一个都将小输入全部加载到内存中。

这种简单而有效的算法被称为广播散列连接:广播一词反映了这样一个事实,即大输入的分区的每个Mapper都读取整个小输入(所以小输入有效地“广播”到大的输入),单词hash反映了它使用一个哈希表。 Pig(名为“replicated join”),Hive(“MapJoin”),Cascading和Crunch支持此连接方法。它也用于数据仓库查询引擎,如Impala 【41】。

而不是将小连接输入加载到内存散列表中,另一种方法是将小连接输入存储在本地磁盘上的只读索引中【42】。该索引中经常使用的部分将保留在操作系统的页面缓存中,因此这种方法可以提供与内存中哈希表几乎一样快的随机访问查找,但实际上并不需要数据集适合内存。

分区散列连接

如果以相同方式对映射端连接的输入进行分区,则散列连接方法可以独立应用于每个分区。在图10-2的情况下,你可以根据用户标识的最后一位十进制数字来安排活动事件和用户数据库的每一个(因此每边有10个分区)。例如,Mapper3首先将所有具有以3结尾的ID的用户加载到散列表中,然后扫描ID为3的每个用户的所有活动事件。

如果分区正确完成,你可以确定所有你可能要加入的记录都位于相同编号的分区中,因此每个Mapper只能从每个输入数据集中读取一个分区就足够了。这具有的优点是每个Mapper都可以将较少量的数据加载到其哈希表中。

这种方法只适用于两个连接的输入具有相同数量的分区,记录根据相同的键和相同的散列函数分配给分区。如果输入是由之前执行过这个分组的MapReduce作业生成的,那么这可能是一个合理的假设。

分区散列连接在Hive 【37】中称为bucketed映射连接。Map端合并连接

如果输入数据集不仅以相同的方式进行分区,而且还基于相同的键进行排序,则应用另一种Map端联接的变体。在这种情况下,输入是否足够小以适应内存并不重要,因为Mapper可以执行通常由reducer执行的相同合并操作:按递增键递增读取两个输入文件,以及匹配相同的键记录。

如果Map端合并连接是可能的,则可能意味着先前的MapReduce作业首先将输入数据集引入到这个分区和排序的表单中。原则上,这个加入可以在之前工作的Reduce阶段进行。但是,在单独的仅用于Map的作业中执行合并连接仍然是适当的,例如,除了此特定连接之外,还需要分区和排序数据集以用于其他目的。

MapReduce与Map端连接的工作流程

当下游作业使用MapReduce连接的输出时,map-side或reduce-side连接的选择会影响输出的结构。 reduce-side连接的输出按连接键进行分区和排序,而map-side连接的输出按照与大输入相同的方式进行分区和排序(因为对每个文件块启动一个map任务无论是使用分区连接还是广播连接,连接的大输入)。

如前所述,Map边连接也对输入数据集的大小,排序和分区做出了更多的假设。在优化连接策略时,了解分布式文件系统中数据集的物理布局变得非常重要:仅仅知道编码格式和数据存储目录的名称是不够的;你还必须知道数据分区和排序的分区数量和键。

在Hadoop生态系统中,这种关于数据集分区的元数据经常在HCatalog和Hive Metastore中维护【37】。

工作流的输出

我们已经谈了很多关于实现MapReduce工作流程的各种算法,但是我们忽略了一个重要的问题:一旦完成,所有处理的结果是什么?我们为什么要把所有这些工作放在首位?

在数据库查询的情况下,我们根据分析目的来区分事务处理(OLTP)目的(参阅“事务处理或分析?”)。我们看到,OLTP查询通常使用索引按键查找少量记录,以便将其呈现给用户(例如,在网页上)。另一方面,分析查询通常会扫描大量记录,执行分组和汇总,输出通常具有报告的形式:显示某个指标随时间变化的图表,或前10个项目根据一些排名,或一些数量分解成子类别。这种报告的消费者通常是需要做出商业决策的分析师或经理。

批处理在哪里适合?这不是交易处理,也不是分析。与分析更接近,因为批处理过程通常扫描输入数据集的大部分。但是,MapReduce作业的工作流程与用于分析目的的SQL查询不同(参阅“比较Hadoop与分布式数据库”)。批处理过程的输出通常不是报告,而是一些其他类型的结构。

建立搜索索引

Google最初使用的MapReduce是为其搜索引擎建立索引,这个索引是作为5到10个MapReduce作业的工作流实现的【1】。虽然Google为了这个目的后来不再使用MapReduce 【43】,但是如果从建立搜索索引的角度来看,它可以帮助理解MapReduce。 (即使在今天,Hadoop MapReduce仍然是构建Lucene / Solr索引的好方法。)

我们在“全文搜索和模糊索引”中简要地看到了Lucene这样的全文搜索索引是如何工作的:它是一个文件(关键词字典),你可以在其中高效地查找特定关键字并找到包含该关键字的所有文档ID列表(发布列表)。这是一个非常简单的搜索索引视图 —— 实际上,它需要各种附加数据,以便根据相关性对搜索结果进行排名,纠正拼写错误,解析同义词等等,但这一原则是成立的。

如果需要对一组固定文档执行全文搜索,则批处理是构建索引的一种非常有效的方法:Mapper根据需要对文档集进行分区,每个reducer构建其分区的索引,并将索引文件写入分布式文件系统。构建这样的文档分区索引(参阅“分区和二级索引”)并行处理非常好。

由于按关键字查询搜索索引是只读操作,因此这些索引文件一旦创建就是不可变的。

如果索引的文档集合发生更改,则可以选择定期重新运行整个索引工作流程,并在完成后用新的索引文件批量替换以前的索引文件。如果只有少量的文档发生了变化,这种方法可能会带来很高的计算成本,但是它的优点是索引过程很容易推理:文档,索引。

或者,可以逐渐建立索引。如第3章所述,如果要添加,删除或更新索引中的文档,Lucene会写出新的段文件,并异步合并和压缩背景中的段文件。我们将在第11章中看到更多这样的增量处理。

键值存储作为批处理输出

搜索索引只是批处理工作流程可能输出的一个示例。批量处理的另一个常见用途是构建机器学习系统,如分类器(例如,垃圾邮件过滤器,异常检测,图像识别)和推荐系统(例如,你可能认识的人,你可能感兴趣的产品或相关搜索)。

这些批处理作业的输出通常是某种数据库:例如,可以通过用户ID查询以获取该用户的建议朋友的数据库,或者可以通过产品ID查询的数据库以获取相关产品【45】。

这些数据库需要从处理用户请求的Web应用程序中查询,这些请求通常与Hadoop基础架构分离。那么批处理过程的输出如何返回到Web应用程序可以查询的数据库?

最明显的选择可能是直接在Mapper或Reducer中使用客户端库作为你最喜欢的数据库,并从批处理作业直接写入数据库服务器,一次写入一条记录。这将起作用(假设你的防火墙规则允许从你的Hadoop环境直接访问你的生产数据库),但由于以下几个原因,这是一个坏主意:

  • 正如前面讨论的连接一样,为每个记录提出一个网络请求比批处理任务的正常吞吐量要慢几个数量级。即使客户端库支持批处理,性能也可能很差。
  • MapReduce作业经常并行运行许多任务。如果所有Mapper或Reducer都同时写入相同的输出数据库,并且批处理过程期望的速率,那么该数据库可能很容易被压倒,并且其查询性能可能受到影响。这可能会导致系统其他部分的操作问题【35】。
  • 通常情况下,MapReduce为作业输出提供了一个干净的“全有或全无”的保证:如果作业成功,则结果就是只执行一次任务的输出,即使某些任务失败并且必须重试。如果整个作业失败,则不会生成输出。然而,从作业内部写入外部系统会产生外部可见的副作用,这种副作用是不能被隐藏的。因此,你不得不担心部分完成的作业对其他系统可见的结果,以及Hadoop任务尝试和推测性执行的复杂性。

更好的解决方案是在批处理作业中创建一个全新的数据库,并将其作为文件写入分布式文件系统中作业的输出目录,就像上一节的搜索索引一样。这些数据文件一旦写入就是不可变的,可以批量加载到处理只读查询的服务器中。各种键值存储支持在MapReduce作业中构建数据库文件,包括Voldemort 【46】,Terrapin 【47】,ElephantDB 【48】和HBase批量加载【49】。

构建这些数据库文件是MapReduce的一个很好的使用方法:使用Mapper提取一个键,然后使用该键进行排序已经成为构建索引所需的大量工作。由于大多数这些键值存储是只读的(文件只能由批处理作业一次写入,而且是不可变的),所以数据结构非常简单。例如,它们不需要WAL(参阅“使B树可靠”)。

将数据加载到Voldemort时,服务器将继续向旧数据文件提供请求,同时将新数据文件从分布式文件系统复制到服务器的本地磁盘。一旦复制完成,服务器会自动切换到查询新文件。如果在这个过程中出现任何问题,它可以很容易地再次切换回旧的文件,因为它们仍然存在,并且是不变的【46】。

批量过程输出的哲学

本章前面讨论过的Unix哲学(“Unix哲学”)鼓励通过对数据流的非常明确的实验来进行实验:程序读取输入并写入输出。在这个过程中,输入保持不变,任何以前的输出都被新输出完全替换,并且没有其他副作用。这意味着你可以随心所欲地重新运行一个命令,调整或调试它,而不会扰乱系统的状态。

MapReduce作业的输出处理遵循相同的原理。通过将输入视为不可变且避免副作用(如写入外部数据库),批处理作业不仅实现了良好的性能,而且更容易维护:

  • 如果在代码中引入了一个错误,并且输出错误或损坏了,则可以简单地回滚到代码的先前版本,然后重新运行该作业,输出将再次正确。或者,甚至更简单,你可以将旧的输出保存在不同的目录中,然后切换回原来的目录。具有读写事务的数据库没有这个属性:如果你部署了错误的代码,将错误的数据写入数据库,那么回滚代码将无法修复数据库中的数据。 (能够从错误代码中恢复的思想被称为人类容错【50】。)
  • 由于易于回滚,功能开发可以比错误意味着不可挽回的损害的环境更快地进行。这种使不可逆性最小化的原则有利于敏捷软件的开发【51】。
  • 如果映射或减少任务失败,MapReduce框架将自动重新调度并在同一个输入上再次运行它。如果失败是由于代码中的一个错误造成的,那么它会一直崩溃,并最终导致作业在几次尝试之后失败。但是如果故障是由于暂时的问题引起的,那么故障是可以容忍的。这种自动重试只是安全的,因为输入是不可变的,而失败任务的输出被MapReduce框架丢弃。
  • 同一组文件可用作各种不同作业的输入,其中包括计算度量标准的计算作业,并评估作业的输出是否具有预期的特性(例如,将其与前一次运行的输出进行比较并测量差异) 。
  • 与Unix工具类似,MapReduce作业将逻辑与布线(配置输入和输出目录)分开,这就提供了关注点的分离,并且可以重用代码:一个团队可以专注于实现一件好事的工作其他团队可以决定何时何地运行这项工作。

在这些领域,对Unix运行良好的设计原则似乎也适用于Hadoop,但Unix和Hadoop在某些方面也有所不同。例如,因为大多数Unix工具都假定没有类型的文本文件,所以他们必须做大量的输入解析(本章开头的日志分析示例使用{print $7}来提取URL)。在Hadoop上,通过使用更多结构化的文件格式,可以消除一些低价值的语法转换:Avro(参阅“Avro”)和Parquet(参阅第95页上的“列存储”)经常使用,因为它们提供高效的基于模式的编码,并允许随着时间的推移模式的演变(见第4章)。

比较Hadoop和分布式数据库

正如我们所看到的,Hadoop有点像Unix的分布式版本,其中HDFS是文件系统,而MapReduce是Unix进程的古怪实现(这恰好总是在映射阶段和缩小阶段之间运行排序实用程序)。我们看到了如何在这些基元之上实现各种连接和分组操作。

当MapReduce论文【1】发表时,它在某种意义上说并不新鲜。我们在前几节中讨论的所有处理和并行连接算法已经在十多年前的所谓的**大规模并行处理(MPP, massively parallel processing)**数据库中实现了【3,40】。例如,Gamma数据库机器,Teradata和Tandem NonStop SQL是这方面的先驱【52】。

最大的区别是MPP数据库集中于在一组机器上并行执行分析SQL查询,而MapReduce和分布式文件系统【19】的组合则更像是一个可以运行任意程序的通用操作系统。

存储的多样性

数据库要求你根据特定的模型(例如关系或文档)来构造数据,而分布式文件系统中的文件只是字节序列,可以使用任何数据模型和编码来编写。它们可能是数据库记录的集合,但同样可以是文本,图像,视频,传感器读数,稀疏矩阵,特征向量,基因组序列或任何其他类型的数据。

说白了,Hadoop开放了将数据不加区分地转储到HDFS的可能性,之后才想出如何进一步处理它【53】。相比之下,在将数据导入数据库专有存储格式之前,MPP数据库通常需要对数据和查询模式进行仔细的前期建模。

从纯粹的角度来看,这种仔细的建模和导入似乎是可取的,因为这意味着数据库的用户有更好的质量数据来处理。然而,在实践中,似乎只是简单地使数据可用 —— 即使它是一个古怪的,难以使用的原始格式 —— 通常比尝试决定理想的数据模型更有价值[54 ]。

这个想法与数据仓库类似(参阅“数据仓库”):将大型组织的各个部分的数据集中在一起是很有价值的,因为它可以跨以前不同的数据集进行联接。 MPP数据库所要求的谨慎的模式设计减慢了集中式数据收集速度;以原始形式收集数据,以后担心模式设计,使数据收集速度加快(有时被称为“数据湖(data lake)”或“企业数据中心(enterprise data hub)”【55】)。

不加区别的数据倾销改变了解释数据的负担:不是强迫数据集的生产者将其转化为标准化的格式,而是数据的解释成为消费者的问题(读时模式方法【56】;参阅“文档模型中的架构灵活性”)。如果生产者和消费者是不同优先级的不同团队,这可能是一个优势。甚至可能不存在一个理想的数据模型,而是对适合不同目的的数据有不同的看法。以原始形式简单地转储数据可以进行多次这样的转换。这种方法被称为寿司原则:“原始数据更好”【57】。

因此,Hadoop经常被用于实现ETL过程(参阅“数据仓库”):事务处理系统中的数据以某种原始形式转储到分布式文件系统中,然后编写MapReduce作业来清理数据,将其转换为关系表单,并将其导入MPP数据仓库以进行分析。数据建模仍然在发生,但它是在一个单独的步骤中,从数据收集中分离出来的。这种解耦是可能的,因为分布式文件系统支持以任何格式编码的数据。

加工模型的多样性

MPP数据库是单一的,紧密集成的软件,负责磁盘上的存储布局,查询计划,调度和执行。由于这些组件都可以针对数据库的特定需求进行调整和优化,因此整个系统可以在其设计的查询类型上取得非常好的性能。而且,SQL查询语言允许表达式查询和重要语义,而无需编写代码,使业务分析师(例如Tableau)使用的图形工具可访问该语言。

另一方面,并非所有类型的处理都可以合理地表达为SQL查询。例如,如果要构建机器学习和推荐系统,或者使用相关性排名模型的全文搜索索引,或者执行图像分析,则很可能需要更一般的数据处理模型。这些类型的处理通常对特定的应用程序非常具体(例如机器学习的特征工程,机器翻译的自然语言模型,欺诈预测的风险评估函数),因此它们不可避免地需要编写代码,而不仅仅是查询。

MapReduce使工程师能够轻松地在大型数据集上运行自己的代码。如果你有HDFS和MapReduce,那么你可以在它上面建立一个SQL查询执行引擎,事实上这正是Hive项目所做的【31】。但是,你也可以编写许多其他形式的批处理,这些批处理不适合用SQL查询表示。

随后,人们发现MapReduce对于某些类型的处理来说太过于限制,执行得太差,因此其他各种处理模型都是在Hadoop之上开发的(我们将在“后MapReduce时代”中看到其中的一些)。有两种处理模型,SQL和MapReduce,还不够,需要更多不同的模型!而且由于Hadoop平台的开放性,实施一整套方法是可行的,而这在整体MPP数据库的范围内是不可能的【58】。

至关重要的是,这些不同的处理模型都可以在一个共享的机器上运行,所有这些机器都可以访问分布式文件系统上的相同文件。在Hadoop方法中,不需要将数据导入到几个不同的专用系统中进行不同类型的处理:系统足够灵活,可以支持同一个群集内不同的工作负载。不需要移动数据使得从数据中获得价值变得容易得多,并且使用新的处理模型更容易进行实验。

Hadoop生态系统包括随机访问的OLTP数据库,如HBase(参阅“SSTables和LSM树”)和MPA样式的分析数据库,如Impala 【41】。 HBase和Impala都不使用MapReduce,但都使用HDFS进行存储。它们是访问和处理数据的非常不同的方法,但是它们可以共存并被集成到同一个系统中。

为频繁的故障而设计

在比较MapReduce和MPP数据库时,设计方法的另外两个不同点是:处理故障和使用内存和磁盘。与在线系统相比,批处理对故障不太敏感,因为如果失败,用户不会立即影响用户,并且可以再次运行。

如果一个节点在执行查询时崩溃,大多数MPP数据库会中止整个查询,并让用户重新提交查询或自动重新运行它【3】。由于查询通常最多运行几秒钟或几分钟,所以这种处理错误的方法是可以接受的,因为重试的代价不是太大。 MPP数据库还倾向于在内存中保留尽可能多的数据(例如,使用散列连接)以避免从磁盘读取的成本。

另一方面,MapReduce可以容忍映射或减少任务的失败,而不会影响作业的整体,通过以单个任务的粒度重试工作。它也非常渴望将数据写入磁盘,一方面是为了容错,另一方面是假设数据集太大而不能适应内存。

MapReduce方法更适用于较大的作业:处理如此之多的数据并运行很长时间的作业,以至于在此过程中可能至少遇到一个任务故障。在这种情况下,由于单个任务失败而重新运行整个工作将是浪费的。即使以单个任务的粒度进行恢复引入了使得无故障处理更慢的开销,但如果任务失败率足够高,仍然可以进行合理的权衡。

但是这些假设有多现实呢?在大多数集群中,机器故障确实发生,但是它们不是很频繁 —— 可能很少,大多数工作都不会经验,因为机器故障。为了容错,真的值得引起重大的开销吗?

要了解MapReduce节省使用内存和任务级恢复的原因,查看最初设计MapReduce的环境是很有帮助的。 Google拥有混合使用的数据中心,在线生产服务和离线批处理作业在同一台机器上运行。每个任务都有一个使用容器执行的资源分配(CPU核心,RAM,磁盘空间等)。每个任务也具有优先级,如果优先级较高的任务需要更多的资源,则可以终止(抢占)同一台机器上较低优先级的任务以释放资源。优先级还决定了计算资源的定价:团队必须为他们使用的资源付费,而优先级更高的流程花费更多【59】。

这种架构允许非生产(低优先级)计算资源被过度使用,因为系统知道如果必要的话它可以回收资源。与分离生产和非生产任务的系统相比,过度使用资源可以更好地利用机器和提高效率。但是,由于MapReduce作业以低优先级运行,因此它们随时都有被抢占的风险,因为优先级较高的进程需要其资源。批量工作有效地“拿起桌子下面的碎片”,利用高优先级进程已经采取的任何计算资源。

在谷歌,运行一个小时的MapReduce任务有大约5%被终止的风险,为更高优先级的进程腾出空间。由于硬件问题,机器重新启动或其他原因,这个速率比故障率高出一个数量级【59】。按照这种抢先率,如果一个作业有100个任务,每个任务运行10分钟,那么至少有一个任务在完成之前将被终止的风险大于50%。

这就是为什么MapReduce能够容忍频繁意外的任务终止的原因:这不是因为硬件特别不可靠,这是因为任意终止进程的自由可以在计算集群中更好地利用资源。

在开源的集群调度器中,抢占的使用较少。 YARN的CapacityScheduler支持抢占以平衡不同队列的资源分配【58】,但在编写本文时,YARN,Mesos或Kubernetes不支持通用优先级抢占【60】。在任务不经常被终止的环境中,MapReduce的设计决策没有多少意义。在下一节中,我们将看看MapReduce的一些替代方案,这些替代方案做出了不同的设计决定。

后MapReduce时代

虽然MapReduce在二十世纪二十年代后期变得非常流行并受到大量的炒作,但它只是分布式系统的许多可能的编程模型之一。根据数据量,数据结构和处理类型,其他工具可能更适合表达计算。

尽管如此,我们在讨论MapReduce的这一章花了很多时间,因为它是一个有用的学习工具,因为它是分布式文件系统的一个相当清晰和简单的抽象。也就是说,能够理解它在做什么,而不是在易于使用的意义上是简单的。恰恰相反:使用原始的MapReduce API来实现复杂的处理工作实际上是非常困难和费力的 —— 例如,你需要从头开始实现任何连接算法【37】。

针对直接使用MapReduce的困难,在MapReduce上创建了各种更高级的编程模型(Pig,Hive,Cascading,Crunch)作为抽象。如果你了解MapReduce的工作原理,那么它们相当容易学习,而且它们的高级构造使许多常见的批处理任务更容易实现。

但是,MapReduce执行模型本身也存在一些问题,这些问题并没有通过增加另一个抽象层次来解决,而且在某些类型的处理中表现得很差。一方面,MapReduce非常强大:你可以使用它来处理频繁任务终止的不可靠多租户系统上几乎任意大量的数据,并且仍然可以完成工作(虽然速度很慢)。另一方面,对于某些类型的处理来说,其他工具有时也会更快。

在本章的其余部分中,我们将介绍一些批处理方法。在第十一章我们将转向流处理,这可以看作是加速批处理的另一种方法。

内部状态表示

如前所述,每个MapReduce作业都独立于其他任何作业。作业与世界其他地方的主要联系点是分布式文件系统上的输入和输出目录。如果希望一个作业的输出成为第二个作业的输入,则需要将第二个作业的输入目录配置为与第一个作业的输出目录相同,并且外部工作流调度程序必须仅在第一份工作已经完成。

如果第一个作业的输出是要在组织内广泛发布的数据集,则此设置是合理的。在这种情况下,你需要能够通过名称来引用它,并将其用作多个不同作业(包括由其他团队开发的作业)的输入。将数据发布到分布式文件系统中的众所周知的位置允许松耦合,这样作业就不需要知道是谁在输入输出或消耗其输出(参阅“分离逻辑和布线”在本页395)。

但是,在很多情况下,你知道一个工作的输出只能用作另一个工作的输入,这个工作由同一个团队维护。在这种情况下,分布式文件系统上的文件只是简单的中间状态:一种将数据从一个作业传递到下一个作业的方式。在用于构建由50或100个MapReduce作业【29】组成的推荐系统的复杂工作流程中,存在很多这样的中间状态。

将这个中间状态写入文件的过程称为物化。 (我们在“聚合:数据立方体和物化视图”中已经在物化视图的背景下遇到了这个术语。它意味着要急于计算某个操作的结果并写出来,而不是计算需要时按要求。)

相反,本章开头的日志分析示例使用Unix管道将一个命令的输出与另一个命令的输出连接起来。管道并没有完全实现中间状态,而是只使用一个小的内存缓冲区,将输出逐渐流向输入。

MapReduce的完全实现中间状态的方法与Unix管道相比存在不足:

  • MapReduce作业只有在前面的作业(生成其输入)中的所有任务都完成时才能启动,而由Unix管道连接的进程同时启动,输出一旦生成就会被使用。不同机器上的偏差或不同的负荷意味着一份工作往往会有一些比其他人更快完成的离散任务。必须等到所有前面的工作完成才能减慢整个工作流程的执行。
  • Mapper通常是多余的:它们只读取刚刚由reducer写入的相同文件,并为下一个分区和排序阶段做好准备。在许多情况下,Mapper代码可能是以前的reducer的一部分:如果reducer输出被分区和排序的方式与mapper输出相同,那么reducers可以直接链接在一起,而不与mapper阶段交错。
  • 将中间状态存储在分布式文件系统中意味着这些文件被复制到多个节点,这对于这样的临时数据通常是过度的。

数据流引擎

了解决MapReduce的这些问题,开发了几种用于分布式批量计算的新的执行引擎,其中最着名的是Spark 【61,62】,Tez 【63,64】和Flink 【65,66】。他们设计的方式有很多不同之处,但他们有一个共同点:他们把整个工作流作为一项工作来处理,而不是把它分解成独立的子作业。

由于它们通过几个处理阶段明确地建模数据流,所以这些系统被称为数据流引擎。像MapReduce一样,它们通过反复调用用户定义的函数来在单个线程上一次处理一条记录。他们通过对输入进行分区来并行工作,并将一个功能的输出复制到网络上,成为另一个功能的输入。

与MapReduce不同,这些功能不需要交替映射和Reduce的严格角色,而是可以以更灵活的方式进行组合。我们称之为这些函数操作符,数据流引擎提供了几个不同的选项来连接一个操作符的输出到另一个的输入:

  • 一个选项是通过键对记录进行重新分区和排序,就像在MapReduce的混洗阶段一样(参阅“分布式执行MapReduce”)。此功能可以像在MapReduce中一样启用排序合并连接和分组。
  • 另一种可能是采取几个输入,并以相同的方式进行分区,但跳过排序。这节省了分区散列连接的工作,其中记录的分区是重要的,但顺序是不相关的,因为构建散列表随机化了顺序。
  • 对于广播散列连接,可以将一个运算符的相同输出发送到连接运算符的所有分区。

这种处理引擎的风格基于像Dryad 【67】和Nephele 【68】这样的研究系统,与MapReduce模型相比,它提供了几个优点:

  • 排序等昂贵的工作只需要在实际需要的地方执行,而不是在每个Map和Reduce阶段之间默认发生。
  • 没有不必要的Map任务,因为Mapper所做的工作通常可以合并到前面的reduce操作器中(因为Mapper不会更改数据集的分区)。
  • 由于工作流程中的所有连接和数据依赖性都是明确声明的,因此调度程序会概述哪些数据是必需的,因此可以进行本地优化。例如,它可以尝试将占用某些数据的任务放在与生成它的任务相同的机器上,以便可以通过共享内存缓冲区交换数据,而不必通过网络复制数据。
  • 通常将操作员之间的中间状态保存在内存中或写入本地磁盘就足够了,这比写入HDFS需要更少的I/O(必须将其复制到多个计算机并写入到每个代理的磁盘上)。 MapReduce已经将这种优化用于Mapper的输出,但是数据流引擎将该思想推广到了所有的中间状态。
  • 操作员可以在输入准备就绪后立即开始执行;在下一个开始之前不需要等待整个前一阶段的完成。
  • 与MapReduce(为每个任务启动一个新的JVM)相比,现有的Java虚拟机(JVM)进程可以重用来运行新操作,从而减少启动开销。

你可以使用数据流引擎来执行与MapReduce工作流相同的计算,并且由于此处所述的优化,通常执行速度会明显更快。既然操作符是map和reduce的泛化,相同的处理代码可以在任一执行引擎上运行:Pig,Hive或Cascading中实现的工作流可以通过简单的配置更改从MapReduce切换到Tez或Spark,而无需修改代码【64】。

Tez是一个相当薄的库,它依赖于YARN shuffle服务来实现节点间数据的实际复制【58】,而Spark和Flink则是包含自己的网络通信层,调度器和面向用户的API的大型框架。我们将在短期内讨论这些高级API。

容错

完全实现中间状态到分布式文件系统的一个优点是它是持久的,这使得MapReduce中的容错相当容易:如果一个任务失败,它可以在另一台机器上重新启动,并从文件系统重新读取相同的输入。

Spark,Flink和Tez避免将中间状态写入HDFS,因此他们采取了不同的方法来容忍错误:如果一台机器发生故障,并且该机器上的中间状态丢失,则会从其他仍然可用的数据重新计算在可能的情况下是在先的中间阶段,或者是通常在HDFS上的原始输入数据)。

为了实现这个重新计算,框架必须跟踪一个给定的数据是如何计算的 —— 使用哪个输入分区,以及哪个操作符被应用到它。 Spark使用弹性分布式数据集(RDD)抽象来追踪数据的祖先【61】,而Flink检查点操作符状态,允许其恢复运行在执行过程中遇到错误的操作符【66】。

在重新计算数据时,重要的是要知道计算是否是确定性的:也就是说,给定相同的输入数据,操作员是否始终生成相同的输出?如果一些丢失的数据已经发送给下游运营商,这个问题就很重要。如果运营商重新启动,重新计算的数据与原有的丢失数据不一致,下游运营商很难解决新旧数据之间的矛盾。对于不确定性运营商来说,解决方案通常是杀死下游运营商,然后再运行新数据。

为了避免这种级联故障,最好让操作员具有确定性。但是请注意,非确定性行为很容易发生意外蔓延:例如,许多编程语言在迭代哈希表的元素时不能保证任何特定顺序,许多概率和统计算法明确依赖于使用随机数,以及任何用途系统时钟或外部数据源是不确定的。为了可靠地从故障中恢复,例如通过使用固定种子产生伪随机数,需要消除这种不确定性的原因。

通过重新计算数据从故障中恢复并不总是正确的答案:如果中间数据比源数据小得多,或者如果计算量非常大,那么将中间数据转化为文件可能比将其重新计算更便宜。

关于物化的讨论

回到Unix的类比,我们看到MapReduce就像是将每个命令的输出写入临时文件,而数据流引擎看起来更像是Unix管道。尤其是Flink是围绕流水线执行的思想而建立的:也就是说,将运算符的输出递增地传递给其他操作符,并且在开始处理之前不等待输入完成。

排序操作不可避免地需要消耗其整个输入,然后才能生成任何输出,因为最后一个输入记录可能是具有最低键的输入记录,因此需要作为第一个输出记录。任何需要分类的操作员都需要至少暂时地累积状态。但是工作流程的许多其他部分可以以流水线方式执行。

当作业完成时,它的输出需要持续到某个地方,以便用户可以找到并使用它—— 很可能它会再次写入分布式文件系统。因此,在使用数据流引擎时,HDFS上的物化数据集通常仍是作业的输入和最终输出。和MapReduce一样,输入是不可变的,输出被完全替换。对MapReduce的改进是,你可以节省自己将所有中间状态写入文件系统。

图与迭代处理

在“图数据模型”中,我们讨论了使用图形来建模数据,并使用图形查询语言来遍历图形中的边和顶点。第2章的讨论集中在OLTP风格的使用上:快速执行查询来查找少量符合特定条件的顶点。

在批处理环境中查看图表也很有趣,其目标是在整个图表上执行某种离线处理或分析。这种需求经常出现在机器学习应用程序(如推荐引擎)或排序系统中。例如,最着名的图形分析算法之一是PageRank 【69】,它试图根据其他网页链接的网页来估计网页的流行度。它被用作确定网络搜索引擎呈现结果的顺序的公式的一部分。

像Spark,Flink和Tez这样的数据流引擎(参见“中间状态的物化”)通常将操作符作为**有向无环图(DAG)**排列在作业中。这与图形处理不一样:在数据流引擎中,从一个操作符到另一个操作符的数据流被构造成一个图,而数据本身通常由关系式元组构成。在图形处理中,数据本身具有图形的形式。另一个不幸的命名混乱!

许多图算法是通过一次遍历一个边来表示的,将一个顶点与相邻的顶点连接起来以便传播一些信息,并且重复直到满足一些条件为止——例如,直到没有更多的边要跟随,或者直到一些度量收敛。我们在图2-6中看到一个例子,它通过重复地跟踪指示哪个位置在哪个其他位置(这种算法被称为传递闭包)的边缘,列出了包含在数据库中的北美所有位置。

可以在分布式文件系统(包含顶点和边的列表的文件)中存储图形,但是这种“重复直到完成”的想法不能用普通的MapReduce来表示,因为它只执行一次数据传递。这种算法因此经常以迭代方式实现:

  1. 外部调度程序运行批处理来计算算法的一个步骤。
  2. 当批处理过程完成时,调度器检查它是否完成(基于完成条件 - 例如,没有更多的边要跟随,或者与上次迭代相比的变化低于某个阈值)。
  3. 如果尚未完成,则调度程序返回到步骤1并运行另一轮批处理。

这种方法是有效的,但是用MapReduce实现它往往是非常低效的,因为MapReduce没有考虑算法的迭代性质:它总是读取整个输入数据集并产生一个全新的输出数据集,即使只有一小部分该图与上次迭代相比已经改变。 Pregel处理模型

作为批处理图形的优化,计算的批量同步并行(BSP)模型【70】已经流行起来。其中,它由Apache Giraph 【37】,Spark的GraphX API和Flink的Gelly API 【71】实现。它也被称为Pregel模型,正如Google的Pregel论文推广这种处理图的方法【72】。

回想一下在MapReduce中,Mapper在概念上“发送消息”给reducer的特定调用,因为框架将所有的mapper输出集中在一起。 Pregel背后有一个类似的想法:一个顶点可以“发送消息”到另一个顶点,通常这些消息沿着图的边被发送。

在每次迭代中,为每个顶点调用一个函数,将所有发送给它的消息传递给它 —— 就像调用reducer一样。与MapReduce的不同之处在于,在Pregel模型中,顶点从一次迭代到下一次迭代记忆它的状态,所以这个函数只需要处理新的传入消息。如果在图的某个部分没有发送消息,则不需要做任何工作。

这与演员模型有些相似(参阅“分布式的Actor框架”),除非顶点状态和顶点之间的消息具有容错性和耐久性,并且通信以固定的方式进行,否则将每个顶点视为主角轮次:在每一次迭代中,框架传递在前一次迭代中发送的所有消息。演员通常没有这样的时间保证。

容错

顶点只能通过消息传递进行通信(而不是直接相互查询)的事实有助于提高Pregel作业的性能,因为消息可以成批处理,而且等待通信的次数也减少了。唯一的等待是在迭代之间:由于Pregel模型保证所有在一次迭代中发送的消息都在下一次迭代中传递,所以先前的迭代必须完全完成,并且所有的消息必须在网络上复制,然后下一个开始。 即使底层网络可能丢失,重复或任意延迟消息(参阅“不可靠的网络”),Pregel实施可保证在接下来的迭代中消息在其目标顶点处理一次。像MapReduce一样,该框架透明地从故障中恢复,以简化Pregel顶层算法的编程模型。

这种容错是通过在迭代结束时定期检查所有顶点的状态来实现的,即将其全部状态写入持久存储。如果某个节点发生故障并且其内存中状态丢失,则最简单的解决方法是将整个图计算回滚到上一个检查点,然后重新启动计算。如果算法是确定性的并且记录了消息,那么也可以选择性地只恢复丢失的分区(就像我们之前讨论过的数据流引擎)【72】。

并行执行

顶点不需要知道它正在执行哪个物理机器;当它发送消息到其他顶点时,它只是将它们发送到一个顶点ID。分配图的框架,即确定哪个顶点运行在哪个机器上,以及如何通过网络路由消息,以便它们结束在正确的位置。

由于编程模型一次仅处理一个顶点(有时称为“像顶点一样思考”),所以框架可以以任意方式划分图形。理想情况下,如果它们需要进行大量的通信,那么它将被分割,以使顶点在同一台机器上共置。然而,寻找这样一个优化的分割在实践中是困难的,图形经常被任意分配的顶点ID分割,而不会尝试将相关的顶点分组在一起。

因此,图算法通常会有很多跨机器通信,而中间状态(节点之间发送的消息)往往比原始图大。通过网络发送消息的开销会显着减慢分布式图算法的速度。

出于这个原因,如果你的图可以放在一台计算机的内存中,那么单机(甚至可能是单线程)算法很可能会超越分布式批处理【73,74】。即使图形大于内存,也可以放在单个计算机的磁盘上,使用GraphChi等框架进行单机处理是一个可行的选择【75】。如果图形太大而不适合单个机器,像Pregel这样的分布式方法是不可避免的。有效的并行化图算法是一个正在进行的领域。

高级API和语言

自MapReduce第一次流行以来,分布式批处理的执行引擎已经成熟。到目前为止,基础设施已经足够强大,能够存储和处理超过10,000台机器群集上的数PB的数据。由于在这种规模下物理操作批处理过程的问题已经或多或少得到了解决,所以已经转向其他领域:改进编程模型,提高处理效率,扩大这些技术可以解决的问题集。

如前所述,Hive,Pig,Cascading和Crunch等高级语言和API由于手工编写MapReduce作业而变得非常流行。随着Tez的出现,这些高级语言还有额外的好处,可以移动到新的数据流执行引擎,而无需重写作业代码。 Spark和Flink也包括他们自己的高级数据流API,经常从FlumeJava中获得灵感【34】。

这些数据流API通常使用关系式构建块来表达一个计算:连接数据集以获取某个字段的值;按键分组元组;过滤一些条件;并通过计数,求和或其他函数来聚合元组。在内部,这些操作是使用本章前面讨论过的各种连接和分组算法来实现的。

除了需要较少代码的明显优势之外,这些高级接口还允许交互式使用,在这种交互式使用中,你可以将分析代码逐步编写到shell中并经常运行,以观察它正在做什么。这种发展风格在探索数据集和试验处理方法时非常有用。这也让人联想到Unix哲学,我们在第394页的“Unix哲学”中讨论过这个问题。

而且,这些高级接口不仅使人类使用系统的效率更高,而且提高了机器级别的工作执行效率。

向声明式查询语言的转变

与拼写执行连接的代码相比,指定连接为关系运算符的优点是,框架可以分析连接输入的属性,并自动决定哪个上述连接算法最适合手头的任务。 Hive,Spark和Flink都有基于代价的查询优化器,可以做到这一点,甚至可以改变连接顺序,使中间状态的数量最小化【66,77,78,79】。

连接算法的选择可以对批处理作业的性能产生很大的影响,不必理解和记住本章中讨论的各种连接算法。如果以声明的方式指定连接,则这是可能的:应用程序简单地说明哪些连接是必需的,查询优化器决定如何最好地执行连接。我们以前在第42页的“数据的查询语言”中遇到了这个想法。

但是,在其他方面,MapReduce及其数据流后继与SQL的完全声明性查询模型有很大不同。 MapReduce是围绕函数回调的思想构建的:对于每个记录或者一组记录,调用一个用户定义的函数(mapper或reducer),并且该函数可以自由地调用任意代码来决定输出什么。这种方法的优点是可以绘制在现有库的大型生态系统上进行分析,自然语言分析,图像分析以及运行数字或统计算法等。

轻松运行任意代码的自由是从MPP数据库(参见“比较Hadoop和分布式数据库”一节)中分离出来的MapReduce传统批处理系统。虽然数据库具有编写用户定义函数的功能,但是它们通常使用起来很麻烦,而且与大多数编程语言中广泛使用的程序包管理器和依赖管理系统(例如Maven for Java,npm for Java-Script,和Ruby的Ruby的Ruby)。

但是,数据流引擎已经发现,除了连接之外,在合并更多的声明性特征方面也是有优势的。例如,如果一个回调函数只包含一个简单的过滤条件,或者只是从一条记录中选择了一些字段,那么在调用每条记录的函数时会有相当大的CPU开销。如果以声明方式表示这样简单的过滤和映射操作,那么查询优化器可以利用面向列的存储布局(参阅“列存储”),并从磁盘只读取所需的列。 Hive,Spark DataFrames和Impala也使用向量化执行(参阅“内存带宽和向量处理”):在对CPU缓存很友好的内部循环中迭代数据,并避免函数调用。

Spark生成JVM字节码【79】,Impala使用LLVM为这些内部循环生成本机代码【41】。

通过将声明性方面与高级API结合起来,并使查询优化器可以在执行期间利用这些优化方法,批处理框架看起来更像MPP数据库(并且可以实现可比较的性能)。同时,通过具有运行任意代码和以任意格式读取数据的可扩展性,它们保持了灵活性的优势。

专业化的不同领域

尽管能够运行任意代码的可扩展性是有用的,但是在标准处理模式不断重复发生的情况下也有许多常见的情况,所以值得重用通用构建块的实现。传统上,MPP数据库满足了商业智能分析师和业务报告的需求,但这只是许多使用批处理的领域之一。

另一个越来越重要的领域是统计和数值算法,它们是机器学习应用(如分类和推荐系统)所需要的。可重复使用的实现正在出现:例如,Mahout在MapReduce,Spark和Flink之上实现了用于机器学习的各种算法,而MADlib在关系型MPP数据库(Apache HAWQ)中实现了类似的功能【54】。

空间算法也是有用的,例如最近邻搜索(kNN)【80】,它在一些多维空间中搜索与给定物品接近的物品 - 这是一种类似的搜索。近似搜索对于基因组分析算法也很重要,它们需要找到相似但不相同的字符串【81】。

批处理引擎正被用于分布式执行来自日益广泛的领域的算法。随着批处理系统获得内置功能和高级声明性操作符,并且随着MPP数据库变得更加可编程和灵活,两者开始看起来更相似:最终,它们都只是存储和处理数据的系统。

本章小结

在本章中,我们探讨了批处理的主题。我们首先查看了诸如awk,grep和sort之类的Unix工具,然后我们看到了这些工具的设计理念是如何运用到MapReduce和更新的数据流引擎中的。这些设计原则中的一些是输入是不可变的,输出是为了成为另一个(还未知的)程序的输入,而复杂的问题是通过编写“做一件好事”的小工具来解决的。

在Unix世界中,允许一个程序与另一个程序组合的统一界面是文件和管道;在MapReduce中,该接口是一个分布式文件系统。我们看到数据流引擎添加了自己的管道式数据传输机制,以避免将中间状态转化为分布式文件系统,但作业的初始输入和最终输出通常仍然是HDFS。

分布式批处理框架需要解决的两个主要问题是:

分区

在MapReduce中,Mapper根据输入文件块进行分区。Mapper的输出被重新分区,分类并合并到可配置数量的reducer分区中。这个过程的目的是把所有的相关数据 —— 例如,所有的记录都放在同一个地方。

后MapReduce数据流引擎尽量避免排序,除非它是必需的,但它们采取了大致类似的分区方法。

容错

MapReduce经常写入磁盘,这使得从单个失败的任务中轻松地恢复,而无需重新启动整个作业,但在无故障的情况下减慢了执行速度。数据流引擎对中间状态执行的实现较少,并且保留在内存中,这意味着如果节点发生故障,则需要推荐更多的数据。确定性运算符减少了需要重新计算的数据量。

我们讨论了几种MapReduce的连接算法,其中大多数也是在MPP数据库和数据流引擎中使用的。他们还提供了分区算法如何工作的一个很好的例子:

排序合并连接

每个正在连接的输入都通过一个提取连接键的Mapper。通过分区,排序和合并,具有相同键的所有记录最终都会进入reducer的相同调用。这个函数可以输出连接的记录。

广播散列连接

两个连接输入之一是小的,所以它没有分区,它可以被完全加载到一个哈希表。因此,你可以为大连接输入的每个分区启动一个Mapper,将小输入的散列表加载到每个Mapper中,然后一次扫描大输入一条记录,查询每条记录的散列表。

分区散列连接

如果两个连接输入以相同的方式分区(使用相同的键,相同的散列函数和相同数量的分区),则可以独立地为每个分区使用散列表方法。

分布式批处理引擎有一个有意限制的编程模型:回调函数(比如Mapper和Reducer)被认为是无状态的,除了指定的输出外,没有外部可见的副作用。这个限制允许框架隐藏抽象背后的一些硬分布式系统问题:面对崩溃和网络问题,任务可以安全地重试,任何失败任务的输出都被丢弃。如果某个分区的多个任务成功,则只有其中一个实际上使其输出可见。

得益于这个框架,你在批处理作业中的代码无需担心实现容错机制:框架可以保证作业的最终输出与没有发生错误的情况相同,也许不得不重新尝试各种任务。这些可靠的语义要比在线服务处理用户请求时经常使用的要多得多,而且在处理请求的副作用时写入数据库。

批量处理工作的显着特点是它读取一些输入数据并产生一些输出数据,而不修改输入—— 换句话说,输出是从输入衍生出的。重要的是,输入数据是有界的:它有一个已知的,固定的大小(例如,它包含一些时间点的日志文件或数据库内容的快照)。因为它是有界的,一个工作知道什么时候它完成了整个输入的读取,所以一个工作最终完成。

在下一章中,我们将转向流处理,其中的输入是未知的 —— 也就是说,你还有一份工作,但是它的输入是永无止境的数据流。在这种情况下,工作永远不会完成,因为在任何时候都可能有更多的工作进来。我们将看到流和批处理在某些方面是相似的。但是关于无尽数据流的假设,也对我们构建系统的方式产生了很大的改变。

参考文献

  1. Jeffrey Dean and Sanjay Ghemawat: “MapReduce: Simplified Data Processing on Large Clusters,” at 6th USENIX Symposium on Operating System Design and Implementation (OSDI), December 2004.

  2. Joel Spolsky: “The Perils of JavaSchools,” joelonsoftware.com, December 25, 2005.

  3. Shivnath Babu and Herodotos Herodotou: “Massively Parallel Databases and MapReduce Systems,” Foundations and Trends in Databases, volume 5, number 1, pages 1–104, November 2013. doi:10.1561/1900000036

  4. David J. DeWitt and Michael Stonebraker: “MapReduce: A Major Step Backwards,” originally published at databasecolumn.vertica.com, January 17, 2008.

  5. Henry Robinson: “The Elephant Was a Trojan Horse: On the Death of Map-Reduce at Google,” the-paper-trail.org, June 25, 2014.

  6. The Hollerith Machine,” United States Census Bureau, census.gov.

  7. IBM 82, 83, and 84 Sorters Reference Manual,” Edition A24-1034-1, International Business Machines Corporation, July 1962.

  8. Adam Drake: “Command-Line Tools Can Be 235x Faster than Your Hadoop Cluster,” aadrake.com, January 25, 2014.

  9. GNU Coreutils 8.23 Documentation,” Free Software Foundation, Inc., 2014.

  10. Martin Kleppmann: “Kafka, Samza, and the Unix Philosophy of Distributed Data,” martin.kleppmann.com, August 5, 2015.

  11. Doug McIlroy: Internal Bell Labs memo, October 1964. Cited in: Dennis M. Richie: “Advice from Doug McIlroy,” cm.bell-labs.com.

  12. M. D. McIlroy, E. N. Pinson, and B. A. Tague: “UNIX Time-Sharing System: Foreword,” The Bell System Technical Journal, volume 57, number 6, pages 1899–1904, July 1978.

  13. Eric S. Raymond: The Art of UNIX Programming. Addison-Wesley, 2003. ISBN: 978-0-13-142901-7

  14. Ronald Duncan: “Text File Formats – ASCII Delimited Text – Not CSV or TAB Delimited Text,” ronaldduncan.wordpress.com, October 31, 2009.

  15. Alan Kay: “Is 'Software Engineering' an Oxymoron?,” tinlizzie.org.

  16. Martin Fowler: “InversionOfControl,” martinfowler.com, June 26, 2005.

  17. Daniel J. Bernstein: “Two File Descriptors for Sockets,” cr.yp.to.

  18. Rob Pike and Dennis M. Ritchie: “The Styx Architecture for Distributed Systems,” Bell Labs Technical Journal, volume 4, number 2, pages 146–152, April 1999.

  19. Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung: “The Google File System,” at 19th ACM Symposium on Operating Systems Principles (SOSP), October 2003. doi:10.1145/945445.945450

  20. Michael Ovsiannikov, Silvius Rus, Damian Reeves, et al.: “The Quantcast File System,” Proceedings of the VLDB Endowment, volume 6, number 11, pages 1092–1101, August 2013. doi:10.14778/2536222.2536234

  21. OpenStack Swift 2.6.1 Developer Documentation,” OpenStack Foundation, docs.openstack.org, March 2016.

  22. Zhe Zhang, Andrew Wang, Kai Zheng, et al.: “Introduction to HDFS Erasure Coding in Apache Hadoop,” blog.cloudera.com, September 23, 2015.

  23. Peter Cnudde: “Hadoop Turns 10,” yahoohadoop.tumblr.com, February 5, 2016.

  24. Eric Baldeschwieler: “Thinking About the HDFS vs. Other Storage Technologies,” hortonworks.com, July 25, 2012.

  25. Brendan Gregg: “Manta: Unix Meets Map Reduce,” dtrace.org, June 25, 2013.

  26. Tom White: Hadoop: The Definitive Guide, 4th edition. O'Reilly Media, 2015. ISBN: 978-1-491-90163-2

  27. Jim N. Gray: “Distributed Computing Economics,” Microsoft Research Tech Report MSR-TR-2003-24, March 2003.

  28. Márton Trencséni: “Luigi vs Airflow vs Pinball,” bytepawn.com, February 6, 2016.

  29. Roshan Sumbaly, Jay Kreps, and Sam Shah: “The 'Big Data' Ecosystem at LinkedIn,” at ACM International Conference on Management of Data (SIGMOD), July 2013. doi:10.1145/2463676.2463707

  30. Alan F. Gates, Olga Natkovich, Shubham Chopra, et al.: “Building a High-Level Dataflow System on Top of Map-Reduce: The Pig Experience,” at 35th International Conference on Very Large Data Bases (VLDB), August 2009.

  31. Ashish Thusoo, Joydeep Sen Sarma, Namit Jain, et al.: “Hive – A Petabyte Scale Data Warehouse Using Hadoop,” at 26th IEEE International Conference on Data Engineering (ICDE), March 2010. doi:10.1109/ICDE.2010.5447738

  32. Cascading 3.0 User Guide,” Concurrent, Inc., docs.cascading.org, January 2016.

  33. Apache Crunch User Guide,” Apache Software Foundation, crunch.apache.org.

  34. Craig Chambers, Ashish Raniwala, Frances Perry, et al.: “FlumeJava: Easy, Efficient Data-Parallel Pipelines,” at 31st ACM SIGPLAN Conference on Programming Language Design and Implementation (PLDI), June 2010. doi:10.1145/1806596.1806638

  35. Jay Kreps: “Why Local State is a Fundamental Primitive in Stream Processing,” oreilly.com, July 31, 2014.

  36. Martin Kleppmann: “Rethinking Caching in Web Apps,” martin.kleppmann.com, October 1, 2012.

  37. Mark Grover, Ted Malaska, Jonathan Seidman, and Gwen Shapira: Hadoop Application Architectures. O'Reilly Media, 2015. ISBN: 978-1-491-90004-8

  38. Philippe Ajoux, Nathan Bronson, Sanjeev Kumar, et al.: “Challenges to Adopting Stronger Consistency at Scale,” at 15th USENIX Workshop on Hot Topics in Operating Systems (HotOS), May 2015.

  39. Sriranjan Manjunath: “Skewed Join,” wiki.apache.org, 2009.

  40. David J. DeWitt, Jeffrey F. Naughton, Donovan A. Schneider, and S. Seshadri: “Practical Skew Handling in Parallel Joins,” at 18th International Conference on Very Large Data Bases (VLDB), August 1992.

  41. Marcel Kornacker, Alexander Behm, Victor Bittorf, et al.: “Impala: A Modern, Open-Source SQL Engine for Hadoop,” at 7th Biennial Conference on Innovative Data Systems Research (CIDR), January 2015.

  42. Matthieu Monsch: “Open-Sourcing PalDB, a Lightweight Companion for Storing Side Data,” engineering.linkedin.com, October 26, 2015.

  43. Daniel Peng and Frank Dabek: “Large-Scale Incremental Processing Using Distributed Transactions and Notifications,” at 9th USENIX conference on Operating Systems Design and Implementation (OSDI), October 2010.

  44. "Cloudera Search User Guide," Cloudera, Inc., September 2015.

  45. Lili Wu, Sam Shah, Sean Choi, et al.: “The Browsemaps: Collaborative Filtering at LinkedIn,” at 6th Workshop on Recommender Systems and the Social Web (RSWeb), October 2014.

  46. Roshan Sumbaly, Jay Kreps, Lei Gao, et al.: “Serving Large-Scale Batch Computed Data with Project Voldemort,” at 10th USENIX Conference on File and Storage Technologies (FAST), February 2012.

  47. Varun Sharma: “Open-Sourcing Terrapin: A Serving System for Batch Generated Data,” engineering.pinterest.com, September 14, 2015.

  48. Nathan Marz: “ElephantDB,” slideshare.net, May 30, 2011.

  49. Jean-Daniel (JD) Cryans: “How-to: Use HBase Bulk Loading, and Why,” blog.cloudera.com, September 27, 2013.

  50. Nathan Marz: “How to Beat the CAP Theorem,” nathanmarz.com, October 13, 2011.

  51. Molly Bartlett Dishman and Martin Fowler: “Agile Architecture,” at O'Reilly Software Architecture Conference, March 2015.

  52. David J. DeWitt and Jim N. Gray: “Parallel Database Systems: The Future of High Performance Database Systems,” Communications of the ACM, volume 35, number 6, pages 85–98, June 1992. doi:10.1145/129888.129894

  53. Jay Kreps: “But the multi-tenancy thing is actually really really hard,” tweetstorm, twitter.com, October 31, 2014.

  54. Jeffrey Cohen, Brian Dolan, Mark Dunlap, et al.: “MAD Skills: New Analysis Practices for Big Data,” Proceedings of the VLDB Endowment, volume 2, number 2, pages 1481–1492, August 2009. doi:10.14778/1687553.1687576

  55. Ignacio Terrizzano, Peter Schwarz, Mary Roth, and John E. Colino: “Data Wrangling: The Challenging Journey from the Wild to the Lake,” at 7th Biennial Conference on Innovative Data Systems Research (CIDR), January 2015.

  56. Paige Roberts: “To Schema on Read or to Schema on Write, That Is the Hadoop Data Lake Question,” adaptivesystemsinc.com, July 2, 2015.

  57. Bobby Johnson and Joseph Adler: “The Sushi Principle: Raw Data Is Better,” at Strata+Hadoop World, February 2015.

  58. Vinod Kumar Vavilapalli, Arun C. Murthy, Chris Douglas, et al.: “Apache Hadoop YARN: Yet Another Resource Negotiator,” at 4th ACM Symposium on Cloud Computing (SoCC), October 2013. doi:10.1145/2523616.2523633

  59. Abhishek Verma, Luis Pedrosa, Madhukar Korupolu, et al.: “Large-Scale Cluster Management at Google with Borg,” at 10th European Conference on Computer Systems (EuroSys), April 2015. doi:10.1145/2741948.2741964

  60. Malte Schwarzkopf: “The Evolution of Cluster Scheduler Architectures,” firmament.io, March 9, 2016.

  61. Matei Zaharia, Mosharaf Chowdhury, Tathagata Das, et al.: “Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing,” at 9th USENIX Symposium on Networked Systems Design and Implementation (NSDI), April 2012.

  62. Holden Karau, Andy Konwinski, Patrick Wendell, and Matei Zaharia: Learning Spark. O'Reilly Media, 2015. ISBN: 978-1-449-35904-1

  63. Bikas Saha and Hitesh Shah: “Apache Tez: Accelerating Hadoop Query Processing,” at Hadoop Summit, June 2014.

  64. Bikas Saha, Hitesh Shah, Siddharth Seth, et al.: “Apache Tez: A Unifying Framework for Modeling and Building Data Processing Applications,” at ACM International Conference on Management of Data (SIGMOD), June 2015. doi:10.1145/2723372.2742790

  65. Kostas Tzoumas: “Apache Flink: API, Runtime, and Project Roadmap,” slideshare.net, January 14, 2015.

  66. Alexander Alexandrov, Rico Bergmann, Stephan Ewen, et al.: “The Stratosphere Platform for Big Data Analytics,” The VLDB Journal, volume 23, number 6, pages 939–964, May 2014. doi:10.1007/s00778-014-0357-y

  67. Michael Isard, Mihai Budiu, Yuan Yu, et al.: “Dryad: Distributed Data-Parallel Programs from Sequential Building Blocks,” at European Conference on Computer Systems (EuroSys), March 2007. doi:10.1145/1272996.1273005

  68. Daniel Warneke and Odej Kao: “Nephele: Efficient Parallel Data Processing in the Cloud,” at 2nd Workshop on Many-Task Computing on Grids and Supercomputers (MTAGS), November 2009. doi:10.1145/1646468.1646476

  69. Lawrence Page, Sergey Brin, Rajeev Motwani, and Terry Winograd: “The PageRank

  70. Leslie G. Valiant: “A Bridging Model for Parallel Computation,” Communications of the ACM, volume 33, number 8, pages 103–111, August 1990. doi:10.1145/79173.79181

  71. Stephan Ewen, Kostas Tzoumas, Moritz Kaufmann, and Volker Markl: “Spinning Fast Iterative Data Flows,” Proceedings of the VLDB Endowment, volume 5, number 11, pages 1268-1279, July 2012. doi:10.14778/2350229.2350245

  72. Grzegorz Malewicz, Matthew H. Austern, Aart J. C. Bik, et al.: “Pregel: A System for Large-Scale Graph Processing,” at ACM International Conference on Management of Data (SIGMOD), June 2010. doi:10.1145/1807167.1807184

  73. Frank McSherry, Michael Isard, and Derek G. Murray: “Scalability! But at What COST?,” at 15th USENIX Workshop on Hot Topics in Operating Systems (HotOS), May 2015.

  74. Ionel Gog, Malte Schwarzkopf, Natacha Crooks, et al.: “Musketeer: All for One, One for All in Data Processing Systems,” at 10th European Conference on Computer Systems (EuroSys), April 2015. doi:10.1145/2741948.2741968

  75. Aapo Kyrola, Guy Blelloch, and Carlos Guestrin: “GraphChi: Large-Scale Graph Computation on Just a PC,” at 10th USENIX Symposium on Operating Systems Design and Implementation (OSDI), October 2012.

  76. Andrew Lenharth, Donald Nguyen, and Keshav Pingali: “Parallel Graph Analytics,” Communications of the ACM, volume 59, number 5, pages 78–87, May 2016. doi:10.1145/2901919

  77. Fabian Hüske: “Peeking into Apache Flink's Engine Room,” flink.apache.org, March 13, 2015.

  78. Mostafa Mokhtar: “Hive 0.14 Cost Based Optimizer (CBO) Technical Overview,” hortonworks.com, March 2, 2015.

  79. Michael Armbrust, Reynold S Xin, Cheng Lian, et al.: “Spark SQL: Relational Data Processing in Spark,” at ACM International Conference on Management of Data (SIGMOD), June 2015. doi:10.1145/2723372.2742797

  80. Daniel Blazevski: “Planting Quadtrees for Apache Flink,” insightdataengineering.com, March 25, 2016.

  81. Tom White: “Genome Analysis Toolkit: Now Using Apache Spark for Data Processing,” blog.cloudera.com, April 6, 2016.


上一章 目录 下一章
第三部分:派生数据 设计数据密集型应用 第十章:流处理

Footnotes

  1. 有些人喜欢抬杠,认为cat这里并没有必要,因为输入文件可以直接作为awk的参数。 但这种写法让流水线更显眼。

  2. 统一接口的另一个例子是URL和HTTP,这是Web的基础。 一个URL标识一个网站上的一个特定的东西(资源),你可以链接到任何其他网站的任何网址。 具有网络浏览器的用户因此可以通过跟随链接在网站之间无缝跳转,即使服务器可能由完全不相关的组织操作。 这个原则今天似乎很明显,但它是使网络取得今天成功的关键。 之前的系统并不是那么统一:例如,在公告板系统(BBS)时代,每个系统都有自己的电话号码和波特率配置。 从一个BBS到另一个BBS的引用必须以电话号码和调制解调器设置的形式; 用户将不得不挂断,拨打其他BBS,然后手动找到他们正在寻找的信息。 这是不可能的直接链接到另一个BBS内的一些内容。

  3. ****巴尔干化(Balkanization)**是一个常带有贬义的地缘政治学术语,其定义为:一个国家或政区分裂成多个互相敌对的国家或政区的过程。

  4. 除了使用一个单独的工具,如netcatcurl。 Unix开始试图将所有东西都表示为文件,但是BSD套接字API偏离了这个惯例【17】。研究用操作系统Plan 9和Inferno在使用文件方面更加一致:它们将TCP连接表示为/net/tcp中的文件【18】。

  5. 一个不同之处在于,对于HDFS,可以将计算任务安排在存储特定文件副本的计算机上运行,而对象存储通常将存储和计算分开。如果网络带宽是一个瓶颈,从本地磁盘读取有性能优势。但是请注意,如果使用删除编码,局部优势将会丢失,因为来自多台机器的数据必须进行合并以重建原始文件【20】。

  6. 我们在本书中讨论的连接通常是等值连接,即最常见的连接类型,其中记录与其他记录在特定字段(例如ID)中具有相同的值相关联。有些数据库支持更一般的连接类型,例如使用小于运算符而不是等号运算符,但是我们没有空间来覆盖它们。

  7. 这个例子假定散列表中的每个键只有一个条目,这对用户数据库(用户ID唯一标识一个用户)可能是正确的。通常,哈希表可能需要包含具有相同键的多个条目,并且连接运算符将输出关键字的所有匹配。