Kafka从上手到实践-Kafka CLI:Reseting Offset & Config CLI
2010 年 8 月 20 日
在实际的业务场景中,经常需要重复消费Topic中的Message,所以来看看如何重置Offset。
首先重置Offset可以通过如下的命令:
首先重置Offset可以通过如下的命令:
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group consumer_group_1 --reset-offsets [options] --execute --topic xxxx
Kafka为我们提供了6种重置Offset的方式,也就是命令中的 options
:
-
--to-earliest
:重置到最早的Offset。 -
--to-latest
:重置到最后的Offset。 -
--to-offset
:重置到指定的Offset。 -
--to-current
:重置到当前的Offset。 -
--to-datetime
:重置到指定时间的Offset,时间格式为YYYY-MM-DDTHH:mm:SS.sss
。 -
--shift-by
:左移或右移Offset。
举个例子来看看:
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group consumer_group_1 --topic first_topic --reset-offsets --shift-by -2 --execute TOPIC PARTITION NEW-OFFSET first_topic 2 15 first_topic 1 17 first_topic 0 15
上面的命令将 consumer_group_1
消费 first_topic
的三个Partitions的Offset向左移了2位。如此之后,相当于 consumer_group_1
还有6条Message没有消费。我们启动Consumer看一下:
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --group consumer_group_1 --topic first_topic C F E this is another message. A D
可以看到启动Consumer后,消费了6条Message。其他的Reset Options用法是一样的。这使得我们可以非常灵活的控制Consumer消费Message。
Config CLI
我们再来看看如何通过命令进行Kafka的配置。用到的命令是 kafka-config.sh
,该命令可以对Topic、Broker、Client进行配置。关键的属性有以下三个:
-
--entity-type
:这个属性设置要对什么进行配置,可选值为topics
、brokers
、clients
、users
。 -
--entity-name
:这个属性设置对应Type的名称,比如Topic名称、Broker Id、Client Id、User name。 -
--alter
:确认修改。
首先我们创建一个Topic:
kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic configured-topic --partitions 3 --replication-factor 1
看看新创建的Topic的信息:
kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic configured-topic --describe Topic:configured-topic PartitionCount:3 ReplicationFactor:1 Configs: Topic: configured-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: configured-topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Topic: configured-topic Partition: 2 Leader: 0 Replicas: 0 Isr: 0
可以看到打印信息中的Configs是空的,说明这个Topic没有做额外的配置。或者也可以使用如下命令查看Topic的配置:
kafka-configs.sh --zookeeper 127.0.0.1:2181 --entity-type topics --entity-name configured-topic --describe Configs for topic 'configured-topic' are
看到打印信息只有 Configs for topic 'configured-topic' are
,同样说明该Topic还没有额外配置信息。
接下来该这个Topic设置 min.insync.replicas
属性:
kafka-configs.sh --zookeeper 127.0.0.1:2181 --entity-type topics --entity-name configured-topic --add-config min.insync.replicas=2 --alter Completed Updating config for entity: topic 'configured-topic'.
再来查看一下:
kafka-configs.sh --zookeeper 127.0.0.1:2181 --entity-type topics --entity-name configured-topic --describe Configs for topic 'configured-topic' are min.insync.replicas=2 kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic configured-topic --describe Topic:configured-topic PartitionCount:3 ReplicationFactor:1 Configs:min.insync.replicas=2 Topic: configured-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: configured-topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Topic: configured-topic Partition: 2 Leader: 0 Replicas: 0 Isr: 0
两种方式都可以看到刚才更新的配置信息。
将 --add-config
换成 --delete-config
就可以删除配置项:
kafka-configs.sh --zookeeper 127.0.0.1:2181 --entity-type topics --entity-name configured-topic --add-config min.insync.replicas=2 --alter Completed Updating config for entity: topic 'configured-topic'.