介绍
本篇文章主要描述了如何使用golang实现一个单机版的MapReduce程序,想法来自于MIT-6.824课程的一个lab。本文分为以下几个模块:
- MapReduce基本原理
- MapReduce简单使用
- MapReduce单机版实现
MapReduce基本原理
MapReduce的计算以一组Key/Value对为输入,然后输出一组Key/Value对,用户通过编写Map和Reduce函数来控制处理逻辑。
Map函数把输入转换成一组中间的Key/Value对,MapReduce library会把所有Key的中间结果传递给Reduce函数处理。
Reduce函数接收Key和其对应的一组Value,它的作用就是聚合这些Value,产生最终的结果。Reduce的输入是以迭代器的方式输入,使得MapReduce可以处理数据量比内存大的情况。
一次MapReduce的处理过程如下图:

- MapReduce library会把输入文件划分成多个16到64MB大小的分片(大小可以通过参数调节),然后在一组机器上启动程序。
- 其中比较特殊的程序是master,剩下的由master分配任务的程序叫worker。总共有M个map任务和R个reduce任务需要分配,master会选取空闲的worker,然后分配一个map任务或者reduce任务。
- 处理map任务的worker会从输入分片读入数据,解析出输入数据的K/V对,然后传递给Map函数,生成的K/V中间结果会缓存在内存中。
- map任务的中间结果会被周期性地写入到磁盘中,以partition函数来分成R个部分。R个部分的磁盘地址会推送到master,然后由它转发给响应的reduce worker。
- 当reduce worker接收到master发送的地址信息时,它会通过RPC来向map worker读取对应的数据。当reduce worker读取到了所有的数据,它先按照key来排序,方便聚合操作。
- reduce worker遍历排序好的中间结果,对于相同的key,把其所有数据传入到Reduce函数进行处理,生成最终的结果会被追加到结果文件中。
- 当所有的map和reduce任务都完成时,master会唤醒用户程序,然后返回到用户程序空间执行用户代码。
成功执行后,输出结果在R个文件中,通常,用户不需要合并这R个文件,因为,可以把它们作为新的MapReduce处理逻辑的输入数据,或者其它分布式应用的输入数据。
更详细的介绍可以参考我之前写的博客MapReduce原理
MapReduce核心组件
MapReduce核心组件包括Master和Worker,它们的职责分别如下。
Master
MapReduce负责一次执行过程中Map和Reduce任务的调度,其需要维护的信息包括如下:
- worker的状态
- 任务的状态
- Map生成的文件的位置
Worker
Worker分为两种,分别是Map和Reduce:
Map Worker的职责:
- 对分片数据调用用户指定的Map函数
- 根据Reduce的个数,把数据分成R份
Reduce Worker的职责:
- 对收集到的数据进行排序
- 对于相同的Key调用Reduce函数进行处理
MapReduce简单使用
了解MapReduce基本原理后,再来通过一个简单的word count例子,来描述MapReduce的使用方法,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
|
func mapF(document string, value string) (res []mapreduce.KeyValue) { f := func(r rune) bool { return !unicode.IsLetter(r) } words := strings.FieldsFunc(value, f) for _, word := range words { kv := mapreduce.KeyValue{word, " "} res = append(res, kv) } return }
func reduceF(key string, values []string) string { s := strconv.Itoa(len(values)) return s }
func main() { if len(os.Args) < 4 { fmt.Printf("%s: see usage comments in file\n", os.Args[0]) } else if os.Args[1] == "master" { var mr *mapreduce.Master if os.Args[2] == "sequential" { mr = mapreduce.Sequential("wcseq", os.Args[3:], 3, mapF, reduceF) } else { mr = mapreduce.Distributed("wcseq", os.Args[3:], 3, os.Args[2]) } mr.Wait() } else { mapreduce.RunWorker(os.Args[2], os.Args[3], mapF, reduceF, 100) } }
|
一个MapReduce程序由三个部分组成:
- Map函数
- Reduce函数
- 调用MapReduce执行的函数
Map函数
Map函数主要的功能为吐出K/V对
Reduce函数
Reduce函数则是对相同的Key做操作,一般是统计之类的功能,具体地看应用的需求。
调用MapReduce库函数
分为Sequential和Distributed,其中Sequential为串行地执行Map和Reduce任务,主要用于用户程序调试的场景,Distributed则用于真正的用户程序执行的场景。
MapReduce单机版实现
本节实现的MapReduce单机版与Google论文中的MapReduce主要的不同如下:
- 输入和输出数据都采用本机的文件系统,没有使用到类似于GFS的分布式文件存储
- Google的MapReduce通过GFS的文件名字的原子操作来保证Reduce Worker宕机时,最终只会生成一份结果文件;在单机文件系统中,如果Worker和Master之间网络通信断掉,但是Worker本身可能还在工作,这时候如果重新启动另一个Worker可能会造成两个Worker写入同一份文件,这种场景,在单机版MapReduce的Worker容灾中不考虑。
本节分为两个部分来讨论:
- MapReduce的Sequential实现
- MapReduce的Distributed实现(带Worker容灾)
MapReduce的Sequential实现
Sequential部分的调度程序实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| func Sequential(jobName string, files []string, nreduce int, mapF func(string, string) []KeyValue, reduceF func(string, []string) string, ) (mr *Master) { mr = newMaster("master") go mr.run(jobName, files, nreduce, func(phase jobPhase) { switch phase { case mapPhase: for i, f := range mr.files { doMap(mr.jobName, i, f, mr.nReduce, mapF) } case reducePhase: for i := 0; i < mr.nReduce; i++ { doReduce(mr.jobName, i, len(mr.files), reduceF) } } }, func() { mr.stats = []int{len(files) + nreduce} }) return }
|
其逻辑非常简单,就是按照顺序先一个个的处理Map任务,处理完成之后,再一个个的处理Reduce任务。
接下来,看doMap和doReduce是如何实现的。
doMap的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
| // doMap does the job of a map worker: it reads one of the input files // (inFile), calls the user-defined map function (mapF) for that file's // contents, and partitions the output into nReduce intermediate files. func doMap( jobName string, // the name of the MapReduce job mapTaskNumber int, // which map task this is inFile string, nReduce int, // the number of reduce task that will be run ("R" in the paper) mapF func(file string, contents string) []KeyValue, ) { // TODO: // You will need to write this function. // You can find the filename for this map task's input to reduce task number // r using reduceName(jobName, mapTaskNumber, r). The ihash function (given // below doMap) should be used to decide which file a given key belongs into. // // The intermediate output of a map task is stored in the file // system as multiple files whose name indicates which map task produced // them, as well as which reduce task they are for. Coming up with a // scheme for how to store the key/value pairs on disk can be tricky, // especially when taking into account that both keys and values could // contain newlines, quotes, and any other character you can think of. // // One format often used for serializing data to a byte stream that the // other end can correctly reconstruct is JSON. You are not required to // use JSON, but as the output of the reduce tasks *must* be JSON, // familiarizing yourself with it here may prove useful. You can write // out a data structure as a JSON string to a file using the commented // code below. The corresponding decoding functions can be found in // common_reduce.go. // // enc := json.NewEncoder(file) // for _, kv := ... { // err := enc.Encode(&kv) // // Remember to close the file after you have written all the values!`` file, err := os.Open(inFile) if err != nil { log.Fatal(err) } defer file.Close() scanner := bufio.NewScanner(file) contents := "" for scanner.Scan() { s := scanner.Text() s += "\n" contents = contents + s } kvs := mapF(inFile, contents) reduceFileMap := make(map[string]*os.File) jsonFileMap := make(map[string]*json.Encoder) for i := 0; i < nReduce; i++ { reduceFileName := reduceName(jobName, mapTaskNumber, i) file, err := os.Create(reduceFileName) if err != nil { log.Fatal(err) } enc := json.NewEncoder(file) reduceFileMap[reduceFileName] = file jsonFileMap[reduceFileName] = enc defer reduceFileMap[reduceFileName].Close() } for _, kv := range kvs { hashValue := int(ihash(kv.Key)) fileNum := hashValue % nReduce reduceFileName := reduceName(jobName, mapTaskNumber, fileNum) enc, ok := jsonFileMap[reduceFileName] if !ok { log.Fatal(err) } err := enc.Encode(&kv) if err != nil { log.Fatal(err) } } }
|
处理过程如下:
- 读入输出文件
- 调用用户指定的Map函数,吐出所有的K/V对
- 创建跟Reduce Worker相同数量的文件,然后,对每个K/V对,根据Key来做hash,输出到对应的文件
doReduce实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| // doReduce does the job of a reduce worker: it reads the intermediate // key/value pairs (produced by the map phase) for this task, sorts the // intermediate key/value pairs by key, calls the user-defined reduce function // (reduceF) for each key, and writes the output to disk. func doReduce( jobName string, // the name of the whole MapReduce job reduceTaskNumber int, // which reduce task this is nMap int, // the number of map tasks that were run ("M" in the paper) reduceF func(key string, values []string) string, ) { // TODO: // You will need to write this function. // You can find the intermediate file for this reduce task from map task number // m using reduceName(jobName, m, reduceTaskNumber). // Remember that you've encoded the values in the intermediate files, so you // will need to decode them. If you chose to use JSON, you can read out // multiple decoded values by creating a decoder, and then repeatedly calling // .Decode() on it until Decode() returns an error. // // You should write the reduced output in as JSON encoded KeyValue // objects to a file named mergeName(jobName, reduceTaskNumber). We require // you to use JSON here because that is what the merger than combines the // output from all the reduce tasks expects. There is nothing "special" about // JSON -- it is just the marshalling format we chose to use. It will look // something like this: // // enc := json.NewEncoder(mergeFile) // for key in ... { // enc.Encode(KeyValue{key, reduceF(...)}) // } // file.Close() kvs := make(map[string][]string) for i := 0; i < nMap; i++ { reduceFileName := reduceName(jobName, i, reduceTaskNumber) file, err := os.Open(reduceFileName) if err != nil { log.Fatal(err) } defer file.Close() enc := json.NewDecoder(file) for { var kv KeyValue if err := enc.Decode(&kv); err == io.EOF { break } else if err != nil { log.Fatal(err) } else { log.Println(kv.Key + kv.Value) kvs[kv.Key] = append(kvs[kv.Key], kv.Value) } } }
var keys []string for k, _ := range kvs { keys = append(keys, k) } sort.Sort(sort.StringSlice(keys)) mergeFileName := mergeName(jobName, reduceTaskNumber) mergeFile, err := os.Create(mergeFileName) if err != nil { log.Fatal(err) } defer mergeFile.Close() enc := json.NewEncoder(mergeFile) for _, key := range keys { enc.Encode(KeyValue{key, reduceF(key, kvs[key])}) } }
|
Reduce任务的处理逻辑如下:
- 根据之前约定好的命名格式,找到该Reduce Worker需要处理的文件,然后,按照约定的方式进行解码
- 得到所有的K/V对之后,根据Key对K/V对排序
- 调用用户指定的ReduceF函数,对相同的Key的所有Value进行处理
- 把处理后的结果以一定的编码方式写入文件
MapReduce的Distributed实现(带Worker容灾)
Distributed和Sequencial的主要区别在于调度函数的实现,如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| func (mr *Master) schedule(phase jobPhase) { var ntasks int var nios int switch phase { case mapPhase: ntasks = len(mr.files) nios = mr.nReduce case reducePhase: ntasks = mr.nReduce nios = len(mr.files) }
fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, nios)
switch phase { case mapPhase: mr.scheduleMap() case reducePhase: mr.scheduleReduce() } fmt.Printf("Schedule: %v phase done\n", phase) }
|
分为Map和Reduce两阶段的调度,先来看ScheduleMap部分:
ScheduleMap