• 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏吧

Spark Streaming源码阅读 KafkaReceiver

SPARK wangting 3周前 (11-01) 17次浏览
通过GeneratedBlockHandler实现对数据的存储和确认逻辑。
/** Class to handle blocks generated by the block generator. */
private final class GeneratedBlockHandler extends BlockGeneratorListener {
  def onAddData(data: Any, metadata: Any): Unit = {
    // Update the offset of the data that was added to the generator
    if (metadata != null) {
      val (topicAndPartition, offset) = metadata.asInstanceOf[(TopicAndPartition, Long)]
      updateOffset(topicAndPartition, offset)
    }
  }
  def onGenerateBlock(blockId: StreamBlockId): Unit = {
    // Remember the offsets of topics/partitions when a block has been generated
    rememberBlockOffsets(blockId)
  }
  def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
    // Store block and commit the blocks offset
    storeBlockAndCommitOffset(blockId, arrayBuffer)
  }
  def onError(message: String, throwable: Throwable): Unit = {
    reportError(message, throwable)
  }
}

启动Receiver时,新建一个Map记录StreamBlockId和对应的TopicAndPartition,偏移对

override def onStart(): Unit = {
  logInfo(s"Starting Kafka Consumer Stream with group: $groupId")
  // Initialize the topic-partition / offset hash map.
  topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]
  // Initialize the stream block id / offset snapshot hash map.
  blockOffsetMap = new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]()
  // Initialize the block generator for storing Kafka message.
  blockGenerator = supervisor.createBlockGenerator(new GeneratedBlockHandler)
  ...
  val topicMessageStreams = consumerConnector.createMessageStreams(
    topics, keyDecoder, valueDecoder)
  topicMessageStreams.values.foreach { streams =>
    streams.foreach { stream =>
      messageHandlerThreadPool.submit(new MessageHandler(stream))
    }
  }
  blockGenerator.start()
}

喜欢 (0)