• 欢迎光临~

Kafka消费者 API(2)

开发技术 开发技术 2022-05-21 次浏览

1.需求:创建一个独立消费者,消费 first 主题 0 号分区的数据。

2.在上篇随笔的package包新建CustomConsumerPartition类

package com.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class CustomConsumerPartition {
    public static void main(String[] args) {
        //配置
        Properties properties = new Properties();
        //连接Kafka集群
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        //反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        //GroupId(必须配置)
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        //创建消费者对象
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

        //topic分区数组
        ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
        topicPartitions.add(new TopicPartition("first",0));
        //订阅topic对应的分区
        kafkaConsumer.assign(topicPartitions);
        //处理数据
        while (true){
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }
    }
}

3.在IDEA运行代码

Kafka消费者 API(2)

4.在异步发送随笔当中执行CustomProducerCallback()类代码(红色部分为修改部分,指定分区为0,key为"")

//发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", 0,"","Kafka" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("主题:" + metadata.topic() + " 分区:" + metadata.partition());
                    }
                }
            });
        }

Kafka消费者 API(2)

生产者发送成功

查看消费者

Kafka消费者 API(2)

Kafka消费者 API(2)

5.修改分区为1,查看结果

for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", 1,"","Kafka" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("主题:" + metadata.topic() + " 分区:" + metadata.partition());
                    }
                }
            });
        }

Kafka消费者 API(2)

消费者无消息收到

程序员灯塔
转载请注明原文链接:Kafka消费者 API(2)
喜欢 (0)