在做数据开发时,经常预见要查看kafka 元数据的一些信息,比如有多少个topic、某个topic中有多少分区、创建topic、删除topic等等。
废话不多,直接撸。
我们的maven版本为:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.13</artifactId> <version>2.7.0</version> </dependency>
首先我们可以定义一个接口
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import java.util.List; public interface KafkaClientImp {
/** * 创建kafka * * @param topicName topic名称 * @param partition 分区数 建议小于12个 * @param replication 副本数量 * @return 是否创建成功 */ boolean createTopic(String topicName, Integer partition, short replication); /** * topic 是否存在 * * @param topicName topic名称 * @return true 存在 false不存在 */ boolean isTopic(String topicName); /** * 获取topic列表 * * @return */ List<String> getTopicList(); /** * 获取topic 的分区数 * * @param consumer kafka消费者 * @return */ List<PartitionInfo> getPartitionNum(KafkaConsumer<String, String> consumer, Object topicName); /** * 删除topic * * @param topicName * @return */ boolean deleteTopic(String topicName); }
然后我们写一个KafkaUtils类来实现接口。
import com.xxx.kafka.metadata.KafkaClientImp; import org.apache.kafka.clients.admin.*; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import java.util.*; import java.util.concurrent.TimeUnit; /** * kafka工具类 */ public class KafkaUtils implements KafkaClientImp {
private AdminClient adminClient; public KafkaUtils(String filePath) {
HashMap<String, Object> map = new HashMap<>(); MyProperties myProperties = new MyProperties(); String bootstrapServers = myProperties.GetValueByKey(filePath, "bootstrapServers"); map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); adminClient = KafkaAdminClient.create(map); } /** * 创建topic * * @param topicName topic名称 * @param partition 分区数 建议小于12个 * @param replication 副本数量 * @return */ @Override public boolean createTopic(String topicName, Integer partition, short replication) {
boolean flag = false; try {
NewTopic newTopic = new NewTopic(topicName, /*topic名称,不建议过长*/ partition,/*分区数量*/ replication/*副本数量*/ ); adminClient.createTopics(Collections.singleton(newTopic), new CreateTopicsOptions().timeoutMs(10000)) .all() .get(); flag = true; } catch (Exception e) {
if (e.getMessage().startsWith("org.apache.kafka.common.errors.TopicExistsException")) {
System.out.println("topic is exist !! " + e.getMessage()); } else {
e.printStackTrace(); } } return flag; } /** * 判断topic是否存在 * * @param topicName topic名称 * @return */ @Override public boolean isTopic(String topicName) {
return getTopicList().contains(topicName); } /** * 获取topic list * * @return */ @Override public List<String> getTopicList() {
Collection<TopicListing> topicListings = null; ArrayList<String> list = new ArrayList<>(); try {
//获取topiclist,1分钟超时后报异常 topicListings = adminClient.listTopics().listings().get(1, TimeUnit.MINUTES); topicListings.forEach(topicListing -> {
list.add(topicListing.name()); }); } catch (Exception e) {
e.printStackTrace(); } return list; } /** * 获取partition信息 * * @param consumer kafka消费者 * @param topicName * @return */ @Override public List<PartitionInfo> getPartitionNum(KafkaConsumer consumer, Object topicName) {
return (List<PartitionInfo>) consumer.listTopics().get(topicName); } /** * 删除topic * * @param topicName * @return */ @Override public boolean deleteTopic(String topicName) {
boolean flag = false; try {
adminClient.deleteTopics(Collections.singleton(topicName)) .all() .get(); flag = true; } catch (Exception e) {
e.printStackTrace(); } return flag; } public static void main(String[] args) {
KafkaUtils utils = new KafkaUtils(MyPropertiesFilePaths.KAFKA_DEV); // List<String> topicLists = utils.getTopicList(); // topicLists.forEach(topiclist->{
// System.out.println(topiclist); // }); // System.out.println(utils.isTopic("test")); }
我们这里边有使用到自定义Properties,有需要可以参考。