SortShuffleManager
Reader使用BlockStoreShuffleReader,Writer根据传入的ShuffleHandler选择使用UnsafeShuffleWriter,BypassMergeSortShuffleWriter,还是SortShuffleWriter
override def getReader[K, C](    handle: ShuffleHandle,    startPartition: Int,    endPartition: Int,    context: TaskContext): ShuffleReader[K, C] = {  new BlockStoreShuffleReader(    handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)}override def getWriter[K, V](    handle: ShuffleHandle,    mapId: Int,    context: TaskContext): ShuffleWriter[K, V] = {  numMapsForShuffle.putIfAbsent(    handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)  val env = SparkEnv.get  handle match {    case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>      new UnsafeShuffleWriter(        env.blockManager,        shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],        context.taskMemoryManager(),        unsafeShuffleHandle,        mapId,        context,        env.conf)    case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>      new BypassMergeSortShuffleWriter(        env.blockManager,        shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],        bypassMergeSortHandle,        mapId,        context,        env.conf)    case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>      new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)  }} | 
HashShuffleManager
HashShuffleManager使用Hash方法对于每个mapper产生的每个reduce partition创建一个对应的输出文件。
Reader使用BlockStoreShuffleReader,Writer使用HashShuffleWriter
override def getReader[K, C](    handle: ShuffleHandle,    startPartition: Int,    endPartition: Int,    context: TaskContext): ShuffleReader[K, C] = {  new BlockStoreShuffleReader(    handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)}/** Get a writer for a given partition. Called on executors by map tasks. */override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext)    : ShuffleWriter[K, V] = {  new HashShuffleWriter(    shuffleBlockResolver, handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context)} | 
Shuffle Reader
Reducer端用于Shuffle阶段从其他节点的BlockStore中读取需要的Partition。
目前Spark中只有一种实现:BlockStoreShuffleReader。
读取的数据范围由[startPartition, endPartition)区间指定。
ShuffleBlockFetcherIterator
读取Shuffle数据块的操作是通过ShuffleBlockFetcherIterator完成的。
initialize
初始话阶段通过splitLocalRemoteBlocks将要读取的数据划分,分多次FetchRequest进行读取。
private[this] def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {  val targetRequestSize = math.max(maxBytesInFlight / 5, 1L)  ...  val remoteRequests = new ArrayBuffer[FetchRequest]  var totalBlocks = 0  for ((address, blockInfos) <- blocksByAddress) {    totalBlocks += blockInfos.size    if (address.executorId == blockManager.blockManagerId.executorId) {      ... // <-- 处理local blocks    } else {      val iterator = blockInfos.iterator      var curRequestSize = 0L      var curBlocks = new ArrayBuffer[(BlockId, Long)]      while (iterator.hasNext) {        val (blockId, size) = iterator.next()        // Skip empty blocks        if (size > 0) {          curBlocks += ((blockId, size))          remoteBlocks += blockId          numBlocksToFetch += 1          curRequestSize += size        } else if (size < 0) {          throw new BlockException(blockId, "Negative block size " + size)        }        if (curRequestSize >= targetRequestSize) {   // <-- 当累计从同一节点读取的数据达到maxBytesInFlight的1/5时,就会将剩下的数据放到下一次取。          // Add this FetchRequest          remoteRequests += new FetchRequest(address, curBlocks)          curBlocks = new ArrayBuffer[(BlockId, Long)]          logDebug(s"Creating fetch request of $curRequestSize at $address")          curRequestSize = 0        }      }      ...} | 
返回的FetchRequet会进行随机化处理,然后加入到fetchRequests队列中,随后发起一次预取。
// Split local and remote blocks.val remoteRequests = splitLocalRemoteBlocks()// Add the remote requests into our queue in a random orderfetchRequests ++= Utils.randomize(remoteRequests)// Send out initial requests for blocks, up to our maxBytesInFlightfetchUpToMaxBytes() | 
取数据的过程由fetchUpToMaxBytes完成,就是从fetchRequests队列里取出FetchRequest。
只要当前总共在取的总数据量不超过maxBytesInFlight,就可以发送取数据请求进行读取。
由于在splitLocalRemoteBlocks阶段已经限制每个FetchRequest取得数据不超过maxBytesInFlight/5,因此可以保证并发度至少为5
即至少可以从5台不同的节点并行Fetch数据。
private def fetchUpToMaxBytes(): Unit = {  // Send fetch requests up to maxBytesInFlight  while (fetchRequests.nonEmpty &&    (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {    sendRequest(fetchRequests.dequeue())  }} | 
next
next方法result队列中获取需要的数据。取回后会触发fetchUpToMaxBytes预取后续的数据。
override def next(): (BlockId, InputStream) = {  numBlocksProcessed += 1  val startFetchWait = System.currentTimeMillis()  currentResult = results.take()  val result = currentResult  val stopFetchWait = System.currentTimeMillis()  shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait)  result match {    case SuccessFetchResult(_, _, size, _) => bytesInFlight -= size    case _ =>  }  // Send fetch requests up to maxBytesInFlight  fetchUpToMaxBytes()  result match {    ... // <-- 结果处理  }} | 
取数据请求通过sendRequest方法发送。
private[this] def sendRequest(req: FetchRequest) {  logDebug("Sending request for %d blocks (%s) from %s".format(    req.blocks.size, Utils.bytesToString(req.size), req.address.hostPort))  bytesInFlight += req.size  // so we can look up the size of each blockID  val sizeMap = req.blocks.map { case (blockId, size) => (blockId.toString, size) }.toMap  val blockIds = req.blocks.map(_._1.toString)  val address = req.address  shuffleClient.fetchBlocks(address.host, address.port, address.executorId, blockIds.toArray,    new BlockFetchingListener {      override def onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): Unit = {        // Only add the buffer to results queue if the iterator is not zombie,        // i.e. cleanup() has not been called yet.        if (!isZombie) {          // Increment the ref count because we need to pass this to a different thread.          // This needs to be released after use.          buf.retain()          results.put(new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), buf))  // <-- 这里将成功取回的结果放入LinkedBlockingQueue中          shuffleMetrics.incRemoteBytesRead(buf.size)          shuffleMetrics.incRemoteBlocksFetched(1)        }        logTrace("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))      }      ...    }  )} | 
MapOutputTracker
每个executor对应一个MapOutputTrackerWorker,用于从Driver端的MapOutputTrackerMaster获取map阶段的输出信息。
通过getMapSizesByExecutorId方法获取,返回(BlockManagerId, Seq[(BlockId, BlockSize)])形式的元组。
Shuffle Client
数据的读取通过BlockManager的shuffle Client完成,可以作为一个外部服务运行,也可以直接使用blockTransferService去同其他executor通信。
// Client to read other executors' shuffle files. This is either an external service, or just the// standard BlockTransferService to directly connect to other Executors.private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {  val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)  new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled(),    securityManager.isSaslEncryptionEnabled())} else {  blockTransferService} | 
默认使用的实现是NettyBlockTransferService,基于netty实现。
NettyBlockTransferService
取数据操作由fetchBlocks实现:
override def fetchBlocks(    host: String,    port: Int,    execId: String,    blockIds: Array[String],    listener: BlockFetchingListener): Unit = {  logTrace(s"Fetch blocks from $host:$port (executor id $execId)")  try {    val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {      override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) {        val client = clientFactory.createClient(host, port)        new OneForOneBlockFetcher(client, appId, execId, blockIds.toArray, listener).start() // <--同目标数据所在节点建立连接并读取数据      }    }    val maxRetries = transportConf.maxIORetries()    if (maxRetries > 0) {      // Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's      // a bug in this code. We should remove the if statement once we're sure of the stability.      new RetryingBlockFetcher(transportConf, blockFetchStarter, blockIds, listener).start()    } else {      blockFetchStarter.createAndStart(blockIds, listener)    }  } catch {    case e: Exception =>      logError("Exception while beginning fetchBlocks", e)      blockIds.foreach(listener.onBlockFetchFailure(_, e))  }} | 
获取数据所在节点IP和端口后,通过OneForOneBlockFetcher读取该节点数据。
OneForOneBlockFetcher
实现如下:
public void start() {  if (blockIds.length == 0) {    throw new IllegalArgumentException("Zero-sized blockIds array");  }  client.sendRpc(openMessage.toByteBuffer(), new RpcResponseCallback() {    @Override    public void onSuccess(ByteBuffer response) {      try {        streamHandle = (StreamHandle) BlockTransferMessage.Decoder.fromByteBuffer(response);        logger.trace("Successfully opened blocks {}, preparing to fetch chunks.", streamHandle);        // Immediately request all chunks -- we expect that the total size of the request is        // reasonable due to higher level chunking in [[ShuffleBlockFetcherIterator]].        for (int i = 0; i < streamHandle.numChunks; i++) {          client.fetchChunk(streamHandle.streamId, i, chunkCallback);        }      } catch (Exception e) {        ...      }    }    ...  });} | 
每个Chunk数据通过fetchChunk读取
public void fetchChunk(    long streamId,    final int chunkIndex,    final ChunkReceivedCallback callback) {  final String serverAddr = NettyUtils.getRemoteAddress(channel);  final long startTime = System.currentTimeMillis();  logger.debug("Sending fetch chunk request {} to {}", chunkIndex, serverAddr);  final StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);  handler.addFetchRequest(streamChunkId, callback);  channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(    new ChannelFutureListener() {      @Override      public void operationComplete(ChannelFuture future) throws Exception {        if (future.isSuccess()) {          long timeTaken = System.currentTimeMillis() - startTime;          logger.trace("Sending request {} to {} took {} ms", streamChunkId, serverAddr,            timeTaken);        } else {          ... // 错误处理        }      }    });} | 
Spill
如果内存放不下Shuffle的数据,则需要,是否需要Spill由maybeSpill方法检测,采用的是抽样检测的方法,每读进32次个元素后检查一下是否需要进行Spill。
protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {  var shouldSpill = false  if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {    // Claim up to double our current memory from the shuffle memory pool    val amountToRequest = 2 * currentMemory - myMemoryThreshold    val granted =      taskMemoryManager.acquireExecutionMemory(amountToRequest, MemoryMode.ON_HEAP, null)    myMemoryThreshold += granted    // If we were granted too little memory to grow further (either tryToAcquire returned 0,    // or we already had more memory than myMemoryThreshold), spill the current collection    shouldSpill = currentMemory >= myMemoryThreshold  }  shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold  // Actually spill  if (shouldSpill) {    _spillCount += 1    logSpillage(currentMemory)    spill(collection)    _elementsRead = 0    _memoryBytesSpilled += currentMemory    releaseMemory()  }  shouldSpill} | 
如果需要则触发Spill方法将数据写入磁盘。实现如下:
Shuffle Write
SortShuffleWriter
实际执行过程中默认使用SortShuffleWriter,写操作实现在write方法中。
override def write(records: Iterator[Product2[K, V]]): Unit = {  sorter = if (dep.mapSideCombine) {    require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")    new ExternalSorter[K, V, C](      context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)  } else {    // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't    // care whether the keys get sorted in each partition; that will be done on the reduce side    // if the operation being run is sortByKey.    new ExternalSorter[K, V, V](      context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)  }  sorter.insertAll(records)  ...  val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)  val tmp = Utils.tempFileWith(output)  val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)  val partitionLengths = sorter.writePartitionedFile(blockId, tmp)  shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)  mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)} | 
Shuffle Write将records的迭代器交给ExternalSorter处理,并通过writePartitionedFile接口写入到临时文件中。
然后通过shuffleBlockResolver来写索引文件并更新MapStatus。
ExternalSort
insertAll方法将记录按key插入内存中,实现如下:
def insertAll(records: Iterator[Product2[K, V]]): Unit = {  // TODO: stop combining if we find that the reduction factor isn't high  val shouldCombine = aggregator.isDefined  if (shouldCombine) {    // Combine values in-memory first using our AppendOnlyMap    val mergeValue = aggregator.get.mergeValue    val createCombiner = aggregator.get.createCombiner    var kv: Product2[K, V] = null    val update = (hadValue: Boolean, oldValue: C) => {      if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)    }    while (records.hasNext) {      addElementsRead()      kv = records.next()      map.changeValue((getPartition(kv._1), kv._1), update)      maybeSpillCollection(usingMap = true)    }  } else {    // Stick values into our buffer    while (records.hasNext) {      addElementsRead()      val kv = records.next()      buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])      maybeSpillCollection(usingMap = false)    }  }} | 
如果需要在Map端做Combine,则插入到PartitionedAppendOnlyMap结构中保存,否则插入到PartitionedPairBuffer结构中保存。
两种结构的实现参考:Shuffle – 相关数据结构
1. 如果需要在Map端做聚合,则PartitionedAppendOnlyMap是ExternalSort阶段内存中用于保存插入数据的数据结构,对每个key只需保存更新聚合后的value即可。
2. 如果不需要在Map端做聚合,则PartitionedAppendOnlyMap是ExternalSort阶段内存中用于保存插入数据的数据结构,对每个key需要保存相应地value。
相关数据结构
在ExternalSort中,会使用到PartitionedAppendOnlyMap和PartitionedPairBuffer两个数据结构在内存中保存插入阶段的数据。
// Data structures to store in-memory objects before we spill. Depending on whether we have an// Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we// store them in an array buffer.private var map = new PartitionedAppendOnlyMap[K, C]private var buffer = new PartitionedPairBuffer[K, C] | 
PartitionedAppendOnlyMap和PartitionedPairBuffer的特点在于其key和value是连续存放的,形式如下:
| key1 | value1 | key2 | value2 | … | keyN | valueN | 
具体介绍如下:
PartitionedAppendOnlyMap
继承关系:PartitionedAppendOnlyMap -> SizeTrackingAppendOnlyMap -> AppendOnlyMap
AppendOnlyMap提供了用于存放Shuffle数据的内存空间,形式上是一个Map。
实现上其实是一个数组data,元素根据HashCode存放,从而构成一个Hash表。
class AppendOnlyMap[K, V](initialCapacity: Int = 64)  extends Iterable[(K, V)] with Serializable {  ...  // Holds keys and values in the same array for memory locality; specifically, the order of  // elements is key0, value0, key1, value1, key2, value2, etc.  private var data = new Array[AnyRef](2 * capacity)   // <-- 内存中真正存放Shuffle数据的位置    /** Get the value for a given key */  def apply(key: K): V = {    assert(!destroyed, destructionMessage)    val k = key.asInstanceOf[AnyRef]    if (k.eq(null)) {      return nullValue    }    var pos = rehash(k.hashCode) & mask  // <-- 根据key的hash值索引    var i = 1    while (true) {      val curKey = data(2 * pos)      if (k.eq(curKey) || k.equals(curKey)) {        return data(2 * pos + 1).asInstanceOf[V]   // <-- 找到位置后2*pos+1处存放的就是key对应的value      } else if (curKey.eq(null)) {        return null.asInstanceOf[V]      } else {        val delta = i        pos = (pos + delta) & mask        i += 1      }    }    null.asInstanceOf[V]  }  ...} | 
PartitionedPairBuffer
实现与PartitionedAppendOnlyMap类似,代码如下:
private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)  extends WritablePartitionedPairCollection[K, V] with SizeTracker{  ...  // Basic growable array data structure. We use a single array of AnyRef to hold both the keys  // and the values, so that we can sort them efficiently with KVArraySortDataFormat.  private var capacity = initialCapacity  private var curSize = 0  private var data = new Array[AnyRef](2 * initialCapacity)  /** Add an element into the buffer */  def insert(partition: Int, key: K, value: V): Unit = {    if (curSize == capacity) {      growArray()    }    data(2 * curSize) = (partition, key.asInstanceOf[AnyRef])  // <-- 与PartitionedAppendOnlyMap类似,这里key和value也是物理上连续存储的    data(2 * curSize + 1) = value.asInstanceOf[AnyRef]    curSize += 1    afterUpdate()  }} | 
1. PartitionedAppendOnlyMap与PartitionedPairBuffer的key和对应的value物理上都是连续存储的,因此索引的时候根据hash值得到指定元素位置后pos需要data(2 * pos + 1)找到相应的元素的value。
2. PartitionedAppendOnlyMap和PartitionedPairBuffer在实现上Key均为(int, K)的形式,value为V。