kafka常用命令

kafka常用命令 https://kafka.apache.org/23/configuration/topic-level-configs/https://github.com/xxd763795151/kafka-console-ui查看broker idkafka-broker-api-versions.sh --bootstrap-server172.18.6.5:9092|grepid查看主题kafka-topics.sh --bootstrap-server172.18.6.5:9092--listkafka-topics.sh --bootstrap-server172.18.6.5:9092--describe--topicldb_prd_order-oc_order_main创建主题kafka-topics.sh --bootstrap-server10.1.1.2:9092--create--topicmytopic--partitions1kafka-topics.sh --bootstrap-server10.0.0.31:9092--create--topictest1--configretention.ms3600000--replication-factor2--partitions3查看topic大小jq下载https://github.com/jqlang/jq/releases/download/jq-1.8.1/jq-linux-amd64kafka-log-dirs.sh --bootstrap-server172.18.6.5:9092--describe--broker-list brokerid号 --topic-list topic名称kafka.json# 删除kafka.json文件中无用内容然后用jq命令查看jq keys kafka.json jq .brokers[0].logDirs[].partitions[].size kafka.json jq .brokers[0].logDirs[].partitions[].size kafka.json|awk{sum$1}END{print sum/1024/1024/1024}查看消费者列表–list./kafka-consumer-groups.sh --bootstrap-server10.1.1.2:9092--list查看消费者组详情–describe 查看指定消费组详情–groupkafka-consumer-groups.sh --command-config /data/soft/kafka/config/kafka_client_jaas.conf --bootstrap-server10.1.1.2:9092--describe--groupgroupname kafka-consumer-groups.sh --command-config /data/soft/kafka/config/kafka_client_jaas.conf --bootstrap-server10.1.1.2:9092--state--describe--groupgroupname查看所有消费组详情–all-groups./kafka-consumer-groups.sh --bootstrap-server10.1.1.2:9092--describe--all-groups查询消费者成员信息–members 所有消费组成员信息./kafka-consumer-groups.sh--describe--all-groups--members--bootstrap-server10.1.1.2:9092指定消费组成员信息./kafka-consumer-groups.sh--describe--members--groupdefaultConsumerGroup --bootstrap-server10.1.1.2:9092重置位移位置重置消费组位移会导致消费者重新分配分区https://blog.csdn.net/u010003835/article/details/83314766kafka-consumer-groups.sh --bootstrap-server10.1.1.2:9092--groupgroupName--topicmytopic --reset-offsets --to-latest--executekafka-consumer-groups.sh --bootstrap-server10.1.1.2:9092--groupgroupName--topicmytopic --reset-offsets --to-earliest--executekafka-consumer-groups.sh --bootstrap-server10.1.1.2:9092--groupgroupName--topicmytopic --reset-offsets --to-offset100--executekafka-consumer-groups.sh --bootstrap-server10.1.1.2:9092--groupgroupName--topicmytopic --reset-offsets --to-datetime2024-07-24T06:50:00.000--execute重设位移有几种选项:--to-earliest 设置到最早位移处也就是0 --to-latest 设置到最新处也就是主题分区HW的位置 --to-offset NUM 指定具体的位移位置 --shift-by NUM 基于当前位移向前回退多少 --by-duration 回退到多长时间消费组参数说明:CURRENT-OFFSET 当前消费者位移 LOG-END-OFFSET 分区最新位移 LAG LOG-END-OFFSET减去CURRENT-OFFSET的值表示积压量 CONSUMER-ID 是Kafka自己生成的 CLIENT-ID 是消费者代码里写的CLIENT ID用于区分同消费者组中的不同客户端删除指定topic与数据./kafka-topics.sh --bootstrap-server10.1.1.2:9092--delete--topicinventory_topic立即删除某个topic下的数据./kafka-topics.sh--zookeeperlocalhost:2181--alter--topicmytopic--configcleanup.policydelete动态配置全局设置修改 server.properties如下的值 log.retention.hours72 log.cleanup.policydelete单独对某一个topic设置过期时间,但如果只有某一个topic数据量过大,想单独对这个topic的过期时间设置短点./kafka-configs.sh--zookeeperlocalhost:2181--alter--entity-name mytopic --entity-type topics --add-configretention.ms86400000# retention.ms86400000 为一天单位是毫秒查看设置./kafka-configs.sh--zookeeperlocalhost:2181--describe--entity-name mytopic --entity-type topics Configsfortopics:wordcounttopic areretention.ms86400000