Netty解析-Echo的bind&connect
借助 Netty 官方 Echo 实例: Echo Demo
问题
- 如何与底层交互
- Bootstrap 与 ServerBootstreap 有什么异同
- NioSocketChannel 与 NioServerSocketChannel 有什么异同
- 为什么说 netty 是事件驱动,其事件是如何传播的
- 为什么 ServerBootstrap 需要两个 EventLoopGroup,分别有什么用
尝试回答
如何与底层交互
从 bind 的角度来看
Netty 中与底层网络交互的实际是 io.netty.channel.Channel.Unsafe 类,在各个主要 IO 模式(包括但不限于 NIO、Epoll、KQueue)下均有对其的实现。其中包含了 Server bind 的接口。
1 | /** |
怎么调用到的?
在 Echo Demo 中我们选用了 NioServerSocketChannel,在其创建事会一直调用其父类继承链构造器,直到 AbstractChannel,而在其中会创建 Unsafe(NioMessageUnsafe) 与 DefaultChannelPipeline。
在 DefaultChannelPipeline 中存在两个特殊的 ChannelHandlerContext 分别是 head 与 tail,而在 Boostrap 中调用 bind 将会通过 bossGroup 将调用传递到 ChannelPipeline.bind 方法中,这样在经过 head.bind (因为 bind 是一个 outBound 事件,所以会是最后触发 head 的)时候会调用 unsafe.bind
。这会产生什么效果呢?
在调用 unsafe.bind 实际是在调用 AbstractChannel.AbstractUnsafe.bind(NioMessageUnsafe 父类) 方法,最终又回传到了 NioServerSocketChannel 中的 doBind 方法中,在这里使用了 ServerSocketChannel 的 bind 逻辑,则是 JDK 的方法了。
这么设计有什么优势呢?从 Unsafe 这个名称猜测,是其作为 Channel 辅助类专门管理 Socket 相关接口调用并防止 Netty 上层应用所使用。
从 connect 的角度来看
其实同 bind 一样,当我们追踪 connect 方法时,最终会走到 channel.connect,而在 Client 中我们所选用的是 NioSocketChannel,在其中会创建 Unsafe (NioByteUnsafe),当链路同样通过 DefaultChannelPipeline 传递到 head.connect 的时候回去调用 NioByteUnsafe.connect 的方法,此时同 bind 一样,最终起到效果的依然是 NioSocketChannel.doConnect 方法,其主要逻辑在于
1 | // ... |
最后依然调用了 SocketChannel.connect 的方法来建立连接。
Bootstrap 与 ServerBootstrap 有什么异同
ServerBootstrap 比较明显的注意点在,它需要传入两个 EventLoopGroup,我们通常会将第一个称为 bossGroup,第二个称为 workGroup,其中,workGroup 其流程其实与 Bootstrap 的 EventLoop 所起到的作用是一致的,都是充当了 IO 操作的处理现成。这里会涉及到问题五
ServerBootstrap 需要两个 EventLoopGroup,分别有什么用
在 Demo 中第一个 EventLoopGroup 也是 bossGroup 用于处理客户端连接请求,而第二个 EventLoopGroup 也是 workerGroup 用于处理与各个客户端连接的 IO 操作。
bossGroup 处理 OP_ACCEPT
- 在我们创建 NioServerSocketChannel 时候会在最后的构造器中传入一个参数,SelectionKey.OP_ACCEPT
1 | /** |
- 在 initAndRegister 时候,通过 SingleThreadEventLoop.register 中对于 channel().unsafe() 的 register 调用将 bossGroup 设置到 Channel 的 EventLoop 中.
- 同样是在 register 逻辑中,当设置完 EventLoop 之后首先在 EventLoop 中注册事件监听到 NioEventLoop 的 selector 中,即 NioServerSocketChannel 中的
SelectorProvider.openSelector
中 - 通知 ChannelPipeline 相关事件
- 准备 read 事件(AbstractChannel.beginRead 开始)在其中,可以看到若当前 selector 监听的事件无对应 Channel.readInterestOp(即第一步的 OP_ACCEPT) 则对其进行注册
- 在上述 initAndRegister 结束之后会有调用 bossGroup.execute(AbstractBootstrap.doBind0) 方法执行之前说到与底层交互的 Bind 逻辑,同时还会在 bossGroup 中开启无限循环(NioEventLoop.this.run()) 处理 IO 事件(processSelectedKeys)这里可以看到,我们会对 OP_ACCEPT 调用 unsafe.read() 方法。
- 调用了 read 事件自然是需要传递写消息进去的,此时的 unsafe 是 NioMessageUnsafe,有个调用
doReadMessages
方法最终被 NioServerSocketChannel 重载
1 | protected int doReadMessages(List<Object> buf) throws Exception { |
最终将包装的 NioSocketChannel 传入到 ChannelPipeline 事件流中
接下来就是说明如何处理 OP_ACCEPT 事件到 workerGroup 中了。
workerGroup 处理其他网络 IO
- 回到 initAndRegister 中对于 channel 的 init 方法,该出被 ServerBootstrap 进行了重载在这里我们可以看到在 ChannelPipeline 中添加了一个 ServerBootstrapAcceptor 的 ChannelInboundHandlerAdapter,在上述监听到 OP_ACCEPT 事件时会通过 ChannelPipeline 调用其 channelRead 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34void init(Channel channel) throws Exception {
... // 对于 attr 与 option 的设置
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
p.addLast(new ChannelInitializer<Channel>() {
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
} - 在 channelRead 方法中会将 ServerBootstrap 设置的 channelHandler 给加入 ChannelPipeline 中,然后通过 childGroup(即 workerGroup)来注册到接收的 NioSocketChannel 中等待网络 IO 事件并处理(让注册的 ChannelHandler 处理)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
Bootstrap 其实就是其上面的 worker 的处理逻辑(都是 NioSocketChannel 处理)。
NioSocketChannel 与 NioServerSocketChannel 有什么异同
它们之前最大的差异体现在其握有的 SocketChannel / ServerSocketChannel。这二者在 Java Nio 中一个是连接到 TCP 网络套接字的通道,另一个是可以监听新进来的 TCP 连接的通道。Netty 中 NioSocketChannel 与 NioServerSocketChannel 其实都是对其二者操作的包装及兼容到 Netty 本身对于协议的操作抽象中来。
为什么说 netty 是事件驱动,其事件是如何传播的
首先说说事件驱动,netty 的事件主要是指 inBound 事件与 outBound 事件,inBound 事件通常是 Socket 接收到网络事件然后进行逐层 ChannelHandler 传播,而 outBound 事件则是由用户发起如 bind/connect/read/write 等事件然后在 netty 的 ChannelHandler 中逐层传播。
然后我们可以先看一下 ChannelPipeline 中的一张图
1 | * I/O Request |
在 DefaultChannelPipeline 中,有两个特殊的 ChannelHandler,其中一个是 HeadContext 主要用于一些网络操作的处理,同时作为 ChannelOutboundHandler/ChannelInboundHandler,而另一个则是 TailContext 主要用途在于将最后传入到此处若为得到释放的消息给释放掉,作为一个 ChannelInboundHandler。
若是 Inbound 事件则是由 head 开始触发,直到最后触发到 tail 中,期间经历过各种 inbound 类型的 context,执行该 context 的 handler 的对应处理方法,具体体现就是各种在 ChannelInboundInvoker 中的 fireChannel* 方法。而若发生 Outbound 事件则是由 tail 开始触发最终触发到 head 中,期间经历过各种outbound类型的context,执行该context的handler的对应处理方法,具体体现就在各种在 ChannelOutboundInvoker 中的方法。
参考文档
官方文档
Netty 源码分析之 一 揭开 Bootstrap 神秘的红盖头 (服务器端)
Netty源码之Channel初始化和注册过程
深入浅出Netty:NioEventLoop
《Netty 权威指南》
《Netty 实战》