前言
我们在前面的文章中也对ChannelPipeline
接口做了初步的介绍。
[io.netty学习使用汇总][io.netty]
ChannelPipeline 接口
ChannelPipeline
接口采用了责任链设计模式,底层采用双向链表的数据结构,将链上的各个处理器串联起来。客户端每一个请求的到来,ChannelPipeline
中所有的处理器都有机会处理它。
每一个新创建的
Channel
都将会被分配一个新的ChannelPipeline
。这项关联是永久性的;Channel
既不能附加另一个ChannelPipeline
,也不能分离其当前的。
创建 ChannelPipeline
ChannelPipeline
数据管道是与Channel
管道绑定的,一个Channel
通道对应一个ChannelPipeline
,ChannelPipeline
是在Channel
初始化时被创建。
观察下面这个实例:
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
// 添加ChannelHandler到ChannelPipeline
ch.pipeline().addLast(new DiscardServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// 绑定端口,开始接收进来的连接
ChannelFuture f = b.bind(port).sync(); // (7)
System.out.println("DiscardServer已启动,端口:" + port);
// 等待服务器 socket 关闭 。
// 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
从上述代码中可以看到,当ServerBootstrap
初始化后,直接就可以获取到SocketChannel
上的ChannelPipeline
,而无需手动实例化,因为 Netty 会为每个Channel
连接创建一个ChannelPipeline
。
Channel
的大部分子类都继承了AbstractChannel
,在创建实例时也会调用AbstractChannel
构造器。在AbstractChannel
构造器中会创建ChannelPipeline
管道实例,核心代码如下:
protected AbstractChannel(Channel parent) {
this.parent = parent;
this.id = this.newId();
this.unsafe = this.newUnsafe();
this.pipeline = this.newChannelPipeline();
}
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
从上述代码中可以看出,在创建Channel
时,会由Channel
创建DefaultChannelPipeline
类的实例。DefaultChannelPipeline
是ChannelPipeline
的默认实现。
pipeline
是AbstractChannel
的属性,内部维护着一个以AbstractChannelHandlerContext
为节点的双向链表,创建的head
和tail
节点分别指向链表头尾,源码如下:
public class DefaultChannelPipeline implements ChannelPipeline {
protected DefaultChannelPipeline(Channel channel) {
this.channel = (Channel)ObjectUtil.checkNotNull(channel, "channel");
this.succeededFuture = new SucceededChannelFuture(channel, (EventExecutor)null);
this.voidPromise = new VoidChannelPromise(channel, true);
this.tail = new DefaultChannelPipeline.TailContext(this);
this.head = new DefaultChannelPipeline.HeadContext(this);
this.head.next = this.tail;
this.tail.prev = this.head;
}
...
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, (EventExecutor)null, DefaultChannelPipeline.TAIL_NAME, DefaultChannelPipeline.TailContext.class);
this.setAddComplete();
}
...
}
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, (EventExecutor)null, DefaultChannelPipeline.HEAD_NAME, DefaultChannelPipeline.HeadContext.class);
this.unsafe = pipeline.channel().unsafe();
this.setAddComplete();
}
...
}
...
}
从上述源码可以看到,TailContext
和HeadContext
都继承了AbstractChannelHandlerContext
,并实现了ChannelHandler
接口。AbstractChannelHandlerContext
内部维护着next
、prev
链表指针和入站、出站节点方向等。其中TailContext
实现了ChannelInboundHandler
,HeadContext
实现了ChannelOutboundHandler
和ChannelInboundHandler
。
ChannelPipeline 事件传输机制
通过ChannelPipeline
的addFirst()
方法来添加ChannelHandler
,并为这个ChannelHandler
创建一个对应的DefaultChannelHandlerContext
实例。
public class DefaultChannelPipeline implements ChannelPipeline {
//...
public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
AbstractChannelHandlerContext newCtx;
synchronized(this) {
checkMultiplicity(handler);
name = this.filterName(name, handler);
newCtx = this.newContext(group, name, handler);
this.addFirst0(newCtx);
if (!this.registered) {
newCtx.setAddPending();
this.callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
this.callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
this.callHandlerAdded0(newCtx);
return this;
}
//...
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, this.childExecutor(group), name, handler);
}
//...
}
处理出站事件
当处理出站事件时,channelRead()
方法的示例如下:
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(ctx.channel().remoteAddress() + " -> Server :" + msg);
// 写消息到管道
ctx.write(msg);// 写消息
}
//...
}
上述代码中的write()
方法会触发一个出站事件,该方法会调用DefaultChannelPipeline
上的write()
方法。
public final ChannelFuture write(Object msg) {
return this.tail.write(msg);
}
从上述源码可以看到,调用的是DefaultChannelPipeline
上尾部节点(tail)的write
方法。
上述方法最终会调用到DefaultChannelHandlerContext
的write()
方法。
private void write(Object msg, boolean flush, ChannelPromise promise) {
ObjectUtil.checkNotNull(msg, "msg");
try {
if (this.isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
return;
}
} catch (RuntimeException var8) {
ReferenceCountUtil.release(msg);
throw var8;
}
AbstractChannelHandlerContext next = this.findContextOutbound(flush ? 98304 : '耀');
Object m = this.pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractChannelHandlerContext.WriteTask task = AbstractChannelHandlerContext.WriteTask.newInstance(next, m, promise, flush);
if (!safeExecute(executor, task, promise, m, !flush)) {
task.cancel();
}
}
}
上述的write()
方法会查找下一个出站的节点,也就是当前ChannelHandler
后的一个出站类型的ChannelHandler
,并调用下一个节点的invokeWrite()
方法。
void invokeWrite(Object msg, ChannelPromise promise) {
if (this.invokeHandler()) {
this.invokeWrite0(msg, promise);
} else {
this.write(msg, promise);
}
}
接着调用invokeWrite0()
方法,该方法最终调用ChannelOutboundHandler
的write
方法。
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler)this.handler()).write(this, msg, promise);
} catch (Throwable var4) {
notifyOutboundHandlerException(var4, promise);
}
}
至此,处理完成了第一个节点的处理,开始执行下一个节点并不断循环。
所以,处理出站事件时,数据传输的方向是从尾部节点tail
到头部节点head
。
处理入站事件
入站事件处理的起点是触发ChannelPipeline fire
方法,例如fireChannelActive()
方法的示例如下:
public class DefaultChannelPipeline implements ChannelPipeline {
//...
public final ChannelPipeline fireChannelActive() {
AbstractChannelHandlerContext.invokeChannelActive(this.head);
return this;
}
//...
}
从上述源码可以看到,处理的节点是头部节点head
。AbstractChannelHandlerContext.invokeChannelActive
方法定义如下:
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelActive();
} else {
executor.execute(new Runnable() {
public void run() {
next.invokeChannelActive();
}
});
}
}
该方法最终调用ChannelInboundHandler
的channelActive
方法。
private void invokeChannelActive() {
if (this.invokeHandler()) {
try {
((ChannelInboundHandler)this.handler()).channelActive(this);
} catch (Throwable var2) {
this.invokeExceptionCaught(var2);
}
} else {
this.fireChannelActive();
}
}
至此完成了第一个节点的处理,开始执行下一个节点的不断循环。
所以,处理入站事件时,数据传输的方向是从头部节点head
到尾部节点tail
。
ChannelPipeline 中的 ChannelHandler
从上述的ChannelPipeline
接口源码可以看出,ChannelPipeline
是通过addXxx
或者removeXxx
方法来将ChannelHandler
动态的添加到ChannelPipeline
中,或者从ChannelPipeline
移除ChannelHandler
的。那么ChannelPipeline
是如何保障并发访问时的安全呢?
以addLast
方法为例,DefaultChannelPipeline
的源码如下:
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
AbstractChannelHandlerContext newCtx;
//synchronized 保障线程安全
synchronized(this) {
checkMultiplicity(handler);
newCtx = this.newContext(group, this.filterName(name, handler), handler);
this.addLast0(newCtx);
if (!this.registered) {
newCtx.setAddPending();
this.callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
this.callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
this.callHandlerAdded0(newCtx);
return this;
}
从上述源码可以看到,使用synchronized
关键字保障了线程的安全访问。其他方法的实现方式也是类似。
ChannelHandlerContext 接口
ChannelHandlerContext
接口是联系ChannelHandler
和ChannelPipeline
之间的纽带。
每当有ChannelHandler
添加到ChannelPipeline
中时,都会创建ChannelHandlerContext
。ChannelHandlerContext
的主要功能是管理它所关联的ChannelHandler
和在同一个ChannelPipeline
中的其他ChannelHandler
之间的交互。
例如,ChannelHandlerContext
可以通知ChannelPipeline
中的下一个ChannelHandler
开始执行及动态修改其所属的ChannelPipeline
。
ChannelHandlerContext
中包含了许多方法,其中一些方法也出现在Channel
和ChannelPipeline
中。如果通过Channel
或ChannelPipeline
的实例来调用这些方法,它们就会在整个ChannelPipeline
中传播。相比之下,一样的方法在ChannelHandlerContext
的实例上调用,就只会从当前的ChannelHandler
开始并传播到相关管道中的下一个有处理事件能力的ChannelHandler
中。因此,ChannelHandlerContext
所包含的事件流比其他类中同样的方法都要短,利用这一点可以尽可能提高性能。
ChannelHandlerContext 与其他组件的关系
下图展示了ChannelPipeline
、Channel
、ChannelHandler
和ChannelHandlerContext
之间的关系做了如下说明:
(1)Channel
被绑定到ChannelPipeline
上。
(2)和Channel
绑定的ChannelPipeline
包含了所有的ChannelHandler
。
(3)ChannelHandler
。
(4)当添加ChannelHandler
到ChannelPipeline
时,ChannelHandlerContext
被创建。
跳过某些 ChannelHandler
下面的代码,展示了从ChannelHandlerContext
获取到Channel
的引用,并通过调用Channel
上的write()
方法来触发一个写事件到流中。
ChannelHandlerContext ctx = context;
Channel channel = ctx.channel(); //获取ChannelHandlerContext上的Channel
channel.write(msg);
以下代码展示了从ChannelHandlerContext
获取到ChannelPipeline
。
ChannelHandlerContext ctx = context;
ChannelPipeline pipeline = ctx.pipeline(); //获取ChannelHandlerContext上的ChannelPipeline
pipeline.write(msg);
上述的两个示例,事件流是一样的。虽然被调用的Channel
和ChannelPipeline
上的write()
方法将一直传播事件通过整个ChannelPipeline
,但是在ChannelHandler
的级别上,事件从一个ChannelHandler
到下一个ChannelHandler
的移动是由ChannelHandlerContext
上的调用完成的。
下图展示了Channel
或者ChannelPipeline
进行的事件传播机制。
在上图中可以看出:
(1)事件传递给ChannelPipeline
的第一个ChannelHandler
;
(2)ChannelHandler
通过关联的ChannelHandlerContext
传递事件给ChannelPipeline
中的下一个ChannelHandler
。
(3)ChannelHandler
通过关联的ChannelHandlerContext
传递事件给ChannelPipeline
中的下一个ChannelHandler
。
从上面的流程可以看出,如果通过Channel
或ChannelPipeline
的实例来调用这些方法,它们肯定会在整个ChannelPipeline
中传播。
那么是否可以跳过某些处理器呢?答案是肯定的。
通过减少ChannelHandler
不感兴趣的事件的传递减少开销,并排除掉特定的对此事件感兴趣的处理器的处理以提升性能。想要实现从一个特定的ChannelHandler
开始处理,必须引用与此ChannelHandler
的前一个ChannelHandler
关联的ChannelHandlerContext
。这个ChannelHandlerContext
将会调用与自身关联的ChannelHandler
的下一个ChannelHandler
,代码如下:
ChannelHandlerContext ctx = context;
ctx.write(msg);
直接调用ChannelHandlerContext
的write()
方法,将会把缓冲区发送到下一个ChannelHandler
。
如下图,消息会将从下一个ChannelHandler
开始流过ChannelPipeline
,绕过所有在它之前的ChannelHandler
。
(1)执行ChannelHandlerContext
方法调用。
(2)事件发送到了下一个ChannelHandler
。
(3)经过最后一个ChannelHandler
后,事件从ChannelPipeline
中移除。
当调用某个特定的ChannelHandler
操作时,它尤为有用。
例如:
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(ctx.channel().remoteAddress() + " -> Server :" + msg);
// 写消息到管道
ctx.write(msg);// 写消息
ctx.flush(); // 冲刷消息
// 上面两个方法等同于 ctx.writeAndFlush(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 当出现异常就关闭连接
cause.printStackTrace();
ctx.close();
}
}
总结
以上就是关于ChannelPipeline
的源码分析,相信认真看完了,你就明白ChannelPipeline
、Channel
、ChannelHandler
和ChannelHandlerContext
之间的关系。下节我们继续来剖析 Netty 的源码。