MIT 6.5840 Lab 1: MapReduce (2025)

Posted on Mar 20, 2025

前言

Lab 的页面 6.5840 Lab 1: MapReduce

补充一个小知识,论文中的 master 在这个 Lab 中改名为 coordinator,主要是因为“Black Lives Matter”活动,具体可以看一下这篇文章:为什么 Git 分支开始从“master”变为“main”了?

Lab 页面中有这样一段话:

For this lab and all the others, we might issue updates to the code we provide you. To ensure that you can fetch those updates and easily merge them using git pull, it’s best to leave the code we provide in the original files. You can add to the code we provide as directed in the lab write-ups; just don’t move it. It’s OK to put your own new functions in new files.

在编写代码的过程中我也会尽量遵守这个要求,不修改现有的代码,只进行新增。下面是具体的实现。

Worker

Worker 启动后通过 RPC 向 Coordinator 轮询获取任务并执行,分别实现对应的doMapTaskdoReduceTask函数即可。

func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {
	for {
		args := GetTaskArgs{}
		reply := GetTaskReply{}
		ok := call("Coordinator.GetTask", &args, &reply)
		if ok {
			log.Printf("Worker: get task, task type %v, task id %v\n",
				reply.TaskType, reply.TaskID)
			switch reply.TaskType {
			case MAPTASK:
				doMapTask(mapf, reply)
			case REDUCETASK:
				doReduceTask(reducef, reply)
			case WAITTASK:
				time.Sleep(1 * time.Second)
			case EXITTASK:
				return
			default:
				log.Fatalf("Worker: unexpected task type %v\n", reply.TaskType)
			}
		} else {
			log.Println("Worker: get task failed, cannot contact the coordinator")
			return
		}
	}
}

任务类型一共有四种,都是定义的常量:

const (
	MAPTASK = iota
	REDUCETASK
	WAITTASK
	EXITTASK
)

除了 Map 任务和 Reduce 任务以外,另外还有两种类型:

  • 等待任务:当前阶段所有的任务都有 Worker 在执行,没有多余的任务可以分配
  • 退出任务:所有任务都完成后,mrcoordinator.go调用m.Done()会返回true,然后会等待一秒钟时间来停止进程,这段时间如果 Worker 获取任务可以得到一个明确的应该退出的指令

跟等待任务类似的两种方法分别是 Worker 每次完成任务和下一次申请任务间进行等待以及 Coordinator 在申请任务的 RPC 调用中进行等待。

Workers will sometimes need to wait, e.g. reduces can’t start until the last map has finished. One possibility is for workers to periodically ask the coordinator for work, sleeping with time.Sleep() between each request. Another possibility is for the relevant RPC handler in the coordinator to have a loop that waits, either with time.Sleep() or sync.Cond. Go runs the handler for each RPC in its own thread, so the fact that one handler is waiting needn’t prevent the coordinator from processing other RPCs.

除了退出任务,申请任务的 RPC 调用失败时也会直接退出运行,这可能是 Worker 和 Coordinator 之间网络不通,也可能是 Coordinator 已经退出运行了。

When the job is completely finished, the worker processes should exit. A simple way to implement this is to use the return value from call(): if the worker fails to contact the coordinator, it can assume that the coordinator has exited because the job is done, so the worker can terminate too. Depending on your design, you might also find it helpful to have a “please exit” pseudo-task that the coordinator can give to workers.

Map 任务

主要逻辑包括读文件,调用Map函数生成键值对,把每一条键值对按照论文中的公式 $hash(key) \mod R$ 写入 R 个文件中。

func doMapTask(mapf func(string, string) []KeyValue, reply GetTaskReply) {
	file, err := os.Open(reply.MapTaskArg)
	if err != nil {
		log.Fatalf("Worker: cannot open file %v\n", reply.MapTaskArg)
	}
	defer file.Close()

	content, err := io.ReadAll(file)
	if err != nil {
		log.Fatalf("Worker: cannot read file %v\n", reply.MapTaskArg)
	}

	kva := mapf(reply.MapTaskArg, string(content))

	var tempFiles []*os.File
	var encoders []*json.Encoder
	for i := 0; i < reply.NReduce; i++ {
		fd, err := os.CreateTemp(".", "")
		if err != nil {
			log.Fatalln("Worker: cannot create temporary file")
		}
		defer fd.Close()

		tempFiles = append(tempFiles, fd)
		enc := json.NewEncoder(fd)
		encoders = append(encoders, enc)
	}

	for _, kv := range kva {
		index := ihash(kv.Key) % reply.NReduce
		err := encoders[index].Encode(&kv)
		if err != nil {
			log.Fatalf("Worker: cannot parse record %v\n", kv)
		}
	}

	var finalFiles []string
	for i, fd := range tempFiles {
		newFileName := fmt.Sprintf("mr-%d-%d", reply.TaskID, i)
		err = os.Rename(fd.Name(), newFileName)
		if err != nil {
			log.Fatalf("Worker: cannot rename file %v\n", fd.Name())
		}
		finalFiles = append(finalFiles, newFileName)
	}

	args := ReportTaskArgs{
		TaskType:      MAPTASK,
		TaskID:        reply.TaskID,
		MapTaskResult: finalFiles,
	}
	reportReply := ReportTaskReply{}
	ok := call("Coordinator.ReportTask", &args, &reportReply)
	if ok {
		log.Println("Worker: report map task done")
	} else {
		log.Println("Worker: report map task failed, cannot contact the coordinator")
	}
}

根据 Lab 页面中的提示,中间文件的命名可以是mr-X-Y,其中X代表第 X 个 Map 任务,Y代表第 Y 个 Reduce 任务。

A reasonable naming convention for intermediate files is mr-X-Y, where X is the Map task number, and Y is the reduce task number.

中间文件的内容可以在 Map 阶段用 json 进行编码写入,并在 Reduce 阶段用 json 解码读取。

The worker’s map task code will need a way to store intermediate key/value pairs in files in a way that can be correctly read back during reduce tasks. One possibility is to use Go’s encoding/json package. To write key/value pairs in JSON format to an open file:

enc := json.NewEncoder(file)
for _, kv := ... {
    err := enc.Encode(&kv)

and to read such a file back:

dec := json.NewDecoder(file)
for {
    var kv KeyValue
    if err := dec.Decode(&kv); err != nil {
        break
    }
    kva = append(kva, kv)
}

Reduce 任务

由于文件都是本地文件,所以采用直接读文件的方式,Map 任务直接写入当前目录,Reduce 任务也直接从当前目录进行读取,更好的方案是 Map worker 提供 RPC 方法来进行文件的读取。

另外,为了简便直接把所有的文件都读到内存然后根据 key 排序,生产级别的 MapReduce 实现肯定是用内存+外存进行归并排序,然后逐个 key 喂给 Reduce 函数的,因为一台普通的 PC 基本上不可能把一个 Reduce 任务的所有文件都放在自己内存中。

func doReduceTask(reducef func(string, []string) string, reply GetTaskReply) {
	intermediate := []KeyValue{}
	for _, filename := range reply.ReduceTaskArg {
		file, err := os.Open(filename)
		if err != nil {
			log.Fatalf("Worker: cannot open %v\n", filename)
		}
		defer file.Close()

		dec := json.NewDecoder(file)
		for {
			var kv KeyValue
			if err := dec.Decode(&kv); err != nil {
				break
			}
			intermediate = append(intermediate, kv)
		}
	}

	sort.Sort(ByKey(intermediate))

	fd, err := os.CreateTemp(".", "")
	if err != nil {
		log.Fatalln("Worker: cannot create temporary file")
	}
	defer fd.Close()

	//
	// call Reduce on each distinct key in intermediate[],
	// and print the result to mr-out-0.
	//
	i := 0
	for i < len(intermediate) {
		j := i + 1
		for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
			j++
		}
		values := []string{}
		for k := i; k < j; k++ {
			values = append(values, intermediate[k].Value)
		}
		output := reducef(intermediate[i].Key, values)

		// this is the correct format for each line of Reduce output.
		fmt.Fprintf(fd, "%v %v\n", intermediate[i].Key, output)

		i = j
	}

    oname := fmt.Sprintf("mr-out-%d", reply.TaskID)
	err = os.Rename(fd.Name(), oname)
	if err != nil {
		log.Fatalf("Worker: cannot rename file %v\n", fd.Name())
	}

	args := ReportTaskArgs{
		TaskType:         REDUCETASK,
		TaskID:           reply.TaskID,
		ReduceTaskResult: oname,
	}
	reportReply := ReportTaskReply{}
	ok := call("Coordinator.ReportTask", &args, &reportReply)
	if ok {
		log.Println("Worker: report reduce task done")
	} else {
		log.Println("Worker: report reduce task failed, cannot contact the coordinator")
	}
}

在 Map 任务和 Reduce 任务生成文件的过程中,需要保证原子性,防止发生崩溃后出现文件部分写入的情况。具体做法是先创建并将内容写入一个临时文件,最后利用系统调用将临时文件重命名。

To ensure that nobody observes partially written files in the presence of crashes, the MapReduce paper mentions the trick of using a temporary file and atomically renaming it once it is completely written. You can use ioutil.TempFile (or os.CreateTemp if you are running Go 1.17 or later) to create a temporary file and os.Rename to atomically rename it.

Coordinator

类型定义

首先定义任务结构体,包含任务类型、任务编号、开始时间、Map 或者 Reduce 类型任务的参数。

type Task struct {
	TaskType      int
	TaskID        int
	StartTime     time.Time
	MapTaskArg    string
	ReduceTaskArg []string
}

其中TaskID字段用于 Map 阶段生成中间文件以及 Reduce 阶段生成最终文件的命名;StartTime方便判断任务是否超时;MapTaskArgReduceTaskArg的字符串类型保存的是文件名,因为都是本地文件读取,如果 Map worker 提供 RPC 方法来进行文件的读取,那么ReduceTaskArg需要增加一个 RPC 的连接信息。

有了任务结构体,我们就可以定义获取任务和上报任务结果的 RPC 调用中用到的参数:

type GetTaskArgs struct {}

type GetTaskReply struct {
	NReduce int
	Task
}

type ReportTaskArgs struct {
	TaskType         int
	TaskID           int
	MapTaskResult    []string
	ReduceTaskResult string
}

type ReportTaskReply struct {}

接下来是最重要的Coordinator结构体,负责记录任务数量、任务状态、任务结果等信息。很多人会选择把所有的任务都保存在一个 slice 中,在Task结构体中增加一个任务状态字段用来标识。这样的好处是占用内存空间很少,坏处是每次分配任务都需要遍历。我选择把不同类型和不同状态的任务放在单独的 map 中,这样一共需要六个 map(两种类型、三种状态)。

至于状态的管理,基于锁的方法以及基于 channel 的方法都可以实现,并且各有优劣。基于锁的方法就是在结构体中设置一个或多个锁字段,并且在所有会对结构体中的状态进行操作的地方都用锁来保护;基于 channel 的方法的本质是状态的管理由一个单独的 goroutine 负责,其他 goroutine 把对状态的操作通过 channel 发送给这个单独的 goroutine。我选择基于 channel 的方法,一共需要三个 channel,一个用于 RPC 申请任务,一个用于 RPC 上报任务结果,最后一个用于结束 Coordinator 的运行。下面是定义的Coordinator结构体:

type Coordinator struct {
	// Your definitions here.
	nReduce int
	phase   int

	mapTaskIdle          map[int]*Task
	mapTaskInprogress    map[int]*Task
	mapTaskCompleted     map[int]*Task
	reduceTaskIdle       map[int]*Task
	reduceTaskInprogress map[int]*Task
	reduceTaskCompleted  map[int]*Task

	getTaskCh    chan *GetTaskReply
	reportTaskCh chan *ReportTaskArgs
	exitCh       chan struct{}
}

数据流图

下面我们来梳理一下系统的各个部分和数据流。Coordinator 运行后会开启 RPC 服务,同时后台启动一个 scheduler goroutine,负责发送任务到 getTashCh,以及从 reportTashCh 接收任务结果。Worker 启动后执行申请任务的 RPC 调用,RPC 库会发起一个到 Coordinator 的连接,并把要运行的函数的参数通过连接发动给Coordinator,Coordinator 的 RPC 服务会单独启动一个 goroutine 来运行指定的函数,从 getTaskCh 接收任务,并把结果通过连接发给 Worker 的 RPC,Worker 处理完任务,执行上报任务的 RPC 调用,Coordinator 单独启动一个 goroutine 来运行 ReportTask 函数,将任务结果发送给 reportTaskCh。

┌───────────┬──────────────────────────────────────────────────────────────────┐
Coordinator                                                                  
├───────────┘                        ┌─────────────────┐                       
                                    Coordinator state                       
                                    └────────▲────────┘                       
                                                                             
                                                                             
                                                                             
  ┌────────────────┐                ┌────────┴┬───────┐                       
   main goroutine                 goroutine                              
  ├────────────────┤     ┌────┐     ├─────────┘                              
                  ◄─────┤chan◄─────┤                                        
  Done() func call     └────┘         scheduler                           
                                                                          
  └────────────────┘                └────┬────────▲───┘                       
                                                                            
                                                                            
                                      ┌──▼─┐   ┌──┴─┐                         
                                      chan   chan                         
                                      └──┬─┘   └──▲─┘                         
                                                                            
                                                                            
                        ┌─────────┬──────▼─┐   ┌──┴──────┬───────────┐        
                        goroutine           goroutine                   
                        ├─────────┘           ├─────────┘                   
                                                                          
                        GetTask RPC server   ReportTask RPC server        
                                                                          
                        └──────────▲────┬──┘   └───────────▲────┬────┘        
└───────────────────────────────────┼────┼──────────────────┼────┼─────────────┘
                                                                            
                                   args reply              args reply           
                                                                            
                      ┌──────┬──────┼────┼──────────────────┼────┼────────┐     
                   ┌──┤Worker                                             
                ┌──┤Wo├──────┘                                             
                Wo├──┤ ┌─────────┬─┼────┼──────────────────┼────┼─────┐       
                ├──┤   goroutine                                       
                     ├─────────┘                                       
                     │┌──────────┴────▼──┐   ┌───────────┴────▼────┐│       
                     ││GetTask RPC client   ReportTask RPC client││       
                     │└──────────────────┘   └─────────────────────┘│       
                                                                          
                     └──────────────────────────────────────────────┘       
                                                                            
                    └────────────────────────────────────────────────┬──┘     
                  └────────────────────────────────────────────────┬──┘        
                └───────────────────────────────────────────────────┘           

初始化

先创建一个空的Coordinator结构体,然后设置 reduce 任务的数量(用于 map 任务生成对应数量的中间文件),设置当前的阶段为执行 map 任务阶段,所有的阶段通过常量定义:

const (
	MAPPHASE = iota
	REDUCEPHASE
	EXITPHASE
)

接着创建好六个 map 和三个 channel,并生成所有的 map 任务和 reduce 任务,放到对应的 map 中。结构体初始化完毕后就可以把 scheduler goroutine 跑起来了。最后,启动 RPC 服务,并返回初始化完毕的结构体指针。完整代码如下:

// create a Coordinator.
// main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use.
func MakeCoordinator(files []string, nReduce int) *Coordinator {
	c := Coordinator{}

	// Your code here.
	c.nReduce = nReduce
	c.phase = MAPPHASE

	c.mapTaskIdle = make(map[int]*Task)
	c.mapTaskInprogress = make(map[int]*Task)
	c.mapTaskCompleted = make(map[int]*Task)
	c.reduceTaskIdle = make(map[int]*Task)
	c.reduceTaskInprogress = make(map[int]*Task)
	c.reduceTaskCompleted = make(map[int]*Task)

	c.getTaskCh = make(chan *GetTaskReply)
	c.reportTaskCh = make(chan *ReportTaskArgs)
	c.exitCh = make(chan struct{})

	for k, file := range files {
		c.mapTaskIdle[k] = &Task{
			TaskType:   MAPTASK,
			TaskID:     k,
			MapTaskArg: file,
		}
	}

	for i := 0; i < nReduce; i++ {
		c.reduceTaskIdle[i] = &Task{
			TaskType:      REDUCETASK,
			TaskID:        i,
			ReduceTaskArg: make([]string, 0),
		}
	}

	go c.schedule()
	c.server()
	return &c
}

RPC 方法

获取任务的 RPC 方法只需要从getTaskCh接收任务,并设置reply参数的对应字段;上报任务结果的 RPC 方法只需要把任务结果发给reportTaskCh就行,都非常简单:

// Your code here -- RPC handlers for the worker to call.
func (c *Coordinator) GetTask(args *GetTaskArgs, reply *GetTaskReply) error {
	msg := <-c.getTaskCh
	reply.NReduce = msg.NReduce
	reply.Task = msg.Task
	return nil
}

func (c *Coordinator) ReportTask(args *ReportTaskArgs, reply *ReportTaskReply) error {
	c.reportTaskCh <- args
	return nil
}

调度器

上面提到的管理状态的单独的 goroutine 就是这里的调度器了,主要负责把待执行的任务通过 channel 发送给 RPC,以及从 channel 中接收来自 RPC 的任务结果。

func (c *Coordinator) schedule() {
	var task *GetTaskReply

	for {
		if task == nil {
			task = &GetTaskReply{NReduce: c.nReduce}
			switch c.phase {
			case MAPPHASE:
				c.checkTimeout(MAPTASK)
				if len(c.mapTaskIdle) == 0 {
					task.TaskType = WAITTASK
				} else {
					var k int
					var v *Task
					for k, v = range c.mapTaskIdle {
						v.StartTime = time.Now()
						task.Task = *v
						break
					}
					delete(c.mapTaskIdle, k)
					c.mapTaskInprogress[k] = v
				}
			case REDUCEPHASE:
				c.checkTimeout(REDUCETASK)
				if len(c.reduceTaskIdle) == 0 {
					task.TaskType = WAITTASK
				} else {
					var k int
					var v *Task
					for k, v = range c.reduceTaskIdle {
						v.StartTime = time.Now()
						task.Task = *v
						break
					}
					delete(c.reduceTaskIdle, k)
					c.reduceTaskInprogress[k] = v
				}
			case EXITPHASE:
				task.TaskType = EXITTASK
			}
		}

		select {
		case c.getTaskCh <- task:
			task = nil
		case msg := <-c.reportTaskCh:
			switch msg.TaskType {
			case MAPTASK:
				c.checkTimeout(MAPTASK)
				if _, ok := c.mapTaskInprogress[msg.TaskID]; ok {
					c.mapTaskCompleted[msg.TaskID] = c.mapTaskInprogress[msg.TaskID]
					delete(c.mapTaskInprogress, msg.TaskID)
					if len(c.mapTaskIdle) == 0 && len(c.mapTaskInprogress) == 0 {
						c.phase = REDUCEPHASE
					}

					for i, file := range msg.MapTaskResult {
						c.reduceTaskIdle[i].ReduceTaskArg = append(c.reduceTaskIdle[i].ReduceTaskArg, file)
					}
				}
			case REDUCETASK:
				c.checkTimeout(REDUCETASK)
				if _, ok := c.reduceTaskInprogress[msg.TaskID]; ok {
					c.reduceTaskCompleted[msg.TaskID] = c.reduceTaskInprogress[msg.TaskID]
					delete(c.reduceTaskInprogress, msg.TaskID)
					if len(c.reduceTaskIdle) == 0 && len(c.reduceTaskInprogress) == 0 {
						c.phase = EXITPHASE
						c.exitCh <- struct{}{}
					}
				}
			}
		}
	}
}

在最后一个 reduce 任务也完成之后,往exitCh发送一个通知,这个通知会被mrcoordinator.go调用Done()的时候接收到,终止整个 Coordinator 的运行。

// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
func (c *Coordinator) Done() bool {
	ret := false

	// Your code here.
	select {
	case <-c.exitCh:
		ret = true
	default:
	}

	return ret
}

超时检测

为了简化调度器的逻辑,目前的超时检测采用了一种 lazy 的设计,也就是在分配任务和接收任务结果的时候才进行超时检测。通过比较当前时间和任务的开始执行时间,把超过 10 秒的任务状态都从执行中改为待执行。

func (c *Coordinator) checkTimeout(t int) {
	now := time.Now()
	timeOut := []int{}
	switch t {
	case MAPTASK:
		for k, v := range c.mapTaskInprogress {
			if now.Sub(v.StartTime) >= 10*time.Second {
				timeOut = append(timeOut, k)
			}
		}
		for _, k := range timeOut {
			c.mapTaskIdle[k] = c.mapTaskInprogress[k]
			delete(c.mapTaskInprogress, k)
		}
	case REDUCETASK:
		for k, v := range c.reduceTaskInprogress {
			if now.Sub(v.StartTime) >= 10*time.Second {
				timeOut = append(timeOut, k)
			}
		}
		for _, k := range timeOut {
			c.reduceTaskIdle[k] = c.reduceTaskInprogress[k]
			delete(c.reduceTaskInprogress, k)
		}
	}
}

上述方案一般情况下没什么问题。但是假设你要实现一个实时监控服务,考虑这样一种极端情况,只有一个 Worker 在运行,申请完任务以后花了很久时间才运行完,在超过 10 秒一直到上报任务结果的这段时间内本应看到的任务状态是因为超时变为 Idle 状态,在这种情况下却依然是Inprogress。另外,检测超时的操作是通过遍历任务列表来实现的,如果任务数量超级多的话,可能会出现比较严重的性能问题。

可以尝试的一种方案是每次分配任务后都用一个定时器跟踪,10 秒后定时器通知调度器,大致的代码如下,感兴趣的朋友可以自己实现。

// 取消 StartTime 字段
type Task struct {
	TaskType      int
	TaskID        int
	MapTaskArg    string
	ReduceTaskArg []string
}

type Coordinator struct {
	...
	timeoutCh chan *Task
}

func MakeCoordinator(files []string, nReduce int) *Coordinator {
	...
	c.timeoutCh = make(chan *Task)
	...
}

func (c *Coordinator) checkTimeout(t *Task) {
	time.Sleep(10 * time.Second)
	c.timeoutCh <- t
}

func (c *Coordinator) schedule() {
	var task *GetTaskReply
	for {
		... // 任务不再设置 StartTime 字段
		select {
		case c.getTaskCh <- task:
			switch task.TaskType {
			case MAPTASK, REDUCETASK:
				// 只在这一个地方调用
				go c.checkTimeout(&task.Task)
			}
			task = nil
		case msg := <-c.reportTaskCh:
			...
		case t := <-c.timeoutCh:
			switch t.TaskType {
			case MAPTASK:
				if _, ok := c.mapTaskInprogress[t.TaskID]; ok {
					c.mapTaskIdle[t.TaskID] = c.mapTaskInprogress[t.TaskID]
					delete(c.mapTaskInprogress, t.TaskID)
				}
			case REDUCETASK:
				if _, ok := c.reduceTaskInprogress[t.TaskID]; ok {
					c.reduceTaskIdle[t.TaskID] = c.reduceTaskInprogress[t.TaskID]
					delete(c.reduceTaskInprogress, t.TaskID)
				}
			}
		}
	}
}