MIT6.5840 Lab1 - MapReduce

单来说,MapReduce 过程包含两个动作,一个是 map 将任务拆解分配,一个是 reducemap 产生的中间结果取出组合。 下图为 MapReduce 流程,出自 MapReduce 论文:

mapreduce

根据上图,可以看出 MapReduce 经历了几个流程:

  1. 待处理文件分解,MapReduce 首先将输入文件划分为 M 片,之后在多台机器上启动了相同的程序拷贝。
  2. 其中有一个 master 拷贝程序,以及若干个 worker,worker 负责接收 master 分配的 mapreduce 任务。
  3. 被分配到 map 任务的 worker 会去读取相应输入块的内容,它在输入文件中解析出键值对并将每个键值对传送给用户定义的 Map 函数,由 Map 函数产生的中间键值对缓存在内存中。
  4. 被缓存的键值对会被阶段性的写回本地磁盘,并且被划分函数分割为 R 份。这些缓存对在磁盘上的位置会被回传给 master,master 再负责将这些位置转发给 reduce worker。
  5. 当 reduce worker 从 master 那里接收到文件存储路径信息后,会通过 rpc 从 map worker的本地磁盘中回去缓存数据,当 reduce worker 读入全部中间数据之后,会根据中间键对它们进行排序,这样所有具有相同键的键值对就聚集在一起了。
  6. reduce worker 遍历已经排完序的中间数据。每当遇到一个新的中间键,它会将 key 和相应的中间值传递给用户定义的 Reduce 函数。Reduce 函数的输出会被添加到这个 Reduce 部分的输出文件中。
  7. 当所有的 map task 和 reduce task 都已经完成的时候,master 将唤醒用户程序。到此为止,用户代码中的 MapReduce 调用返回。

Task

首先实现 Task 的数据结构,Task 用于 coordinator 节点和 worker 节点之间的 RPC 通信,其中 Type 表示该 Task 的类型;TaskId 表示当前任务的 ID 序号;ReducerTaskNum 表示在本次处理中参与的 reduce 任务的数量,map 任务需要根据该值创建对应的中间文件;Files 表示当前任务要处理的文件。

TaskType 中,ExitTask 表示当前所有 map 和 reduce 任务均已完成,可以直接退出;WaitTask 表示当前任务队列为空无法分配到新任务。

const (
	MapTask TaskType = iota
	ReduceTask
	ExitTask
	WaitTask
)

type Task struct {
	Type           TaskType
	TaskId         int
	ReducerTaskNum int
	Files          []string
}

此外,还定义了 Task 的元信息,用来表示 Task 的状态。

const (
	Waiting TaskStatus = iota
	Working
	Done
)
type TaskMetaData struct {
	Status    TaskStatus
	StartTime time.Time
	Task      *Task
}

为了管理所有的 Task,定义了 TaskSet,用于保存所有 Task的状态。

type TaskSet struct {
	mapTaskMap    map[int]*TaskMetaData
	reduceTaskMap map[int]*TaskMetaData
}

由于核心操作都会经过 TaskSet,因此 TaskSet 定义了若干方法:

RegisterTask:创建一个新的 Task,将其保存至 TaskSet;

StartTask:根据传入的 Task,将其状态设置为 Working

CompleteTask:根据传入的 Task,将其状态设置为 Done

CheckComplete:检查某一类型的 Task 是否全部完成;

CheckTaskTimeout:查找所有超时的 Task;

// RegisterTask register task to TaskSet
func (t *TaskSet) RegisterTask(task *Task) {
	taskMetaData := NewTaskMetaData(task)

	switch task.Type {
	case MapTask:
		t.mapTaskMap[task.TaskId] = taskMetaData
	case ReduceTask:
		t.reduceTaskMap[task.TaskId] = taskMetaData
	default:
		log.Panic("Cannot add unsupported task to TaskSet.")
	}
}

// StartTask
// When a task is handed over to the corresponding worker via RPC,
// update the timestamp and status information.
func (t *TaskSet) StartTask(task *Task) {
	var taskMetaData *TaskMetaData

	switch task.Type {
	case MapTask:
		taskMetaData = t.mapTaskMap[task.TaskId]
	case ReduceTask:
		taskMetaData = t.reduceTaskMap[task.TaskId]
	default:
		log.Panic("Cannot get unsupported task to TaskSet.")
	}

	taskMetaData.StartTime = time.Now()
	taskMetaData.Status = Working
}

// CompleteTask set a task status to done
func (t *TaskSet) CompleteTask(task *Task) bool {
	var taskMetaData *TaskMetaData

	switch task.Type {
	case MapTask:
		taskMetaData = t.mapTaskMap[task.TaskId]
	case ReduceTask:
		taskMetaData = t.reduceTaskMap[task.TaskId]
	default:
		log.Panic("Cannot get unsupported task to TaskSet.")
	}

	// Sometimes the task has been handled by another worker due to slow execution.
	if taskMetaData.Status == Done {
		log.Printf("Task already completed, thus result abandoned. Task: %v\n", taskMetaData)
		return false
	}

	taskMetaData.Status = Done

	return true
}

// CheckComplete Check if all tasks of a certain type are completed.
func (t *TaskSet) CheckComplete(taskType TaskType) bool {
	var mp map[int]*TaskMetaData

	switch taskType {
	case MapTask:
		mp = t.mapTaskMap
	case ReduceTask:
		mp = t.reduceTaskMap
	default:
		log.Panic("Cannot check unsupported task type in TaskType.")
	}

	for _, ts := range mp {
		if ts.Status != Done {
			return false
		}
	}

	return true
}

// CheckTaskTimeout Periodically check for timeout tasks.
func (t *TaskSet) CheckTaskTimeout(taskType TaskType) []*Task {
	var out []*Task
	var mp map[int]*TaskMetaData

	switch taskType {
	case MapTask:
		mp = t.mapTaskMap
	case ReduceTask:
		mp = t.reduceTaskMap
	default:
		log.Panic("Cannot check unsupported task type in TaskType.")
	}

	for _, ts := range mp {
		if ts.Status == Working && time.Since(ts.StartTime) > TaskTimeout*time.Second {
			out = append(out, ts.Task)
		}
	}

	return out
}