kafka主题管理

主题增删改查

创建

bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name  --partitions 1 --replication-factor 1

从 Kafka 2.2 版本开始,社区推荐用 –bootstrap-server 参数替换 –zookeeper 参数

查询

# 查询所有主题的列表
bin/kafka-topics.sh --bootstrap-server broker_host:port --list
# 查询单个主题的详细数据
bin/kafka-topics.sh --bootstrap-server broker_host:port --describe --topic 

修改

# 增加分区
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic  --partitions <新分区数>

# 修改主题级别参数,常规的主题级别参数,使用 --zookeeper;动态参数使用 --bootstrap-server 
bin/kafka-configs.sh --zookeeper zookeeper_host:port --entity-type topics --entity-name  --alter --add-config max.message.bytes=10485760

# 变更副本数
# reassign.json
{"version":1, "partitions":[
 {"topic":"__consumer_offsets","partition":0,"replicas":[0,1,2]}, 
  {"topic":"__consumer_offsets","partition":1,"replicas":[0,2,1]},
  {"topic":"__consumer_offsets","partition":2,"replicas":[1,0,2]},
  {"topic":"__consumer_offsets","partition":3,"replicas":[1,2,0]},
  ...
  {"topic":"__consumer_offsets","partition":49,"replicas":[0,1,2]}
]}` 
bin/kafka-reassign-partitions.sh --zookeeper zookeeper_host:port --reassignment-json-file reassign.json --execute

 # 修改test主题限速
bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600' --entity-type brokers --entity-name 0
bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*' --entity-type topics --entity-name test

# 主题分区迁移
kafka-reassign-partitions

删除

bin/kafka-topics.sh --bootstrap-server broker_host:port --delete  --topic 

特殊主体管理

查看消费者组提交的位移数

bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

查看消费者组的状态信息

bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" --from-beginning

常见主题错误处理

主题删除失败

  • 副本所在的 Broker 宕机了;–重启即可
  • 待删除主题的部分分区依然在执行迁移过程

解决

第 1 步,手动删除 ZooKeeper 节点 /admin/delete_topics 下以待删除主题为名的 znode。

第 2 步,手动删除该主题在磁盘上的分区目录。

第 3 步,在 ZooKeeper 中执行 rmr /controller,触发 Controller 重选举,刷新 Controller 缓存。–非必须,可能造成大面积的分区 Leader 重选举

__consumer_offsets 占用太多的磁盘

jstack 命令查看一下 kafka-log-cleaner-thread 前缀的线程状态。通常情况下,这都是因为该线程挂掉了,无法及时清理此内部主题。


   转载规则


《kafka主题管理》 wangyixin-tom 采用 知识共享署名 4.0 国际许可协议 进行许可。
 上一篇
kafka脚本 kafka脚本
生产消息# 使用控制台来向 Kafka 的指定主题发送消息 $ bin/kafka-console-producer.sh --broker-list kafka-host:port --topic test-topic --request
2021-03-29
下一篇 
kafka高水位和Leader Epoch kafka高水位和Leader Epoch
高水位在分区高水位以下的消息被认为是已提交消息。kafka中,分区的高水位就是其 Leader 副本的高水位。 作用 定义消息可见性,即用来标识分区下的哪些消息是可以被消费者消费的。 帮助 Kafka 完成副本同步。 LEO(Log E
2021-03-28
  目录