Skip to content

程序员灯塔

Menu
  • Download
  • sitemap
  • 文章归档
  • 标签归档
  • 示例页面
Menu

Spark Streaming源码阅读 KafkaReceiver

Posted on 2019 年 11 月 1 日
通过GeneratedBlockHandler实现对数据的存储和确认逻辑。
/** Class to handle blocks generated by the block generator. */
private final class GeneratedBlockHandler extends BlockGeneratorListener {
  def onAddData(data: Any, metadata: Any): Unit = {
    // Update the offset of the data that was added to the generator
    if (metadata != null) {
      val (topicAndPartition, offset) = metadata.asInstanceOf[(TopicAndPartition, Long)]
      updateOffset(topicAndPartition, offset)
    }
  }
  def onGenerateBlock(blockId: StreamBlockId): Unit = {
    // Remember the offsets of topics/partitions when a block has been generated
    rememberBlockOffsets(blockId)
  }
  def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
    // Store block and commit the blocks offset
    storeBlockAndCommitOffset(blockId, arrayBuffer)
  }
  def onError(message: String, throwable: Throwable): Unit = {
    reportError(message, throwable)
  }
}

启动Receiver时,新建一个Map记录StreamBlockId和对应的TopicAndPartition,偏移对

override def onStart(): Unit = {
  logInfo(s"Starting Kafka Consumer Stream with group: $groupId")
  // Initialize the topic-partition / offset hash map.
  topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]
  // Initialize the stream block id / offset snapshot hash map.
  blockOffsetMap = new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]()
  // Initialize the block generator for storing Kafka message.
  blockGenerator = supervisor.createBlockGenerator(new GeneratedBlockHandler)
  ...
  val topicMessageStreams = consumerConnector.createMessageStreams(
    topics, keyDecoder, valueDecoder)
  topicMessageStreams.values.foreach { streams =>
    streams.foreach { stream =>
      messageHandlerThreadPool.submit(new MessageHandler(stream))
    }
  }
  blockGenerator.start()
}

近期文章

  • 技术网站
  • 世界,您好!
  • Git学习记录(learngitbranching.js.org)
  • 阿里职场潜规则
  • 寻找两个正序数组的中位数

近期评论

  1. 一位 WordPress 评论者 发表在 世界,您好!

归档

  • 2024 年 9 月
  • 2024 年 3 月
  • 2022 年 12 月
  • 2021 年 8 月
  • 2021 年 6 月
  • 2021 年 3 月
  • 2021 年 2 月
  • 2020 年 11 月
  • 2020 年 5 月
  • 2020 年 3 月
  • 2019 年 11 月
  • 2019 年 10 月
  • 2019 年 9 月
  • 2019 年 7 月
  • 2019 年 6 月
  • 2019 年 5 月
  • 2019 年 3 月
  • 2018 年 9 月
  • 2018 年 8 月
  • 2018 年 7 月
  • 2018 年 4 月
  • 2018 年 2 月
  • 2018 年 1 月
  • 2017 年 12 月
  • 2017 年 11 月
  • 2017 年 10 月
  • 2017 年 8 月
  • 2017 年 7 月

分类目录

  • 未分类
©2025 程序员灯塔 | Design: Newspaperly WordPress Theme