mit6.824 lab1 part3,4,5

Part3

part3实现 mapreduce/schedule.go来对多个worker进行RPC调度。这里使用sync.WaitGroup等待goroutine完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
var waitGroup sync.WaitGroup
for i := 0; i < ntasks; i++ {
waitGroup.Add(1)
var taskArgs DoTaskArgs
taskArgs.JobName = jobName
taskArgs.Phase = phase
taskArgs.TaskNumber = i
taskArgs.NumOtherPhase = n_other
if phase == mapPhase {
taskArgs.File = mapFiles[i]
}
go func() {
defer waitGroup.Done()
worker := <-registerChan
if !call(worker, "Worker.DoTask", &taskArgs, nil) {
log.Fatal("call RPC failed")
}
go func() {registerChan <- worker}()
}()
}
waitGroup.Wait()

Part4

part4考虑某个worker fails之后应当把任务交给另外的worker,加入for循环直至成功调用即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
var waitGroup sync.WaitGroup
for i := 0; i < ntasks; i++ {
waitGroup.Add(1)
var taskArgs DoTaskArgs
taskArgs.JobName = jobName
taskArgs.Phase = phase
taskArgs.TaskNumber = i
taskArgs.NumOtherPhase = n_other
if phase == mapPhase {
taskArgs.File = mapFiles[i]
}
go func() {
defer waitGroup.Done()
for {
worker := <-registerChan
if call(worker, "Worker.DoTask", taskArgs, nil) {
go func() {
registerChan <- worker
}()
break
}

}
}()
}
waitGroup.Wait()

Part5

part5与part2类似,实现main/ii.go中的mapF()reduceF(),统计词在所有文件中出现的次数以及所在的文件,输出word: num pg-xx.txt,pg-xxx.txt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (document string, value string) (res []mapreduce.KeyValue) {

words := strings.FieldsFunc(value, func(r rune) bool {
return !unicode.IsLetter(r)
})
DocMaps := make(map[string]string)
var kv []mapreduce.KeyValue
for _, word := range words {
DocMaps[word] = document
}
for k, doc := range DocMaps {
kv = append(kv, mapreduce.KeyValue{Key: k, Value: doc})
}
return kv
}

func reduceF(key string, values []string) string {

nDoc := len(values)
sort.Strings(values)
resString := strconv.Itoa(nDoc) + " "
for _, v := range values {
resString = resString + v + ","
}
return resString[:(len(resString)-1)]
}