通过GeneratedBlockHandler实现对数据的存储和确认逻辑。
private final class GeneratedBlockHandler extends BlockGeneratorListener {
def onAddData(data : Any, metadata : Any) : Unit = {
if (metadata ! = null ) {
val (topicAndPartition, offset) = metadata.asInstanceOf[(TopicAndPartition, Long)]
updateOffset(topicAndPartition, offset)
}
}
def onGenerateBlock(blockId : StreamBlockId) : Unit = {
rememberBlockOffsets(blockId)
}
def onPushBlock(blockId : StreamBlockId, arrayBuffer : mutable.ArrayBuffer[ _ ]) : Unit = {
storeBlockAndCommitOffset(blockId, arrayBuffer)
}
def onError(message : String, throwable : Throwable) : Unit = {
reportError(message, throwable)
}
}
|
启动Receiver时,新建一个Map记录StreamBlockId和对应的TopicAndPartition,偏移对
override def onStart() : Unit = {
logInfo(s "Starting Kafka Consumer Stream with group: $groupId" )
topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]
blockOffsetMap = new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]()
blockGenerator = supervisor.createBlockGenerator( new GeneratedBlockHandler)
...
val topicMessageStreams = consumerConnector.createMessageStreams(
topics, keyDecoder, valueDecoder)
topicMessageStreams.values.foreach { streams = >
streams.foreach { stream = >
messageHandlerThreadPool.submit( new MessageHandler(stream))
}
}
blockGenerator.start()
}
|