Kafka-Network-阅读
JavaNio 及 EventLoop

在 Java NIO 中最为重要的三个概念
- Selector:用于管理多个 Channel(通过
register方法来注册event来实现)。主要方法open:static 构建函数,构建一个Selector,用于屏蔽操作系统实现细节。SelectableChannel#register(Channel, ops):将Channel注册到Selector中,并返回SelectionKey作为标识selectedKeys():给到所有注册过的Channel的SelectionKeyselect(timeout) 与 selectNow():block 请求,返回已经 ready 的事件数量wakeup():唤醒被select阻塞的线程
- Channel:对于 IO 的抽象,包括
FileChannel、ServerSocketChannel、SocketChannelread(Buffer):使用Buffer读取Channel的数据write(Buffer):将Buffer的数据写入到Channel中
- Buffer:用于与
Channel读写交互数据使用

SimpleExample
1 | public static void main(String[] args) throws Exception { |
Kafka 中 NetworkClient 对于 JavaNio 的封装
其中也有对应的 Selector、KafkaChannel 但是还多出了 TransportLayer 接口。
Selector中的connect方法会将每个node都关联一个KafkaChannel到一个SocketChannel然后包装成TrnsportLayer并注册到nioSelector中KafkaChannel用来持有TransportLayer即SocketChannelTransportLayer是在传输协议上的封装,主要是为了鉴权使用。
Kafka 中 NetworkClient 是如何发送数据的?
在 NetworkClient中对于 Request 的发送主要使用了三个方法
ready用于建立与 node 的连接,创建一个KafkaChannelsend用于将 data 数据缓存到KafkaChannel中,并将SocketChannel的 ops 增加为WRITEpoll的方法就比较复杂- 先读:读
KafkaChannel中等待读的数据,将其缓存到Selector中的completedReceives数据结构中 - 再写:将
KafkaChannel中缓存的数据写入到网络中,然后将写入成功的数据放入completedSends数据结构中 - 处理等待的请求,如果已经成功收到 broker 的返回结果对请求发起回调(如果存在)
- 先读:读
Ready

- 当
Sender调用ready的时候如果对应 node 没有与 client 建立连接,则会通过NetworkClient#initiateConnect方法建连. - 其中建连最为重要的则是
Selector#connect方法会新建一个OPEN的SocketChannel(JDK)并通过该SocketChannel与node的address建连。 - 在建连之后会将该
SocketChannel注册到Selector对应的NioSelector(JDK)中并得到一个SelectionKey,这时候会创建一个KafkaChannel(Selector#registerChannel) ,让SelectionKeyattach 该KafkaChannel,这样方便在存在 IO 通知的时候获取对应节点的KafkaChannel,每个node都将拥有一个KafkaChannel用于管理网络事件。 TransportLayer主要是处理 auth 之间的差异,同时提供了方法用来包装SocketChannel以提供给KafkaChannel使用
Send
需要注意,在 RequestBuilder 组成 ClientRequest之后会生成 Send接口,这个 Send 可以简单理解为对于要发送 bytes 的一个反向依赖。
同时在 NetworkClient#send 方法中会将 ClientRequest加入到 inFlightRequests中
在 NetworkClient中将要发送的请求 Sendset 到 KafkaChannel的成员变量中,然后通过 TransportLayer将对应 node 的 SocketChannel 设置为 OPEN_WRITE
Poll

- 在调用
poll之后会直接调用Selector#poll然后通过nioSelector#select获取所有已经准备就绪的KafkaChannel - 针对每个
KafkaChannel会走一段鉴权逻辑,如果鉴权已经通过KafkaChannel#ready()方法会返回true - 此时会尝试先读,如果
Channel的状态是isReadable则调用attemptRead方法将得到的数据 通过NetworkReceived存入Channel中 - 然后尝试写,如果
Channel的状态是isWritable则会将数据通过TransportLayer写入到SocketChannel中 - 最后在
NetworkClient中调用handlerXXX方法处理各种从网络中得到的请求,对于请求完成的(读到 response)调用回调。