Kafka 的攒批机制
介绍
本文的起因是在工作中到 Kafka 的使用最广泛、性能要求最高、问题最多的组件。而其中性能问题的调优思路往往与 Kafka 的攒批息息相关。所以这篇文章中会一起学习 Kafka 攒批原理及如何使用 Kafka 的攒批。
什么是攒批?
发送消息(Record)时,让消息在内存中驻留一段时间,等待多个消息同时达到可发送状态时,形成一次 Request 发送到 Server 中。
为什么需要攒批?
- 默认网络发送成本大于内存中流转成本。使用攒批可以极大提升吞吐量。
- 对数据压缩有优势。增加吞吐量及减少存储成本。
攒批一定好吗?
- 攒批时间太长会影响时效性。
- 由于消息是在内存中驻留的,会增加内存占用,可能影响 GC 反而减少了吞吐。
相关配置及方法原理
相关 API 描述
在 Producer 的 API 中明确写到,Producer 是由 buffer space 与 background IO 组成的,而其中 buffer space 保存了尚未传输到 server 的 records,而这些 records 是有 send异步方法被调用时候传入到缓冲区中的。buffer.memory 控制生产者可用于缓冲的内存总量。 如果记录的发送速度快于它们可以传输到服务器的速度,则此缓冲区空间将被耗尽。 当缓冲区空间耗尽时,额外的发送调用将被阻塞。 阻塞时间的阈值由 max.block.ms 确定,超过它会抛出 TimeoutException。
Record 如何到 Server 中的?

Kafka 在 ProducerClient 中发送消息主要是由 RecordAccumulate 作为中转站,由主线程通过 append 方法根据 partition 不同追加消息进入 ProduceBatch 中,Sender线程每次轮询将准备好(ready方法)的消息通过 drain() 方法获取到,然后根据 Node 不同组成 ProduceRequest 发送给 KafkaServer。
- 应用程序发送的消息通过拦截器和序列化得到消息的各个部分(
Header、key、value…) - 消息分区之后通过
RecordAccumulate#append将消息放入ProducerBatch中,底层存储使用了BufferPool分配的ByteBuffer,这里涉及到 Kafka 的内存控制,后续有机会介绍(坑1) Sender通过ready方法询问是否有准备好发送的消息,如果有的话返回其 Node 信息Sender通过drain方法从RecordAccumulate中获取在max.request.size配置下允许的所有ProducerBatch,返回结果以Node->List<ProducerBatch>作为映射。- 每个 Node 的
List<ProducerBatch>组装成ProduceRequest - 通过
NetworkClient发送给Server
有什么方式能控制攒批?
RecordAccumulator是属于 producer.internal 的类,主要就是控制攒批。
类描述如下
1 | /** |
其中主要方法是
append:追加消息
会被 Producer.doSend调用,并返回一个携带有 FutureRecordMeta的返回结果;
主要实现:
- 给每个
partition分配一个Deque<ProducerBatch>用于缓存消息 - 如果
Deque非空则会最终调用MemoryRecordsBuilder.tryAppend方法将Record累加到ProducerBatch中 - 如果
Deque为空,则开始重新生成ProducerBatch(主要是生成其中的MemoryRecordsBuilder再tryAppend
即是说,Record 被 ProducerBatch保存了起来,等待发送。
ready() 之后 drain
Sender.sendProducerData 是 Sender的核心方法在线程 run 的时候被持续调用,而在其方法的开头就调用了 RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);方法,核心就是在攒批准备好之后发送。我们先看看 ready的方法签名。
1 | /** |
接下来我们深入 ready 方法内部探索,由于我们的目的是分析原理,流程只会涉及到主要代码
首先 ready 会针对每个 partition 的 Dequq 进行一次循环来对每个分区数据进行检测
1 | public ReadyCheckResult ready(Cluster cluster, long nowMs) { |
在 Sender中通过 ready 得到 readyNodes 之后,调用 drain返回 node.id与 List<ProducerBatch> 的 Map。**这样间接说明,Producer 会将发送往一个 Node 的数据 Merge 到一次请求中,这里在 ****Sender#sendProduceReqest**中可以清晰的看到,所以说,测试性能得用多节点,单节点 partition 数量不是真实场景。drain方法主要是针对每个 ready node 调用 drainBatchesForOneNode 方法然后汇聚成 Map<Integer, List<ProducerBatch>> 返回给 Sender
方法签名:
1 | /** |
1 | private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) { |
场景测试
在 Kafka2.x 的版本中体会以下几个版本 batch 上的差异。
- 使用 Kafka 自带的压测脚本
- 4K 消息
- 单节点(测试存在偏差性,条件有限,后面补充多节点测试)
batch.size 影响
JVM 内存 512M
lingerMs 固定 10 ms
max.request.size 固定 16M
buffer.memory 固定 16M
max.in.flight.requests.per.connection = 1
命令
1 | ./bin/kafka-producer-perf-test.sh --topic test \ |
| batch.size | 数据 |
|---|---|
| 100 | 17.20 MB/sec |
| 16384 (16K) | 281.92 MB/sec |
| 131072 (128K) | 984.49 MB/sec |
| 1048576 (1M) | 1098.23 MB/sec |
| 16777216 (16M) | 841.21 MB/sec |
linger.ms影响
JVM 内存 512M
batch.size 固定 16384 ms
max.request.size 固定 16M
buffer.memory 固定 16M
max.in.flight.requests.per.connection = 1
测试下来在压测场景下影响不大,因为 batch.size 的条件会优先被满足。
同类产品的原理对比
RocketMQ
RocketMQ也同样有消息攒批逻辑,参数相差不大:
batchSize:表示消息批的大小,单位是字节。当达到批大小后,RocketMQ 会将消息发送到 Broker。maxDelayTime:表示最大的等待时间,单位是毫秒。当等待时间超过该时间后,RocketMQ 会将消息发送到 Broker。
Pulsar
Pulsar参数差距同样不大:
batchingEnabled:表示是否启用消息批处理,默认为 false。batchingMaxMessages:表示消息批的最大数量,默认为 1。batchingMaxPublishDelayMicros:表示最大的等待时间,单位是微秒。当等待时间超过该时间后,Pulsar 会将消息发送到 Broker。