KafkaUtils

发布于 2022-08-31  447 次阅读


pom文件

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.13</artifactId>
    <version>3.2.1</version>
</dependency>

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.8.0</version>
</dependency>

代码

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;

/**
 * kafka工具类
 *
 * @author 7e2hj
 * @version 1.0
 * @date 2021-11-29
 */
public class KafkaUtils {
    /**
     * 创建kafka客户端基类,记得手动关闭
     *
     * @param bootstrapServers
     * @param userName
     * @param password
     * @return kafka客户端基类
     */
    public static AdminClient getAdmin(String bootstrapServers, String userName, String password) {
        Properties prop = new Properties();
        prop.put("bootstrap.servers", bootstrapServers);
        prop.put("connections.max.idle.ms", 10000);
        prop.put("request.timeout.ms", 5000);
        prop.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" + userName + "\" password=\"" + password + "\";");
        prop.put("security.protocol", "SASL_PLAINTEXT");
        prop.put("sasl.mechanism", "SCRAM-SHA-256");

        return AdminClient.create(prop);
    }

    public static AdminClient getAdmin(String bootstrapServers) {
        Properties prop = new Properties();
        prop.put("bootstrap.servers", bootstrapServers);
        prop.put("connections.max.idle.ms", 10000);
        prop.put("request.timeout.ms", 5000);

        return AdminClient.create(prop);
    }

    public static boolean isExistTopic(String bootstrapServers, String userName, String password, String topicName) {
        try (AdminClient client = getAdmin(bootstrapServers, userName, password)) {
            Set<String> topics = client.listTopics().names().get();
            return topics.contains(topicName);
        } catch (InterruptedException | ExecutionException e) {
            throw new IllegalStateException(e);
        }
    }

    /**
     * 创建topic
     *
     * @param client    kafka客户端
     * @param newTopics 要创建的topic列表
     *                  eg:HashMap<String, String> map = new HashMap<>();
     *                  map.put("max.message.bytes", "2147483647");
     *                  Arrays.asList(
     *                  new NewTopic("topic1", 1, (short) 1),
     *                  new NewTopic("topic2", 1, (short) 1).configs(map)
     *                  );
     */
    public static void createTopics(AdminClient client, Collection<NewTopic> newTopics) {
        CreateTopicsResult result = client.createTopics(newTopics);
        try {
            result.all().get();
        } catch (InterruptedException | ExecutionException e) {
            throw new IllegalStateException(e);
        }
    }

    /**
     * 创建topic
     *
     * @param bootstrapServers  kafka集群地址
     * @param userName          Kerberos用户名
     * @param password          kerberos密码
     * @param topicName         topic名
     * @param partitionNum      分区数
     * @param replicationFactor 副本数
     */
    public static void createTopics(String bootstrapServers, String userName, String password, String topicName, int partitionNum, short replicationFactor) {
        try (AdminClient client = getAdmin(bootstrapServers, userName, password)) {
            CreateTopicsResult result = client.createTopics(Collections.singletonList(
                    new NewTopic(topicName, partitionNum, replicationFactor)
            ));
            result.all().get();
        } catch (InterruptedException | ExecutionException e) {
            throw new IllegalStateException(e);
        }
    }

    public static void createTopics(String bootstrapServers, String topicName, int partitionNum, short replicationFactor) {
        try (AdminClient client = getAdmin(bootstrapServers)) {
            CreateTopicsResult result = client.createTopics(Collections.singletonList(
                    new NewTopic(topicName, partitionNum, replicationFactor)
            ));
            result.all().get();
        } catch (InterruptedException | ExecutionException e) {
            throw new IllegalStateException(e);
        }
    }

    public static void deleteTopics(AdminClient client, Collection<String> topics) {
        DeleteTopicsResult result = client.deleteTopics(topics);
        try {
            result.all().get();
        } catch (InterruptedException | ExecutionException e) {
            throw new IllegalStateException(e);
        }
    }

    public static void deleteTopics(String bootstrapServers, String userName, String password, String topicName) {
        try (AdminClient client = getAdmin(bootstrapServers, userName, password)) {
            DeleteTopicsResult result = client.deleteTopics(Collections.singletonList(topicName));
            result.all().get();
        } catch (InterruptedException | ExecutionException e) {
            throw new IllegalStateException(e);
        }
    }

    public static void deleteTopics(String bootstrapServers, String topicName) {
        try (AdminClient client = getAdmin(bootstrapServers)) {
            DeleteTopicsResult result = client.deleteTopics(Collections.singletonList(topicName));
            result.all().get();
        } catch (InterruptedException | ExecutionException e) {
            throw new IllegalStateException(e);
        }
    }

    /**
     * 添加topic分区数
     *
     * @param client       kafka客户端
     * @param topicName    topic名称
     * @param partitionNum 要添加到的分区数
     */
    public static void addPartitions(AdminClient client, String topicName, int partitionNum) {
        HashMap<String, NewPartitions> newPartitions = new HashMap<>();
        newPartitions.put(topicName, NewPartitions.increaseTo(partitionNum));
        CreatePartitionsResult rs = client.createPartitions(newPartitions);
        try {
            rs.all().get();
        } catch (InterruptedException | ExecutionException e) {
            throw new IllegalStateException(e);
        }
    }

    public static void addPartitions(String bootstrapServers, String userName, String password, String topicName, int partitionNum) {
        try (AdminClient client = getAdmin(bootstrapServers, userName, password)) {
            HashMap<String, NewPartitions> newPartitions = new HashMap<>();
            newPartitions.put(topicName, NewPartitions.increaseTo(partitionNum));
            CreatePartitionsResult rs = client.createPartitions(newPartitions);

            rs.all().get();
        } catch (InterruptedException | ExecutionException e) {
            throw new IllegalStateException(e);
        }
    }

    public static void addPartitions(String bootstrapServers, String topicName, int partitionNum) {
        try (AdminClient client = getAdmin(bootstrapServers)) {
            HashMap<String, NewPartitions> newPartitions = new HashMap<>();
            newPartitions.put(topicName, NewPartitions.increaseTo(partitionNum));
            CreatePartitionsResult rs = client.createPartitions(newPartitions);

            rs.all().get();
        } catch (InterruptedException | ExecutionException e) {
            throw new IllegalStateException(e);
        }
    }

    /**
     * 普通生产者
     *
     * @param bootStrap Kafka集群地址
     * @param config    额外的配置
     * @return kafka生产者
     */
    public static KafkaProducer<String, String> getProducer(String bootStrap, Map<String, String> config) {
        if (config == null)
            config = new HashMap<>();
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers", bootStrap);
        prop.setProperty("max.request.size", "2147483647");
        prop.setProperty("buffer.memory", "2147483647");
        prop.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        prop.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        prop.putAll(config);
        return new KafkaProducer<>(prop);
    }

    /**
     * 带kerberos认证的生产者
     *
     * @param bootStrap Kafka集群地址
     * @param userName  kerberos用户名
     * @param password  kerberos密码
     * @param config    额外的配置
     * @return kafka生产者
     */
    public static KafkaProducer<String, String> getProducer(String bootStrap, String userName, String password, Map<String, String> config) {
        if (config == null)
            config = new HashMap<>();
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers", bootStrap);
        prop.setProperty("max.request.size", "2147483647");
        prop.setProperty("buffer.memory", "2147483647");
        prop.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        prop.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        prop.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" + userName + "\" password=\"" + password + "\";");
        prop.setProperty("security.protocol", "SASL_PLAINTEXT");
        prop.setProperty("sasl.mechanism", "SCRAM-SHA-256");
        prop.putAll(config);
        return new KafkaProducer<>(prop);
    }

    /**
     * 普通消费者
     *
     * @param bootStrap Kafka集群地址
     * @param group     消费者组
     * @param config    额外的配置
     * @return kafka消费者
     */
    public static KafkaConsumer<String, String> getConsumer(String bootStrap, String group, Map<String, String> config) {
        if (config == null)
            config = new HashMap<>();
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers", bootStrap);
        prop.setProperty("group.id", group);
        prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        prop.putAll(config);
        return new KafkaConsumer<>(prop);
    }

    /**
     * 带kerberos认证的消费者
     *
     * @param bootStrap kafka集群地址
     * @param group     消费者组
     * @param userName  kerberos用户名
     * @param password  Kerberos密码
     * @param config    额外的配置
     * @return kafka消费者
     */
    public static KafkaConsumer<String, String> getConsumer(String bootStrap, String group, String userName, String password, Map<String, String> config) {
        if (config == null)
            config = new HashMap<>();
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers", bootStrap);
        prop.setProperty("group.id", group);
        prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        prop.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" + userName + "\" password=\"" + password + "\";");
        prop.setProperty("security.protocol", "SASL_PLAINTEXT");
        prop.setProperty("sasl.mechanism", "SCRAM-SHA-256");
        prop.putAll(config);
        return new KafkaConsumer<>(prop);
    }

    public static void send(KafkaProducer<String, String> producer, String topic, String value) {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);
        producer.send(record, (recordMetadata, e) -> {
            if (e != null) {
                e.printStackTrace();
            }
        });
    }

    public static void send(KafkaProducer<String, String> producer, String topic, int partition, String value) {
        //当同时提供分区编号和key值时,以分区编号为准
        String key = null;
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, partition, key, value);
        producer.send(record, (recordMetadata, e) -> {
            if (e != null) {
                e.printStackTrace();
            }
        });
    }

    /**
     * 单次消费
     *
     * @param consumer kafka消费者
     * @param topic    要消费的主题
     * @return 消息
     */
    public static ConsumerRecords<String, String> receive(KafkaConsumer<String, String> consumer, String topic) {
        //订阅主题 自动分配分区
        consumer.subscribe(Collections.singletonList(topic));
        return consumer.poll(Duration.ofSeconds(60));
    }

    /**
     * 指定offset消费,需设置enable.auto.commit为false
     *
     * @param consumer kafka消费者
     * @param topic    要消费的主题
     * @param offset   指定的开始位置
     * @return 消息
     */
    public static ConsumerRecords<String, String> receive(KafkaConsumer<String, String> consumer, String topic,
                                                          long offset) {
        List<TopicPartition> topicPartitions = getTopicPartitions(consumer, topic);
        consumer.assign(topicPartitions);
        for (TopicPartition topicPartition : topicPartitions) {
            consumer.seek(topicPartition, offset);
        }
        return consumer.poll(Duration.ofSeconds(60));
    }

    /**
     * 从头开始消费,需设置enable.auto.commit为false
     *
     * @param consumer kafka消费者
     * @param topic    要消费的主题
     * @return 消息
     */
    public static ConsumerRecords<String, String> receiveBegin(KafkaConsumer<String, String> consumer, String topic) {
        List<TopicPartition> topicPartitions = getTopicPartitions(consumer, topic);
        consumer.assign(topicPartitions);
        consumer.seekToBeginning(topicPartitions);
        return consumer.poll(Duration.ofSeconds(60));
    }

    /**
     * 指定时间消费,需设置enable.auto.commit为false
     *
     * @param consumer  kafka消费者
     * @param topic     要消费的主题
     * @param startTime 指定的开始时间
     * @param endTime   指定的结束时间 为0则消费到最新
     */
    public static void timeReceive(KafkaConsumer<String, String> consumer, String topic, long startTime,
                                   long endTime, Consumer<ConsumerRecord<String, String>> action) {
        //订阅所有分区
        consumer.assign(getTopicPartitions(consumer, topic));
        // 获取每个partition偏移量
        Map<TopicPartition, OffsetAndTimestamp> map = getTimeOffsets(consumer, topic, startTime);

        for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : map.entrySet()) {
            // 如果设置的查询偏移量的时间点大于最大的索引记录时间,那么value就为空
            OffsetAndTimestamp offsetTimestamp = entry.getValue();
            if (offsetTimestamp != null) {
                long offset = offsetTimestamp.offset();
                // 设置读取消息的偏移量
                consumer.seek(entry.getKey(), offset);
            }
        }

        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(60));
        for (ConsumerRecord<String, String> record : records) {
            if (endTime == 0 || record.timestamp() <= endTime) {
                action.accept(record);
            }
        }
    }

    /**
     * 循环消费
     *
     * @param consumer kafka消费者
     * @param topic    要消费的主题
     * @param action   处理逻辑
     */
    @SuppressWarnings("InfiniteLoopStatement")
    public static void loopReceive(KafkaConsumer<String, String> consumer, String topic, Consumer<ConsumerRecord<String, String>> action) {
        //订阅主题 自动分配分区
        consumer.subscribe(Collections.singletonList(topic));
        // noinspection InfiniteLoopStatement
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(60));
            records.forEach(action);
        }
    }

    public static List<TopicPartition> getTopicPartitions(KafkaConsumer<String, String> consumer, String topic) {
        // 获取topic的partition信息
        List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
        List<TopicPartition> topicPartitions = new ArrayList<>();

        for (PartitionInfo partitionInfo : partitionInfos) {
            topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
        }

        return topicPartitions;
    }

    /**
     * 获取指定时间的offset
     *
     * @param consumer  kafka消费者
     * @param topic     要消费的主题
     * @param startTime 指定的时间
     * @return Map<TopicPartition, OffsetAndTimestamp>
     */
    public static Map<TopicPartition, OffsetAndTimestamp> getTimeOffsets(KafkaConsumer<String, String> consumer, String topic, long startTime) {
        // 获取topic的partition信息
        List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
        Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();

        for (PartitionInfo partitionInfo : partitionInfos) {
            timestampsToSearch.put(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()), startTime);
        }
        // 获取每个partition偏移量
        return consumer.offsetsForTimes(timestampsToSearch);
    }

    public static long getOffsetSum(String bootstrapServers, String userName, String password, String topicName) {
        long offsetSum = 0;
        try (KafkaConsumer<String, String> consumer = getConsumer(bootstrapServers, "get_offset", userName, password, null)) {
            Map<TopicPartition, Long> endOffsets = consumer.endOffsets(getTopicPartitions(consumer, topicName));
            for (Map.Entry<TopicPartition, Long> entry : endOffsets.entrySet()) {
                offsetSum += entry.getValue();
            }
        }
        return offsetSum;
    }

    public static long getOffsetSum(String bootstrapServers, String topicName) {
        long offsetSum = 0;
        try (KafkaConsumer<String, String> consumer = getConsumer(bootstrapServers, "get_offset", null)) {
            Map<TopicPartition, Long> endOffsets = consumer.endOffsets(getTopicPartitions(consumer, topicName));
            for (Map.Entry<TopicPartition, Long> entry : endOffsets.entrySet()) {
                offsetSum += entry.getValue();
            }
        }
        return offsetSum;
    }

    public static long getPartitionsNum(String bootstrapServers, String userName, String password, String topicName) {
        try (KafkaConsumer<String, String> consumer = getConsumer(bootstrapServers, "get_partitions", userName, password, null)) {
            List<PartitionInfo> partitionInfos = consumer.partitionsFor(topicName);
            return partitionInfos.size();
        }
    }

    public static long getTopicCreateTime(String zookeeper, String topicName) throws IOException, InterruptedException, KeeperException {
        try (ZooKeeper zk = new ZooKeeper(zookeeper, 300000, null)) {
            String path = "/brokers/topics/" + topicName;
            Stat stat = zk.exists(path, null);
            if (stat != null) {
                return stat.getCtime();
            } else {
                return 0;
            }
        }
    }
}