FlumeJava论文分享

大纲

  • 提出FlumeJava的背景
  • FlumeJava是什么?解决什么问题?已有哪些应用?
  • FlumeJava特性介绍:
    • 核心数据集抽象
    • 编程接口
    • 优化器
  • FlumeJava中未实现的优化策略

提出FlumeJava的背景

  • MapReduce的出现大大降低了大规模数据并行处理的门槛,但实际应用场景中需要的不止是单一的map+reduce,而是MapReduce Pipeline
  • 开发,调试,管理一个MapReduce Pipeline很困难:
    • 需要编写多个map和reduce程序
    • 需要编写额外代码将这些map,reduce组织起来
    • 自行管理这些map,reduce产出的中间数据
    • 新加入的开发者接手代码后,只有分散的map,reduce程序,而不是一个整体的计算处理逻辑,开发难度大
    • 计算逻辑发生变化后,要修改多个map,reduce程序,维护成本大
  • 需要一套数据并行处理pipeline,屏蔽维护pipeline的细节,使用户专注在计算逻辑上

FlumeJava是什么?解决什么问题?已有哪些应用?

  • 在开发者看来,FlumeJava是一个Java库。定义了一系列的不可变的并行数据集抽象;同时为这些数据集抽象定义了适量的函数方法,可对这些数据集进行并行处理
  • FlumeJava为数据并行处理提供了统一抽象,屏蔽了如何描述不同类型的并行数据集处理流程中的细节
  • 将计算逻辑转化为多个MapReduce组成的pipeline,清除中间数据产出,重试计算,cache等工作,都由FlumeJava管理。
  • 使用FlumeJava开发的应用,性能接近于有经验的工程师经过手动优化后的MapReduce程序
  • 截止到2010年3月,在Google内部拥有175个开发者。相当数量的产品使用FlumeJava开发(无具体数据)

FlumeJava特性概述

  • 核心数据集抽象
    • PCollection
    • PTable
    • PObject
  • 编程接口
    • 底层接口:parallelDo & DoFn,groupByKey,combineValues,flatten
    • 高级衍生接口:count,join,top ……
  • 优化器

核心数据集抽象

PCollection<T>

  • 用于描述不可变的数据集
  • 支持有序(sequence)和无序(collection)数据集合
  • 可由内存中的Java Collection对象构造得到,也可以从外部存储中读取数据进行构造
  • 支持序列化:recordsOf();开发者可从外部存储中读取任意格式(文本or二进制)的数据,然后通过recordsOf接口将数据转化为Java对象
  • 不支持PCollection<PCollection<T>>

PTable<K,V>

  • PTable<K,V>从实现上是PCollection<Pair<K,V>>的子类,用于描述无序的不可变的Key-Value Pair数据集,Key允许重复
  • PTable是一种特殊的PCollection的,应用在PCollection<T>上的方法也可以应用在PTable<K,V>上

PObject<T>

  • 用于承载单个Java对象
  • 在pipeline run结束后,可通过getValue()方法,将PObject内装载的Java对象取出
  • 可通过PColleciton上的asSequentialCollection()方法将一个PCollection<T>转化为PObject<Collection<T>>,再通过getValue()将Collection<T>取出。
1
2
PTable<String,Integer> wordCounts = ...;
PObject<Collection<Pair<String,Integer>>> result = wordCounts.asSequentialCollection();
  • 可通过PCollection上的combine方法,将PCollection<T>聚合为PObject<T>
1
2
3
PCollection<Boolean> haveConverged = ...;

PObject<Boolean> allHaveConverged = haveConverged.combine(AND_BOOLS);

编程接口

底层接口

FlumeJava定义了几个底层接口,用来操作它所定义的数据集抽象(PCollection,PTable,PObject)。

由这几个底层接口,能完成所有对数据集抽象的操作。

这些底层接口在真正进行数据处理时,都是并行执行的。

另外,还支持一些高级衍生接口,其本质是对底层接口的封装,通过组合不同的底层接口来完成功能。

parallelDo & DoFn<T,S>

输入一个PCollection<T>,将其中的每个T类型对象转化为S类型对象,输出一个PCollection<S>

  • 开发者向parallelDo注册回调函数DoFn<T,S>,在DoFn<T,S>中实现业务逻辑
  • parallelDo遍历PCollection<T>中的每个元素,对每个元素执行一遍DoFn<T,S>
  • DoFn<T,S>是一个仿函数对象,将一个T类型Java对象转化为S类型Java对象
  • 可在parallelDo中,决定产出的PCollection的组织结构:
    • collectionOf():产出的PCollection内的元素是无序集合
    • sequenceOf():产出的PCollection内的元素是有序集合
    • tableOf():产出的PCollection内的元素要求是Key-Value pair,产出一个PTable
  • 一个parallelDo操作最后会转化为一个map或一个reduce节点
1
2
3
4
5
6
7
PCollection<String> words = lines.parallelDo(new DoFn<String,String>() {
void process(String line, EmitFn<String> emitFn) {
for (String word : splitIntoWords(line)) {
emitFn.emit(word);
}
}
}, collectionOf(strings()));

groupByKey

将multi-key的PTable<K,V>转化为uni-key的PTable<K,Collection<V>>

  • groupByKey会被FlumeJava转化为MapReduce的Shuffle阶段
1
2
3
PTable<URL,DocInfo> backlinks = ...;

PTable<URL,Collection<DocInfo>> referringDocInfos = backlinks.groupByKey();

combineValues

将PTable<K,Collection<V>>转化为PTable<K,V>

  • 开发者向combineValues注册combining function,该回调函数将Collection<T>聚合为单个T类型对象
1
2
PTable<String,Collection<Integer>> groupedWordsWithOnes = ...;
PTable<String,Integer> wordCounts = groupedWordsWithOnes.combineValues(SUM_INTS);

flatten

将一系列的PCollection<T>数据全部展开,最后组成一个PCollection<T>

  • 在实现上,不会将所有PCollection<T>内的元素复制出来,只是创建了一个新的PCollection<T>,并记录下flatten操作,在pipeline run时才触发计算
  • 论文中没有给出flatten()的代码示例,可能的接口风格如:
1
2
3
Collection<PCollection<T>> pcs = ...;

PCollection<T> pc = flatten(pcs);

通过单词计数的例子,使用所有底层接口(flatten除外)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 从外部存储中读取文本文件,并解析为一个包含单词的PCollection
PCollection<String> words = ...;


// 通过parallelDo,为每个单词附上数量1
// 由PCollection<String>转化为PCollection<Pair<String,Integer>>
// 再通过tableOf,将PCollection<Pair<String,Integer>>转化为PTable<String,Integer>
PTable<String,Integer> wordsWithOnes = words.parallelDo(
new DoFn<String, Pair<String,Integer>>() {
void process(String word, EmitFn<Pair<String,Integer>> emitFn) {
emitFn.emit(Pair.of(word, 1));
}
}, tableOf(strings(), ints()));


// 通过groupByKey,将单词进行分组
// 将PTable<String,Integer>转化为PTable<String,Collection<Integer>>
PTable<String,Collection<Integer>> groupedWordsWithOnes = wordsWithOnes.groupByKey();


// 通过combineValue对单词进行计数
// 将PTable<String,Collection<Integer>>聚合为PTable<String,Integer> PTable<String,Integer> wordCounts = groupedWordsWithOnes.combineValues(SUM_INTS);

高级衍生接口

高级接口内部是通过调用底层接口来实现功能的,开发者也可以自己基于底层接口进行封装。FlumeJava提供了一些常用的高级接口。

count

输入PCollection<T>,返回PTable<T,Integer>

  • parallelDo + groupByKey + combineValues
  • parallelDo:PCollection<T> —> PTable<T,Integer>
  • groupByKey:PTable<T,Integer> —> PTable<T,Collection<Integer>>
  • combineValues: PTable<T,Collection<Integer>> —> PTable<T,Integer>

join

输入PTable<K,V1>, PTable<K,V2>,输出PTable<K,Tuple2<Collection<V1>, Collection<V2>>>

  • parallelDo + flatten + groupByKey + parallelDo
  • parallelDo: PTable<K, Vi> —> Collection<PTable<K, TaggedUnion2<V1,V2>>> (存疑)
  • flatten: Collection<PTable<K, TaggedUnion2<V1,V2>>> —> PTable<K, TaggedUnion2<V1,V2>>
  • groupByKey: PTable<K, TaggedUnion2<V1,V2>> —> PTable<K,Collection<TaggedUnion2<V1,V2>>>
  • parallelDo(): PTable<K,Collection<TaggedUnion2<V1,V2>>> —> PTable<K,Tuple2<Collection<V1>, Collection<V2>>>

优化器

ParallelDo producer-consumer fusion

若parallelDo f的输入是parallelDo g的输入,则f和g可以合并为一个parallelDo,避免产出中间结果文件

ParallelDo sibling fusion

若多个parallelDo,接受同一个PCollection作为输入,那么这些parallelDo可以合并为一个parallelDo,合并后的parallelDo是一个多输出的算子(原本的parallelDo各自有自己的输出)

ParallelDo producer-consumer fusion + ParallelDo sibling fusion

因此这两种优化,可以统一为一种优化。若多个parallelDo有一个共同的上游parallelDo,则这个上游parallelDo以及所有下游parallelDo可以合并到一起

The MapShuffleCombineReduce (MSCR) Operation

FlumeJava中定义的核心操作,可将开发者的计算逻辑,划分为MapReduce任务

  • 有M个input channels,每个代表 一个map
  • 有R个output channels,每个代表 一次shuffle+combine+一个reduce
  • input channels:
    • 每个input channels,接受一个PCollection<T>作为输入,并产出R路输出,每路输出的类型均为PTable<K,V>
    • input channels可以任意emit那R路输出
  • output channels:
    • 由于每个input channels都产出R路输出,所有每个output channels都会接受M个输入
    • output channels使用flatten将M个PTable<K,V>展开为一个PTable<K,V>
    • 随后可能对展开后的PTable执行两种不同的操作:
      • grouping: 执行groupByKey+combineValues,产出R路输出,分别传递给R个reduce
      • pass-through: 直接将数据写出到外部存储
      • pass-through操作,用于实现,将MapReduce中map的产出,直接写到存储并作为最终计算的结果

下面是一张典型的MSCR操作示意图,其中第1,2路执行了grouping操作,第3路执行了pass-through操作:

MSCR Fusion

  • 一组相关联的groupByKey调用,将产生一个MSCR操作
  • 怎么算是相关联的groupByKey的调用?
    • 如果多个groupByKey操作,接受了同一个parallelDo的输出作为输入,则这些groupByKey操作是相关联的
    • 关联性传染,若GBK1和GBK2相关联,GBK2和GBK3相关联,则认为GBK1,GBK2,GBK3相关联
    • 这些相关联的groupByKey将合并到MSCR操作中
    • 作为共同上游的parallelDo也将合并到MSCR操作中,作为一个input channel
  • MSCR中的groupByKey的上游parallelDo,也将合并到MSCR中,作为一个input channel
  • 每个groupByKey之后若调用了combineValue,该combineValue也合并到MSCR中
  • groupByKey或者groupByKey+combineValue的输出被作为某些parallelDo的输入,则这些parallelDo也将合并到MSCR中

下面这张图展示了MSCR操作是怎么被构造出来的

Sink Flattens

flatten操作下沉(消除),创造ParallelDo Fusion的机会

  • 若flatten后面紧接一个下游parallelDo,可以将该flatten下沉到parallelDo之后
  • 下沉后,可以将flatten的上游parallelDo和下游parallelDo合并
  • flatten有多个上游parallelDo,要对应地复制下游parallelDo


Lift CombineValues operations

若combineValues操作后面紧跟一个groupByKey操作,则该combineValues可视为一个普通的parallelDo,可参与parallelDo Fusion

Insert fusion blocks

当两个groupByKey操作之间,有一条parallelDo调用链,这两个groupByKey最终会被划分到两个MSCR Fusion中。

两个GBK之间的所有parallelDo,将面临着如何划分的问题:是应该划分到上游的MSCR中作为output channel,还是划分到下游的MSCR中作为input channel?

FlumeJava的做法是:

  • 在paralleDo链中,找到输出数据量最小的那个parallelDo,以此为边界
  • 上游的parallelDo划分到上游MSCR中作为output channel,下游parallelDo划分到下游MSCR中作为input channel

FlumeJava中未实现的优化策略

  • 为DoFn添加hint参数,用于预测DoFn内部的行为:
    • 例如,开发者可以预估DoFn的输出数据量,FlumeJava根据预估的输出量,决定该算子在本地还是MapReduce集群上执行。
  • 分析开发者编写的业务函数,优化后重新生成新的代码,并与FlumeJava代码内联到一起进行整体优化
  • 去除重复和无用的算子
  • 结合底层MapReduce计算引擎,实现动态监控,在一定的时间间隔内汇报输出数据量,实时调整并行处理的MapReduce worker数量和占用的CPU,IO资源
坚持原创分享,您的支持将鼓励我继续创作!