MapReduce论文分享

大纲

  • MapReduce设计方案

    • 整体架构与执行流程
    • master设计
    • 如何容错
      • worker故障
      • master失败
    • 存储,调度与带宽
    • 如何切分任务
    • 预测执行
  • MapReduce特性和扩展功能

    • 分区函数
    • reduce顺序处理
    • combiner函数
    • 输入输出类型支持
    • 自动忽略坏数据
    • local引擎支持
    • job状态监控
    • counter

MapReduce设计方案

整体架构

关键角色

  • master:分配map和reduce任务;收集worker上报的数据;向worker下发信息和指令。
  • worker:执行map和reduce任务;向master汇报状态,上报数据;
    • 执行map任务的称为map worker
    • 执行reduce任务的称为reduce worker

执行流程

  • 将输入文件切分为M个数据分片,每个分片大小一般从16MB~64MB,用户可配置
  • Master将创建M个map任务,R个reduce任务,并将这M+R个任务,分配给空闲worker执行,每个空闲worker一次执行一个任务
  • map worker从输入数据分片中读入数据,解析成key-value pair(其中key是文件偏移量,value是一条文本行数据),并将key-value pair传递给用户注册的map函数。map函数对输出key-value pair处理完后,要求产出中间key-value pair
  • 每个map worker产出的中间key-value pair会被缓存在内存中,并通过分区函数分到R个分区中,缓存一定量后写入map worker执行时的本地磁盘,并记录下存储路径,回传给master。每个map任务的输出都会被分到R个分区中,整个集群总共产出M*R个中间输出文件
  • master将中间输出文件所在的机器+路径发送给reduce worker,所有map worker产出的所有R[i]输出文件将发送给R[i] reduce worker
  • reduce worker通过RPC从对应机器上读取中间输出文件,读取完所有数据后,按key进行排序,再将所有相同key的数据聚合在一起,形成\<key, iterator(value)>的结构。若内存不足以完成排序,则借助外部存储。
  • reudce worker将\<key, iterator(value)>传递到用户注册的reduce函数。reduce函数处理结果追加到所属分区的输出文件。一个reduce worker产出一个最终输出文件,保存在全局分布式文件系统上

master设计

论文中没有给出master的设计方案,更没有给出工程实现细节,只是针对master的功能,简述其需要维护的数据结构

  • 保存每个map和reduce任务的状态(空闲,执行中,完成)
  • 保存worker机器的标识
  • 保存worker进程上传的中间输出文件的位置

如何容错

由于map-reduce程序运行在大规模分布式集群中,发生错误是常态,需要考虑如何容错。论文中以work故障和master失败两方面介绍

worker故障

如何判断worker的运行状态?

master周期性ping每个worker,并期望在一个约定时间内收到work返回的信息。若没有如期收到Worker回复,将相应的worker标记为失效

worker失效后怎么办?

所有由该worker完成的和正在执行中的map任务,都会被重新置为空闲状态,等待master重新调度到其他worker重新执行。worker失败后,导致了map任务产出的中间输出文件无法访问,所以即使是已完成的map任务也需要重新执行;而已完成的reduce任务,最终输出文件是保存在全局分布式文件系统上,因此不需要重新执行。

master失败

论文中提到的解决方案

提出了以checkpoint的方案来解决master失败的问题,周期性将master维护的数据结构和状态写入磁盘。master失败后,从最后一个checkpoint开始启动新的master进程,恢复master状态

实际工程实现

但其工程实现上只有一个master进程,master失败后会终止整个MapReduce作业。

存储,调度与带宽

输入文件保存在GFS中,读取时会涉及到网络传输,而带宽资源匮乏,在调度任务时要着重考虑带宽问题。GFS把每个文件按64MB一个Block分隔,每个Block一般以3副本的形式保存在集群不同机器中。

如何调度map任务,策略是什么?

  • 原则:副本文件离map任务越近越好
  • map任务从GFS读取文件,master会尽量将一个map任务调度到包含相关副本的机器上执行,否则就调度到附加的机器上执行。

如何切分任务

整个MapReduce作业由M个map和R个reduce任务组成,如何决定M和R的值,是影响整个作业性能的关键点。论文中从集群的动态负载能力故障恢复速度资源占用等方面进行阐述。

  • M和R的值应该比worker的机器数量多得多,使得每个work执行大量不同的任务,以此提高集群的动态负载能力。
  • 任务被切分得更细,使得一个任务被重试的代价更小。
  • 任务被切分得更细,当一个worker故障,其处理完及正在处理的map任务,能分布到更多的worker中重新执行,加快了故障恢复的速度
  • master需要维护M+R个任务的状态,每个任务状态占1 byte即可(1 byte = 8 bits,可以表达256种状态),因为内存占用并不可观。

设置M和R值的建议

  • R值一般是用户根据业务需求进行设置。建议设置为worker机器数量的较小的倍数
  • 设置M值时,应考虑总输入数据量,使得每个map任务能读入16M到64M的数据为佳
  • 一个常用的设置比例:M=200000,R=5000,2000台worker机器

预测执行

另一个影响MapReduce作业总执行时间的关键因素就是长尾任务,指那些最后阶段,要花很长时间才能完成的map和reduce任务。导致长尾任务出现的原因有很多,例如:

  • 机器故障,导致从分布式文件系统读取文件时速度下降
  • 调度不均导致多个任务被调度到一台机器上执行,CPU,内存,本地磁盘,网络带宽等计算资源被抢占
  • 代码bug
  • ……

论文中就怎么解决长尾任务的问题,提出了一种解决方案:

  • 当整个MapReduce作业接近完成时,master会针对正在执行中的任务(in-process tasks),启动备用任务(backup executions)。备用任务可看作是执行中任务的副本。
  • 原任务或备用任务中的任意一个执行完成,都认为该任务已经完成。
  • 该解决方案只会多占用少于10%的计算资源。

MapReduce的特性和扩展功能

具备基本功能后,MapReduce框架还提供了不少扩展功能。论文中介绍了这些扩展功能的原理以及是如何提升MapReduce框架性能的。

分区函数

分区函数作用在map和reduce之间,即map任务产出的中间key-value pair如何分配到R个reduce任务中。MapReduce框架提供的默认分区函数为(hash(key) mod R),同时提供了编程接口,允许用户注册自定义的分区函数

reduce顺序处理

reduce任务在处理map任务产出的中间key-value pair时,保证按照key值增量顺序处理,这样也保证了每个reduce任务产出的最终输出文件也是按key值增量保存

combiner函数

当reduce函数满足结合律和交换律时,可以在map任务中对输出数据先进行一次预聚合,以减少map任务产出的中间输出。

  • combiner函数和reduce函数一般是相同的
  • MapReduce框架负责将combiner函数的输出写入中间结果文件,将reduce函数的输出写入最后输出文件

输入输出类型支持

论文提及了MapReduce框架支持多种数据输入输出格式,框架自身提供了一些预定义类型,同时允许用户注册自定义类型。本节中对一些预定义类型进行了介绍:

  • 文本模式下,输入文件的每一行作为一个key-value pair,key是文件偏移量,value是该行内容
  • 对key排序后的key-value pair序列

同时提供了扩展思路,读取数据不一定从文本文件中读入,可以实现自定义的reader,从数据库或内存数据结构中读取。

跳过损坏数据

脏数据或者用户程序bug可能导致map或reduce函数在处理某些key-value pair时crash。MapReduce框架提供了一种机制,在确定了某些记录多次引起crash后,允许跳过这些损坏记录,在论文给出实现方案:

  • worker设置了信号处理函数捕获segmentation violation和bus error
  • 在worker处理记录导致crash时,在信号处理函数中通过UDP包向master汇报引起crash的记录序号
  • 当记录失败次数达到规定的阈值时,master标记该记录需要被跳过,在下次调度任务时,通知worker忽略该记录。

local引擎支持

为了方便用户调试程序,提供了本地执行版本

job状态信息

master内嵌HTTP服务器,用于向用户展示作业执行状态,如执行进度,输入字节数,中间产出字节数,输出字节数,计算资源占用率等

counter

counter用于统计一个作业中不同事件发生的次数。counter的值,附加在worker回复给master的ping包中,周期性汇报给master,master将执行成功的map和reduce任务的counter值进行累加。

master启动的备用任务,worker故障后被重新调度的map任务,都会向master汇报counter值。master在设计上要考虑避免重复累加这些counter

坚持原创分享,您的支持将鼓励我继续创作!