Dependency
Spark将RDD之间的依赖分为窄依赖和宽依赖,源码中对应的定义分别为NarrowDependency和ShuffleDependency。
其中NarrowDependency又有两种OneToOneDependency和RangeDependency两种实现。
NarrowDependency
OneToOneDependency用于描述一个子RDD只依赖一个父RDD的情况。
RangeDependency用于描述一个子RDD依赖多个父RDD,但每个父RDD只被一个子RDD依赖的情况。
OneToOneDependency
实现如下:
@DeveloperApiclass OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) { override def getParents(partitionId: Int): List[Int] = List(partitionId)} |
RangeDependency
实现如下:
@DeveloperApiclass RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int) extends NarrowDependency[T](rdd) { override def getParents(partitionId: Int): List[Int] = { if (partitionId >= outStart && partitionId < outStart + length) { List(partitionId - outStart + inStart) } else { Nil } }} |
ShuffleDependency
ShuffleDependency描述一个父RDD被多个子RDD依赖的情况。
实现如下:
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( @transient private val _rdd: RDD[_ <: Product2[K, V]], val partitioner: Partitioner, val serializer: Option[Serializer] = None, val keyOrdering: Option[Ordering[K]] = None, val aggregator: Option[Aggregator[K, V, C]] = None, val mapSideCombine: Boolean = false) extends Dependency[Product2[K, V]] { override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]] private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName // Note: It's possible that the combiner class tag is null, if the combineByKey // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag. private[spark] val combinerClassName: Option[String] = Option(reflect.classTag[C]).map(_.runtimeClass.getName) val shuffleId: Int = _rdd.context.newShuffleId() val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle( shuffleId, _rdd.partitions.size, this) _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))} |
每个ShuffleDependency作用的RDD都是K,V类型的,每个ShuffleDependency对应一个shuffleId,并同一个shuffleHandle相关联。
DAGScheduler
Task生成
根据stage类型生成相应的Task:
- ShuffleMapStage生成ShuffleMapTask
- ResultStage生成ResultTask
相关代码如下:
val tasks: Seq[Task[_]] = try { stage match { case stage: ShuffleMapStage => partitionsToCompute.map { id => val locs = taskIdToLocations(id) val part = stage.rdd.partitions(id) new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, stage.internalAccumulators) } case stage: ResultStage => val job = stage.activeJob.get partitionsToCompute.map { id => val p: Int = stage.partitions(id) val part = stage.rdd.partitions(p) val locs = taskIdToLocations(id) new ResultTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, id, stage.internalAccumulators) } }} catch { ...} |
Task提交
对为stage生成的任务构造一个TaskSet,交给taskScheduler,通过submitTasks()接口进行提交。
if (tasks.size > 0) { logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")") stage.pendingPartitions ++= tasks.map(_.partitionId) logDebug("New pending partitions: " + stage.pendingPartitions) taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties)) stage.latestInfo.submissionTime = Some(clock.getTimeMillis())} else { ...} |
TaskScheduler
TaskScheduler负责底层task粒度的任务调度,接口定义于org.apache.spark.scheduler.TaskScheduler。
实现类为org.apache.spark.scheduler.TaskSchedulerImpl。
每个SparkContext对应一个TaskScheduler,负责接收DAGScheduler为每个Stage分派的task,将task提交到集群运行。
如果作业运行失败,TaskScheduler会负责重试,并且能够尽量避免慢节点问题。
TaskScheduler会将task运行产生的事件返回给DAGScheduler。
主要接口包括:
|
接口
|
参数
|
功能
|
|---|---|---|
submitTasks |
taskSet: TaskSet |
提交由Taskset描述的一组task的集合 |
cancelTasks |
stageId: Int, interruptThread: Boolean | 取消运行指定stage |
setDAGScheduler |
dagScheduler: DAGScheduler | 设置当前taskScheduler的dagScheduler |
executorHeartbeatReceived |
execId: String, taskMetrics: Array[(Long, TaskMetrics)],
blockManagerId: BlockManagerId |
更新task的状态信息,并通知driver指定blackManager是否alive,若是则返回true,否则返回false |
| executorLost | executorId: String, reason: ExecutorLossReason | 处理丢失的 executor信息 |
TaskSchedulerImpl
具体的实现对于同一SparkContext的不同job有FIFO和FAIR两种调度策略,由spark.scheduler.mode参数控制。
提交task:主要是调用backend的reviveOffers接口。
override def submitTasks(taskSet: TaskSet) { val tasks = taskSet.tasks logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") this.synchronized { val manager = createTaskSetManager(taskSet, maxTaskFailures) val stage = taskSet.stageId val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) stageTaskSets(taskSet.stageAttemptId) = manager ... schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) ... hasReceivedTask = true } backend.reviveOffers()} |
撤销task,主要是调用backend的killTask接口。
override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized { logInfo("Cancelling stage " + stageId) taskSetsByStageIdAndAttempt.get(stageId).foreach { attempts => attempts.foreach { case (_, tsm) => // There are two possible cases here: // 1. The task set manager has been created and some tasks have been scheduled. // In this case, send a kill signal to the executors to kill the task and then abort // the stage. // 2. The task set manager has been created but no tasks has been scheduled. In this case, // simply abort the stage. tsm.runningTasksSet.foreach { tid => val execId = taskIdToExecutorId(tid) backend.killTask(tid, execId, interruptThread) } ... }} |