4. Kafka 命令行工具

4. Kafka 命令行工具

4.1 topic管理操作:kafka-topics

4.1.1 查看topic列表

1
bin/kafka-topics.sh --list --zookeeper doit01:2181

4.1.2 查看topic状态信息

1
bin/kafka-topics.sh --zookeeper doit01:2181 --describe --topic test

Kafka图1

从上面的结果中,可以看出,topic的分区数量,以及每个分区的副本数量,以及每个副本所在的broker节点,以及每个分区的leader副本所在borker节点,以及每个分区的ISR副本列表;
ISR:in synchronized replicas 同步副本(当然也包含leader自身replica.lag.time.max.ms=10000默认值) 
OSR:out of synchronized replicas 失去同步的副本(该副本上次请求leader同步数据距现在的时间间隔超出配置阈值)

4.1.3 创建topic

(1)基本方式

1
./kafka-topics.sh --zookeeper doit01:2181 --create --replication-factor 3 --partitions 3 --topic test

参数解释:

1
2
3
--replication-factor 副本数量
--partitions 分区数量
--topic topic名称

(2)手动指定分配方案:分区数,副本数,存储位置

1
./kafka-topics.sh --zookeeper doit01:2181 --create --topic test --replica-assignment 0:1:3,1:2:6

该topic,将有如下partition:
partition0:所在节点:broker0、borker1、borker3   
partition1:所在节点:borker1、borker2、borker6

4.1.4 删除topic

1
./kafka-topics.sh --zookeeper doit01:2181 --delete --topic test

删除topic,server.properties中需要一个参数处于启用状态:delete.topic.enable=true
使用kafka-topics.sh脚本删除主题的行为本质上只是在ZooKeeper中的/admin/delete_topics路径下建一个与待删除主题同名的节点,以标记该主题为待删除的状态,然后由kafka控制器异步完成。

4.1.5 增加分区数

1
./kafka-topics.sh --zookeeper doit01:2181 --alter --topic test --partitions 3

kafka只支持增加分区,不支持减少分区 
原因是:减少分区,代价太大(数据的转移,日志段拼接合并) 
如果真的需要实现此功能,则完全可以重新创建一个分区数较小的主题,然后将现有主题中的消息按照既定的逻辑复制过去。

4.1.6 动态配置topic参数

通过管理命令,可以为已创建的topic增加、修改、删除topic level参数
添加/修改 指定topic的配置参数:

1
./kafka-topics.sh --zookeeper doit01:2181 --alter --topic test --connfig compression.type=gzip

–config compression.type=gzip 修改或添加参数配置
–add-config compression.type=gzip 添加参数配置   
–delete-config compression.type 删除参数配置

4.2 生产者:kafka-console-producer

1
./kafka-console-producer.sh --broker-list doit01:9092 --topic test

4.3 消费者:kafka-console-consumer

消费者在消费的时候,需要指定要订阅的主题,还可以指定消费的起始偏移量
消费的起始偏移量有3种策略: 
earliest:从最早的消息开始消费 
lastes:从最新的消息开始消费 
指定offset(分区号,偏移量):从指定的位置开始消费
kafka的topic中的消息,是有序号的(序号叫消费偏移量),而且消息的偏移量是在各个partition中独立维护的,在各个分区内,都是从0开始递增编号!

(1)消费消息

1
./kafka-console-consumer.sh --bootstrap-server doit01:9092 --topic test --from-beginning

(2)指定要消费的分区,和要消费的起始offset

1
./kafka-console-consumer.sh --bootstrap-server doit01:9092 --topic test --offset 2 --partition 0

(3)消费组 
消费组是kafka为了提高消费并行度的一种机制。 
消费组内的各个消费者之间,分担数据读取任务的最小单位是:partition
在kafka的底层逻辑中,任何一个消费者都有自己所属的组   
组和组之间,没有任何关系,大家都可以消费到目标topic的所有数据,但是组内的各个消费者,就只能读取到自己所分配到的partitions   
kafka中的消费组,可以动态增减消费者,而且消费组中的消费者数量发生任意变动,都会重新分配分区消费任务。

4.4 消费位移的记录

kafka的消费者,可以记录自己所消费到的消息偏移量,记录的这个偏移量就叫消费位移,记录这个消费到的位置,作用就在于消费者重启后可以接续上一次消费到位置来继续往后面消费。

消费位移,是组内共享的!!!

consumer去记录偏移量的时候,不是读到一条或一批数据就记录一次,而是周期性的去提交当前的位移

1
./kafka-console-consumer.sh --bootstrap-server doit01:9092 --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"

通过指定formatter工具类,来对__consumer_offsets主题中的数据进行解析;
如果需要获取某个特定consumer-group的消费偏移量信息,则需要计算该消费组的偏移量记录所在分区:Math.abs(groupID.hashCode())%numPartitions
__consumer_offsets的分区数为:50

4.5 配置管理kafka-config

kafka-config.sh脚本是专门用来对参数配置进行操作的,这里的操作是运行状态修改原有的配置,因此可以达到动态变更的目的; 
动态配置的参数,会被存储在zookeeper上,因而是持久生效的 
kafka-configs.sh脚本包含:变更alter、查看describe这两种指令类型;
kafka-configs.sh支持主题、broker、用户和客户端这4个类型的配置。 
kafka-configs.sh脚本使用entity-type参数来指定操作配置的类型,并且使entity-name参数来指定配置的名称。

比如查看topic的配置可以按如下方式执行:

1
./kafka-configs.sh --zookeeper doit:2181 --describe --entity-type topics --entity-name tpc_2

比如查看broker的动态配置可以按如下方式执行:

1
./kafka-configs.sh --zookeeper doit:2181 --describe --entity-type brokers --entity-name tpc_2

entity-type和entity-name的对应关系

entity-type的释义entity-name的释义
主题类型的配置,取值为topics指定主题的名称
broker类型的配置,取值为brokers指定brokerId值,即broker中broker.id参数配置的值
客户端类型的配置,取值为clients指定clientId值,即KafkaProducer或KafkaConsumer的client.id参数配置的值
用户类型的配置,取值为users指定用户名

示例:添加topic级别参数

1
./kafka-configs.sh --zookeeper doit:2181 --alter --entity-type topics --entity-name tpc22 --add-config cleanup.policy=compact,max.message.bytes=10000

使用kafka-configs.sh脚本来变更(alter)配置时,会在ZooKeeper中创建一个命名形式为:/config//的节点,并将变更的配置写入这个节点

4.5.1 动态配置topic参数

通过管理命令,可以为已创建的topic增加、修改、删除topic level参数

  • 添加/修改 指定topic的配置参数:
1
./kafka-topics.sh --zookeeper doit01:2181 --alter --topic test --connfig compression.type=gzip

如果利用kafka-configs.sh脚本来对topic、producer、consumer、borker等进行参数动态配置

  • 添加、修改配置参数
1
./kafka-configs.sh --zookeeper doit01:2181 --entity-type topics --entity-name tpc_1 --alter --add-config compression.type=gzip


-删除s配置参数

1
./kafka-configs.sh --zookeeper doit01:2181 --entity-type topics --entity-name tpc_1 --alter --delete-config compression.type