SortShuffleManager
Reader使用BlockStoreShuffleReader,Writer根据传入的ShuffleHandler选择使用UnsafeShuffleWriter,BypassMergeSortShuffleWriter,还是SortShuffleWriter
override def getReader[K, C]( handle : ShuffleHandle, startPartition : Int, endPartition : Int, context : TaskContext) : ShuffleReader[K, C] = { new BlockStoreShuffleReader( handle.asInstanceOf[BaseShuffleHandle[K, _ , C]], startPartition, endPartition, context) } override def getWriter[K, V]( handle : ShuffleHandle, mapId : Int, context : TaskContext) : ShuffleWriter[K, V] = { numMapsForShuffle.putIfAbsent( handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[ _ , _ , _ ]].numMaps) val env = SparkEnv.get handle match { case unsafeShuffleHandle : SerializedShuffleHandle[K @ unchecked, V @ unchecked] = > new UnsafeShuffleWriter( env.blockManager, shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], context.taskMemoryManager(), unsafeShuffleHandle, mapId, context, env.conf) case bypassMergeSortHandle : BypassMergeSortShuffleHandle[K @ unchecked, V @ unchecked] = > new BypassMergeSortShuffleWriter( env.blockManager, shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], bypassMergeSortHandle, mapId, context, env.conf) case other : BaseShuffleHandle[K @ unchecked, V @ unchecked, _ ] = > new SortShuffleWriter(shuffleBlockResolver, other, mapId, context) } } |
HashShuffleManager
HashShuffleManager使用Hash方法对于每个mapper产生的每个reduce partition创建一个对应的输出文件。
Reader使用BlockStoreShuffleReader,Writer使用HashShuffleWriter
override def getReader[K, C]( handle : ShuffleHandle, startPartition : Int, endPartition : Int, context : TaskContext) : ShuffleReader[K, C] = { new BlockStoreShuffleReader( handle.asInstanceOf[BaseShuffleHandle[K, _ , C]], startPartition, endPartition, context) } /** Get a writer for a given partition. Called on executors by map tasks. */ override def getWriter[K, V](handle : ShuffleHandle, mapId : Int, context : TaskContext) : ShuffleWriter[K, V] = { new HashShuffleWriter( shuffleBlockResolver, handle.asInstanceOf[BaseShuffleHandle[K, V, _ ]], mapId, context) } |
Shuffle Reader
Reducer端用于Shuffle阶段从其他节点的BlockStore中读取需要的Partition。
目前Spark中只有一种实现:BlockStoreShuffleReader。
读取的数据范围由[startPartition, endPartition)区间指定。
ShuffleBlockFetcherIterator
读取Shuffle数据块的操作是通过ShuffleBlockFetcherIterator完成的。
initialize
初始话阶段通过splitLocalRemoteBlocks将要读取的数据划分,分多次FetchRequest进行读取。
private [ this ] def splitLocalRemoteBlocks() : ArrayBuffer[FetchRequest] = { val targetRequestSize = math.max(maxBytesInFlight / 5 , 1 L) ... val remoteRequests = new ArrayBuffer[FetchRequest] var totalBlocks = 0 for ((address, blockInfos) <- blocksByAddress) { totalBlocks + = blockInfos.size if (address.executorId == blockManager.blockManagerId.executorId) { ... // <-- 处理local blocks } else { val iterator = blockInfos.iterator var curRequestSize = 0 L var curBlocks = new ArrayBuffer[(BlockId, Long)] while (iterator.hasNext) { val (blockId, size) = iterator.next() // Skip empty blocks if (size > 0 ) { curBlocks + = ((blockId, size)) remoteBlocks + = blockId numBlocksToFetch + = 1 curRequestSize + = size } else if (size < 0 ) { throw new BlockException(blockId, "Negative block size " + size) } if (curRequestSize > = targetRequestSize) { // <-- 当累计从同一节点读取的数据达到maxBytesInFlight的1/5时,就会将剩下的数据放到下一次取。 // Add this FetchRequest remoteRequests + = new FetchRequest(address, curBlocks) curBlocks = new ArrayBuffer[(BlockId, Long)] logDebug(s "Creating fetch request of $curRequestSize at $address" ) curRequestSize = 0 } } ... } |
返回的FetchRequet会进行随机化处理,然后加入到fetchRequests队列中,随后发起一次预取。
// Split local and remote blocks. val remoteRequests = splitLocalRemoteBlocks() // Add the remote requests into our queue in a random order fetchRequests ++ = Utils.randomize(remoteRequests) // Send out initial requests for blocks, up to our maxBytesInFlight fetchUpToMaxBytes() |
取数据的过程由fetchUpToMaxBytes完成,就是从fetchRequests队列里取出FetchRequest。
只要当前总共在取的总数据量不超过maxBytesInFlight,就可以发送取数据请求进行读取。
由于在splitLocalRemoteBlocks阶段已经限制每个FetchRequest取得数据不超过maxBytesInFlight/5,因此可以保证并发度至少为5
即至少可以从5台不同的节点并行Fetch数据。
private def fetchUpToMaxBytes() : Unit = { // Send fetch requests up to maxBytesInFlight while (fetchRequests.nonEmpty && (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size < = maxBytesInFlight)) { sendRequest(fetchRequests.dequeue()) } } |
next
next方法result队列中获取需要的数据。取回后会触发fetchUpToMaxBytes预取后续的数据。
override def next() : (BlockId, InputStream) = { numBlocksProcessed + = 1 val startFetchWait = System.currentTimeMillis() currentResult = results.take() val result = currentResult val stopFetchWait = System.currentTimeMillis() shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait) result match { case SuccessFetchResult( _ , _ , size, _ ) = > bytesInFlight - = size case _ = > } // Send fetch requests up to maxBytesInFlight fetchUpToMaxBytes() result match { ... // <-- 结果处理 } } |
取数据请求通过sendRequest方法发送。
private [ this ] def sendRequest(req : FetchRequest) { logDebug( "Sending request for %d blocks (%s) from %s" .format( req.blocks.size, Utils.bytesToString(req.size), req.address.hostPort)) bytesInFlight + = req.size // so we can look up the size of each blockID val sizeMap = req.blocks.map { case (blockId, size) = > (blockId.toString, size) }.toMap val blockIds = req.blocks.map( _ . _ 1 .toString) val address = req.address shuffleClient.fetchBlocks(address.host, address.port, address.executorId, blockIds.toArray, new BlockFetchingListener { override def onBlockFetchSuccess(blockId : String, buf : ManagedBuffer) : Unit = { // Only add the buffer to results queue if the iterator is not zombie, // i.e. cleanup() has not been called yet. if (!isZombie) { // Increment the ref count because we need to pass this to a different thread. // This needs to be released after use. buf.retain() results.put( new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), buf)) // <-- 这里将成功取回的结果放入LinkedBlockingQueue中 shuffleMetrics.incRemoteBytesRead(buf.size) shuffleMetrics.incRemoteBlocksFetched( 1 ) } logTrace( "Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) } ... } ) } |
MapOutputTracker
每个executor对应一个MapOutputTrackerWorker,用于从Driver端的MapOutputTrackerMaster获取map阶段的输出信息。
通过getMapSizesByExecutorId方法获取,返回(BlockManagerId, Seq[(BlockId, BlockSize)])形式的元组。
Shuffle Client
数据的读取通过BlockManager的shuffle Client完成,可以作为一个外部服务运行,也可以直接使用blockTransferService去同其他executor通信。
// Client to read other executors' shuffle files. This is either an external service, or just the // standard BlockTransferService to directly connect to other Executors. private [spark] val shuffleClient = if (externalShuffleServiceEnabled) { val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle" , numUsableCores) new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled(), securityManager.isSaslEncryptionEnabled()) } else { blockTransferService } |
默认使用的实现是NettyBlockTransferService,基于netty实现。
NettyBlockTransferService
取数据操作由fetchBlocks实现:
override def fetchBlocks( host : String, port : Int, execId : String, blockIds : Array[String], listener : BlockFetchingListener) : Unit = { logTrace(s "Fetch blocks from $host:$port (executor id $execId)" ) try { val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter { override def createAndStart(blockIds : Array[String], listener : BlockFetchingListener) { val client = clientFactory.createClient(host, port) new OneForOneBlockFetcher(client, appId, execId, blockIds.toArray, listener).start() // <--同目标数据所在节点建立连接并读取数据 } } val maxRetries = transportConf.maxIORetries() if (maxRetries > 0 ) { // Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's // a bug in this code. We should remove the if statement once we're sure of the stability. new RetryingBlockFetcher(transportConf, blockFetchStarter, blockIds, listener).start() } else { blockFetchStarter.createAndStart(blockIds, listener) } } catch { case e : Exception = > logError( "Exception while beginning fetchBlocks" , e) blockIds.foreach(listener.onBlockFetchFailure( _ , e)) } } |
获取数据所在节点IP和端口后,通过OneForOneBlockFetcher读取该节点数据。
OneForOneBlockFetcher
实现如下:
public void start() { if (blockIds.length == 0 ) { throw new IllegalArgumentException( "Zero-sized blockIds array" ); } client.sendRpc(openMessage.toByteBuffer(), new RpcResponseCallback() { @ Override public void onSuccess(ByteBuffer response) { try { streamHandle = (StreamHandle) BlockTransferMessage.Decoder.fromByteBuffer(response); logger.trace( "Successfully opened blocks {}, preparing to fetch chunks." , streamHandle); // Immediately request all chunks -- we expect that the total size of the request is // reasonable due to higher level chunking in [[ShuffleBlockFetcherIterator]]. for (int i = 0 ; i < streamHandle.numChunks; i++) { client.fetchChunk(streamHandle.streamId, i, chunkCallback); } } catch (Exception e) { ... } } ... }); } |
每个Chunk数据通过fetchChunk读取
public void fetchChunk( long streamId, final int chunkIndex, final ChunkReceivedCallback callback) { final String serverAddr = NettyUtils.getRemoteAddress(channel); final long startTime = System.currentTimeMillis(); logger.debug( "Sending fetch chunk request {} to {}" , chunkIndex, serverAddr); final StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex); handler.addFetchRequest(streamChunkId, callback); channel.writeAndFlush( new ChunkFetchRequest(streamChunkId)).addListener( new ChannelFutureListener() { @ Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { long timeTaken = System.currentTimeMillis() - startTime; logger.trace( "Sending request {} to {} took {} ms" , streamChunkId, serverAddr, timeTaken); } else { ... // 错误处理 } } }); } |
Spill
如果内存放不下Shuffle的数据,则需要,是否需要Spill由maybeSpill方法检测,采用的是抽样检测的方法,每读进32次个元素后检查一下是否需要进行Spill。
protected def maybeSpill(collection : C, currentMemory : Long) : Boolean = { var shouldSpill = false if (elementsRead % 32 == 0 && currentMemory > = myMemoryThreshold) { // Claim up to double our current memory from the shuffle memory pool val amountToRequest = 2 * currentMemory - myMemoryThreshold val granted = taskMemoryManager.acquireExecutionMemory(amountToRequest, MemoryMode.ON _ HEAP, null ) myMemoryThreshold + = granted // If we were granted too little memory to grow further (either tryToAcquire returned 0, // or we already had more memory than myMemoryThreshold), spill the current collection shouldSpill = currentMemory > = myMemoryThreshold } shouldSpill = shouldSpill || _ elementsRead > numElementsForceSpillThreshold // Actually spill if (shouldSpill) { _ spillCount + = 1 logSpillage(currentMemory) spill(collection) _ elementsRead = 0 _ memoryBytesSpilled + = currentMemory releaseMemory() } shouldSpill } |
如果需要则触发Spill方法将数据写入磁盘。实现如下:
Shuffle Write
SortShuffleWriter
实际执行过程中默认使用SortShuffleWriter,写操作实现在write方法中。
override def write(records : Iterator[Product 2 [K, V]]) : Unit = { sorter = if (dep.mapSideCombine) { require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!" ) new ExternalSorter[K, V, C]( context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) } else { // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't // care whether the keys get sorted in each partition; that will be done on the reduce side // if the operation being run is sortByKey. new ExternalSorter[K, V, V]( context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer) } sorter.insertAll(records) ... val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId) val tmp = Utils.tempFileWith(output) val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP _ REDUCE _ ID) val partitionLengths = sorter.writePartitionedFile(blockId, tmp) shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp) mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) } |
Shuffle Write将records的迭代器交给ExternalSorter处理,并通过writePartitionedFile接口写入到临时文件中。
然后通过shuffleBlockResolver来写索引文件并更新MapStatus。
ExternalSort
insertAll方法将记录按key插入内存中,实现如下:
def insertAll(records : Iterator[Product 2 [K, V]]) : Unit = { // TODO: stop combining if we find that the reduction factor isn't high val shouldCombine = aggregator.isDefined if (shouldCombine) { // Combine values in-memory first using our AppendOnlyMap val mergeValue = aggregator.get.mergeValue val createCombiner = aggregator.get.createCombiner var kv : Product 2 [K, V] = null val update = (hadValue : Boolean, oldValue : C) = > { if (hadValue) mergeValue(oldValue, kv. _ 2 ) else createCombiner(kv. _ 2 ) } while (records.hasNext) { addElementsRead() kv = records.next() map.changeValue((getPartition(kv. _ 1 ), kv. _ 1 ), update) maybeSpillCollection(usingMap = true ) } } else { // Stick values into our buffer while (records.hasNext) { addElementsRead() val kv = records.next() buffer.insert(getPartition(kv. _ 1 ), kv. _ 1 , kv. _ 2 .asInstanceOf[C]) maybeSpillCollection(usingMap = false ) } } } |
如果需要在Map端做Combine,则插入到PartitionedAppendOnlyMap结构中保存,否则插入到PartitionedPairBuffer结构中保存。
两种结构的实现参考:Shuffle – 相关数据结构
1. 如果需要在Map端做聚合,则PartitionedAppendOnlyMap是ExternalSort阶段内存中用于保存插入数据的数据结构,对每个key只需保存更新聚合后的value即可。
2. 如果不需要在Map端做聚合,则PartitionedAppendOnlyMap是ExternalSort阶段内存中用于保存插入数据的数据结构,对每个key需要保存相应地value。
相关数据结构
在ExternalSort中,会使用到PartitionedAppendOnlyMap和PartitionedPairBuffer两个数据结构在内存中保存插入阶段的数据。
// Data structures to store in-memory objects before we spill. Depending on whether we have an // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we // store them in an array buffer. private var map = new PartitionedAppendOnlyMap[K, C] private var buffer = new PartitionedPairBuffer[K, C] |
PartitionedAppendOnlyMap和PartitionedPairBuffer的特点在于其key和value是连续存放的,形式如下:
key1 | value1 | key2 | value2 | … | keyN | valueN |
具体介绍如下:
PartitionedAppendOnlyMap
继承关系:PartitionedAppendOnlyMap -> SizeTrackingAppendOnlyMap -> AppendOnlyMap
AppendOnlyMap提供了用于存放Shuffle数据的内存空间,形式上是一个Map。
实现上其实是一个数组data,元素根据HashCode存放,从而构成一个Hash表。
class AppendOnlyMap[K, V](initialCapacity : Int = 64 ) extends Iterable[(K, V)] with Serializable { ... // Holds keys and values in the same array for memory locality; specifically, the order of // elements is key0, value0, key1, value1, key2, value2, etc. private var data = new Array[AnyRef]( 2 * capacity) // <-- 内存中真正存放Shuffle数据的位置 /** Get the value for a given key */ def apply(key : K) : V = { assert(!destroyed, destructionMessage) val k = key.asInstanceOf[AnyRef] if (k.eq( null )) { return nullValue } var pos = rehash(k.hashCode) & mask // <-- 根据key的hash值索引 var i = 1 while ( true ) { val curKey = data( 2 * pos) if (k.eq(curKey) || k.equals(curKey)) { return data( 2 * pos + 1 ).asInstanceOf[V] // <-- 找到位置后2*pos+1处存放的就是key对应的value } else if (curKey.eq( null )) { return null .asInstanceOf[V] } else { val delta = i pos = (pos + delta) & mask i + = 1 } } null .asInstanceOf[V] } ... } |
PartitionedPairBuffer
实现与PartitionedAppendOnlyMap类似,代码如下:
private [spark] class PartitionedPairBuffer[K, V](initialCapacity : Int = 64 ) extends WritablePartitionedPairCollection[K, V] with SizeTracker { ... // Basic growable array data structure. We use a single array of AnyRef to hold both the keys // and the values, so that we can sort them efficiently with KVArraySortDataFormat. private var capacity = initialCapacity private var curSize = 0 private var data = new Array[AnyRef]( 2 * initialCapacity) /** Add an element into the buffer */ def insert(partition : Int, key : K, value : V) : Unit = { if (curSize == capacity) { growArray() } data( 2 * curSize) = (partition, key.asInstanceOf[AnyRef]) // <-- 与PartitionedAppendOnlyMap类似,这里key和value也是物理上连续存储的 data( 2 * curSize + 1 ) = value.asInstanceOf[AnyRef] curSize + = 1 afterUpdate() } } |
1. PartitionedAppendOnlyMap与PartitionedPairBuffer的key和对应的value物理上都是连续存储的,因此索引的时候根据hash值得到指定元素位置后pos需要data(2 * pos + 1)找到相应的元素的value。
2. PartitionedAppendOnlyMap和PartitionedPairBuffer在实现上Key均为(int, K)的形式,value为V。