pom文件
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.8.0</version>
</dependency>
代码
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
/**
* kafka工具类
*
* @author 7e2hj
* @version 1.0
* @date 2021-11-29
*/
public class KafkaUtils {
/**
* 创建kafka客户端基类,记得手动关闭
*
* @param bootstrapServers
* @param userName
* @param password
* @return kafka客户端基类
*/
public static AdminClient getAdmin(String bootstrapServers, String userName, String password) {
Properties prop = new Properties();
prop.put("bootstrap.servers", bootstrapServers);
prop.put("connections.max.idle.ms", 10000);
prop.put("request.timeout.ms", 5000);
prop.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" + userName + "\" password=\"" + password + "\";");
prop.put("security.protocol", "SASL_PLAINTEXT");
prop.put("sasl.mechanism", "SCRAM-SHA-256");
return AdminClient.create(prop);
}
public static AdminClient getAdmin(String bootstrapServers) {
Properties prop = new Properties();
prop.put("bootstrap.servers", bootstrapServers);
prop.put("connections.max.idle.ms", 10000);
prop.put("request.timeout.ms", 5000);
return AdminClient.create(prop);
}
public static boolean isExistTopic(String bootstrapServers, String userName, String password, String topicName) {
try (AdminClient client = getAdmin(bootstrapServers, userName, password)) {
Set<String> topics = client.listTopics().names().get();
return topics.contains(topicName);
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
}
}
/**
* 创建topic
*
* @param client kafka客户端
* @param newTopics 要创建的topic列表
* eg:HashMap<String, String> map = new HashMap<>();
* map.put("max.message.bytes", "2147483647");
* Arrays.asList(
* new NewTopic("topic1", 1, (short) 1),
* new NewTopic("topic2", 1, (short) 1).configs(map)
* );
*/
public static void createTopics(AdminClient client, Collection<NewTopic> newTopics) {
CreateTopicsResult result = client.createTopics(newTopics);
try {
result.all().get();
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
}
}
/**
* 创建topic
*
* @param bootstrapServers kafka集群地址
* @param userName Kerberos用户名
* @param password kerberos密码
* @param topicName topic名
* @param partitionNum 分区数
* @param replicationFactor 副本数
*/
public static void createTopics(String bootstrapServers, String userName, String password, String topicName, int partitionNum, short replicationFactor) {
try (AdminClient client = getAdmin(bootstrapServers, userName, password)) {
CreateTopicsResult result = client.createTopics(Collections.singletonList(
new NewTopic(topicName, partitionNum, replicationFactor)
));
result.all().get();
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
}
}
public static void createTopics(String bootstrapServers, String topicName, int partitionNum, short replicationFactor) {
try (AdminClient client = getAdmin(bootstrapServers)) {
CreateTopicsResult result = client.createTopics(Collections.singletonList(
new NewTopic(topicName, partitionNum, replicationFactor)
));
result.all().get();
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
}
}
public static void deleteTopics(AdminClient client, Collection<String> topics) {
DeleteTopicsResult result = client.deleteTopics(topics);
try {
result.all().get();
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
}
}
public static void deleteTopics(String bootstrapServers, String userName, String password, String topicName) {
try (AdminClient client = getAdmin(bootstrapServers, userName, password)) {
DeleteTopicsResult result = client.deleteTopics(Collections.singletonList(topicName));
result.all().get();
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
}
}
public static void deleteTopics(String bootstrapServers, String topicName) {
try (AdminClient client = getAdmin(bootstrapServers)) {
DeleteTopicsResult result = client.deleteTopics(Collections.singletonList(topicName));
result.all().get();
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
}
}
/**
* 添加topic分区数
*
* @param client kafka客户端
* @param topicName topic名称
* @param partitionNum 要添加到的分区数
*/
public static void addPartitions(AdminClient client, String topicName, int partitionNum) {
HashMap<String, NewPartitions> newPartitions = new HashMap<>();
newPartitions.put(topicName, NewPartitions.increaseTo(partitionNum));
CreatePartitionsResult rs = client.createPartitions(newPartitions);
try {
rs.all().get();
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
}
}
public static void addPartitions(String bootstrapServers, String userName, String password, String topicName, int partitionNum) {
try (AdminClient client = getAdmin(bootstrapServers, userName, password)) {
HashMap<String, NewPartitions> newPartitions = new HashMap<>();
newPartitions.put(topicName, NewPartitions.increaseTo(partitionNum));
CreatePartitionsResult rs = client.createPartitions(newPartitions);
rs.all().get();
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
}
}
public static void addPartitions(String bootstrapServers, String topicName, int partitionNum) {
try (AdminClient client = getAdmin(bootstrapServers)) {
HashMap<String, NewPartitions> newPartitions = new HashMap<>();
newPartitions.put(topicName, NewPartitions.increaseTo(partitionNum));
CreatePartitionsResult rs = client.createPartitions(newPartitions);
rs.all().get();
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
}
}
/**
* 普通生产者
*
* @param bootStrap Kafka集群地址
* @param config 额外的配置
* @return kafka生产者
*/
public static KafkaProducer<String, String> getProducer(String bootStrap, Map<String, String> config) {
if (config == null)
config = new HashMap<>();
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", bootStrap);
prop.setProperty("max.request.size", "2147483647");
prop.setProperty("buffer.memory", "2147483647");
prop.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.putAll(config);
return new KafkaProducer<>(prop);
}
/**
* 带kerberos认证的生产者
*
* @param bootStrap Kafka集群地址
* @param userName kerberos用户名
* @param password kerberos密码
* @param config 额外的配置
* @return kafka生产者
*/
public static KafkaProducer<String, String> getProducer(String bootStrap, String userName, String password, Map<String, String> config) {
if (config == null)
config = new HashMap<>();
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", bootStrap);
prop.setProperty("max.request.size", "2147483647");
prop.setProperty("buffer.memory", "2147483647");
prop.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" + userName + "\" password=\"" + password + "\";");
prop.setProperty("security.protocol", "SASL_PLAINTEXT");
prop.setProperty("sasl.mechanism", "SCRAM-SHA-256");
prop.putAll(config);
return new KafkaProducer<>(prop);
}
/**
* 普通消费者
*
* @param bootStrap Kafka集群地址
* @param group 消费者组
* @param config 额外的配置
* @return kafka消费者
*/
public static KafkaConsumer<String, String> getConsumer(String bootStrap, String group, Map<String, String> config) {
if (config == null)
config = new HashMap<>();
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", bootStrap);
prop.setProperty("group.id", group);
prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
prop.putAll(config);
return new KafkaConsumer<>(prop);
}
/**
* 带kerberos认证的消费者
*
* @param bootStrap kafka集群地址
* @param group 消费者组
* @param userName kerberos用户名
* @param password Kerberos密码
* @param config 额外的配置
* @return kafka消费者
*/
public static KafkaConsumer<String, String> getConsumer(String bootStrap, String group, String userName, String password, Map<String, String> config) {
if (config == null)
config = new HashMap<>();
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", bootStrap);
prop.setProperty("group.id", group);
prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
prop.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" + userName + "\" password=\"" + password + "\";");
prop.setProperty("security.protocol", "SASL_PLAINTEXT");
prop.setProperty("sasl.mechanism", "SCRAM-SHA-256");
prop.putAll(config);
return new KafkaConsumer<>(prop);
}
public static void send(KafkaProducer<String, String> producer, String topic, String value) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);
producer.send(record, (recordMetadata, e) -> {
if (e != null) {
e.printStackTrace();
}
});
}
public static void send(KafkaProducer<String, String> producer, String topic, int partition, String value) {
//当同时提供分区编号和key值时,以分区编号为准
String key = null;
ProducerRecord<String, String> record = new ProducerRecord<>(topic, partition, key, value);
producer.send(record, (recordMetadata, e) -> {
if (e != null) {
e.printStackTrace();
}
});
}
/**
* 单次消费
*
* @param consumer kafka消费者
* @param topic 要消费的主题
* @return 消息
*/
public static ConsumerRecords<String, String> receive(KafkaConsumer<String, String> consumer, String topic) {
//订阅主题 自动分配分区
consumer.subscribe(Collections.singletonList(topic));
return consumer.poll(Duration.ofSeconds(60));
}
/**
* 指定offset消费,需设置enable.auto.commit为false
*
* @param consumer kafka消费者
* @param topic 要消费的主题
* @param offset 指定的开始位置
* @return 消息
*/
public static ConsumerRecords<String, String> receive(KafkaConsumer<String, String> consumer, String topic,
long offset) {
List<TopicPartition> topicPartitions = getTopicPartitions(consumer, topic);
consumer.assign(topicPartitions);
for (TopicPartition topicPartition : topicPartitions) {
consumer.seek(topicPartition, offset);
}
return consumer.poll(Duration.ofSeconds(60));
}
/**
* 从头开始消费,需设置enable.auto.commit为false
*
* @param consumer kafka消费者
* @param topic 要消费的主题
* @return 消息
*/
public static ConsumerRecords<String, String> receiveBegin(KafkaConsumer<String, String> consumer, String topic) {
List<TopicPartition> topicPartitions = getTopicPartitions(consumer, topic);
consumer.assign(topicPartitions);
consumer.seekToBeginning(topicPartitions);
return consumer.poll(Duration.ofSeconds(60));
}
/**
* 指定时间消费,需设置enable.auto.commit为false
*
* @param consumer kafka消费者
* @param topic 要消费的主题
* @param startTime 指定的开始时间
* @param endTime 指定的结束时间 为0则消费到最新
*/
public static void timeReceive(KafkaConsumer<String, String> consumer, String topic, long startTime,
long endTime, Consumer<ConsumerRecord<String, String>> action) {
//订阅所有分区
consumer.assign(getTopicPartitions(consumer, topic));
// 获取每个partition偏移量
Map<TopicPartition, OffsetAndTimestamp> map = getTimeOffsets(consumer, topic, startTime);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : map.entrySet()) {
// 如果设置的查询偏移量的时间点大于最大的索引记录时间,那么value就为空
OffsetAndTimestamp offsetTimestamp = entry.getValue();
if (offsetTimestamp != null) {
long offset = offsetTimestamp.offset();
// 设置读取消息的偏移量
consumer.seek(entry.getKey(), offset);
}
}
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(60));
for (ConsumerRecord<String, String> record : records) {
if (endTime == 0 || record.timestamp() <= endTime) {
action.accept(record);
}
}
}
/**
* 循环消费
*
* @param consumer kafka消费者
* @param topic 要消费的主题
* @param action 处理逻辑
*/
@SuppressWarnings("InfiniteLoopStatement")
public static void loopReceive(KafkaConsumer<String, String> consumer, String topic, Consumer<ConsumerRecord<String, String>> action) {
//订阅主题 自动分配分区
consumer.subscribe(Collections.singletonList(topic));
// noinspection InfiniteLoopStatement
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(60));
records.forEach(action);
}
}
public static List<TopicPartition> getTopicPartitions(KafkaConsumer<String, String> consumer, String topic) {
// 获取topic的partition信息
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
List<TopicPartition> topicPartitions = new ArrayList<>();
for (PartitionInfo partitionInfo : partitionInfos) {
topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
}
return topicPartitions;
}
/**
* 获取指定时间的offset
*
* @param consumer kafka消费者
* @param topic 要消费的主题
* @param startTime 指定的时间
* @return Map<TopicPartition, OffsetAndTimestamp>
*/
public static Map<TopicPartition, OffsetAndTimestamp> getTimeOffsets(KafkaConsumer<String, String> consumer, String topic, long startTime) {
// 获取topic的partition信息
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
for (PartitionInfo partitionInfo : partitionInfos) {
timestampsToSearch.put(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()), startTime);
}
// 获取每个partition偏移量
return consumer.offsetsForTimes(timestampsToSearch);
}
public static long getOffsetSum(String bootstrapServers, String userName, String password, String topicName) {
long offsetSum = 0;
try (KafkaConsumer<String, String> consumer = getConsumer(bootstrapServers, "get_offset", userName, password, null)) {
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(getTopicPartitions(consumer, topicName));
for (Map.Entry<TopicPartition, Long> entry : endOffsets.entrySet()) {
offsetSum += entry.getValue();
}
}
return offsetSum;
}
public static long getOffsetSum(String bootstrapServers, String topicName) {
long offsetSum = 0;
try (KafkaConsumer<String, String> consumer = getConsumer(bootstrapServers, "get_offset", null)) {
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(getTopicPartitions(consumer, topicName));
for (Map.Entry<TopicPartition, Long> entry : endOffsets.entrySet()) {
offsetSum += entry.getValue();
}
}
return offsetSum;
}
public static long getPartitionsNum(String bootstrapServers, String userName, String password, String topicName) {
try (KafkaConsumer<String, String> consumer = getConsumer(bootstrapServers, "get_partitions", userName, password, null)) {
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topicName);
return partitionInfos.size();
}
}
public static long getTopicCreateTime(String zookeeper, String topicName) throws IOException, InterruptedException, KeeperException {
try (ZooKeeper zk = new ZooKeeper(zookeeper, 300000, null)) {
String path = "/brokers/topics/" + topicName;
Stat stat = zk.exists(path, null);
if (stat != null) {
return stat.getCtime();
} else {
return 0;
}
}
}
}
Comments NOTHING