• 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏吧

Spark源码: Spark Core Shuffle机制

SPARK wangting 3周前 (11-13) 23次浏览

SortShuffleManager

Reader使用BlockStoreShuffleReader,Writer根据传入的ShuffleHandler选择使用UnsafeShuffleWriter,BypassMergeSortShuffleWriter,还是SortShuffleWriter

override def getReader[K, C](
    handle: ShuffleHandle,
    startPartition: Int,
    endPartition: Int,
    context: TaskContext): ShuffleReader[K, C] = {
  new BlockStoreShuffleReader(
    handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)
}
override def getWriter[K, V](
    handle: ShuffleHandle,
    mapId: Int,
    context: TaskContext): ShuffleWriter[K, V] = {
  numMapsForShuffle.putIfAbsent(
    handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[___]].numMaps)
  val env = SparkEnv.get
  handle match {
    case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
      new UnsafeShuffleWriter(
        env.blockManager,
        shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
        context.taskMemoryManager(),
        unsafeShuffleHandle,
        mapId,
        context,
        env.conf)
    case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
      new BypassMergeSortShuffleWriter(
        env.blockManager,
        shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
        bypassMergeSortHandle,
        mapId,
        context,
        env.conf)
    case other: BaseShuffleHandle[K @unchecked, V @unchecked, _=>
      new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
  }
}

HashShuffleManager

HashShuffleManager使用Hash方法对于每个mapper产生的每个reduce partition创建一个对应的输出文件。

Reader使用BlockStoreShuffleReader,Writer使用HashShuffleWriter

override def getReader[K, C](
    handle: ShuffleHandle,
    startPartition: Int,
    endPartition: Int,
    context: TaskContext): ShuffleReader[K, C] = {
  new BlockStoreShuffleReader(
    handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)
}
/** Get a writer for a given partition. Called on executors by map tasks. */
override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext)
    : ShuffleWriter[K, V] = {
  new HashShuffleWriter(
    shuffleBlockResolver, handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context)
}

Shuffle Reader


Reducer端用于Shuffle阶段从其他节点的BlockStore中读取需要的Partition。

目前Spark中只有一种实现:BlockStoreShuffleReader

读取的数据范围由[startPartition, endPartition)区间指定。

ShuffleBlockFetcherIterator

读取Shuffle数据块的操作是通过ShuffleBlockFetcherIterator完成的。

initialize

初始话阶段通过splitLocalRemoteBlocks将要读取的数据划分,分多次FetchRequest进行读取。

private[thisdef splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {
  val targetRequestSize = math.max(maxBytesInFlight / 51L)
  ...
  val remoteRequests = new ArrayBuffer[FetchRequest]
  var totalBlocks = 0
  for ((address, blockInfos) <- blocksByAddress) {
    totalBlocks += blockInfos.size
    if (address.executorId == blockManager.blockManagerId.executorId) {
      ... // <-- 处理local blocks
    else {
      val iterator = blockInfos.iterator
      var curRequestSize = 0L
      var curBlocks = new ArrayBuffer[(BlockId, Long)]
      while (iterator.hasNext) {
        val (blockId, size) = iterator.next()
        // Skip empty blocks
        if (size > 0) {
          curBlocks += ((blockId, size))
          remoteBlocks += blockId
          numBlocksToFetch += 1
          curRequestSize += size
        else if (size < 0) {
          throw new BlockException(blockId, "Negative block size " + size)
        }
        if (curRequestSize >= targetRequestSize) {   // <-- 当累计从同一节点读取的数据达到maxBytesInFlight的1/5时,就会将剩下的数据放到下一次取。
          // Add this FetchRequest
          remoteRequests += new FetchRequest(address, curBlocks)
          curBlocks = new ArrayBuffer[(BlockId, Long)]
          logDebug(s"Creating fetch request of $curRequestSize at $address")
          curRequestSize = 0
        }
      }
      ...
}

返回的FetchRequet会进行随机化处理,然后加入到fetchRequests队列中,随后发起一次预取。

// Split local and remote blocks.
val remoteRequests = splitLocalRemoteBlocks()
// Add the remote requests into our queue in a random order
fetchRequests ++= Utils.randomize(remoteRequests)
// Send out initial requests for blocks, up to our maxBytesInFlight
fetchUpToMaxBytes()

取数据的过程由fetchUpToMaxBytes完成,就是从fetchRequests队列里取出FetchRequest。

只要当前总共在取的总数据量不超过maxBytesInFlight,就可以发送取数据请求进行读取。

由于在splitLocalRemoteBlocks阶段已经限制每个FetchRequest取得数据不超过maxBytesInFlight/5,因此可以保证并发度至少为5

至少可以5台不同的节点并行Fetch数据。

private def fetchUpToMaxBytes(): Unit = {
  // Send fetch requests up to maxBytesInFlight
  while (fetchRequests.nonEmpty &&
    (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
    sendRequest(fetchRequests.dequeue())
  }
}

 

next

next方法result队列中获取需要的数据。取回后会触发fetchUpToMaxBytes预取后续的数据。

override def next(): (BlockId, InputStream) = {
  numBlocksProcessed += 1
  val startFetchWait = System.currentTimeMillis()
  currentResult = results.take()
  val result = currentResult
  val stopFetchWait = System.currentTimeMillis()
  shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait)
  result match {
    case SuccessFetchResult(__, size, _=> bytesInFlight -= size
    case _ =>
  }
  // Send fetch requests up to maxBytesInFlight
  fetchUpToMaxBytes()
  result match {
    ... // <-- 结果处理
  }
}

取数据请求通过sendRequest方法发送。

private[thisdef sendRequest(req: FetchRequest) {
  logDebug("Sending request for %d blocks (%s) from %s".format(
    req.blocks.size, Utils.bytesToString(req.size), req.address.hostPort))
  bytesInFlight += req.size
  // so we can look up the size of each blockID
  val sizeMap = req.blocks.map { case (blockId, size) => (blockId.toString, size) }.toMap
  val blockIds = req.blocks.map(_._1.toString)
  val address = req.address
  shuffleClient.fetchBlocks(address.host, address.port, address.executorId, blockIds.toArray,
    new BlockFetchingListener {
      override def onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): Unit = {
        // Only add the buffer to results queue if the iterator is not zombie,
        // i.e. cleanup() has not been called yet.
        if (!isZombie) {
          // Increment the ref count because we need to pass this to a different thread.
          // This needs to be released after use.
          buf.retain()
          results.put(new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), buf))  // <-- 这里将成功取回的结果放入LinkedBlockingQueue中
          shuffleMetrics.incRemoteBytesRead(buf.size)
          shuffleMetrics.incRemoteBlocksFetched(1)
        }
        logTrace("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
      }
      ...
    }
  )
}

MapOutputTracker

每个executor对应一个MapOutputTrackerWorker,用于从Driver端的MapOutputTrackerMaster获取map阶段的输出信息。

通过getMapSizesByExecutorId方法获取,返回(BlockManagerId, Seq[(BlockId, BlockSize)])形式的元组。

Shuffle Client

数据的读取通过BlockManager的shuffle Client完成,可以作为一个外部服务运行,也可以直接使用blockTransferService去同其他executor通信。

// Client to read other executors' shuffle files. This is either an external service, or just the
// standard BlockTransferService to directly connect to other Executors.
private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
  val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
  new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled(),
    securityManager.isSaslEncryptionEnabled())
else {
  blockTransferService
}

默认使用的实现是NettyBlockTransferService,基于netty实现。

NettyBlockTransferService

取数据操作由fetchBlocks实现:

override def fetchBlocks(
    host: String,
    port: Int,
    execId: String,
    blockIds: Array[String],
    listener: BlockFetchingListener): Unit = {
  logTrace(s"Fetch blocks from $host:$port (executor id $execId)")
  try {
    val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {
      override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) {
        val client = clientFactory.createClient(host, port)
        new OneForOneBlockFetcher(client, appId, execId, blockIds.toArray, listener).start() // <--同目标数据所在节点建立连接并读取数据
      }
    }
    val maxRetries = transportConf.maxIORetries()
    if (maxRetries > 0) {
      // Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's
      // a bug in this code. We should remove the if statement once we're sure of the stability.
      new RetryingBlockFetcher(transportConf, blockFetchStarter, blockIds, listener).start()
    else {
      blockFetchStarter.createAndStart(blockIds, listener)
    }
  catch {
    case e: Exception =>
      logError("Exception while beginning fetchBlocks", e)
      blockIds.foreach(listener.onBlockFetchFailure(_, e))
  }
}

获取数据所在节点IP和端口后,通过OneForOneBlockFetcher读取该节点数据。

OneForOneBlockFetcher

实现如下:

public void start() {
  if (blockIds.length == 0) {
    throw new IllegalArgumentException("Zero-sized blockIds array");
  }
  client.sendRpc(openMessage.toByteBuffer(), new RpcResponseCallback() {
    @Override
    public void onSuccess(ByteBuffer response) {
      try {
        streamHandle = (StreamHandle) BlockTransferMessage.Decoder.fromByteBuffer(response);
        logger.trace("Successfully opened blocks {}, preparing to fetch chunks.", streamHandle);
        // Immediately request all chunks -- we expect that the total size of the request is
        // reasonable due to higher level chunking in [[ShuffleBlockFetcherIterator]].
        for (int i = 0; i < streamHandle.numChunks; i++) {
          client.fetchChunk(streamHandle.streamId, i, chunkCallback);
        }
      catch (Exception e) {
        ...
      }
    }
    ...
  });
}

 

每个Chunk数据通过fetchChunk读取

public void fetchChunk(
    long streamId,
    final int chunkIndex,
    final ChunkReceivedCallback callback) {
  final String serverAddr = NettyUtils.getRemoteAddress(channel);
  final long startTime = System.currentTimeMillis();
  logger.debug("Sending fetch chunk request {} to {}", chunkIndex, serverAddr);
  final StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
  handler.addFetchRequest(streamChunkId, callback);
  channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(
    new ChannelFutureListener() {
      @Override
      public void operationComplete(ChannelFuture future) throws Exception {
        if (future.isSuccess()) {
          long timeTaken = System.currentTimeMillis() - startTime;
          logger.trace("Sending request {} to {} took {} ms", streamChunkId, serverAddr,
            timeTaken);
        else {
          ... // 错误处理
        }
      }
    });
}

 

Spill

如果内存放不下Shuffle的数据,则需要,是否需要Spill由maybeSpill方法检测,采用的是抽样检测的方法,每读进32次个元素后检查一下是否需要进行Spill。

protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
  var shouldSpill = false
  if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
    // Claim up to double our current memory from the shuffle memory pool
    val amountToRequest = 2 * currentMemory - myMemoryThreshold
    val granted =
      taskMemoryManager.acquireExecutionMemory(amountToRequest, MemoryMode.ON_HEAP, null)
    myMemoryThreshold += granted
    // If we were granted too little memory to grow further (either tryToAcquire returned 0,
    // or we already had more memory than myMemoryThreshold), spill the current collection
    shouldSpill = currentMemory >= myMemoryThreshold
  }
  shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
  // Actually spill
  if (shouldSpill) {
    _spillCount += 1
    logSpillage(currentMemory)
    spill(collection)
    _elementsRead = 0
    _memoryBytesSpilled += currentMemory
    releaseMemory()
  }
  shouldSpill
}

如果需要则触发Spill方法将数据写入磁盘。实现如下:

Shuffle Write


 

SortShuffleWriter

实际执行过程中默认使用SortShuffleWriter,写操作实现在write方法中。

override def write(records: Iterator[Product2[K, V]]): Unit = {
  sorter = if (dep.mapSideCombine) {
    require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
    new ExternalSorter[K, V, C](
      context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
  else {
    // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
    // care whether the keys get sorted in each partition; that will be done on the reduce side
    // if the operation being run is sortByKey.
    new ExternalSorter[K, V, V](
      context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
  }
  sorter.insertAll(records)
  ...
  val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
  val tmp = Utils.tempFileWith(output)
  val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
  val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
  shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
  mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
}

Shuffle Write将records的迭代器交给ExternalSorter处理,并通过writePartitionedFile接口写入到临时文件中。

然后通过shuffleBlockResolver来写索引文件并更新MapStatus。

ExternalSort

insertAll方法将记录按key插入内存中,实现如下:

def insertAll(records: Iterator[Product2[K, V]]): Unit = {
  // TODO: stop combining if we find that the reduction factor isn't high
  val shouldCombine = aggregator.isDefined
  if (shouldCombine) {
    // Combine values in-memory first using our AppendOnlyMap
    val mergeValue = aggregator.get.mergeValue
    val createCombiner = aggregator.get.createCombiner
    var kv: Product2[K, V] = null
    val update = (hadValue: Boolean, oldValue: C) => {
      if (hadValue) mergeValue(oldValue, kv._2else createCombiner(kv._2)
    }
    while (records.hasNext) {
      addElementsRead()
      kv = records.next()
      map.changeValue((getPartition(kv._1), kv._1), update)
      maybeSpillCollection(usingMap = true)
    }
  else {
    // Stick values into our buffer
    while (records.hasNext) {
      addElementsRead()
      val kv = records.next()
      buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
      maybeSpillCollection(usingMap = false)
    }
  }
}

如果需要在Map端做Combine,则插入到PartitionedAppendOnlyMap结构中保存,否则插入到PartitionedPairBuffer结构中保存。

两种结构的实现参考:Shuffle – 相关数据结构

1. 如果需要在Map端做聚合,则PartitionedAppendOnlyMap是ExternalSort阶段内存中用于保存插入数据的数据结构,对每个key只需保存更新聚合后的value即可。

2. 如果不需要在Map端做聚合,则PartitionedAppendOnlyMap是ExternalSort阶段内存中用于保存插入数据的数据结构,对每个key需要保存相应地value

 

相关数据结构


在ExternalSort中,会使用到PartitionedAppendOnlyMap和PartitionedPairBuffer两个数据结构在内存中保存插入阶段的数据。

// Data structures to store in-memory objects before we spill. Depending on whether we have an
// Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we
// store them in an array buffer.
private var map = new PartitionedAppendOnlyMap[K, C]
private var buffer = new PartitionedPairBuffer[K, C]

PartitionedAppendOnlyMap和PartitionedPairBuffer的特点在于其key和value是连续存放的,形式如下:

 

key1 value1 key2 value2 keyN valueN

 

具体介绍如下:

PartitionedAppendOnlyMap

继承关系:PartitionedAppendOnlyMap -> SizeTrackingAppendOnlyMap -> AppendOnlyMap

AppendOnlyMap提供了用于存放Shuffle数据的内存空间,形式上是一个Map。
实现上其实是一个数组data,元素根据HashCode存放,从而构成一个Hash表。
class AppendOnlyMap[K, V](initialCapacity: Int = 64)
  extends Iterable[(K, V)] with Serializable {
  ...
  // Holds keys and values in the same array for memory locality; specifically, the order of
  // elements is key0, value0, key1, value1, key2, value2, etc.
  private var data = new Array[AnyRef](2 * capacity)   // <-- 内存中真正存放Shuffle数据的位置
  
  /** Get the value for a given key */
  def apply(key: K): = {
    assert(!destroyed, destructionMessage)
    val = key.asInstanceOf[AnyRef]
    if (k.eq(null)) {
      return nullValue
    }
    var pos = rehash(k.hashCode) & mask  // <-- 根据key的hash值索引
    var = 1
    while (true) {
      val curKey = data(2 * pos)
      if (k.eq(curKey) || k.equals(curKey)) {
        return data(2 * pos + 1).asInstanceOf[V]   // <-- 找到位置后2*pos+1处存放的就是key对应的value
      else if (curKey.eq(null)) {
        return null.asInstanceOf[V]
      else {
        val delta = i
        pos = (pos + delta) & mask
        i += 1
      }
    }
    null.asInstanceOf[V]
  }
  ...
}

PartitionedPairBuffer

实现与PartitionedAppendOnlyMap类似,代码如下:

private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)
  extends WritablePartitionedPairCollection[K, V] with SizeTracker
{
  ...
  // Basic growable array data structure. We use a single array of AnyRef to hold both the keys
  // and the values, so that we can sort them efficiently with KVArraySortDataFormat.
  private var capacity = initialCapacity
  private var curSize = 0
  private var data = new Array[AnyRef](2 * initialCapacity)
  /** Add an element into the buffer */
  def insert(partition: Int, key: K, value: V): Unit = {
    if (curSize == capacity) {
      growArray()
    }
    data(2 * curSize) = (partition, key.asInstanceOf[AnyRef])  // <-- 与PartitionedAppendOnlyMap类似,这里key和value也是物理上连续存储的
    data(2 * curSize + 1= value.asInstanceOf[AnyRef]
    curSize += 1
    afterUpdate()
  }
}

1. PartitionedAppendOnlyMap与PartitionedPairBuffer的key和对应的value物理上都是连续存储的,因此索引的时候根据hash值得到指定元素位置后pos需要data(2 * pos + 1)找到相应的元素的value。

2. PartitionedAppendOnlyMap和PartitionedPairBuffer在实现上Key均为(int, K)的形式,value为V。


喜欢 (1)