简单来说,MapReduce 过程包含两个动作,一个是 map
将任务拆解分配,一个是 reduce
将 map
产生的中间结果取出组合。
下图为 MapReduce 流程,出自 MapReduce 论文:
根据上图,可以看出 MapReduce 经历了几个流程:
- 待处理文件分解,MapReduce 首先将输入文件划分为
M
片,之后在多台机器上启动了相同的程序拷贝。 - 其中有一个 master 拷贝程序,以及若干个 worker,worker 负责接收 master 分配的
map
或reduce
任务。 - 被分配到
map
任务的 worker 会去读取相应输入块的内容,它在输入文件中解析出键值对并将每个键值对传送给用户定义的Map
函数,由Map
函数产生的中间键值对缓存在内存中。 - 被缓存的键值对会被阶段性的写回本地磁盘,并且被划分函数分割为
R
份。这些缓存对在磁盘上的位置会被回传给 master,master 再负责将这些位置转发给 reduce worker。 - 当 reduce worker 从 master 那里接收到文件存储路径信息后,会通过 rpc 从 map worker的本地磁盘中回去缓存数据,当 reduce worker 读入全部中间数据之后,会根据中间键对它们进行排序,这样所有具有相同键的键值对就聚集在一起了。
- reduce worker 遍历已经排完序的中间数据。每当遇到一个新的中间键,它会将 key 和相应的中间值传递给用户定义的
Reduce
函数。Reduce
函数的输出会被添加到这个 Reduce 部分的输出文件中。 - 当所有的 map task 和 reduce task 都已经完成的时候,master 将唤醒用户程序。到此为止,用户代码中的 MapReduce 调用返回。
Task
首先实现 Task 的数据结构,Task 用于 coordinator 节点和 worker 节点之间的 RPC 通信,其中 Type 表示该 Task 的类型;TaskId
表示当前任务的 ID
序号;ReducerTaskNum
表示在本次处理中参与的 reduce 任务的数量,map 任务需要根据该值创建对应的中间文件;Files
表示当前任务要处理的文件。
在 TaskType
中,ExitTask
表示当前所有 map 和 reduce 任务均已完成,可以直接退出;WaitTask
表示当前任务队列为空无法分配到新任务。
const (
MapTask TaskType = iota
ReduceTask
ExitTask
WaitTask
)
type Task struct {
Type TaskType
TaskId int
ReducerTaskNum int
Files []string
}
此外,还定义了 Task
的元信息,用来表示 Task
的状态。
const (
Waiting TaskStatus = iota
Working
Done
)
type TaskMetaData struct {
Status TaskStatus
StartTime time.Time
Task *Task
}
为了管理所有的 Task
,定义了 TaskSet
,用于保存所有 Task
的状态。
type TaskSet struct {
mapTaskMap map[int]*TaskMetaData
reduceTaskMap map[int]*TaskMetaData
}
由于核心操作都会经过 TaskSet
,因此 TaskSet
定义了若干方法:
RegisterTask:创建一个新的 Task,将其保存至 TaskSet;
StartTask:根据传入的 Task,将其状态设置为
Working
;CompleteTask:根据传入的 Task,将其状态设置为
Done
;CheckComplete:检查某一类型的 Task 是否全部完成;
CheckTaskTimeout:查找所有超时的 Task;
// RegisterTask register task to TaskSet
func (t *TaskSet) RegisterTask(task *Task) {
taskMetaData := NewTaskMetaData(task)
switch task.Type {
case MapTask:
t.mapTaskMap[task.TaskId] = taskMetaData
case ReduceTask:
t.reduceTaskMap[task.TaskId] = taskMetaData
default:
log.Panic("Cannot add unsupported task to TaskSet.")
}
}
// StartTask
// When a task is handed over to the corresponding worker via RPC,
// update the timestamp and status information.
func (t *TaskSet) StartTask(task *Task) {
var taskMetaData *TaskMetaData
switch task.Type {
case MapTask:
taskMetaData = t.mapTaskMap[task.TaskId]
case ReduceTask:
taskMetaData = t.reduceTaskMap[task.TaskId]
default:
log.Panic("Cannot get unsupported task to TaskSet.")
}
taskMetaData.StartTime = time.Now()
taskMetaData.Status = Working
}
// CompleteTask set a task status to done
func (t *TaskSet) CompleteTask(task *Task) bool {
var taskMetaData *TaskMetaData
switch task.Type {
case MapTask:
taskMetaData = t.mapTaskMap[task.TaskId]
case ReduceTask:
taskMetaData = t.reduceTaskMap[task.TaskId]
default:
log.Panic("Cannot get unsupported task to TaskSet.")
}
// Sometimes the task has been handled by another worker due to slow execution.
if taskMetaData.Status == Done {
log.Printf("Task already completed, thus result abandoned. Task: %v\n", taskMetaData)
return false
}
taskMetaData.Status = Done
return true
}
// CheckComplete Check if all tasks of a certain type are completed.
func (t *TaskSet) CheckComplete(taskType TaskType) bool {
var mp map[int]*TaskMetaData
switch taskType {
case MapTask:
mp = t.mapTaskMap
case ReduceTask:
mp = t.reduceTaskMap
default:
log.Panic("Cannot check unsupported task type in TaskType.")
}
for _, ts := range mp {
if ts.Status != Done {
return false
}
}
return true
}
// CheckTaskTimeout Periodically check for timeout tasks.
func (t *TaskSet) CheckTaskTimeout(taskType TaskType) []*Task {
var out []*Task
var mp map[int]*TaskMetaData
switch taskType {
case MapTask:
mp = t.mapTaskMap
case ReduceTask:
mp = t.reduceTaskMap
default:
log.Panic("Cannot check unsupported task type in TaskType.")
}
for _, ts := range mp {
if ts.Status == Working && time.Since(ts.StartTime) > TaskTimeout*time.Second {
out = append(out, ts.Task)
}
}
return out
}