实验前请安装 Go 环境,编辑器我采用 VS Code
我的实验环境为
Manjaro Linux,因此就不用虚拟机了并且由于官网的不公开协议,我不会上传代码到
github上,博客也不会挂在搜索引擎上
实验要求
使用 Go 实现一个简单的 MapReduce 框架,实现的功能为简单的 Word Count
MapReduce 框架简介
这个简单版本的 MapReduce 省去了原文中的一些步骤(例如切分文件等),所以这里解释我们需要实现的 MapReduce 框架是什么样的。
由于我们是使用单机器多进程来模拟多机器的,所以下面我都以进程来描述原框架中的各个机器
其原理十分简单,从宏观上而言,一个MapReduce 框架由一个 master 进程(这里应该叫 coordinator?但字母太多了,统一用 matser 说吧)和多个 Worker 进程组成。
master扮演着服务器的角色,其追踪着各个Task的状态与阶段状态,为Worker分配Task并处理Worker已完成的任务(如果需要处理的话)Worker扮演着驴的角色,只负责做从master那要来的任务,做完就一直问master要新的任务,并把完成的任务上报给master
框架各个部分的设计
官网上给出了很多的要求,直接看可能会眼花缭乱不知道应该如何下手(说的就是我),所以这里对照要求一步一步的给出设计思路
Task
首先我们明确这个 Worker 处理的最小单位,任何一个 Map / Reduce 任务都可以视为一个 Task , 一个完整的 MapReduce 过程被称为一个 Job (但在这里我们不会考虑 Job 这个概念,因为都是只做一次 MapReduce 过程)
对于一次 Task ,事实上这是 Worker 通过 RPC 从 master 中拿到的关于任务的一切信息,所以我们必须设计一些冗余项来保证信息是完整的。
我们可以分步来进行设计,首先,显然我们设计如下:
type Phase int
const ( Map Phase = iota Reduce Exit Wait)
type Task struct { TaskPhase Phase // task phase for worker InputFile string // input file name}这样,对于 Worker 而言,我们就知道了
- 当前应该执行的是
Map任务还是Reduce任务 - 任务的输入文件
但考虑我们的任务为 word count, 我们的 Map 任务需要给出中间生成的文件,并且中间生成文件的命名为 mr-x-y,其中 x 为这是第几个 Map 任务(也就是这是第几个输入文件),y 为相同键值所在的 Reduce 任务的编号,例如 y = ihash(key) % nReduce 意思为键为 Key 的Kv 应当让索引为 y 的 Reduce 来处理
在 work.go 中已给出了 ihash() 的实现,所以我们还需要的是 nReduce 的值,并且我们还需要一个数组来存储中间生成文件的文件名(因为是本地所以直接存文件名就好) ,因此我们二次设计如下:
type Task struct { TaskPhase Phase // task phase for worker nReduce int // for map task InputFile string // input file name Intermediates []string // map task output}这样我们做完了最初版本的设计,这样的 Task 就可以在 Worker 与 master 中传输了
Master
master 需要有以下功能:
-
在启动时根据指定的输入文件数及
Reduce Task数,生成Map Task及Reduce Task -
响应
Worker的Task申请RPC请求,分配可用的Task给到Worker处理 -
追踪
Task的完成情况,在所有Map Task完成后进入Reduce阶段,开始派发Reduce Task;在所有Reduce Task完成后标记作业已完成并退出
由于我们需要对 Task 的完成情况进行追踪,并且需要给 Worker 任务,那么这里我们可以通过 map 与 chan 来实现这两个功能
首先,我们设计对 Task 任务的追踪:
type MasterTaskStatus int
const ( Idle MasterTaskStatus = iota InProcess Completed)
// MasterTask// Master's data structure , store every status of Map/Reduce tasktype MasterTask struct { TaskStatus MasterTaskStatus // status of task TaskRef *Task // Reference of task MachineId string // machine id}这里对 MasterTask 的设计借鉴了原论文中的设计,包括 TaskStatus 与 MachineId
这样,我们就可以完成对 Task 的追踪,接下来就可以对 Master 的结构进行设计了:
type Coordinator struct { // Your definitions here. mutex sync.Mutex // mutex TaskQueue chan *Task // task queue TaskMeta map[int]*MasterTask // task meta info nReduce int // number of Reduce task MasterPhase Phase // phase of all task InputFiles []string // Input files Intermediates [][]string // map task output}mutex为互斥锁,用于并发控制nReduce用以传送给每一个Task- 消息队列
TaskQueue用以为Worker发送Task,由于chan自带锁,这样就避免了我们自己手写一个带锁的队列( TaskMeta为一个map,其键值为index,实际上就是文件的序号,当然在这里可以当作Task的索引,值为MasterTask,注意这个结构体实际上是包含了Task的MasterPhase记录了当前MapReduce任务已经进行到了什么阶段InputFiles记录了Map任务的输入文件Intermediates为一个文件列表,由于在前面我们提到,Map任务会生成一系列中间文件mr-X-Y,其中Y为Reduce的编号,所以在这里,我们需要把中间文件存入这里,Intermediates[Y] = mr-i-Y其中
我们在这里添加了 TaskMeta 之后,显然我们在 Task 中还需要一个值来记录,我们当前的 Task 在 Master 中所对应的元数据应该是什么,所以我们设计 Task 如下所示:
type Task struct { TaskPhase Phase // task phase for worker nReduce int // for map task InputFile string // input file name TaskNumber int // index of task in MasterMeta Intermediates []string // map task output}(我发誓这是最终版本了)
功能实现
首先最简单的功能,应该是 RPC 中 master 与 worker 之间的任务派发,我们从这一步开始写,但首先,我们需要明确在 master 初始化时需要完成的事情:
func MakeCoordinator(files []string, nReduce int) *Coordinator {
max := func(a int, b int) int { if a > b { return a } return b }
c := Coordinator{ MasterPhase: Map, TaskMeta: make(map[int]*MasterTask), TaskQueue: make(chan *Task, max(len(files), nReduce)), InputFiles: files, Intermediates: make([][]string, nReduce), }
// Your code here.
// init map tasks without machine id (not send yet)
for idx, file := range files { task := Task{ TaskPhase: Map, nReduce: nReduce, InputFile: file, TaskNumber: idx, } c.TaskQueue <- &task c.TaskMeta[idx] = &MasterTask{ TaskStatus: Idle, TaskRef: &task, MachineId: "", } }
c.server()
return &c}由于 RPC 的存在,因此我们需要设定请求参数与返回参数。
在这里,由于我们跟踪 Task 的完成情况,所以这就要求 Worker 需要向 Master 报告自己是否完成了上一个任务,Master 又会向 Worker 发送新的任务,因此这两步我们可以合成一个部分来看待。
于是我们可以设计请求参数与返回参数如下:
type AskArgs struct { MachineId string // identity of workers LastTask Task // last completed task}
type ReplyArgs Task当然最后一行可以不写,直接将 reply 设置为 *Task 类型即可,一个简单的框架如下所示:
func (c *Coordinator) AskForTask(args *AskArgs, reply *Task) error { completedTask := args.LastTask // check worker's report to decide whether commit the map file if worker have a completed work { //do something here } // get a new task c.mutex.Lock() defer c.mutex.Unlock()
select { case task, ok := <-c.TaskQueue: if !ok { if c.MasterPhase == Exit { *reply = Task{TaskPhase: Exit} } else { *reply = Task{TaskPhase: Wait} } return nil } *reply = *task
c.TaskMeta[reply.TaskNumber].DeadLine = time.Now().Add(10 * time.Second) c.TaskMeta[reply.TaskNumber].MachineId = args.MachineId c.TaskMeta[reply.TaskNumber].TaskStatus = InProcess default: if c.MasterPhase == Exit { *reply = Task{TaskPhase: Exit} } else { *reply = Task{TaskPhase: Wait} } return nil }
return nil
}注意这里,我们将 DeadLine 设置为 10s ,这是官网的 Hint 中建议的,至于其用处,在后面其他功能的实现部分会解释
下面解释一下为什么要用 select,在最开始的版本中,我没有使用 select ,是直接 task, ok := <- c.TaskQueue ,但遇到了代码执行完 Map 阶段后卡住不动,debug 后发现是因为在 TaskQueue 这个 chan 中的内容被取完后,进入了阻塞状态,只有此 chan 中有新任务添加才会被唤起。经过 GPT4 的解答后,采用了 select 进行阻塞的处理,如果阻塞,那么我们将 reply 设置为 WAIT 或 EXIT 来告诉 Worker 应该做什么样的反应
而对于提交任务的部分,我们需要考虑几个条件:
- 由于
Worker获取到Task后可能出现宕机和卡死等情况,在这种情况下,Master会将任务给其他Worker处理,发生故障的Worker所做的处理会作废 - 也是由于卡死,所以可能会出现提交的任务阶段滞后的情况,例如已经到了
Reduce阶段,但此时提交上来的任务却还是Map的任务,类似这种我们也应该作废
所以,我们对任务判断应该写为:
c.mutex.Lock()if task := c.TaskMeta[completedTask.TaskNumber]; task.MachineId == args.MachineId && task.TaskStatus == InProcess && completedTask.TaskPhase == c.MasterPhase {
c.TaskMeta[completedTask.TaskNumber].TaskStatus = Completed
if c.MasterPhase == Map { for id, file := range completedTask.Intermediates { c.Intermediates[id] = append(c.Intermediates[id], file) } }}c.mutex.Unlock()如果检测通过,那么我们接受这次任务的提交,并将中间文件存在 Master 上(虽然这里做的处理只是将其路径存在 Master 中)
那么在 Worker 中,我们的结构可以这样:
// Worker// main/mrworker.go calls this function.func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) {
// Your worker implementation here. // uncomment to send the Example RPC to the coordinator.
machineId := strconv.Itoa(os.Getegid())
lastTask := Task{ TaskNumber: -1, }
flag := true
for flag {
ask := AskArgs{ MachineId: machineId, LastTask: lastTask, }
reply := Task{}
call("Coordinator.AskForTask", &ask, &reply)
currentTask := reply
// log.Printf("%v, %v", currentTask.nReduce, reply.nReduce)
// if in different computer, then need to send json data, master should decode json and store it into local disk switch currentTask.TaskPhase { case Map: mapPhase(¤tTask, mapf) case Reduce: reducePhase(¤tTask, reducef) case Wait: currentTask.TaskNumber = -1 time.Sleep(5 * time.Second) case Exit: flag = false }
lastTask = currentTask
}}接下来,我们来实现 Worker 中的 map 与 reduce,首先是 map 阶段 注意定义 KeyValue 结构体
func writeToLocal(x int, y int, content *[]KeyValue) string { dir, err := os.Getwd() if err != nil { log.Fatal("cannot get current working directory") } tempFile, err := os.CreateTemp(dir, "mr-tmp-*") if err != nil { log.Fatal("cannot create a temp file") } enc := json.NewEncoder(tempFile) for _, kv := range *content { if err := enc.Encode(&kv); err != nil { log.Fatalf("cannot encode %v", kv) } } err = tempFile.Close() if err != nil { log.Fatalf("cannot close file %v", tempFile.Name()) }
filename := fmt.Sprintf("mr-%d-%d", x, y) err = os.Rename(tempFile.Name(), filename) if err != nil { log.Fatalf("cannot rename %v to %v", tempFile.Name(), filename) } return filepath.Join(dir, filename)}
func mapPhase(task *Task, mapf func(string, string) []KeyValue) {
file, err := os.Open(task.InputFile) defer func(file *os.File) { err := file.Close() if err != nil { log.Fatalf("cannot close %v", task.InputFile) } }(file) if err != nil { log.Fatalf("cannot open %v", task.InputFile) } content, err := os.ReadFile(task.InputFile) if err != nil { log.Fatalf("cannot read %v", task.InputFile) }
kva := mapf(task.InputFile, string(content))
buffer := make([][]KeyValue, task.nReduce) for _, v := range kva { slot := ihash(v.Key) % task.nReduce buffer[slot] = append(buffer[slot], v) } output := make([]string, 0) for i := 0; i < task.nReduce; i++ { output = append(output, writeToLocal(task.TaskNumber, i, &buffer[i])) } task.Intermediates = output}大部分都是从 mrsequential.go 中照搬来的代码,所以也没有特别多的解释(
比较值得注意的是,这里我们按照要求,模拟了一个 json 的数据传送
而对于 reduce 任务:
func readFromLocal(files []string) *[]KeyValue {
var kva []KeyValue for _, filename := range files { file, err := os.Open(filename) if err != nil { log.Fatalf("cannot open file %v", filename) } dec := json.NewDecoder(file) for { var kv KeyValue if err := dec.Decode(&kv); err != nil { break } kva = append(kva, kv) } } return &kva}
func reducePhase(task *Task, reducef func(string, []string) string) { intermediate := *readFromLocal(task.Intermediates) sort.Sort(ByKey(intermediate))
dir, _ := os.Getwd() tempFile, err := os.CreateTemp(dir, "mr-tmp-*") if err != nil { log.Fatalf("cannot create temp file") }
i := 0 for i < len(intermediate) { j := i + 1 for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key { j++ } var values []string for k := i; k < j; k++ { values = append(values, intermediate[k].Value) } output := reducef(intermediate[i].Key, values)
// this is the correct format for each line of Reduce output. _, err = fmt.Fprintf(tempFile, "%v %v\n", intermediate[i].Key, output) if err != nil { log.Fatalf("Error: %v", err) }
i = j }
err = tempFile.Close() if err != nil { log.Fatalf("cannot close temp file %v", tempFile.Name()) } filename := fmt.Sprintf("mr-out-%d", task.TaskNumber) err = os.Rename(tempFile.Name(), filename) if err != nil { log.Fatalf("cannot rename file %v to %v", tempFile.Name(), filename) }}同样也是照抄 mrsequential.go 的代码(
于是在 worker 这边,我们的任务就已全部做完了。
而我们还有其他要求,即 Worker 长时间不与 master 通信了,为了简化任务,设定该超时阈值为 10s 即可。为了支持这一点,我们的实现需要支持到:
master追踪已分配Task的运行情况,在Task超出10s仍未完成时,将该Task重新分配给其他Worker重试- 考虑
Task上一次分配的Worker可能仍在运行,重新分配后会出现两个Worker同时运行同一个Task的情况。要确保只有一个Worker能够完成结果数据的最终写出,以免出现冲突,导致下游观察到重复或缺失的结果数据
第二点会相对复杂些,不过在 Lab 文档中也给出了提示 —— 实际上也是参考了 Google MapReduce 的做法,Worker 在写出数据时可以先写出到临时文件,最终确认没有问题后再将其重命名为正式结果文件,区分开了 Write 和 Commit 的过程。Commit 的过程可以是 Master 来执行,也可以是 Worker 来执行:
- Master Commit:Worker 向 Master 汇报 Task 完成,Master 确认该 Task 是否仍属于该 Worker,是则进行结果文件 Commit,否则直接忽略
- Worker Commit:Worker 向 Master 汇报 Task 完成,Master 确认该 Task 是否仍属于该 Worker 并响应 Worker,是则 Worker 进行结果文件 Commit,再向 Master 汇报 Commit 完成
但实际上这里,我们并不需要区分的如此明确,因为 Worker 会将文件上传到 master 的磁盘中,所以我们可以直接让 worker 进行重命名,也就是我们上面所做的那样。
而对于第一点,我们采用 goroutine 来解决(实际上有点像自动垃圾回收?
go func() { for { time.Sleep(10 * time.Second) c.mutex.Lock() for _, task := range c.TaskMeta { if task.MachineId != "" && time.Now().After(task.DeadLine) && task.TaskRef.TaskPhase == c.MasterPhase && task.TaskStatus == InProcess { task.MachineId = "" c.TaskQueue <- task.TaskRef task.TaskStatus = Idle } } c.mutex.Unlock() }}()我们需要考虑的点比较多:
- 该任务是否被分配
- 是否超过时限(
DDL) - 此任务当前的阶段是否与服务器的阶段不匹配(也就是是否为过期任务)
- 任务是否为
InProcess
如果都满足,那么我们将此任务重新加回队列中
我们还差最后一步尚未完成,也就是我们的服务器无法进行阶段转移:从 Map 到 Reduce, 从 Reduce 到 Exit
并且我们也没有为 Reduce 阶段进行任务的创建
对于第一个,我们可以采用 goroutine 进行周期性的检测:
func (c *Coordinator) AllDone() bool { for _, task := range c.TaskMeta { if task.TaskStatus != Completed { return false } } return true}go func() { for { c.mutex.Lock() if c.AllDone() { switch c.MasterPhase { case Map: c.MasterPhase = Reduce c.CreateReduceTask() case Reduce: c.MasterPhase = Exit } } c.mutex.Unlock() time.Sleep(time.Second) }}()对于第二点,我们需要遍历中间生成文件,并创建相应数目的 Reduce 任务:
func (c *Coordinator) CreateReduceTask() { c.TaskMeta = make(map[int]*MasterTask) for idx, file := range c.Intermediates { task := Task{ TaskPhase: Reduce, nReduce: c.nReduce, InputFile: "", TaskNumber: idx, Intermediates: file, } c.TaskQueue <- &task c.TaskMeta[idx] = &MasterTask{ TaskStatus: Idle, DeadLine: time.Time{}, TaskRef: &task, MachineId: "", } }}注意,上述的两个 goroutine 都放在 MakeCoordinator 执行即可
至此,我们的 MapReduce 框架就已完成,打开终端输入
bash test-mr.sh进行测试即可
实验结果
