Kafka — Java消费者管理TCP连接

  1. 消费者会为每个要消费的分区创建与该分区 领导者副本
    所在Broker的Socket连接
  2. 假设消费者要消费5个分区的数据,这5个分区各自的领导者副本分布在4台Broker上

    • 那么消费者在消费时会创建与这4台Broker的Socket连接

TCP连接数

日志详解

[2019-05-27 10:00:54,142] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9092
( id: -1
rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)

消费者程序创建的 第一个TCP连接
,该Socket用于发送 FindCoordinator
请求

此时消费者对要连接的Kafka集群 一无所知
,因此它连接的Broker节点的ID为 -1
,表示不知道要连接的Broker的任何信息

[2019-05-27 10:00:54,188] DEBUG [Consumer clientId=consumer-1, groupId=test] Sending metadata request
MetadataRequestData(topics=[MetadataRequestTopic(name=’t4’)], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node localhost:9092 (id: -1 rack: null) (org.apache.kafka.clients.NetworkClient:1097)

消费者 复用
刚刚创建的Socket连接,向Kafka集群发送 元数据请求
以获取 整个集群的信息

[2019-05-27 10:00:54,188] TRACE [Consumer clientId=consumer-1, groupId=test] Sending FIND_COORDINATOR
{key=test,key_type=0} with correlation id 0 to node -1
(org.apache.kafka.clients.NetworkClient:496)

消费者程序开始发送 FindCoordinator
请求给第一步中连接的Broker,即 localhost:9092
(nodeId为 -1

[2019-05-27 10:00:54,203] TRACE [Consumer clientId=consumer-1, groupId=test] Completed receive from node -1 for FIND_COORDINATOR
with correlation id 0, received {throttle_time_ms=0,error_code=0,error_message=null, node_id=2,host=localhost,port=9094
} (org.apache.kafka.clients.NetworkClient:837)

十几毫秒后,消费者程序成功地获悉 Coordinator所在的Broker
,即 node_id=2,host=localhost,port=9094

[2019-05-27 10:00:54,204] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9094
( id: 2147483645
rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)

消费者此时已经知道 协调者Broker的连接信息
了,发起第二个Socket连接,创建连向 localhost:9094
的TCP连接

只有连接了Coordinator,消费者才能正常地开启 消费组的各种功能
以及 后续的消息消费

此时的id是由 Integer.MAX_VALUE
减去 Coordinator所在的Broker的Id
计算出来的,即 2147483647 - 2 = 2147483645

这种节点ID的标记方式是Kafka社区 特意为之
,目的是要让 组协调请求
真正的数据获取请求
使用
不同的Socket连接

[2019-05-27 10:00:54,237] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9094
( id: 2
rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)

[2019-05-27 10:00:54,237] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9092
( id: 0
rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)

[2019-05-27 10:00:54,238] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9093
( id: 1
rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)

消费者又分别创建了 新的TCP连接
,主要用于 实际的消息获取

3类TCP连接

  1. 确定协调者
    获取集群元数据
  2. 连接协调者
    ,令其执行组成员管理操作
  3. 执行 实际的消息获取