【Netty 专栏】Netty源码分析之ChannelPipeline

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> 【Netty 专栏】Netty源码分析之ChannelPipeline

点击上方“芋道源码”,选择“置顶公众号”

技术文章第一时间送达!

本章节分析Netty中的ChannelPipeline模块。

每个channel内部都会持有一个ChannelPipeline对象pipeline.
pipeline默认实现DefaultChannelPipeline内部维护了一个DefaultChannelHandlerContext链表。

ChannelPipeline

当channel完成register、active、read等操作时,会触发pipeline的相应方法。
1、当channel注册到selector时,触发pipeline的fireChannelRegistered方法。
2、当channel的socket绑定完成时,触发pipeline的fireChannelActive方法。
3、当有客户端请求时,触发pipeline的fireChannelRead方法。
4、当本次客户端请求,pipeline执行完fireChannelRead,触发pipeline的fireChannelReadComplete方法。

接下去看看pipeline是如何组织并运行handler对应的方法。

DefaultChannelPipeline

其中DefaultChannelHandlerContext保存了当前handler的上下文,如channel、pipeline等信息,默认实现了head和tail。


class DefaultChannelPipeline implements ChannelPipeline {
    final Channel channel; // pipeline所属的channel
    //head和tail都是handler上下文
    final DefaultChannelHandlerContext head;
    final DefaultChannelHandlerContext tail;
    ...
    public DefaultChannelPipeline(AbstractChannel channel) {
        if (channel == null) {
            throw new NullPointerException("channel");
        }
        this.channel = channel;
        tail = new TailContext(this);
        head = new HeadContext(this);
        head.next = tail;
        tail.prev = head;
    }  
}

1、TailContext实现了ChannelOutboundHandler接口。
2、HeadContext实现了ChannelInboundHandler接口。
3、head和tail形成了一个链表。

对于Inbound的操作,当channel注册到selector时,触发pipeline的fireChannelRegistered,从head开始遍历,找到实现了ChannelInboundHandler接口的handler,并执行其fireChannelRegistered方法。


@Override
public ChannelPipeline fireChannelRegistered() {
    head.fireChannelRegistered();
    return this;
}
@Override
public ChannelHandlerContext fireChannelRegistered() {
    final DefaultChannelHandlerContext next = findContextInbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRegistered();
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRegistered();
            }
        });
    }
    return this;
}
private DefaultChannelHandlerContext findContextInbound() {
    DefaultChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
    } while (!(ctx.handler() instanceof ChannelInboundHandler));
    return ctx;
}
private void invokeChannelRegistered() {
    try {
        ((ChannelInboundHandler) handler()).channelRegistered(this);
    } catch (Throwable t) {
        notifyHandlerException(t);
    }
}

假如我们通过pipeline的addLast方法添加一个inboundHandler实现。


public class ClientHandler extends ChannelInboundHandlerAdapter {
    @Override  
    public void channelRegistered(ChannelHandlerContext ctx)  
            throws Exception {  
        super.channelRegistered(ctx);  
        System.out.println(" ClientHandler  registered channel ");  
    }  
}  

当channel注册完成时会触发pipeline的channelRegistered方法,从head开始遍历,找到ClientHandler,并执行channelRegistered方法。

对于Outbound的操作,则从tail向前遍历,找到实现ChannelOutboundHandler接口的handler,具体实现和Inbound一样。

服务启动过程中,ServerBootstrap在init方法中,会给ServerSocketChannel的pipeline添加ChannelInitializer对象,其中ChannelInitializer继承ChannelInboundHandlerAdapter,并实现了ChannelInboundHandler接口,所以当ServerSocketChannel注册到selector之后,会触发其channelRegistered方法。


public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    initChannel((C) ctx.channel());
    ctx.pipeline().remove(this);
    ctx.fireChannelRegistered();
}
public void initChannel(Channel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();
    ChannelHandler handler = handler();
    if (handler != null) {
        pipeline.addLast(handler);
    }
    pipeline.addLast(new ServerBootstrapAcceptor(
            currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}

在initChannel实现中,添加ServerBootstrapAcceptor实例到pipeline中。

ServerBootstrapAcceptor继承自ChannelInboundHandlerAdapter,负责把接收到的客户端socketChannel注册到childGroup中,由childGroup中的eventLoop负责数据处理。


public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;
    child.pipeline().addLast(childHandler);
    for (EntryChannelOption?, Object e: childOptions) {
        try {
            if (!child.config().setOption((ChannelOptionObject) e.getKey(), e.getValue())) {
                logger.warn("Unknown channel option: " + e);
            }
        } catch (Throwable t) {
            logger.warn("Failed to set a channel option: " + child, t);
        }
    }
    for (EntryAttributeKey?, Object e: childAttrs) {
        child.attr((AttributeKeyObject) e.getKey()).set(e.getValue());
    }
    try {
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

 

【Netty 专栏】Netty源码分析之ChannelPipeline

目前在知识星球(https://t.zsxq.com/2VbiaEu)更新了如下 Dubbo 源码解析如下:

01. 调试环境搭建

02. 项目结构一览

03. API 配置(一)之应用

04. API 配置(二)之服务提供者

05. API 配置(三)之服务消费者

06. 属性配置

07. XML 配置

08. 核心流程一览

09. 拓展机制 SPI

  1. 线程池

一共 60 篇++

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> 【Netty 专栏】Netty源码分析之ChannelPipeline


 上一篇
【Netty 专栏】Netty 源码分析之 accept 过程 【Netty 专栏】Netty 源码分析之 accept 过程
点击上方“芋道源码”,选择“置顶公众号” 技术文章第一时间送达! 摘要: 原创出处 https://www.jianshu.com/p/ffc6fd82e32b 「占小狼」欢迎转载,保留摘要,谢谢! 本章节分析服务端如何accept客户
2021-04-05
下一篇 
【Netty 专栏】Netty源码分析之NioEventLoop 【Netty 专栏】Netty源码分析之NioEventLoop
点击上方“芋道源码”,选择“置顶公众号” 技术文章第一时间送达! 上一章节中,我们分析了Netty服务的启动过程,本章节分析Netty的NioEventLoop是如工作的。 NioEventLoop中维护了一个线程,线程启动时会调用NioE
2021-04-05