通过GeneratedBlockHandler实现对数据的存储和确认逻辑。
private final class GeneratedBlockHandler extends BlockGeneratorListener {
def onAddData(data: Any, metadata: Any): Unit = {
if (metadata != null) {
val (topicAndPartition, offset) = metadata.asInstanceOf[(TopicAndPartition, Long)]
updateOffset(topicAndPartition, offset)
}
}
def onGenerateBlock(blockId: StreamBlockId): Unit = {
rememberBlockOffsets(blockId)
}
def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
storeBlockAndCommitOffset(blockId, arrayBuffer)
}
def onError(message: String, throwable: Throwable): Unit = {
reportError(message, throwable)
}
}
|
启动Receiver时,新建一个Map记录StreamBlockId和对应的TopicAndPartition,偏移对
override def onStart(): Unit = {
logInfo(s"Starting Kafka Consumer Stream with group: $groupId")
topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]
blockOffsetMap = new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]()
blockGenerator = supervisor.createBlockGenerator(new GeneratedBlockHandler)
...
val topicMessageStreams = consumerConnector.createMessageStreams(
topics, keyDecoder, valueDecoder)
topicMessageStreams.values.foreach { streams =>
streams.foreach { stream =>
messageHandlerThreadPool.submit(new MessageHandler(stream))
}
}
blockGenerator.start()
}
|