Kafka-Network-阅读
JavaNio 及 EventLoop
在 Java NIO 中最为重要的三个概念
- Selector:用于管理多个 Channel(通过
register
方法来注册event
来实现)。主要方法open
:static 构建函数,构建一个Selector
,用于屏蔽操作系统实现细节。SelectableChannel#register(Channel, ops)
:将Channel
注册到Selector
中,并返回SelectionKey
作为标识selectedKeys()
:给到所有注册过的Channel
的SelectionKey
select(timeout) 与 selectNow()
:block 请求,返回已经 ready 的事件数量wakeup()
:唤醒被select
阻塞的线程
- Channel:对于 IO 的抽象,包括
FileChannel
、ServerSocketChannel
、SocketChannel
read(Buffer)
:使用Buffer
读取Channel
的数据write(Buffer)
:将Buffer
的数据写入到Channel
中
- Buffer:用于与
Channel
读写交互数据使用
SimpleExample
1 | public static void main(String[] args) throws Exception { |
Kafka 中 NetworkClient 对于 JavaNio 的封装
其中也有对应的 Selector
、KafkaChannel
但是还多出了 TransportLayer
接口。
Selector
中的connect
方法会将每个node
都关联一个KafkaChannel
到一个SocketChannel
然后包装成TrnsportLayer
并注册到nioSelector
中KafkaChannel
用来持有TransportLayer
即SocketChannel
TransportLayer
是在传输协议上的封装,主要是为了鉴权使用。
Kafka 中 NetworkClient 是如何发送数据的?
在 NetworkClient
中对于 Request
的发送主要使用了三个方法
ready
用于建立与 node 的连接,创建一个KafkaChannel
send
用于将 data 数据缓存到KafkaChannel
中,并将SocketChannel
的 ops 增加为WRITE
poll
的方法就比较复杂- 先读:读
KafkaChannel
中等待读的数据,将其缓存到Selector
中的completedReceives
数据结构中 - 再写:将
KafkaChannel
中缓存的数据写入到网络中,然后将写入成功的数据放入completedSends
数据结构中 - 处理等待的请求,如果已经成功收到 broker 的返回结果对请求发起回调(如果存在)
- 先读:读
Ready
- 当
Sender
调用ready
的时候如果对应 node 没有与 client 建立连接,则会通过NetworkClient#initiateConnect
方法建连. - 其中建连最为重要的则是
Selector#connect
方法会新建一个OPEN
的SocketChannel(JDK)
并通过该SocketChannel
与node
的address
建连。 - 在建连之后会将该
SocketChannel
注册到Selector
对应的NioSelector(JDK)
中并得到一个SelectionKey
,这时候会创建一个KafkaChannel
(Selector#registerChannel
) ,让SelectionKey
attach 该KafkaChannel
,这样方便在存在 IO 通知的时候获取对应节点的KafkaChannel
,每个node
都将拥有一个KafkaChannel
用于管理网络事件。 TransportLayer
主要是处理 auth 之间的差异,同时提供了方法用来包装SocketChannel
以提供给KafkaChannel
使用
Send
需要注意,在 RequestBuilder
组成 ClientRequest
之后会生成 Send
接口,这个 Send
可以简单理解为对于要发送 bytes
的一个反向依赖。
同时在 NetworkClient#send
方法中会将 ClientRequest
加入到 inFlightRequests
中
在 NetworkClient
中将要发送的请求 Send
set 到 KafkaChannel
的成员变量中,然后通过 TransportLayer
将对应 node 的 SocketChannel
设置为 OPEN_WRITE
Poll
- 在调用
poll
之后会直接调用Selector#poll
然后通过nioSelector#select
获取所有已经准备就绪的KafkaChannel
- 针对每个
KafkaChannel
会走一段鉴权逻辑,如果鉴权已经通过KafkaChannel#ready()
方法会返回true
- 此时会尝试先读,如果
Channel
的状态是isReadable
则调用attemptRead
方法将得到的数据 通过NetworkReceived
存入Channel
中 - 然后尝试写,如果
Channel
的状态是isWritable
则会将数据通过TransportLayer
写入到SocketChannel
中 - 最后在
NetworkClient
中调用handlerXXX
方法处理各种从网络中得到的请求,对于请求完成的(读到 response)调用回调。