Kafka生产者分析——KafkaProducer
文章参考自《Apache Kafka 源码剖析》
github 地址:https://github.com/BingLau7/kafka,里面有代码的相关注释
具体部署方式不进行指导了,网上资料比较多
KafkaProducer Demo
1 | public class ProducerDemo { |
KafkaProducer 分析
整体流程
上图是一个 KafkaProducer 在发送消息的整个流程,我们就上面来进行一个宏观的了解:
- ProducerInterceptors 对消息进行拦截
- Serializer 对消息的 key 和 value 进行序列化
- Partitioner 为消息选择合适的 Partition
- RecordAccumulator 收集消息,实现批量发送
- Sender 从 RecordAccumulator 获取消息
- 构造 ClientRequest
- 将 ClientRequest 交给 NetworkClient,准备发送
- NetworkClient 将请求放入 KafkaChannel 的缓存
- 执行网络 I/O,发送请求
- 收到响应,调用 ClientRequest 的回调函数
- 调用 RecordBatch 的回调函数,最终调用每个消息上注册的回调函数
消息发送的过程中,涉及两个线程协同工作。主线程首先将业务数据封装或 ProducerRecord 对象,之后调用 send()
方法将消息放入 RecordAccummulator (消息收集器,也可以理解为主线程与 Sender 线程之间的缓冲区)中暂存。Sender 线程负责将消息信息构成请求,并最终执行网络 I/O 的线程,它从 RecordAccumulator 中取出消息并批量发送出去。需要注意的是,KafkaProducer 是线程安全的,多个线程间可以共享使用同一个 KafkaProucer 对象。
KafkaProducer 实现了 Producer 接口,在 Producer 接口中定义 KafkaProducer 对外提供的 API,分为四类方法:
send()
方法:发送消息,实际是将消息放入 RecordAccumulator 暂存,等待发送flush()
方法:刷新操作,等待 RecordAccumulator 中所有消息发送完成,在刷新之前就会阻塞调用的线程partitionsFor()
方法:在 KafkaProducer 中维护了一个 Metadata 对象用于存储 Kafka 集群的元素局,Metadata 中的元素局会定时更新。partitionsFor()
方法负责从 Metadata 中获取指定 Topic 中的分区信息。close()
方法:关闭此 Producer 对象,主要操作是设置 close 壁纸,等待RecordAccumulator
中的消息清空,关闭 Sender 线程。
KafkaProducer 重要字段
1 | /* clientId 的生成器,如果没有明确指定 client 的 id,则使用字段生成一个 ID */ |
在 KafkaProducer 的构造函数中,会初始化上面介绍的字段,其中有几个需要注意:
1 | private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) { |
KafkaProducer 构造完成之后,我们来关注 KafkaProducer 的 send()
方法
图中关键步骤:
- 调用
ProducerInterceptors.onSend()
方法,通过ProducerInterceptor
对 消息进行拦截或修改 - 调用
watiOnMetadata()
方法获取 Kafka 集群的信息,底层会唤醒 Send 线程更新 Metadata 中保存的 Kafka 集群元数据 - 调用
Serializer.serialize()
方法序列号消息的 key 和 value - 调用
partition()
为消息选择合适的分区 - 调用
RecordAccumulator.append()
方法,将消息追加到RecordAccumulator
中 - 唤醒
Sender
线程将RecordAccumulator
中缓存的消息发送出去
ProducerInterceptors&ProducerInterceptor
ProducerInterceptors
是一个 ProducerInterceptor
的集合,其 onSend
方法、onAcknowledgement
方法、onSendError
方法,实际上是循环调用其封装的 ProducerInterceptor
集合的对应方法。
ProducerIntercepto
对象可以在消息发送之前对其进行拦截或修改,也可以先于用户的 Callback,对 ACK 响应进行预处理。如果要使用自定义 ProducerInterceptor
类,只要实现 ProducerInterceptor
接口,创建其对象并添加到 ProducerInterceptors
中即可。
Kafka 集群元数据
Kafka 中每个 Topic 中有个多个分区,这些分区的 Leader 副本可以分配在集群中不同的 Broker 上。在运行过程中,Leader 副本随时都可能出现故障而导致 Leader 副本重新选举,新的 Leader 副本会在其他 Broker 上继续提供对外服务,所以由于种种原因分区的数量以及 Leader 副本的分布是动态变化的。当需要提高某 Topic 的并发处理消息能力时,我们可以通过增加其分区的数量来实现。
在 KafkaProducer 中,使用 Node、TopicPartition、PartitionInfo 这三个类封装了 Kafka 集群的相关元数据,其主要字段:
- Node 表示集群中的一个节点,Node 记录这个节点的 host、ip、port等新兴
- TopicPartition 表示某个 Topic 的一个分区,其中的 topic 字段是 Topic 的名称,partition 则是该分区编号(ID)
- PartitionInfo 表示一个分区的详细信息
通过这三个类的组合,我们可以完整表示出 KafkaProducer 需要的集群元数据。这些元数据保存在 Cluster
这个类中,并按照不同的映射方式进行存放,方便查询。
- nodes: Kafka 集群中节点信息列表
- nodesById:BrokerId 与 Node 节点之间对应关系,方便按照 BrokerId 进行索引
- partitionsByTopicPartition:记录了 TopicPartition 与 PartitionInfo 之间的映射关系
- partitionsByTopic:记录了 Topic 名称和 PartitionInfo 的映射关系,可以按照 Topic 名称查询其中全部分区的详细信息。
- avaliablePartitionByTopic:Topic 与 PartitionInfo 的映射关系,这里的
List<PartitionInfo>
中存放的分区必须是有 Leader 副本的 Partition,而 partitionByTopic 中记录的分区则不一定有 Leader 副本,因为某些中间状态。 - partitionsByNode: 记录了 Node 与 PartitionInfo 的映射关系,可以按照节点 Id 查询其上分布的全部分区的详细信息。
Metadata 中封装了 Cluster 对象,并保持 Cluster 数据的最后更新时间、版本号(version)、是否需要更新等待信息
1 | public final class Metadata { |
Metadata 的方法比较简单,主要是操纵上面的几个字段,这里着重介绍主线程中使用到的 requestUpdate()
和 awaitUpdate()
方法。
requestUpdate()
方法将needUpdate
字段修改为true
,这样当 Sender 线程运行时更新 Metadata 记录的集群元数据,然后返回 version 字段的值。1
2
3
4public synchronized int requestUpdate() {
this.needUpdate = true;
return this.version;
}
awaitUpdate()
方法主要是通过 version 版本号来判断元数据是否更新完成,更新为完成则阻塞等待。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
if (maxWaitMs < 0) {
throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
}
long begin = System.currentTimeMillis();
long remainingWaitMs = maxWaitMs;
/* 比较版本号,通过版本号比较集群元数据是否更新完成 */
while (this.version <= lastVersion) {
/* 主线程与 Sender 通过 wait/notify 同步,更新元数据的操作则交给 Sender 线程去完成 */
if (remainingWaitMs != 0)
wait(remainingWaitMs);
long elapsed = System.currentTimeMillis() - begin;
if (elapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
remainingWaitMs = maxWaitMs - elapsed;
}
}需要注意的是,Metadata 中的字段可以由主线程读,Sedner 线程更新,因此它必须是线程安全的,这也是上面为什么所有方法都使用 synchronized 同步的原因。
下面介绍
KafkaProducer.waitOnMetadta()
方法(KafkaProducer#doSend调用
),它负责触发 Kafka 集群元数据的更新,并阻塞主线程等等更新完毕。它的主要步骤是:- 直接添加 topic 进入 metadata 中,如果已经存在则更新其过期时间
- 尝试获取 Topic 中分区的详细信息,失败后会调用
requestUpdate()
方法设置Metadata.needUpdate
字段,并得到当前元数据版本号 - 唤醒 Sender 线程,由 Sender 线程更新 Metadata 中保存的 Kafka 集群元数据。
- 主线程调用
awaitUpdate()
方法,等待 Sender 线程完成更新 - 从 Metadata 中获取指定 Topic 分区的详细信息(即 PartitionInfo 集合)。若失败,则回到步骤2继续尝试,若等待时间超时,则抛出异常。
其具体实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
// add topic to metadata topic list if it is not there already and reset expiry
metadata.add(topic);
/* 获取分区信息 */
Cluster cluster = metadata.fetch();
Integer partitionsCount = cluster.partitionCountForTopic(topic);
// Return cached metadata if we have it, and if the record's partition is either undefined
// or within the known partition range
if (partitionsCount != null && (partition == null || partition < partitionsCount))
return new ClusterAndWaitTime(cluster, 0);
long begin = time.milliseconds();
long remainingWaitMs = maxWaitMs;
long elapsed;
// Issue metadata requests until we have metadata for the topic or maxWaitTimeMs is exceeded.
// In case we already have cached metadata for the topic, but the requested partition is greater
// than expected, issue an update request only once. This is necessary in case the metadata
// is stale and the number of partitions for this topic has increased in the meantime.
do {
log.trace("Requesting metadata update for topic {}.", topic);
/* 获取失败之后调用 requestUpdate() 方法,并获取当前元数据版本号 */
int version = metadata.requestUpdate();
/* 唤醒 Sender 线程 */
sender.wakeup();
try {
/* 阻塞等待元数据更新完毕 */
metadata.awaitUpdate(version, remainingWaitMs);
} catch (TimeoutException ex) {
// Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
}
cluster = metadata.fetch();
elapsed = time.milliseconds() - begin;
/* 检测超时时间 */
if (elapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
/* 检测权限 */
if (cluster.unauthorizedTopics().contains(topic))
throw new TopicAuthorizationException(topic);
remainingWaitMs = maxWaitMs - elapsed;
partitionsCount = cluster.partitionCountForTopic(topic);
} while (partitionsCount == null);
if (partition != null && partition >= partitionsCount) {
throw new KafkaException(
String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount));
}
return new ClusterAndWaitTime(cluster, elapsed);
}Serializer&Deserializer
客户端发送的消息的 key 和 value 都是 byte 数组,
Serializer
和Deserializer
接口提供了将 Java 对象序列号(反序列化)为 byte 数组的功能。在 KafkaProducer 中,根据配置文件,使用合适的Serializer
。
Kafka 已经为我们提供了 Java 基本类型的 Serializer 实现和 Deserializer 实现,我们也可以对 Java 复杂类型的自定义 Serializer 和 Deserializer 实现。
在 Serializer 接口中, configure()
方法是在执行序列化操作之前的配置,例如,在 StringSerializer.configure()
方法中会选择合适的编码(encoding),默认是 UTF-8;serializer()
方法是真正进行序列化的地方,将传入的 Java 对象序列化为 byte[]。close()
方法是在其后的关闭方法,多为空实现。
Partitioner
KafkaProducer.send()
方法的下一步操作是选择消息的分区。在有的应用场景中,由业务逻辑控制每个消息追加到合适的分区中,而有时候业务逻辑并不关心分区的选择。在 KafkaProducer.partition()
方法中,优先根据 ProducerRecord
中 partition
字段指定的序号选择分区,如果 ProducerRecord.partition
字段没有明确指定分区编号,则通过 Partitioner.partition()
方法选择 partition。
Kafka 提供了 Partitioner 接口的一个默认实现 DefaultPartitioner
可以看到,之前介绍的 ProducerInterceptor
接口也继承了 Configurable
接口。
在创建 KafkaProducer 时传入的 key/value 配置项会保存到 AbstractConfig
的 originals
字段中,AbstractConfig
的核心方法是 getConfiguredInstance()
方法,其主要功能是通过反射机制实例化 originals
字段中指定的类。
设计 Configurable
接口的目的是统一反射后的初始化过程,对外提供同意的初始化接口。在 AbstractConfig.getConfiguredInstance
方法中通过反射构造出来的对象,都是通过无参构造函数构造成功的,需要初始化的字段个数和类型各式各样, Configurable
接口的 configure()
方法封装了对象初始化过程且只有一个参数 (originals
)字段,这样对外的接口就变得统一了。
DefaultPartitioner.partition()
方法负责在 ProducerRecord
中没有明确指定分区编号的时候,为其选择合适的分区, count
不断递增,确保消息不会都发到同一个 Partition
里;如果消息有 key 的话,则对 key 进行 hash(murmur2),然后与分区数量取模,来确定 key 所在分区到达负载均衡。
1 | /* counter 初始化为一个随机数,注意,这里是一个 AtomicInteger */ |