Kafka 的攒批机制
介绍
本文的起因是在工作中到 Kafka 的使用最广泛、性能要求最高、问题最多的组件。而其中性能问题的调优思路往往与 Kafka 的攒批息息相关。所以这篇文章中会一起学习 Kafka 攒批原理及如何使用 Kafka 的攒批。
什么是攒批?
发送消息(Record)时,让消息在内存中驻留一段时间,等待多个消息同时达到可发送状态时,形成一次 Request 发送到 Server 中。
为什么需要攒批?
- 默认网络发送成本大于内存中流转成本。使用攒批可以极大提升吞吐量。
- 对数据压缩有优势。增加吞吐量及减少存储成本。
攒批一定好吗?
- 攒批时间太长会影响时效性。
- 由于消息是在内存中驻留的,会增加内存占用,可能影响 GC 反而减少了吞吐。
相关配置及方法原理
相关 API 描述
在 Producer 的 API 中明确写到,Producer 是由 buffer space 与 background IO 组成的,而其中 buffer space 保存了尚未传输到 server 的 records,而这些 records 是有 send
异步方法被调用时候传入到缓冲区中的。buffer.memory
控制生产者可用于缓冲的内存总量。 如果记录的发送速度快于它们可以传输到服务器的速度,则此缓冲区空间将被耗尽。 当缓冲区空间耗尽时,额外的发送调用将被阻塞。 阻塞时间的阈值由 max.block.ms 确定,超过它会抛出 TimeoutException。
Record 如何到 Server 中的?
Kafka 在 ProducerClient 中发送消息主要是由 RecordAccumulate
作为中转站,由主线程通过 append
方法根据 partition 不同追加消息进入 ProduceBatch
中,Sender
线程每次轮询将准备好(ready
方法)的消息通过 drain()
方法获取到,然后根据 Node 不同组成 ProduceRequest
发送给 KafkaServer。
- 应用程序发送的消息通过拦截器和序列化得到消息的各个部分(
Header
、key
、value
…) - 消息分区之后通过
RecordAccumulate#append
将消息放入ProducerBatch
中,底层存储使用了BufferPool
分配的ByteBuffer
,这里涉及到 Kafka 的内存控制,后续有机会介绍(坑1) Sender
通过ready
方法询问是否有准备好发送的消息,如果有的话返回其 Node 信息Sender
通过drain
方法从RecordAccumulate
中获取在max.request.size
配置下允许的所有ProducerBatch
,返回结果以Node
->List<ProducerBatch>
作为映射。- 每个 Node 的
List<ProducerBatch>
组装成ProduceRequest
- 通过
NetworkClient
发送给Server
有什么方式能控制攒批?
RecordAccumulator
是属于 producer.internal 的类,主要就是控制攒批。
类描述如下
1 | /** |
其中主要方法是
append
:追加消息
会被 Producer.doSend
调用,并返回一个携带有 FutureRecordMeta
的返回结果;
主要实现:
- 给每个
partition
分配一个Deque<ProducerBatch>
用于缓存消息 - 如果
Deque
非空则会最终调用MemoryRecordsBuilder.tryAppend
方法将Record
累加到ProducerBatch
中 - 如果
Deque
为空,则开始重新生成ProducerBatch
(主要是生成其中的MemoryRecordsBuilder
再tryAppend
即是说,Record 被 ProducerBatch
保存了起来,等待发送。
ready()
之后 drain
Sender.sendProducerData
是 Sender
的核心方法在线程 run
的时候被持续调用,而在其方法的开头就调用了 RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
方法,核心就是在攒批准备好之后发送。我们先看看 ready
的方法签名。
1 | /** |
接下来我们深入 ready
方法内部探索,由于我们的目的是分析原理,流程只会涉及到主要代码
首先 ready 会针对每个 partition
的 Dequq
进行一次循环来对每个分区数据进行检测
1 | public ReadyCheckResult ready(Cluster cluster, long nowMs) { |
在 Sender
中通过 ready
得到 readyNodes 之后,调用 drain
返回 node.id
与 List<ProducerBatch>
的 Map。**这样间接说明,Producer 会将发送往一个 Node 的数据 Merge 到一次请求中,这里在 ****Sender#sendProduceReqest**
中可以清晰的看到,所以说,测试性能得用多节点,单节点 partition 数量不是真实场景。drain
方法主要是针对每个 ready node 调用 drainBatchesForOneNode
方法然后汇聚成 Map<Integer, List<ProducerBatch>>
返回给 Sender
方法签名:
1 | /** |
1 | private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) { |
场景测试
在 Kafka2.x 的版本中体会以下几个版本 batch 上的差异。
- 使用 Kafka 自带的压测脚本
- 4K 消息
- 单节点(测试存在偏差性,条件有限,后面补充多节点测试)
batch.size
影响
JVM 内存 512M
lingerMs 固定 10 ms
max.request.size 固定 16M
buffer.memory 固定 16M
max.in.flight.requests.per.connection = 1
命令
1 | ./bin/kafka-producer-perf-test.sh --topic test \ |
batch.size | 数据 |
---|---|
100 | 17.20 MB/sec |
16384 (16K) | 281.92 MB/sec |
131072 (128K) | 984.49 MB/sec |
1048576 (1M) | 1098.23 MB/sec |
16777216 (16M) | 841.21 MB/sec |
linger.ms
影响
JVM 内存 512M
batch.size 固定 16384 ms
max.request.size 固定 16M
buffer.memory 固定 16M
max.in.flight.requests.per.connection = 1
测试下来在压测场景下影响不大,因为 batch.size
的条件会优先被满足。
同类产品的原理对比
RocketMQ
RocketMQ
也同样有消息攒批逻辑,参数相差不大:
batchSize
:表示消息批的大小,单位是字节。当达到批大小后,RocketMQ 会将消息发送到 Broker。maxDelayTime
:表示最大的等待时间,单位是毫秒。当等待时间超过该时间后,RocketMQ 会将消息发送到 Broker。
Pulsar
Pulsar
参数差距同样不大:
batchingEnabled
:表示是否启用消息批处理,默认为 false。batchingMaxMessages
:表示消息批的最大数量,默认为 1。batchingMaxPublishDelayMicros
:表示最大的等待时间,单位是微秒。当等待时间超过该时间后,Pulsar 会将消息发送到 Broker。