Wednesday, September 30, 2015

MapReduce 算法思想

对MapReduce的过程有一些了解,
Map: 对读到的每一条数据,产生一个<key, value> pair
Reduce: 以key为依据,将这些<key, value> pair汇总,并输出。
每一个Reducer 只能处理 同一个key 的 <key, value > data。

首先我们需要确定用多少个map的Machine, 多少个reduce的 Machine.
这些map的机器分别处理一部分input的数据,并且在local file上产生
《key, value》 pair, 然后有一个sort and shuffle的过程, 将产生的<key, value>
pair 按照key的顺序 shuffle到不同的机器上,并且使得Key的顺序是递增的,
这样做的目的是为了reduce的阶段产生的output file 里面的key, value 是有序的,
方便以后文件的利用。 最后 reduce 阶段, reduce的每个机器对shuffle 阶段产生的
intermedia file 按照Key进行汇总, 并且将结果存到  output file里面。

参照 Terasort的map reduce 思想, 我的理解是, 我们需要先确定 需要多少个amp 的machine, 多少个reduce的machine,  在每个map的machine, 他们在local 进行Merge sort,  然后有一个partition的过程,这个过程跟上面的shuffle过程类似,把每一个map machine 中处理的数据分成R个partition 块,并且保证下一个partition 块比上一个partition块的数据大, 依次递增的关系,每个partition处理好之后,需要把这些Intermedia files 通过一定的方式传给 reduce machine。 最后reduce阶段, 每个reduce的机器对每个partition 块进行汇总 Merge sort 处理,最后输出的处理后的output file 。 其实这个过程中, 每个map 机器做的事情跟reduce机器做的相同。

Map Reduce是一种 multiple machine parallel 处理一个问题的模式, 通过map reduce这种方式可以很高效地并行处理一些数据。 map 阶段其实就是把这个大规模的问题细分成几个小规模的问题, 分别在map 机器上进行小规模常规处理, 然后这些小规模问题的解 通过一些partition 加 shuffle 的处理,使得这些解的数据有一定的顺序,最后得出的最终结果能让用户方便地按照一定的方式去处理。 这个过程产生一些Intermedia 的文件,这些文件要被发送给 reduce task, reduce 阶段会根据key来分别对这些中间文件数据进行汇总处理, 最终就能得到大规模数据处理后的结果。 整个过程中, 这些map machine 和 reduce machine 可以看做是一个个的 slaves, 或者worker, 这些 worker 由一个 master 来进行协调和管理,


Master 的任务是给map worker 和 reduce worker来分配任务,并且当 其中一个 machine 不 work 或者 fail的时候,如果保证整个系统正常的运行。
通常,一个parallel 系统,需要能够处理: parallelization, fault-tolerance, data distribution, load balancing. 
其中, parallelization 通过multiple machine 实现, fault- tolerance通过master-slave来实现,data distribution 通过 平均分配 数据给每一个 machine,可以通过 sampling  +  num of reducer 来计算每个machine 分配几个任务, 

fault tolerance: 如果一个 worker failed, master 如何协调?
master pings every worker periodically. if no response is received from a worker in a certain amount of time, the master mask this worker as failed;   Any map tasks completed by the worker are reset back to their initial idle state and become eligible for scheduling on other workers. Completed map tasks are re-executed on a failure because their output is stored on the local tasks of the failed machine and is therefore inaccessible. Complete reduced tasks  don't need to be re-executed since their output is stored in a global file system. 

IF master dies, a new copy of the master will run. 

Task Graunularity:

We subdivide the map phase into M pieces and the re- duce phase into R pieces, as described above. Ideally, M and R should be much larger than the number of worker machines. 

Map Reduce 模式如果应用在 算法题中呢? 
数据规模大的时候, 如何实现 parallelization ? 

eg:  mapreduce in Dikjastra algorithm :

1. Map: 需要多少个machine, Reduce: 需要多少个machine .
2. Map 阶段怎么定义? 
3. Reduce 阶段怎么定义? 

Map 和 reduce 的 machine数目可以通过 graph 里面有多少的Node 来求。

每个 node离 start的最短距离 = 1 + min{distance(P), P: neighbors of N}

Map:  计算start 到这个Node的距离,
           key: node N, value: D( distance from start to P) S: neighbors of P

Reduce: minimal D , S: neighbors of P

由于graph 需要通过 bfs来进行 explore, 所以 第一步的map只有一个machine在进行,而reduce阶段所有machine 都进行了参与。 随着map阶段 explore的Nodes 越来越多, map上参与的machine也越来越多, reduce 会把map求出的这些distance, 根据Key node, 来求最小值。 不同 map上面处理的node不同, reduce处理的node也不同,这样就实现了parallel 处理大规模node的dikjastra 算法。 



No comments:

Post a Comment