0%

MIT6.824 Lab1

写在前面

希望在寒假加上开学前两周自学完MIT 6.824课程,这个是MIT分布式系统的博士课程,应该也是分布式系统入门的神课。

采用的材料是2020Spring学期的材料,如果没有弃坑的话,对应的Lab解答都会同步更新到博客里面。

课程主页:http://nil.csail.mit.edu/6.824/2020/schedule.html

个人代码Repo:https://github.com/Phimos/6.824-2020Spring

实现过程

Lab1的主要任务就是通过Golang来实现一个MapReduce的算法,具体的内容可以从Lab1的官方文档中获取。

首先定义任务的类型,具体如下,最关键的就是对应的类型/状态/编号。同时利用MapFileReduceFiles字段来提供对应任务的输入。

1
2
3
4
5
6
7
8
9
10
11
type MapReduceTask struct {
TaskType string // Map / Reduce / Wait
TaskStatus string // Unassigned / Assigned / Finished
TaskNum int

MapFile string // Input of Map task
ReduceFiles []string // Input of Reduce task

NumReduce int
NumMap int
}

之后定义RPC相关的参数和返回值,对于请求有两种,当是finish情况时,需要提供对应的Task字段便于修改Master里面的人物状态。

1
2
3
4
5
6
7
8
9
10
// Args for RPC
type MapReduceArgs struct {
MessageType string // request / finish
Task MapReduceTask
}

// Reply for RPC
type MapReduceReply struct {
Task MapReduceTask
}

对于Master的类型,实际上最关键的只有MapTasksReduceTasks以及mu三项,用来存储所有任务的状态。剩下的字段其实都可以根据任务的信息来进行统计,只是处于方便进行监测。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type Master struct {
// Your definitions here.
NumMap int
NumMapFinished int
NumReduce int
NumReduceFinished int
mu sync.Mutex

MapTasks []MapReduceTask
ReduceTasks []MapReduceTask

MapFinish bool
ReduceFinish bool
}

之后就根据Lab文档当中所给的Hints来梳理整个Lab1的内容。

One way to get started is to modify mr/worker.go's Worker() to send an RPC to the master asking for a task. Then modify the master to respond with the file name of an as-yet-unstarted map task. Then modify the worker to read that file and call the application Map function, as in mrsequential.go.

Worker()当中仿照样例里面的内容发送RPC请求,结果分为三类执行对应的MapReduceWait三类任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {

// Your worker implementation here.
for {
args := MapReduceArgs{MessageType: "request"}
reply := MapReduceReply{}

resp := call("Master.MapReduceHandler", &args, &reply)

if !resp {
break
}

switch reply.Task.TaskType {
case "Map":
mapTask(mapf, reply.Task)
case "Reduce":
reduceTask(reducef, reply.Task)
case "Wait":
waitTask()
}
}
}

对于Map的任务,主体部分直接参考mrsequential.go当中的代码,这里只需要从得到的task里面获取输入,并且修改对应的输出内容即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func mapTask(mapf func(string, string) []KeyValue, task MapReduceTask) {
filename := task.MapFile

file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", filename)
}
file.Close()

kva := mapf(filename, string(content))

kvaa := make([][]KeyValue, task.NumReduce)
for _, kv := range kva {
idx := ihash(kv.Key) % task.NumReduce
kvaa[idx] = append(kvaa[idx], kv)
}

for i := 0; i < task.NumReduce; i++ {
storeIntermediateFile(kvaa[i], intermediateFilename(task.TaskNum, i))
}

defer finishTask(task)
}

The application Map and Reduce functions are loaded at run-time using the Go plugin package, from files whose names end in .so.

这边的话是通过.so文件来直接获取用于Map和Reduce过程的具体函数,包括最终进行test的脚本文件也是通过加载不同的.so来实现不同的测试效果,例如对crash情况的模拟。以下是worker.go中的对应的内容,直接通过已有的loadPlugin()进行加载就可以了。

1
2
3
4
5
6
7
8
9
10
func main() {
if len(os.Args) != 2 {
fmt.Fprintf(os.Stderr, "Usage: mrworker xxx.so\n")
os.Exit(1)
}

mapf, reducef := loadPlugin(os.Args[1])

mr.Worker(mapf, reducef)
}

If you change anything in the mr/ directory, you will probably have to re-build any MapReduce plugins you use, with something like go build -buildmode=plugin ../mrapps/wc.go

**这里每次都要重新build!**不然直接会出错,一开始没有注意到检查了挺久。

This lab relies on the workers sharing a file system. That’s straightforward when all workers run on the same machine, but would require a global filesystem like GFS if the workers ran on different machines.

由于是在同一台机器上去实现这样一个分布式的MapReduce方法,对文件系统实际上进行了一定程度的简化。

A reasonable naming convention for intermediate files is mr-X-Y, where X is the Map task number, and Y is the reduce task number.

这边利用一个函数来规范化对应N x M个中间文件的命名,具体的命名方式采用如上的mr-X-Y格式。

1
2
3
func intermediateFilename(numMapTask int, numReduceTask int) string {
return fmt.Sprintf("mr-%v-%v", numMapTask, numReduceTask)
}

The worker’s map task code will need a way to store intermediate key/value pairs in files in a way that can be correctly read back during reduce tasks. One possibility is to use Go’s encoding/json package. To write key/value pairs to a JSON file:

1
2
3
enc := json.NewEncoder(file)
for _, kv := ... {
err := enc.Encode(&kv)

and to read such a file back:

1
2
3
4
5
6
7
8
dec := json.NewDecoder(file)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
kva = append(kva, kv)
}

正如在上面所提到的,利用encoding/json对于中间结果进行存取,只需要对于以上的代码进行简单修改即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func storeIntermediateFile(kva []KeyValue, filename string) {
file, err := os.Create(filename)
defer file.Close()

if err != nil {
log.Fatalf("cannot open %v", filename)
}
enc := json.NewEncoder(file)
if err != nil {
log.Fatal("cannot create encoder")
}
for _, kv := range kva {
err := enc.Encode(&kv)
if err != nil {
log.Fatal("cannot encode")
}
}
}

func loadIntermediateFile(filename string) []KeyValue {
var kva []KeyValue
file, err := os.Open(filename)
defer file.Close()

if err != nil {
log.Fatalf("cannot open %v", filename)
}
dec := json.NewDecoder(file)
for {
kv := KeyValue{}
if err := dec.Decode(&kv); err != nil {
break
}
kva = append(kva, kv)
}
return kva
}

The map part of your worker can use the ihash(key) function (in worker.go) to pick the reduce task for a given key.

这是mapTask()当中对应的代码部分,在下面的第5行可以看到,通过ihash(key) % nReduce来得到对应的中间输出文件,进行中间结果的划分。

1
2
3
4
5
6
7
8
9
10
11
kva := mapf(filename, string(content))

kvaa := make([][]KeyValue, task.NumReduce)
for _, kv := range kva {
idx := ihash(kv.Key) % task.NumReduce
kvaa[idx] = append(kvaa[idx], kv)
}

for i := 0; i < task.NumReduce; i++ {
storeIntermediateFile(kvaa[i], intermediateFilename(task.TaskNum, i))
}

You can steal some code from mrsequential.go for reading Map input files, for sorting intermedate key/value pairs between the Map and Reduce, and for storing Reduce output in files.

读书人的事情,能够叫做偷吗

既然说了让抄,那就直接从mrsequential.go里面抄就好了,对应的输入输出需要进行修改,主体部分直接粘贴即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func reduceTask(reducef func(string, []string) string, task MapReduceTask) {
var intermediate []KeyValue
for _, filename := range task.ReduceFiles {
intermediate = append(intermediate, loadIntermediateFile(filename)...)
}
sort.Sort(ByKey(intermediate))
oname := fmt.Sprintf("mr-out-%v", task.TaskNum)
ofile, _ := os.Create(oname)

//
// call Reduce on each distinct key in intermediate[],
// and print the result to mr-out-0.
//
i := 0
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
output := reducef(intermediate[i].Key, values)

// this is the correct format for each line of Reduce output.
fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)

i = j
}

ofile.Close()

defer finishTask(task)
}

The master, as an RPC server, will be concurrent; don’t forget to lock shared data.

这里需要上锁的是Master,防止多个Worker进行同时的修改,我这里采用了一个非常暴力的大锁,在MapReduceHandler()的最开始对Master上锁,然后利用defer在函数退出之后进行锁的释放,保证每次只有一个MapReduceHandler()对Master当中的结构进行修改,确保对并发可以正确处理。

1
2
3
4
5
6
// Your code here -- RPC handlers for the worker to call.
func (m *Master) MapReduceHandler(args *MapReduceArgs, reply *MapReduceReply) error {
m.mu.Lock()
defer m.mu.Unlock()
...
}

Use Go’s race detector, with go build -race and go run -race. test-mr.sh has a comment that shows you how to enable the race detector for the tests.

这边是课上所提到的检验race的方法,go语言可以通过对于内存进行监控的情况来查看是否有race发生,但是不会被执行的代码当中的race错误是不会被分析出来的,因为go语言并没有对race进行一个静态的分析。

Workers will sometimes need to wait, e.g. reduces can’t start until the last map has finished. One possibility is for workers to periodically ask the master for work, sleeping with time.Sleep() between each request. Another possibility is for the relevant RPC handler in the master to have a loop that waits, either with time.Sleep() or sync.Cond. Go runs the handler for each RPC in its own thread, so the fact that one handler is waiting won’t prevent the master from processing other RPCs.

暂时还没有新的Task分配的状况,会在Map的任务已经全部分配下去但是还没有全部完成的情况出现。这个时候Master给Worker一个Wait的指令,通过time.Sleep()等待之后再次发送一个申请新任务的RPC请求。

1
2
3
func waitTask() {
time.Sleep(time.Second)
}

The master can’t reliably distinguish between crashed workers, workers that are alive but have stalled for some reason, and workers that are executing but too slowly to be useful. The best you can do is have the master wait for some amount of time, and then give up and re-issue the task to a different worker. For this lab, have the master wait for ten seconds; after that the master should assume the worker has died (of course, it might not have).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
if args.MessageType == "request" {
if !m.MapFinish {
for index, task := range m.MapTasks {
if task.TaskStatus == "Unassigned" {
m.MapTasks[index].TaskStatus = "Assigned"
reply.Task = m.MapTasks[index]
go m.checkTimeout("Map", index, 10)
return nil
}
}
reply.Task.TaskType = "Wait"
return nil
} else if !m.ReduceFinish {
for index, task := range m.ReduceTasks {
if task.TaskStatus == "Unassigned" {
m.ReduceTasks[index].TaskStatus = "Assigned"
reply.Task = m.ReduceTasks[index]
go m.checkTimeout("Reduce", index, 10)
return nil
}
}
reply.Task.TaskType = "Wait"
return nil
} else {
return nil
}
}

具体的checkTimeout()实现如下,就是进行一定时间的Sleep,之后检查任务是否完成,如果仍然没有完成处于Assigned状态的话,说明对应的Worker可能已经crash了。于是将其标为Unassigned状态,在之后进行重分配,保证MapReduce流程可以继续运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (m *Master) checkTimeout(taskType string, num int, timeout int) {
time.Sleep(time.Second * time.Duration(timeout))
m.mu.Lock()
defer m.mu.Unlock()
if taskType == "Map" {
if m.MapTasks[num].TaskStatus == "Assigned" {
m.MapTasks[num].TaskStatus = "Unassigned"
}
} else {
if m.ReduceTasks[num].TaskStatus == "Assigned" {
m.ReduceTasks[num].TaskStatus = "Unassigned"
}
}
}

To test crash recovery, you can use the mrapps/crash.go application plugin. It randomly exits in the Map and Reduce functions.

Test的最后一个部分就是crash test,会使用crash.so文件,事实上,在上面利用一个go routine取检查超时,如果一个worker在工作过程中crash了,那么就无法在最后返回个master一个Finish的RPC。所以利用以上的超时检查机制,是可以简单的对于Lab当中的crash进行正确重分配任务,保证MapReduce流程正常进行。

To ensure that nobody observes partially written files in the presence of crashes, the MapReduce paper mentions the trick of using a temporary file and atomically renaming it once it is completely written. You can use ioutil.TempFile to create a temporary file and os.Rename to atomically rename it.

这边只需要修改对应的输出为TempFile -> Rename的流程就可以,这里就不进行修改了。

test-mr.sh runs all the processes in the sub-directory mr-tmp, so if something goes wrong and you want to look at intermediate or output files, look there.

执行sh test-mr.sh的结果如下所示,可以发现对于Lab1的所有测试都完全通过。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
*** Starting wc test.
2021/02/03 16:24:03 rpc.Register: method "Done" has 1 input parameters; needs exactly three
--- wc test: PASS
*** Starting indexer test.
2021/02/03 16:24:09 rpc.Register: method "Done" has 1 input parameters; needs exactly three
--- indexer test: PASS
*** Starting map parallelism test.
2021/02/03 16:24:12 rpc.Register: method "Done" has 1 input parameters; needs exactly three
--- map parallelism test: PASS
*** Starting reduce parallelism test.
2021/02/03 16:24:20 rpc.Register: method "Done" has 1 input parameters; needs exactly three
--- reduce parallelism test: PASS
*** Starting crash test.
2021/02/03 16:24:28 rpc.Register: method "Done" has 1 input parameters; needs exactly three
--- crash test: PASS
*** PASSED ALL TESTS

体验

  • Golang上手的感觉还行,感觉和C语言差别不大,有C语言的基础熟悉一下语言特性应该就可以上手。
  • 难度一般,可能是Lab1的缘故,总体实现上困难不是很大。
  • 从文档和代码上看,整个Lab的结构上感觉没有6.828课程的巧妙,但是已经非常完备了,远超国内学校课程作业的实用程度。