kafka工具类_kafka支持的数据类型

(68) 2024-08-09 18:01:01

背景

在做数据开发时,经常预见要查看kafka 元数据的一些信息,比如有多少个topic、某个topic中有多少分区、创建topic、删除topic等等。

代码

废话不多,直接撸。
我们的maven版本为:

 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.13</artifactId> <version>2.7.0</version> </dependency> 

首先我们可以定义一个接口

import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import java.util.List; public interface KafkaClientImp { 
    /** * 创建kafka * * @param topicName topic名称 * @param partition 分区数 建议小于12个 * @param replication 副本数量 * @return 是否创建成功 */ boolean createTopic(String topicName, Integer partition, short replication); /** * topic 是否存在 * * @param topicName topic名称 * @return true 存在 false不存在 */ boolean isTopic(String topicName); /** * 获取topic列表 * * @return */ List<String> getTopicList(); /** * 获取topic 的分区数 * * @param consumer kafka消费者 * @return */ List<PartitionInfo> getPartitionNum(KafkaConsumer<String, String> consumer, Object topicName); /** * 删除topic * * @param topicName * @return */ boolean deleteTopic(String topicName); } 

然后我们写一个KafkaUtils类来实现接口。

import com.xxx.kafka.metadata.KafkaClientImp; import org.apache.kafka.clients.admin.*; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import java.util.*; import java.util.concurrent.TimeUnit; /** * kafka工具类 */ public class KafkaUtils implements KafkaClientImp { 
    private AdminClient adminClient; public KafkaUtils(String filePath) { 
    HashMap<String, Object> map = new HashMap<>(); MyProperties myProperties = new MyProperties(); String bootstrapServers = myProperties.GetValueByKey(filePath, "bootstrapServers"); map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); adminClient = KafkaAdminClient.create(map); } /** * 创建topic * * @param topicName topic名称 * @param partition 分区数 建议小于12个 * @param replication 副本数量 * @return */ @Override public boolean createTopic(String topicName, Integer partition, short replication) { 
    boolean flag = false; try { 
    NewTopic newTopic = new NewTopic(topicName, /*topic名称,不建议过长*/ partition,/*分区数量*/ replication/*副本数量*/ ); adminClient.createTopics(Collections.singleton(newTopic), new CreateTopicsOptions().timeoutMs(10000)) .all() .get(); flag = true; } catch (Exception e) { 
    if (e.getMessage().startsWith("org.apache.kafka.common.errors.TopicExistsException")) { 
    System.out.println("topic is exist !! " + e.getMessage()); } else { 
    e.printStackTrace(); } } return flag; } /** * 判断topic是否存在 * * @param topicName topic名称 * @return */ @Override public boolean isTopic(String topicName) { 
    return getTopicList().contains(topicName); } /** * 获取topic list * * @return */ @Override public List<String> getTopicList() { 
    Collection<TopicListing> topicListings = null; ArrayList<String> list = new ArrayList<>(); try { 
    //获取topiclist,1分钟超时后报异常 topicListings = adminClient.listTopics().listings().get(1, TimeUnit.MINUTES); topicListings.forEach(topicListing -> { 
    list.add(topicListing.name()); }); } catch (Exception e) { 
    e.printStackTrace(); } return list; } /** * 获取partition信息 * * @param consumer kafka消费者 * @param topicName * @return */ @Override public List<PartitionInfo> getPartitionNum(KafkaConsumer consumer, Object topicName) { 
    return (List<PartitionInfo>) consumer.listTopics().get(topicName); } /** * 删除topic * * @param topicName * @return */ @Override public boolean deleteTopic(String topicName) { 
    boolean flag = false; try { 
    adminClient.deleteTopics(Collections.singleton(topicName)) .all() .get(); flag = true; } catch (Exception e) { 
    e.printStackTrace(); } return flag; } public static void main(String[] args) { 
    KafkaUtils utils = new KafkaUtils(MyPropertiesFilePaths.KAFKA_DEV); // List<String> topicLists = utils.getTopicList(); // topicLists.forEach(topiclist->{ 
    // System.out.println(topiclist); // }); // System.out.println(utils.isTopic("test")); } 

我们这里边有使用到自定义Properties,有需要可以参考。

THE END

发表回复