MIT 6.5840 Lab 1: MapReduce (2025)
前言
Lab 的页面 6.5840 Lab 1: MapReduce
补充一个小知识,论文中的 master 在这个 Lab 中改名为 coordinator,主要是因为“Black Lives Matter”活动,具体可以看一下这篇文章:为什么 Git 分支开始从“master”变为“main”了?
Lab 页面中有这样一段话:
For this lab and all the others, we might issue updates to the code we provide you. To ensure that you can fetch those updates and easily merge them using
git pull, it’s best to leave the code we provide in the original files. You can add to the code we provide as directed in the lab write-ups; just don’t move it. It’s OK to put your own new functions in new files.
在编写代码的过程中我也会尽量遵守这个要求,不修改现有的代码,只进行新增。下面是具体的实现。
Worker
Worker 启动后通过 RPC 向 Coordinator 轮询获取任务并执行,分别实现对应的doMapTask和doReduceTask函数即可。
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
for {
args := GetTaskArgs{}
reply := GetTaskReply{}
ok := call("Coordinator.GetTask", &args, &reply)
if ok {
log.Printf("Worker: get task, task type %v, task id %v\n",
reply.TaskType, reply.TaskID)
switch reply.TaskType {
case MAPTASK:
doMapTask(mapf, reply)
case REDUCETASK:
doReduceTask(reducef, reply)
case WAITTASK:
time.Sleep(1 * time.Second)
case EXITTASK:
return
default:
log.Fatalf("Worker: unexpected task type %v\n", reply.TaskType)
}
} else {
log.Println("Worker: get task failed, cannot contact the coordinator")
return
}
}
}
任务类型一共有四种,都是定义的常量:
const (
MAPTASK = iota
REDUCETASK
WAITTASK
EXITTASK
)
除了 Map 任务和 Reduce 任务以外,另外还有两种类型:
- 等待任务:当前阶段所有的任务都有 Worker 在执行,没有多余的任务可以分配
- 退出任务:所有任务都完成后,
mrcoordinator.go调用m.Done()会返回true,然后会等待一秒钟时间来停止进程,这段时间如果 Worker 获取任务可以得到一个明确的应该退出的指令
跟等待任务类似的两种方法分别是 Worker 每次完成任务和下一次申请任务间进行等待以及 Coordinator 在申请任务的 RPC 调用中进行等待。
Workers will sometimes need to wait, e.g. reduces can’t start until the last map has finished. One possibility is for workers to periodically ask the coordinator for work, sleeping with
time.Sleep()between each request. Another possibility is for the relevant RPC handler in the coordinator to have a loop that waits, either withtime.Sleep()orsync.Cond. Go runs the handler for each RPC in its own thread, so the fact that one handler is waiting needn’t prevent the coordinator from processing other RPCs.
除了退出任务,申请任务的 RPC 调用失败时也会直接退出运行,这可能是 Worker 和 Coordinator 之间网络不通,也可能是 Coordinator 已经退出运行了。
When the job is completely finished, the worker processes should exit. A simple way to implement this is to use the return value from
call(): if the worker fails to contact the coordinator, it can assume that the coordinator has exited because the job is done, so the worker can terminate too. Depending on your design, you might also find it helpful to have a “please exit” pseudo-task that the coordinator can give to workers.
Map 任务
主要逻辑包括读文件,调用Map函数生成键值对,把每一条键值对按照论文中的公式 $hash(key) \mod R$ 写入 R 个文件中。
func doMapTask(mapf func(string, string) []KeyValue, reply GetTaskReply) {
file, err := os.Open(reply.MapTaskArg)
if err != nil {
log.Fatalf("Worker: cannot open file %v\n", reply.MapTaskArg)
}
defer file.Close()
content, err := io.ReadAll(file)
if err != nil {
log.Fatalf("Worker: cannot read file %v\n", reply.MapTaskArg)
}
kva := mapf(reply.MapTaskArg, string(content))
var tempFiles []*os.File
var encoders []*json.Encoder
for i := 0; i < reply.NReduce; i++ {
fd, err := os.CreateTemp(".", "")
if err != nil {
log.Fatalln("Worker: cannot create temporary file")
}
defer fd.Close()
tempFiles = append(tempFiles, fd)
enc := json.NewEncoder(fd)
encoders = append(encoders, enc)
}
for _, kv := range kva {
index := ihash(kv.Key) % reply.NReduce
err := encoders[index].Encode(&kv)
if err != nil {
log.Fatalf("Worker: cannot parse record %v\n", kv)
}
}
var finalFiles []string
for i, fd := range tempFiles {
newFileName := fmt.Sprintf("mr-%d-%d", reply.TaskID, i)
err = os.Rename(fd.Name(), newFileName)
if err != nil {
log.Fatalf("Worker: cannot rename file %v\n", fd.Name())
}
finalFiles = append(finalFiles, newFileName)
}
args := ReportTaskArgs{
TaskType: MAPTASK,
TaskID: reply.TaskID,
MapTaskResult: finalFiles,
}
reportReply := ReportTaskReply{}
ok := call("Coordinator.ReportTask", &args, &reportReply)
if ok {
log.Println("Worker: report map task done")
} else {
log.Println("Worker: report map task failed, cannot contact the coordinator")
}
}
根据 Lab 页面中的提示,中间文件的命名可以是mr-X-Y,其中X代表第 X 个 Map 任务,Y代表第 Y 个 Reduce 任务。
A reasonable naming convention for intermediate files is mr-X-Y, where X is the Map task number, and Y is the reduce task number.
中间文件的内容可以在 Map 阶段用 json 进行编码写入,并在 Reduce 阶段用 json 解码读取。
The worker’s map task code will need a way to store intermediate key/value pairs in files in a way that can be correctly read back during reduce tasks. One possibility is to use Go’s
encoding/jsonpackage. To write key/value pairs in JSON format to an open file:enc := json.NewEncoder(file) for _, kv := ... { err := enc.Encode(&kv)and to read such a file back:
dec := json.NewDecoder(file) for { var kv KeyValue if err := dec.Decode(&kv); err != nil { break } kva = append(kva, kv) }
Reduce 任务
由于文件都是本地文件,所以采用直接读文件的方式,Map 任务直接写入当前目录,Reduce 任务也直接从当前目录进行读取,更好的方案是 Map worker 提供 RPC 方法来进行文件的读取。
另外,为了简便直接把所有的文件都读到内存然后根据 key 排序,生产级别的 MapReduce 实现肯定是用内存+外存进行归并排序,然后逐个 key 喂给 Reduce 函数的,因为一台普通的 PC 基本上不可能把一个 Reduce 任务的所有文件都放在自己内存中。
func doReduceTask(reducef func(string, []string) string, reply GetTaskReply) {
intermediate := []KeyValue{}
for _, filename := range reply.ReduceTaskArg {
file, err := os.Open(filename)
if err != nil {
log.Fatalf("Worker: cannot open %v\n", filename)
}
defer file.Close()
dec := json.NewDecoder(file)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
intermediate = append(intermediate, kv)
}
}
sort.Sort(ByKey(intermediate))
fd, err := os.CreateTemp(".", "")
if err != nil {
log.Fatalln("Worker: cannot create temporary file")
}
defer fd.Close()
//
// call Reduce on each distinct key in intermediate[],
// and print the result to mr-out-0.
//
i := 0
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
output := reducef(intermediate[i].Key, values)
// this is the correct format for each line of Reduce output.
fmt.Fprintf(fd, "%v %v\n", intermediate[i].Key, output)
i = j
}
oname := fmt.Sprintf("mr-out-%d", reply.TaskID)
err = os.Rename(fd.Name(), oname)
if err != nil {
log.Fatalf("Worker: cannot rename file %v\n", fd.Name())
}
args := ReportTaskArgs{
TaskType: REDUCETASK,
TaskID: reply.TaskID,
ReduceTaskResult: oname,
}
reportReply := ReportTaskReply{}
ok := call("Coordinator.ReportTask", &args, &reportReply)
if ok {
log.Println("Worker: report reduce task done")
} else {
log.Println("Worker: report reduce task failed, cannot contact the coordinator")
}
}
在 Map 任务和 Reduce 任务生成文件的过程中,需要保证原子性,防止发生崩溃后出现文件部分写入的情况。具体做法是先创建并将内容写入一个临时文件,最后利用系统调用将临时文件重命名。
To ensure that nobody observes partially written files in the presence of crashes, the MapReduce paper mentions the trick of using a temporary file and atomically renaming it once it is completely written. You can use
ioutil.TempFile(oros.CreateTempif you are running Go 1.17 or later) to create a temporary file andos.Renameto atomically rename it.
Coordinator
类型定义
首先定义任务结构体,包含任务类型、任务编号、开始时间、Map 或者 Reduce 类型任务的参数。
type Task struct {
TaskType int
TaskID int
StartTime time.Time
MapTaskArg string
ReduceTaskArg []string
}
其中TaskID字段用于 Map 阶段生成中间文件以及 Reduce 阶段生成最终文件的命名;StartTime方便判断任务是否超时;MapTaskArg和ReduceTaskArg的字符串类型保存的是文件名,因为都是本地文件读取,如果 Map worker 提供 RPC 方法来进行文件的读取,那么ReduceTaskArg需要增加一个 RPC 的连接信息。
有了任务结构体,我们就可以定义获取任务和上报任务结果的 RPC 调用中用到的参数:
type GetTaskArgs struct {}
type GetTaskReply struct {
NReduce int
Task
}
type ReportTaskArgs struct {
TaskType int
TaskID int
MapTaskResult []string
ReduceTaskResult string
}
type ReportTaskReply struct {}
接下来是最重要的Coordinator结构体,负责记录任务数量、任务状态、任务结果等信息。很多人会选择把所有的任务都保存在一个 slice 中,在Task结构体中增加一个任务状态字段用来标识。这样的好处是占用内存空间很少,坏处是每次分配任务都需要遍历。我选择把不同类型和不同状态的任务放在单独的 map 中,这样一共需要六个 map(两种类型、三种状态)。
至于状态的管理,基于锁的方法以及基于 channel 的方法都可以实现,并且各有优劣。基于锁的方法就是在结构体中设置一个或多个锁字段,并且在所有会对结构体中的状态进行操作的地方都用锁来保护;基于 channel 的方法的本质是状态的管理由一个单独的 goroutine 负责,其他 goroutine 把对状态的操作通过 channel 发送给这个单独的 goroutine。我选择基于 channel 的方法,一共需要三个 channel,一个用于 RPC 申请任务,一个用于 RPC 上报任务结果,最后一个用于结束 Coordinator 的运行。下面是定义的Coordinator结构体:
type Coordinator struct {
// Your definitions here.
nReduce int
phase int
mapTaskIdle map[int]*Task
mapTaskInprogress map[int]*Task
mapTaskCompleted map[int]*Task
reduceTaskIdle map[int]*Task
reduceTaskInprogress map[int]*Task
reduceTaskCompleted map[int]*Task
getTaskCh chan *GetTaskReply
reportTaskCh chan *ReportTaskArgs
exitCh chan struct{}
}
数据流图
下面我们来梳理一下系统的各个部分和数据流。Coordinator 运行后会开启 RPC 服务,同时后台启动一个 scheduler goroutine,负责发送任务到 getTashCh,以及从 reportTashCh 接收任务结果。Worker 启动后执行申请任务的 RPC 调用,RPC 库会发起一个到 Coordinator 的连接,并把要运行的函数的参数通过连接发动给Coordinator,Coordinator 的 RPC 服务会单独启动一个 goroutine 来运行指定的函数,从 getTaskCh 接收任务,并把结果通过连接发给 Worker 的 RPC,Worker 处理完任务,执行上报任务的 RPC 调用,Coordinator 单独启动一个 goroutine 来运行 ReportTask 函数,将任务结果发送给 reportTaskCh。
┌───────────┬──────────────────────────────────────────────────────────────────┐
│Coordinator│ │
├───────────┘ ┌─────────────────┐ │
│ │Coordinator state│ │
│ └────────▲────────┘ │
│ │ │
│ │ │
│ │ │
│ ┌────────────────┐ ┌────────┴┬───────┐ │
│ │ main goroutine │ │goroutine│ │ │
│ ├────────────────┤ ┌────┐ ├─────────┘ │ │
│ │ ◄─────┤chan◄─────┤ │ │
│ │Done() func call│ └────┘ │ scheduler │ │
│ │ │ │ │ │
│ └────────────────┘ └────┬────────▲───┘ │
│ │ │ │
│ │ │ │
│ ┌──▼─┐ ┌──┴─┐ │
│ │chan│ │chan│ │
│ └──┬─┘ └──▲─┘ │
│ │ │ │
│ │ │ │
│ ┌─────────┬──────▼─┐ ┌──┴──────┬───────────┐ │
│ │goroutine│ │ │goroutine│ │ │
│ ├─────────┘ │ ├─────────┘ │ │
│ │ │ │ │ │
│ │GetTask RPC server│ │ReportTask RPC server│ │
│ │ │ │ │ │
│ └──────────▲────┬──┘ └───────────▲────┬────┘ │
└───────────────────────────────────┼────┼──────────────────┼────┼─────────────┘
│ │ │ │
args reply args reply
│ │ │ │
┌──────┬──────┼────┼──────────────────┼────┼────────┐
┌──┤Worker│ │ │ │ │ │
┌──┤Wo├──────┘ │ │ │ │ │
│Wo├──┤ ┌─────────┬─┼────┼──────────────────┼────┼─────┐ │
├──┤ │ │goroutine│ │ │ │ │ │ │
│ │ │ ├─────────┘ │ │ │ │ │ │
│ │ │ │┌──────────┴────▼──┐ ┌───────────┴────▼────┐│ │
│ │ │ ││GetTask RPC client│ │ReportTask RPC client││ │
│ │ │ │└──────────────────┘ └─────────────────────┘│ │
│ │ │ │ │ │
│ │ │ └──────────────────────────────────────────────┘ │
│ │ │ │
│ │ └────────────────────────────────────────────────┬──┘
│ └────────────────────────────────────────────────┬──┘
└───────────────────────────────────────────────────┘
初始化
先创建一个空的Coordinator结构体,然后设置 reduce 任务的数量(用于 map 任务生成对应数量的中间文件),设置当前的阶段为执行 map 任务阶段,所有的阶段通过常量定义:
const (
MAPPHASE = iota
REDUCEPHASE
EXITPHASE
)
接着创建好六个 map 和三个 channel,并生成所有的 map 任务和 reduce 任务,放到对应的 map 中。结构体初始化完毕后就可以把 scheduler goroutine 跑起来了。最后,启动 RPC 服务,并返回初始化完毕的结构体指针。完整代码如下:
// create a Coordinator.
// main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use.
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{}
// Your code here.
c.nReduce = nReduce
c.phase = MAPPHASE
c.mapTaskIdle = make(map[int]*Task)
c.mapTaskInprogress = make(map[int]*Task)
c.mapTaskCompleted = make(map[int]*Task)
c.reduceTaskIdle = make(map[int]*Task)
c.reduceTaskInprogress = make(map[int]*Task)
c.reduceTaskCompleted = make(map[int]*Task)
c.getTaskCh = make(chan *GetTaskReply)
c.reportTaskCh = make(chan *ReportTaskArgs)
c.exitCh = make(chan struct{})
for k, file := range files {
c.mapTaskIdle[k] = &Task{
TaskType: MAPTASK,
TaskID: k,
MapTaskArg: file,
}
}
for i := 0; i < nReduce; i++ {
c.reduceTaskIdle[i] = &Task{
TaskType: REDUCETASK,
TaskID: i,
ReduceTaskArg: make([]string, 0),
}
}
go c.schedule()
c.server()
return &c
}
RPC 方法
获取任务的 RPC 方法只需要从getTaskCh接收任务,并设置reply参数的对应字段;上报任务结果的 RPC 方法只需要把任务结果发给reportTaskCh就行,都非常简单:
// Your code here -- RPC handlers for the worker to call.
func (c *Coordinator) GetTask(args *GetTaskArgs, reply *GetTaskReply) error {
msg := <-c.getTaskCh
reply.NReduce = msg.NReduce
reply.Task = msg.Task
return nil
}
func (c *Coordinator) ReportTask(args *ReportTaskArgs, reply *ReportTaskReply) error {
c.reportTaskCh <- args
return nil
}
调度器
上面提到的管理状态的单独的 goroutine 就是这里的调度器了,主要负责把待执行的任务通过 channel 发送给 RPC,以及从 channel 中接收来自 RPC 的任务结果。
func (c *Coordinator) schedule() {
var task *GetTaskReply
for {
if task == nil {
task = &GetTaskReply{NReduce: c.nReduce}
switch c.phase {
case MAPPHASE:
c.checkTimeout(MAPTASK)
if len(c.mapTaskIdle) == 0 {
task.TaskType = WAITTASK
} else {
var k int
var v *Task
for k, v = range c.mapTaskIdle {
v.StartTime = time.Now()
task.Task = *v
break
}
delete(c.mapTaskIdle, k)
c.mapTaskInprogress[k] = v
}
case REDUCEPHASE:
c.checkTimeout(REDUCETASK)
if len(c.reduceTaskIdle) == 0 {
task.TaskType = WAITTASK
} else {
var k int
var v *Task
for k, v = range c.reduceTaskIdle {
v.StartTime = time.Now()
task.Task = *v
break
}
delete(c.reduceTaskIdle, k)
c.reduceTaskInprogress[k] = v
}
case EXITPHASE:
task.TaskType = EXITTASK
}
}
select {
case c.getTaskCh <- task:
task = nil
case msg := <-c.reportTaskCh:
switch msg.TaskType {
case MAPTASK:
c.checkTimeout(MAPTASK)
if _, ok := c.mapTaskInprogress[msg.TaskID]; ok {
c.mapTaskCompleted[msg.TaskID] = c.mapTaskInprogress[msg.TaskID]
delete(c.mapTaskInprogress, msg.TaskID)
if len(c.mapTaskIdle) == 0 && len(c.mapTaskInprogress) == 0 {
c.phase = REDUCEPHASE
}
for i, file := range msg.MapTaskResult {
c.reduceTaskIdle[i].ReduceTaskArg = append(c.reduceTaskIdle[i].ReduceTaskArg, file)
}
}
case REDUCETASK:
c.checkTimeout(REDUCETASK)
if _, ok := c.reduceTaskInprogress[msg.TaskID]; ok {
c.reduceTaskCompleted[msg.TaskID] = c.reduceTaskInprogress[msg.TaskID]
delete(c.reduceTaskInprogress, msg.TaskID)
if len(c.reduceTaskIdle) == 0 && len(c.reduceTaskInprogress) == 0 {
c.phase = EXITPHASE
c.exitCh <- struct{}{}
}
}
}
}
}
}
在最后一个 reduce 任务也完成之后,往exitCh发送一个通知,这个通知会被mrcoordinator.go调用Done()的时候接收到,终止整个 Coordinator 的运行。
// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
func (c *Coordinator) Done() bool {
ret := false
// Your code here.
select {
case <-c.exitCh:
ret = true
default:
}
return ret
}
超时检测
为了简化调度器的逻辑,目前的超时检测采用了一种 lazy 的设计,也就是在分配任务和接收任务结果的时候才进行超时检测。通过比较当前时间和任务的开始执行时间,把超过 10 秒的任务状态都从执行中改为待执行。
func (c *Coordinator) checkTimeout(t int) {
now := time.Now()
timeOut := []int{}
switch t {
case MAPTASK:
for k, v := range c.mapTaskInprogress {
if now.Sub(v.StartTime) >= 10*time.Second {
timeOut = append(timeOut, k)
}
}
for _, k := range timeOut {
c.mapTaskIdle[k] = c.mapTaskInprogress[k]
delete(c.mapTaskInprogress, k)
}
case REDUCETASK:
for k, v := range c.reduceTaskInprogress {
if now.Sub(v.StartTime) >= 10*time.Second {
timeOut = append(timeOut, k)
}
}
for _, k := range timeOut {
c.reduceTaskIdle[k] = c.reduceTaskInprogress[k]
delete(c.reduceTaskInprogress, k)
}
}
}
上述方案一般情况下没什么问题。但是假设你要实现一个实时监控服务,考虑这样一种极端情况,只有一个 Worker 在运行,申请完任务以后花了很久时间才运行完,在超过 10 秒一直到上报任务结果的这段时间内本应看到的任务状态是因为超时变为 Idle 状态,在这种情况下却依然是Inprogress。另外,检测超时的操作是通过遍历任务列表来实现的,如果任务数量超级多的话,可能会出现比较严重的性能问题。
可以尝试的一种方案是每次分配任务后都用一个定时器跟踪,10 秒后定时器通知调度器,大致的代码如下,感兴趣的朋友可以自己实现。
// 取消 StartTime 字段
type Task struct {
TaskType int
TaskID int
MapTaskArg string
ReduceTaskArg []string
}
type Coordinator struct {
...
timeoutCh chan *Task
}
func MakeCoordinator(files []string, nReduce int) *Coordinator {
...
c.timeoutCh = make(chan *Task)
...
}
func (c *Coordinator) checkTimeout(t *Task) {
time.Sleep(10 * time.Second)
c.timeoutCh <- t
}
func (c *Coordinator) schedule() {
var task *GetTaskReply
for {
... // 任务不再设置 StartTime 字段
select {
case c.getTaskCh <- task:
switch task.TaskType {
case MAPTASK, REDUCETASK:
// 只在这一个地方调用
go c.checkTimeout(&task.Task)
}
task = nil
case msg := <-c.reportTaskCh:
...
case t := <-c.timeoutCh:
switch t.TaskType {
case MAPTASK:
if _, ok := c.mapTaskInprogress[t.TaskID]; ok {
c.mapTaskIdle[t.TaskID] = c.mapTaskInprogress[t.TaskID]
delete(c.mapTaskInprogress, t.TaskID)
}
case REDUCETASK:
if _, ok := c.reduceTaskInprogress[t.TaskID]; ok {
c.reduceTaskIdle[t.TaskID] = c.reduceTaskInprogress[t.TaskID]
delete(c.reduceTaskInprogress, t.TaskID)
}
}
}
}
}