• 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏吧

SparkCore 任务调度源码阅读

SPARK wangting 2周前 (11-18) 5次浏览

Dependency


Spark将RDD之间的依赖分为窄依赖和宽依赖,源码中对应的定义分别为NarrowDependency和ShuffleDependency。

其中NarrowDependency又有两种OneToOneDependency和RangeDependency两种实现。

NarrowDependency

OneToOneDependency用于描述一个子RDD只依赖一个父RDD的情况。

RangeDependency用于描述一个子RDD依赖多个父RDD,但每个父RDD只被一个子RDD依赖的情况。

OneToOneDependency

实现如下:

@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int): List[Int] = List(partitionId)
}

RangeDependency

实现如下:

@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
  extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int): List[Int] = {
    if (partitionId >= outStart && partitionId < outStart + length) {
      List(partitionId - outStart + inStart)
    else {
      Nil
    }
  }
}

ShuffleDependency

ShuffleDependency描述一个父RDD被多个子RDD依赖的情况。

实现如下:

class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient private val _rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Option[Serializer] = None,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false)
  extends Dependency[Product2[K, V]] {
  override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
  private[sparkval keyClassName: String = reflect.classTag[K].runtimeClass.getName
  private[sparkval valueClassName: String = reflect.classTag[V].runtimeClass.getName
  // Note: It's possible that the combiner class tag is null, if the combineByKey
  // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
  private[sparkval combinerClassName: Option[String] =
    Option(reflect.classTag[C]).map(_.runtimeClass.getName)
  val shuffleId: Int = _rdd.context.newShuffleId()
  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, _rdd.partitions.size, this)
  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}

每个ShuffleDependency作用的RDD都是K,V类型的,每个ShuffleDependency对应一个shuffleId,并同一个shuffleHandle相关联。

 

DAGScheduler


Task生成

根据stage类型生成相应的Task:

  • ShuffleMapStage生成ShuffleMapTask
  • ResultStage生成ResultTask

相关代码如下:

val tasks: Seq[Task[_]] = try {
  stage match {
    case stage: ShuffleMapStage =>
      partitionsToCompute.map { id =>
        val locs = taskIdToLocations(id)
        val part = stage.rdd.partitions(id)
        new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
          taskBinary, part, locs, stage.internalAccumulators)
      }
    case stage: ResultStage =>
      val job = stage.activeJob.get
      partitionsToCompute.map { id =>
        val p: Int = stage.partitions(id)
        val part = stage.rdd.partitions(p)
        val locs = taskIdToLocations(id)
        new ResultTask(stage.id, stage.latestInfo.attemptId,
          taskBinary, part, locs, id, stage.internalAccumulators)
      }
  }
catch {
  ...
}

Task提交

对为stage生成的任务构造一个TaskSet,交给taskScheduler,通过submitTasks()接口进行提交。

if (tasks.size > 0) {
  logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
  stage.pendingPartitions ++= tasks.map(_.partitionId)
  logDebug("New pending partitions: " + stage.pendingPartitions)
  taskScheduler.submitTasks(new TaskSet(
    tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
  stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
else {
  ...
}

 

TaskScheduler


TaskScheduler负责底层task粒度的任务调度,接口定义于org.apache.spark.scheduler.TaskScheduler。

实现类为org.apache.spark.scheduler.TaskSchedulerImpl。

每个SparkContext对应一个TaskScheduler,负责接收DAGScheduler为每个Stage分派的task,将task提交到集群运行。

如果作业运行失败,TaskScheduler会负责重试,并且能够尽量避免慢节点问题。

TaskScheduler会将task运行产生的事件返回给DAGScheduler。

主要接口包括:

接口
参数
功能
submitTasks
taskSet: TaskSet
提交由Taskset描述的一组task的集合
cancelTasks
stageId: Int, interruptThread: Boolean 取消运行指定stage
setDAGScheduler
dagScheduler: DAGScheduler 设置当前taskScheduler的dagScheduler
executorHeartbeatReceived
execId: String, taskMetrics: Array[(Long, TaskMetrics)],

blockManagerId: BlockManagerId

更新task的状态信息,并通知driver指定blackManager是否alive,若是则返回true,否则返回false
executorLost executorId: String, reason: ExecutorLossReason 处理丢失的 executor信息

TaskSchedulerImpl

具体的实现对于同一SparkContext的不同job有FIFO和FAIR两种调度策略,由spark.scheduler.mode参数控制。

提交task:主要是调用backend的reviveOffers接口。

override def submitTasks(taskSet: TaskSet) {
  val tasks = taskSet.tasks
  logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
  this.synchronized {
    val manager = createTaskSetManager(taskSet, maxTaskFailures)
    val stage = taskSet.stageId
    val stageTaskSets =
      taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
    stageTaskSets(taskSet.stageAttemptId) = manager
    ...
    schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
    ...
    hasReceivedTask = true
  }
  backend.reviveOffers()
}

撤销task,主要是调用backend的killTask接口。

override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized {
  logInfo("Cancelling stage " + stageId)
  taskSetsByStageIdAndAttempt.get(stageId).foreach { attempts =>
    attempts.foreach { case (_, tsm) =>
      // There are two possible cases here:
      // 1. The task set manager has been created and some tasks have been scheduled.
      //    In this case, send a kill signal to the executors to kill the task and then abort
      //    the stage.
      // 2. The task set manager has been created but no tasks has been scheduled. In this case,
      //    simply abort the stage.
      tsm.runningTasksSet.foreach { tid =>
        val execId = taskIdToExecutorId(tid)
        backend.killTask(tid, execId, interruptThread)
      }
      ...
  }
}

喜欢 (0)