SortShuffleManager
Reader使用BlockStoreShuffleReader,Writer根据传入的ShuffleHandler选择使用UnsafeShuffleWriter,BypassMergeSortShuffleWriter,还是SortShuffleWriter
override def getReader[K, C]( handle: ShuffleHandle, startPartition: Int, endPartition: Int, context: TaskContext): ShuffleReader[K, C] = { new BlockStoreShuffleReader( handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)}override def getWriter[K, V]( handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter[K, V] = { numMapsForShuffle.putIfAbsent( handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps) val env = SparkEnv.get handle match { case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] => new UnsafeShuffleWriter( env.blockManager, shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], context.taskMemoryManager(), unsafeShuffleHandle, mapId, context, env.conf) case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] => new BypassMergeSortShuffleWriter( env.blockManager, shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], bypassMergeSortHandle, mapId, context, env.conf) case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] => new SortShuffleWriter(shuffleBlockResolver, other, mapId, context) }} |
HashShuffleManager
HashShuffleManager使用Hash方法对于每个mapper产生的每个reduce partition创建一个对应的输出文件。
Reader使用BlockStoreShuffleReader,Writer使用HashShuffleWriter
override def getReader[K, C]( handle: ShuffleHandle, startPartition: Int, endPartition: Int, context: TaskContext): ShuffleReader[K, C] = { new BlockStoreShuffleReader( handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)}/** Get a writer for a given partition. Called on executors by map tasks. */override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext) : ShuffleWriter[K, V] = { new HashShuffleWriter( shuffleBlockResolver, handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context)} |
Shuffle Reader
Reducer端用于Shuffle阶段从其他节点的BlockStore中读取需要的Partition。
目前Spark中只有一种实现:BlockStoreShuffleReader。
读取的数据范围由[startPartition, endPartition)区间指定。
ShuffleBlockFetcherIterator
读取Shuffle数据块的操作是通过ShuffleBlockFetcherIterator完成的。
initialize
初始话阶段通过splitLocalRemoteBlocks将要读取的数据划分,分多次FetchRequest进行读取。
private[this] def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = { val targetRequestSize = math.max(maxBytesInFlight / 5, 1L) ... val remoteRequests = new ArrayBuffer[FetchRequest] var totalBlocks = 0 for ((address, blockInfos) <- blocksByAddress) { totalBlocks += blockInfos.size if (address.executorId == blockManager.blockManagerId.executorId) { ... // <-- 处理local blocks } else { val iterator = blockInfos.iterator var curRequestSize = 0L var curBlocks = new ArrayBuffer[(BlockId, Long)] while (iterator.hasNext) { val (blockId, size) = iterator.next() // Skip empty blocks if (size > 0) { curBlocks += ((blockId, size)) remoteBlocks += blockId numBlocksToFetch += 1 curRequestSize += size } else if (size < 0) { throw new BlockException(blockId, "Negative block size " + size) } if (curRequestSize >= targetRequestSize) { // <-- 当累计从同一节点读取的数据达到maxBytesInFlight的1/5时,就会将剩下的数据放到下一次取。 // Add this FetchRequest remoteRequests += new FetchRequest(address, curBlocks) curBlocks = new ArrayBuffer[(BlockId, Long)] logDebug(s"Creating fetch request of $curRequestSize at $address") curRequestSize = 0 } } ...} |
返回的FetchRequet会进行随机化处理,然后加入到fetchRequests队列中,随后发起一次预取。
// Split local and remote blocks.val remoteRequests = splitLocalRemoteBlocks()// Add the remote requests into our queue in a random orderfetchRequests ++= Utils.randomize(remoteRequests)// Send out initial requests for blocks, up to our maxBytesInFlightfetchUpToMaxBytes() |
取数据的过程由fetchUpToMaxBytes完成,就是从fetchRequests队列里取出FetchRequest。
只要当前总共在取的总数据量不超过maxBytesInFlight,就可以发送取数据请求进行读取。
由于在splitLocalRemoteBlocks阶段已经限制每个FetchRequest取得数据不超过maxBytesInFlight/5,因此可以保证并发度至少为5
即至少可以从5台不同的节点并行Fetch数据。
private def fetchUpToMaxBytes(): Unit = { // Send fetch requests up to maxBytesInFlight while (fetchRequests.nonEmpty && (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) { sendRequest(fetchRequests.dequeue()) }} |
next
next方法result队列中获取需要的数据。取回后会触发fetchUpToMaxBytes预取后续的数据。
override def next(): (BlockId, InputStream) = { numBlocksProcessed += 1 val startFetchWait = System.currentTimeMillis() currentResult = results.take() val result = currentResult val stopFetchWait = System.currentTimeMillis() shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait) result match { case SuccessFetchResult(_, _, size, _) => bytesInFlight -= size case _ => } // Send fetch requests up to maxBytesInFlight fetchUpToMaxBytes() result match { ... // <-- 结果处理 }} |
取数据请求通过sendRequest方法发送。
private[this] def sendRequest(req: FetchRequest) { logDebug("Sending request for %d blocks (%s) from %s".format( req.blocks.size, Utils.bytesToString(req.size), req.address.hostPort)) bytesInFlight += req.size // so we can look up the size of each blockID val sizeMap = req.blocks.map { case (blockId, size) => (blockId.toString, size) }.toMap val blockIds = req.blocks.map(_._1.toString) val address = req.address shuffleClient.fetchBlocks(address.host, address.port, address.executorId, blockIds.toArray, new BlockFetchingListener { override def onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): Unit = { // Only add the buffer to results queue if the iterator is not zombie, // i.e. cleanup() has not been called yet. if (!isZombie) { // Increment the ref count because we need to pass this to a different thread. // This needs to be released after use. buf.retain() results.put(new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), buf)) // <-- 这里将成功取回的结果放入LinkedBlockingQueue中 shuffleMetrics.incRemoteBytesRead(buf.size) shuffleMetrics.incRemoteBlocksFetched(1) } logTrace("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) } ... } )} |
MapOutputTracker
每个executor对应一个MapOutputTrackerWorker,用于从Driver端的MapOutputTrackerMaster获取map阶段的输出信息。
通过getMapSizesByExecutorId方法获取,返回(BlockManagerId, Seq[(BlockId, BlockSize)])形式的元组。
Shuffle Client
数据的读取通过BlockManager的shuffle Client完成,可以作为一个外部服务运行,也可以直接使用blockTransferService去同其他executor通信。
// Client to read other executors' shuffle files. This is either an external service, or just the// standard BlockTransferService to directly connect to other Executors.private[spark] val shuffleClient = if (externalShuffleServiceEnabled) { val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores) new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled(), securityManager.isSaslEncryptionEnabled())} else { blockTransferService} |
默认使用的实现是NettyBlockTransferService,基于netty实现。
NettyBlockTransferService
取数据操作由fetchBlocks实现:
override def fetchBlocks( host: String, port: Int, execId: String, blockIds: Array[String], listener: BlockFetchingListener): Unit = { logTrace(s"Fetch blocks from $host:$port (executor id $execId)") try { val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter { override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) { val client = clientFactory.createClient(host, port) new OneForOneBlockFetcher(client, appId, execId, blockIds.toArray, listener).start() // <--同目标数据所在节点建立连接并读取数据 } } val maxRetries = transportConf.maxIORetries() if (maxRetries > 0) { // Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's // a bug in this code. We should remove the if statement once we're sure of the stability. new RetryingBlockFetcher(transportConf, blockFetchStarter, blockIds, listener).start() } else { blockFetchStarter.createAndStart(blockIds, listener) } } catch { case e: Exception => logError("Exception while beginning fetchBlocks", e) blockIds.foreach(listener.onBlockFetchFailure(_, e)) }} |
获取数据所在节点IP和端口后,通过OneForOneBlockFetcher读取该节点数据。
OneForOneBlockFetcher
实现如下:
public void start() { if (blockIds.length == 0) { throw new IllegalArgumentException("Zero-sized blockIds array"); } client.sendRpc(openMessage.toByteBuffer(), new RpcResponseCallback() { @Override public void onSuccess(ByteBuffer response) { try { streamHandle = (StreamHandle) BlockTransferMessage.Decoder.fromByteBuffer(response); logger.trace("Successfully opened blocks {}, preparing to fetch chunks.", streamHandle); // Immediately request all chunks -- we expect that the total size of the request is // reasonable due to higher level chunking in [[ShuffleBlockFetcherIterator]]. for (int i = 0; i < streamHandle.numChunks; i++) { client.fetchChunk(streamHandle.streamId, i, chunkCallback); } } catch (Exception e) { ... } } ... });} |
每个Chunk数据通过fetchChunk读取
public void fetchChunk( long streamId, final int chunkIndex, final ChunkReceivedCallback callback) { final String serverAddr = NettyUtils.getRemoteAddress(channel); final long startTime = System.currentTimeMillis(); logger.debug("Sending fetch chunk request {} to {}", chunkIndex, serverAddr); final StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex); handler.addFetchRequest(streamChunkId, callback); channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener( new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { long timeTaken = System.currentTimeMillis() - startTime; logger.trace("Sending request {} to {} took {} ms", streamChunkId, serverAddr, timeTaken); } else { ... // 错误处理 } } });} |
Spill
如果内存放不下Shuffle的数据,则需要,是否需要Spill由maybeSpill方法检测,采用的是抽样检测的方法,每读进32次个元素后检查一下是否需要进行Spill。
protected def maybeSpill(collection: C, currentMemory: Long): Boolean = { var shouldSpill = false if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) { // Claim up to double our current memory from the shuffle memory pool val amountToRequest = 2 * currentMemory - myMemoryThreshold val granted = taskMemoryManager.acquireExecutionMemory(amountToRequest, MemoryMode.ON_HEAP, null) myMemoryThreshold += granted // If we were granted too little memory to grow further (either tryToAcquire returned 0, // or we already had more memory than myMemoryThreshold), spill the current collection shouldSpill = currentMemory >= myMemoryThreshold } shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold // Actually spill if (shouldSpill) { _spillCount += 1 logSpillage(currentMemory) spill(collection) _elementsRead = 0 _memoryBytesSpilled += currentMemory releaseMemory() } shouldSpill} |
如果需要则触发Spill方法将数据写入磁盘。实现如下:
Shuffle Write
SortShuffleWriter
实际执行过程中默认使用SortShuffleWriter,写操作实现在write方法中。
override def write(records: Iterator[Product2[K, V]]): Unit = { sorter = if (dep.mapSideCombine) { require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") new ExternalSorter[K, V, C]( context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) } else { // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't // care whether the keys get sorted in each partition; that will be done on the reduce side // if the operation being run is sortByKey. new ExternalSorter[K, V, V]( context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer) } sorter.insertAll(records) ... val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId) val tmp = Utils.tempFileWith(output) val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) val partitionLengths = sorter.writePartitionedFile(blockId, tmp) shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp) mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)} |
Shuffle Write将records的迭代器交给ExternalSorter处理,并通过writePartitionedFile接口写入到临时文件中。
然后通过shuffleBlockResolver来写索引文件并更新MapStatus。
ExternalSort
insertAll方法将记录按key插入内存中,实现如下:
def insertAll(records: Iterator[Product2[K, V]]): Unit = { // TODO: stop combining if we find that the reduction factor isn't high val shouldCombine = aggregator.isDefined if (shouldCombine) { // Combine values in-memory first using our AppendOnlyMap val mergeValue = aggregator.get.mergeValue val createCombiner = aggregator.get.createCombiner var kv: Product2[K, V] = null val update = (hadValue: Boolean, oldValue: C) => { if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) } while (records.hasNext) { addElementsRead() kv = records.next() map.changeValue((getPartition(kv._1), kv._1), update) maybeSpillCollection(usingMap = true) } } else { // Stick values into our buffer while (records.hasNext) { addElementsRead() val kv = records.next() buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C]) maybeSpillCollection(usingMap = false) } }} |
如果需要在Map端做Combine,则插入到PartitionedAppendOnlyMap结构中保存,否则插入到PartitionedPairBuffer结构中保存。
两种结构的实现参考:Shuffle – 相关数据结构
1. 如果需要在Map端做聚合,则PartitionedAppendOnlyMap是ExternalSort阶段内存中用于保存插入数据的数据结构,对每个key只需保存更新聚合后的value即可。
2. 如果不需要在Map端做聚合,则PartitionedAppendOnlyMap是ExternalSort阶段内存中用于保存插入数据的数据结构,对每个key需要保存相应地value。
相关数据结构
在ExternalSort中,会使用到PartitionedAppendOnlyMap和PartitionedPairBuffer两个数据结构在内存中保存插入阶段的数据。
// Data structures to store in-memory objects before we spill. Depending on whether we have an// Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we// store them in an array buffer.private var map = new PartitionedAppendOnlyMap[K, C]private var buffer = new PartitionedPairBuffer[K, C] |
PartitionedAppendOnlyMap和PartitionedPairBuffer的特点在于其key和value是连续存放的,形式如下:
| key1 | value1 | key2 | value2 | … | keyN | valueN |
具体介绍如下:
PartitionedAppendOnlyMap
继承关系:PartitionedAppendOnlyMap -> SizeTrackingAppendOnlyMap -> AppendOnlyMap
AppendOnlyMap提供了用于存放Shuffle数据的内存空间,形式上是一个Map。
实现上其实是一个数组data,元素根据HashCode存放,从而构成一个Hash表。
class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] with Serializable { ... // Holds keys and values in the same array for memory locality; specifically, the order of // elements is key0, value0, key1, value1, key2, value2, etc. private var data = new Array[AnyRef](2 * capacity) // <-- 内存中真正存放Shuffle数据的位置 /** Get the value for a given key */ def apply(key: K): V = { assert(!destroyed, destructionMessage) val k = key.asInstanceOf[AnyRef] if (k.eq(null)) { return nullValue } var pos = rehash(k.hashCode) & mask // <-- 根据key的hash值索引 var i = 1 while (true) { val curKey = data(2 * pos) if (k.eq(curKey) || k.equals(curKey)) { return data(2 * pos + 1).asInstanceOf[V] // <-- 找到位置后2*pos+1处存放的就是key对应的value } else if (curKey.eq(null)) { return null.asInstanceOf[V] } else { val delta = i pos = (pos + delta) & mask i += 1 } } null.asInstanceOf[V] } ...} |
PartitionedPairBuffer
实现与PartitionedAppendOnlyMap类似,代码如下:
private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64) extends WritablePartitionedPairCollection[K, V] with SizeTracker{ ... // Basic growable array data structure. We use a single array of AnyRef to hold both the keys // and the values, so that we can sort them efficiently with KVArraySortDataFormat. private var capacity = initialCapacity private var curSize = 0 private var data = new Array[AnyRef](2 * initialCapacity) /** Add an element into the buffer */ def insert(partition: Int, key: K, value: V): Unit = { if (curSize == capacity) { growArray() } data(2 * curSize) = (partition, key.asInstanceOf[AnyRef]) // <-- 与PartitionedAppendOnlyMap类似,这里key和value也是物理上连续存储的 data(2 * curSize + 1) = value.asInstanceOf[AnyRef] curSize += 1 afterUpdate() }} |
1. PartitionedAppendOnlyMap与PartitionedPairBuffer的key和对应的value物理上都是连续存储的,因此索引的时候根据hash值得到指定元素位置后pos需要data(2 * pos + 1)找到相应的元素的value。
2. PartitionedAppendOnlyMap和PartitionedPairBuffer在实现上Key均为(int, K)的形式,value为V。