Dependency
Spark将RDD之间的依赖分为窄依赖和宽依赖,源码中对应的定义分别为NarrowDependency和ShuffleDependency。
其中NarrowDependency又有两种OneToOneDependency和RangeDependency两种实现。
NarrowDependency
OneToOneDependency用于描述一个子RDD只依赖一个父RDD的情况。
RangeDependency用于描述一个子RDD依赖多个父RDD,但每个父RDD只被一个子RDD依赖的情况。
OneToOneDependency
实现如下:
@ DeveloperApi class OneToOneDependency[T](rdd : RDD[T]) extends NarrowDependency[T](rdd) { override def getParents(partitionId : Int) : List[Int] = List(partitionId) } |
RangeDependency
实现如下:
@ DeveloperApi class RangeDependency[T](rdd : RDD[T], inStart : Int, outStart : Int, length : Int) extends NarrowDependency[T](rdd) { override def getParents(partitionId : Int) : List[Int] = { if (partitionId > = outStart && partitionId < outStart + length) { List(partitionId - outStart + inStart) } else { Nil } } } |
ShuffleDependency
ShuffleDependency描述一个父RDD被多个子RDD依赖的情况。
实现如下:
class ShuffleDependency[K : ClassTag, V : ClassTag, C : ClassTag]( @ transient private val _ rdd : RDD[ _ < : Product 2 [K, V]], val partitioner : Partitioner, val serializer : Option[Serializer] = None, val keyOrdering : Option[Ordering[K]] = None, val aggregator : Option[Aggregator[K, V, C]] = None, val mapSideCombine : Boolean = false ) extends Dependency[Product 2 [K, V]] { override def rdd : RDD[Product 2 [K, V]] = _ rdd.asInstanceOf[RDD[Product 2 [K, V]]] private [spark] val keyClassName : String = reflect.classTag[K].runtimeClass.getName private [spark] val valueClassName : String = reflect.classTag[V].runtimeClass.getName // Note: It's possible that the combiner class tag is null, if the combineByKey // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag. private [spark] val combinerClassName : Option[String] = Option(reflect.classTag[C]).map( _ .runtimeClass.getName) val shuffleId : Int = _ rdd.context.newShuffleId() val shuffleHandle : ShuffleHandle = _ rdd.context.env.shuffleManager.registerShuffle( shuffleId, _ rdd.partitions.size, this ) _ rdd.sparkContext.cleaner.foreach( _ .registerShuffleForCleanup( this )) } |
每个ShuffleDependency作用的RDD都是K,V类型的,每个ShuffleDependency对应一个shuffleId,并同一个shuffleHandle相关联。
DAGScheduler
Task生成
根据stage类型生成相应的Task:
- ShuffleMapStage生成ShuffleMapTask
- ResultStage生成ResultTask
相关代码如下:
val tasks : Seq[Task[ _ ]] = try { stage match { case stage : ShuffleMapStage = > partitionsToCompute.map { id = > val locs = taskIdToLocations(id) val part = stage.rdd.partitions(id) new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, stage.internalAccumulators) } case stage : ResultStage = > val job = stage.activeJob.get partitionsToCompute.map { id = > val p : Int = stage.partitions(id) val part = stage.rdd.partitions(p) val locs = taskIdToLocations(id) new ResultTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, id, stage.internalAccumulators) } } } catch { ... } |
Task提交
对为stage生成的任务构造一个TaskSet,交给taskScheduler,通过submitTasks()接口进行提交。
if (tasks.size > 0 ) { logInfo( "Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")" ) stage.pendingPartitions ++ = tasks.map( _ .partitionId) logDebug( "New pending partitions: " + stage.pendingPartitions) taskScheduler.submitTasks( new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties)) stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) } else { ... } |
TaskScheduler
TaskScheduler负责底层task粒度的任务调度,接口定义于org.apache.spark.scheduler.TaskScheduler。
实现类为org.apache.spark.scheduler.TaskSchedulerImpl。
每个SparkContext对应一个TaskScheduler,负责接收DAGScheduler为每个Stage分派的task,将task提交到集群运行。
如果作业运行失败,TaskScheduler会负责重试,并且能够尽量避免慢节点问题。
TaskScheduler会将task运行产生的事件返回给DAGScheduler。
主要接口包括:
接口
|
参数
|
功能
|
---|---|---|
submitTasks |
taskSet: TaskSet |
提交由Taskset描述的一组task的集合 |
cancelTasks |
stageId: Int, interruptThread: Boolean | 取消运行指定stage |
setDAGScheduler |
dagScheduler: DAGScheduler | 设置当前taskScheduler的dagScheduler |
executorHeartbeatReceived |
execId: String, taskMetrics: Array[(Long, TaskMetrics)],
blockManagerId: BlockManagerId |
更新task的状态信息,并通知driver指定blackManager是否alive,若是则返回true,否则返回false |
executorLost | executorId: String, reason: ExecutorLossReason | 处理丢失的 executor信息 |
TaskSchedulerImpl
具体的实现对于同一SparkContext的不同job有FIFO和FAIR两种调度策略,由spark.scheduler.mode参数控制。
提交task:主要是调用backend的reviveOffers接口。
override def submitTasks(taskSet : TaskSet) { val tasks = taskSet.tasks logInfo( "Adding task set " + taskSet.id + " with " + tasks.length + " tasks" ) this .synchronized { val manager = createTaskSetManager(taskSet, maxTaskFailures) val stage = taskSet.stageId val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) stageTaskSets(taskSet.stageAttemptId) = manager ... schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) ... hasReceivedTask = true } backend.reviveOffers() } |
撤销task,主要是调用backend的killTask接口。
override def cancelTasks(stageId : Int, interruptThread : Boolean) : Unit = synchronized { logInfo( "Cancelling stage " + stageId) taskSetsByStageIdAndAttempt.get(stageId).foreach { attempts = > attempts.foreach { case ( _ , tsm) = > // There are two possible cases here: // 1. The task set manager has been created and some tasks have been scheduled. // In this case, send a kill signal to the executors to kill the task and then abort // the stage. // 2. The task set manager has been created but no tasks has been scheduled. In this case, // simply abort the stage. tsm.runningTasksSet.foreach { tid = > val execId = taskIdToExecutorId(tid) backend.killTask(tid, execId, interruptThread) } ... } } |