Part3
part3实现 mapreduce/schedule.go
来对多个worker进行RPC调度。这里使用sync.WaitGroup
等待goroutine完成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
|
var waitGroup sync.WaitGroup for i := 0; i < ntasks; i++ { waitGroup.Add(1) var taskArgs DoTaskArgs taskArgs.JobName = jobName taskArgs.Phase = phase taskArgs.TaskNumber = i taskArgs.NumOtherPhase = n_other if phase == mapPhase { taskArgs.File = mapFiles[i] } go func() { defer waitGroup.Done() worker := <-registerChan if !call(worker, "Worker.DoTask", &taskArgs, nil) { log.Fatal("call RPC failed") } go func() {registerChan <- worker}() }() } waitGroup.Wait()
|
Part4
part4考虑某个worker fails之后应当把任务交给另外的worker,加入for
循环直至成功调用即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
var waitGroup sync.WaitGroup for i := 0; i < ntasks; i++ { waitGroup.Add(1) var taskArgs DoTaskArgs taskArgs.JobName = jobName taskArgs.Phase = phase taskArgs.TaskNumber = i taskArgs.NumOtherPhase = n_other if phase == mapPhase { taskArgs.File = mapFiles[i] } go func() { defer waitGroup.Done() for { worker := <-registerChan if call(worker, "Worker.DoTask", taskArgs, nil) { go func() { registerChan <- worker }() break }
} }() } waitGroup.Wait()
|
Part5
part5与part2类似,实现main/ii.go
中的mapF()
和reduceF()
,统计词在所有文件中出现的次数以及所在的文件,输出word: num pg-xx.txt,pg-xxx.txt
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
func (document string, value string) (res []mapreduce.KeyValue) { words := strings.FieldsFunc(value, func(r rune) bool { return !unicode.IsLetter(r) }) DocMaps := make(map[string]string) var kv []mapreduce.KeyValue for _, word := range words { DocMaps[word] = document } for k, doc := range DocMaps { kv = append(kv, mapreduce.KeyValue{Key: k, Value: doc}) } return kv }
func reduceF(key string, values []string) string { nDoc := len(values) sort.Strings(values) resString := strconv.Itoa(nDoc) + " " for _, v := range values { resString = resString + v + "," } return resString[:(len(resString)-1)] }
|
近期评论