介绍

本文的起因是在工作中到 Kafka 的使用最广泛、性能要求最高、问题最多的组件。而其中性能问题的调优思路往往与 Kafka 的攒批息息相关。所以这篇文章中会一起学习 Kafka 攒批原理及如何使用 Kafka 的攒批。

Read more »

文章简介

  • [正常]叙述 Kafka 从 Producer 发送消息到 Server 日志落盘全过程
  • [提升]详解 Kafka 日志格式,并通过 Java / Rust 两类解析方式
  • [深入]Rust-Kafka-Client Producer 到 Kafka Server
  • 需要说明的是这篇文章的局限性
    • 仅介绍两个 topic 各有两个 partition 为基础进行介绍,会调用 flush 强行将消息刷新进入 topic 内。发送消息每个 partition 各 5 条
    • 不涉及鉴权部分
    • 不涉及事务消息及有序消息部分
    • Kafka 版本 3.1.0,如无必要不会涉及老版本的历史包袱说明
    • 流程图中只记录关键路径,关键信息,部分细节信息可能需要细看代码才行,但是不妨碍原理理解
    • 这其中会涉及到 ReplicaNetwork 的部分知识,但是在这篇文章中只会涉及到比较浅显的部分,我们默认网络层和主副本是对我们透明的,对其中细节及设计部分我们会在另外的文章中讲解。
Read more »

借助 Netty 官方 Echo 实例: Echo Demo

问题

  1. 如何与底层交互
  2. Bootstrap 与 ServerBootstreap 有什么异同
  3. NioSocketChannel 与 NioServerSocketChannel 有什么异同
  4. 为什么说 netty 是事件驱动,其事件是如何传播的
  5. 为什么 ServerBootstrap 需要两个 EventLoopGroup,分别有什么用
Read more »

Demo

1
2
3
4
5
6
ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer();
buf.writeBytes("test".getBytes());

byte[] readBytes = new byte[buf.readableBytes()];
buf.readBytes(readBytes);
System.out.println("read content: " + new String(readBytes));

直接看 PooledByteBufAllocator.newHeapBuffer(int initialCapacity, int maxCapacity)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<byte[]> heapArena = cache.heapArena;

final ByteBuf buf;
if (heapArena != null) {
buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ?
new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}

return toLeakAwareBuffer(buf);
}
Read more »

要点

  1. UnpooledByteBufAllocator 的 Heap 与 Direct 的实现
  2. Heap 内部实现使用了 byte[]
  3. Direct 内部依托于 PlatformDependent0 的各种 native 方法
  4. toLeakAwareBuffer 部分主要讨论了 Netty 是如何应对内存泄露,以及检测内存泄露跟踪的四个级别是如何实现的
Read more »

要点:

  1. JDK 原生支持,主要用于 NIO
  2. 包含两个实现
    1. HeapByteBuffer: 基于 Java 堆实现
    2. DirectByteBuffer: 使用 unsafe 的 API 进行堆外操作
  3. 核心方法为 put(byte)get()。分别是往ByteBuffer里写一个字节,和读一个字节。
  4. 读写模式分离,正常的应用场景是:往ByteBuffer里写一些数据,然后 flip(),然后再读出来。
  5. 在 JDK11 中 MappedByteBuffer 的创建实际是 DirectByteBuffer
  6. DirectByteBuffer 的垃圾回收利用了幻象引用进行回收,详见下面的 Cleaner
Read more »

大多数东西都是根据之前学习中的印象而来,这篇文章更多的是喃喃自语吧。

协程

在 Python 3.5 之前,协程在我眼中应该就是 yield 这个语法的同义,通过 sendthrow 等方法来作为其交互,用于多进程中以提升效率,然而就 Python 的使用环境来说,其实接触到的机会并不是太多。

在 Python 3.5 之后,给了一种新的,现在看起来还是比较友好的选择

1
2
3
4
5
6
7
async def hello():
print("Hello world!")
# asyncio.sleep 也是一个 coroutine
# 异步调用asyncio.sleep(1):
# 线程也可以从这儿拿到返回值(asyncio.sleep(1) 返回值为 None)
r = await asyncio.sleep(1)
print("Hello again!")

I/O 密集的地方,填入 await ,在 def 前面加上 async,不需要去适应至今让我还是十分不适的 yield / yield from

Read more »

紧接 Kafka生产者分析——KafkaProducer

前文介绍过,KafkaProducer 可以有同步和异步两种方式发送消息,其实两者的底层实现相同,都是通过异步方式实现的。主线程调用 KafkaProducer#send() 方法发送消息的时候,先将消息放到 RecordAccumulator 中暂存,然后主线程就可以从 send() 方法中返回了,此时消息并没有真正地发送给 Kafka,而是缓存在了 RecordAccumulator 中。之后,业务线程通过 KafkaProducer#send() 方法不断向 RecordAccumulator 追加消息,当达到一定的条件,会唤醒 Sender 线程发送 RecordAccumulator 中的消息。

下面我们就来介绍 RecordAccumulator 的结构。首先需要注意的是,**RecordAccumulator 至少有一个业务线程和一个 Sender 线程并发操作,所以必须是线程安全的**。

Read more »
0%