从Producer到日志落盘
文章简介
- [正常]叙述 Kafka 从 Producer 发送消息到 Server 日志落盘全过程
- [提升]详解 Kafka 日志格式,并通过 Java / Rust 两类解析方式
- [深入]Rust-Kafka-Client Producer 到 Kafka Server
- 需要说明的是这篇文章的局限性
- 仅介绍两个 topic 各有两个 partition 为基础进行介绍,会调用
flush
强行将消息刷新进入 topic 内。发送消息每个 partition 各 5 条 - 不涉及鉴权部分
- 不涉及事务消息及有序消息部分
- Kafka 版本 3.1.0,如无必要不会涉及老版本的历史包袱说明
- 流程图中只记录关键路径,关键信息,部分细节信息可能需要细看代码才行,但是不妨碍原理理解
- 这其中会涉及到
Replica
跟Network
的部分知识,但是在这篇文章中只会涉及到比较浅显的部分,我们默认网络层和主副本是对我们透明的,对其中细节及设计部分我们会在另外的文章中讲解。
- 仅介绍两个 topic 各有两个 partition 为基础进行介绍,会调用
Kafka-Network-阅读
Netty解析-Echo的bind&connect
借助 Netty 官方 Echo 实例: Echo Demo
问题
- 如何与底层交互
- Bootstrap 与 ServerBootstreap 有什么异同
- NioSocketChannel 与 NioServerSocketChannel 有什么异同
- 为什么说 netty 是事件驱动,其事件是如何传播的
- 为什么 ServerBootstrap 需要两个 EventLoopGroup,分别有什么用
Netty中的ByteBuf-Pooled与内存管理
Demo
1 | ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(); |
直接看 PooledByteBufAllocator.newHeapBuffer(int initialCapacity, int maxCapacity)
1 | protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) { |
Netty-UnpooledByteBuf 源码剖析
ByteBuffer 源码剖析
要点:
- JDK 原生支持,主要用于 NIO
- 包含两个实现
- HeapByteBuffer: 基于 Java 堆实现
- DirectByteBuffer: 使用 unsafe 的 API 进行堆外操作
- 核心方法为
put(byte)
与get()
。分别是往ByteBuffer里写一个字节,和读一个字节。 - 读写模式分离,正常的应用场景是:往ByteBuffer里写一些数据,然后
flip()
,然后再读出来。 - 在 JDK11 中 MappedByteBuffer 的创建实际是 DirectByteBuffer
- DirectByteBuffer 的垃圾回收利用了幻象引用进行回收,详见下面的
Cleaner
关于 asyncio 的喃喃自语
大多数东西都是根据之前学习中的印象而来,这篇文章更多的是喃喃自语吧。
协程
在 Python 3.5 之前,协程在我眼中应该就是 yield
这个语法的同义,通过 send
、throw
等方法来作为其交互,用于多进程中以提升效率,然而就 Python 的使用环境来说,其实接触到的机会并不是太多。
在 Python 3.5 之后,给了一种新的,现在看起来还是比较友好的选择
1 | async def hello(): |
在 I/O
密集的地方,填入 await
,在 def
前面加上 async
,不需要去适应至今让我还是十分不适的 yield
/ yield from
Kafka生产者分析——RecordAccumulator
前文介绍过,KafkaProducer 可以有同步和异步两种方式发送消息,其实两者的底层实现相同,都是通过异步方式实现的。主线程调用 KafkaProducer#send()
方法发送消息的时候,先将消息放到 RecordAccumulator
中暂存,然后主线程就可以从 send()
方法中返回了,此时消息并没有真正地发送给 Kafka,而是缓存在了 RecordAccumulator
中。之后,业务线程通过 KafkaProducer#send()
方法不断向 RecordAccumulator
追加消息,当达到一定的条件,会唤醒 Sender
线程发送 RecordAccumulator
中的消息。
下面我们就来介绍 RecordAccumulator
的结构。首先需要注意的是,**RecordAccumulator
至少有一个业务线程和一个 Sender
线程并发操作,所以必须是线程安全的**。