【Netty 专栏】Netty入门简介

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> 【Netty 专栏】Netty入门简介

前言

Netty是一个高性能、异步事件驱动的NIO框架,提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。

作为当前最流行的NIO框架,Netty在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,一些业界著名的开源组件也基于Netty构建,比如RPC框架、zookeeper等。

不熟悉NIO的可以先看看下面两篇文章

1、深入浅出NIO之Channel、Buffer

2、深入浅出NIO之Selector实现原理

那么,Netty性能为啥这么高?主要是因为其内部Reactor模型的实现。

Reactor模型

Netty中的Reactor模型主要由多路复用器(Acceptor)、事件分发器(Dispatcher)、事件处理器(Handler)组成,可以分为三种。

1、单线程模型:所有I/O操作都由一个线程完成,即多路复用、事件分发和处理都是在一个Reactor线程上完成的。

【Netty 专栏】Netty入门简介

对于一些小容量应用场景,可以使用单线程模型。但是对于高负载、大并发的应用却不合适,主要原因如下:

  • 一个线程同时处理成百上千的链路,性能上无法支撑,即便CPU负荷达到100%,也无法满足海量消息的编码、解码、读取和发送;
  • 当负载过重后,处理速度将变慢,这会导致大量客户端连接超时,超时之后往往会进行重发,最终会导致大量消息积压和处理超时,成为系统的性能瓶颈;
  • 一旦单线程意外跑飞,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障,可靠性不高。

2、多线程模型:为了解决单线程模型存在的一些问题,演化而来的Reactor线程模型。

【Netty 专栏】Netty入门简介

多线程模型的特点:

  • 有专门一个Acceptor线程用于监听服务端,接收客户端的TCP连接请求;
  • 网络IO的读写操作由一个NIO线程池负责,线程池可以采用标准的JDK线程池实现,包含一个任务队列和N个可用的线程,由这些NIO线程负责消息的读取、解码、编码和发送;
  • 一个NIO线程可以同时处理多条链路,但是一个链路只能对应一个NIO线程,防止发生并发操作问题。

在绝大多数场景下,Reactor多线程模型都可以满足性能需求;但是,在极特殊应用场景中,一个NIO线程负责监听和处理所有的客户端连接可能会存在性能问题。例如百万客户端并发连接,或者服务端需要对客户端的握手消息进行安全认证,认证本身非常损耗性能。在这类场景下,单独一个Acceptor线程可能会存在性能不足问题,为了解决性能问题,产生了第三种Reactor线程模型-主从Reactor多线程模型。

3、主从多线程模型:采用多个reactor,每个reactor都在自己单独的线程里执行。如果是多核,则可以同时响应多个客户端的请求,一旦链路建立成功就将链路注册到负责I/O读写的SubReactor线程池上。

【Netty 专栏】Netty入门简介

事实上,Netty的线程模型并非固定不变,在启动辅助类中创建不同的EventLoopGroup实例并通过适当的参数配置,就可以支持上述三种Reactor线程模型。正是因为Netty对Reactor线程模型的支持提供了灵活的定制能力,所以可以满足不同业务场景的性能需求。

示例代码

以下是server和client的示例代码,其中使用的是 Netty 4.x,先看看如何实现,后续会针对各个模块进行深入分析。

server 代码实现

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
public class EchoServer {    private final int port;    public EchoServer(int port) {        this.port = port;    }    public void run() throws Exception {        // Configure the server.        EventLoopGroup bossGroup = new NioEventLoopGroup();  // (1)        EventLoopGroup workerGroup = new NioEventLoopGroup();          try {            ServerBootstrap b = new ServerBootstrap(); // (2)            b.group(bossGroup, workerGroup)             .channel(NioServerSocketChannel.class) // (3)             .option(ChannelOption.SO_BACKLOG, 100)             .handler(new LoggingHandler(LogLevel.INFO))             .childHandler(new ChannelInitializerSocketChannel() { // (4)                 @Override                 public void initChannel(SocketChannel ch) throws Exception {                     ch.pipeline().addLast(                             //new LoggingHandler(LogLevel.INFO),                             new EchoServerHandler());                 }             });            // Start the server.            ChannelFuture f = b.bind(port).sync(); // (5)            // Wait until the server socket is closed.            f.channel().closeFuture().sync();        } finally {            // Shut down all event loops to terminate all threads.            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }    public static void main(String[] args) throws Exception {        int port;        if (args.length  0) {            port = Integer.parseInt(args[0]);        } else {            port = 8080;        }        new EchoServer(port).run();    } }

public class EchoServer {

   private final int port;

   public EchoServer(int port) {

       this.port = port;

   }

   public void run() throws Exception {

       // Configure the server.

       EventLoopGroup bossGroup = new NioEventLoopGroup();  // (1)

       EventLoopGroup workerGroup = new NioEventLoopGroup();  

       try {

           ServerBootstrap b = new ServerBootstrap(); // (2)

           b.group(bossGroup, workerGroup)

            .channel(NioServerSocketChannel.class) // (3)

            .option(ChannelOption.SO_BACKLOG, 100)

            .handler(new LoggingHandler(LogLevel.INFO))

            .childHandler(new ChannelInitializerSocketChannel() { // (4)

                @Override

                public void initChannel(SocketChannel ch) throws Exception {

                    ch.pipeline().addLast(

                            //new LoggingHandler(LogLevel.INFO),

                            new EchoServerHandler());

                }

            });

           // Start the server.

           ChannelFuture f = b.bind(port).sync(); // (5)

           // Wait until the server socket is closed.

           f.channel().closeFuture().sync();

       } finally {

           // Shut down all event loops to terminate all threads.

           bossGroup.shutdownGracefully();

           workerGroup.shutdownGracefully();

       }

   }

   public static void main(String[] args) throws Exception {

       int port;

       if (args.length 0) {

           port = Integer.parseInt(args[0]);

       } else {

           port = 8080;

       }

       new EchoServer(port).run();

   }

}

EchoServerHandler 实现

1234567891011121314151617181920212223242526272829303132333435
public class EchoServerHandler extends ChannelInboundHandlerAdapter {      private static final Logger logger = Logger.getLogger(              EchoServerHandler.class.getName());      @Override      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {          ctx.write(msg);      }      @Override      public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {          ctx.flush();      }      @Override      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {          // Close the connection when an exception is raised.          logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);          ctx.close();      }   }

public class EchoServerHandler extends ChannelInboundHandlerAdapter {  

   private static final Logger logger = Logger.getLogger(  

           EchoServerHandler.class.getName());  

   @Override  

   public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  

       ctx.write(msg);  

   }  

   @Override  

   public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {  

       ctx.flush();  

   }  

   @Override  

   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {  

       // Close the connection when an exception is raised.  

       logger.log(Level.WARNING, “Unexpected exception from downstream.”, cause);  

       ctx.close();  

   }  

}

1、NioEventLoopGroup 是用来处理I/O操作的线程池,Netty对 EventLoopGroup 接口针对不同的传输协议提供了不同的实现。在本例子中,需要实例化两个NioEventLoopGroup,通常第一个称为“boss”,用来accept客户端连接,另一个称为“worker”,处理客户端数据的读写操作。

2、ServerBootstrap 是启动服务的辅助类,有关socket的参数可以通过ServerBootstrap进行设置。

3、这里指定NioServerSocketChannel类初始化channel用来接受客户端请求。

4、通常会为新SocketChannel通过添加一些handler,来设置ChannelPipeline。ChannelInitializer 是一个特殊的handler,其中initChannel方法可以为SocketChannel 的pipeline添加指定handler。

5、通过绑定端口8080,就可以对外提供服务了。

client 代码实现

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
public class EchoClient {          private final String host;       private final int port;       private final int firstMessageSize;          public EchoClient(String host, int port, int firstMessageSize) {           this.host = host;           this.port = port;           this.firstMessageSize = firstMessageSize;       }          public void run() throws Exception {           // Configure the client.           EventLoopGroup group = new NioEventLoopGroup();           try {               Bootstrap b = new Bootstrap();               b.group(group)                .channel(NioSocketChannel.class)                .option(ChannelOption.TCP_NODELAY, true)                .handler(new ChannelInitializerSocketChannel() {                    @Override                    public void initChannel(SocketChannel ch) throws Exception {                        ch.pipeline().addLast(                                //new LoggingHandler(LogLevel.INFO),                                new EchoClientHandler(firstMessageSize));                    }                });                  // Start the client.               ChannelFuture f = b.connect(host, port).sync();                  // Wait until the connection is closed.               f.channel().closeFuture().sync();           } finally {               // Shut down the event loop to terminate all threads.               group.shutdownGracefully();           }       }          public static void main(String[] args) throws Exception {           final String host = args[0];           final int port = Integer.parseInt(args[1]);           final int firstMessageSize;           if (args.length == 3) {               firstMessageSize = Integer.parseInt(args[2]);           } else {               firstMessageSize = 256;           }              new EchoClient(host, port, firstMessageSize).run();       }   }

public class EchoClient {  

  

    private final String host;  

    private final int port;  

    private final int firstMessageSize;  

  

    public EchoClient(String host, int port, int firstMessageSize) {  

        this.host = host;  

        this.port = port;  

        this.firstMessageSize = firstMessageSize;  

    }  

  

    public void run() throws Exception {  

        // Configure the client.  

        EventLoopGroup group = new NioEventLoopGroup();  

        try {  

            Bootstrap b = new Bootstrap();  

            b.group(group)  

             .channel(NioSocketChannel.class)  

             .option(ChannelOption.TCP_NODELAY, true)  

             .handler(new ChannelInitializerSocketChannel() {  

                 @Override  

                 public void initChannel(SocketChannel ch) throws Exception {  

                     ch.pipeline().addLast(  

                             //new LoggingHandler(LogLevel.INFO),  

                             new EchoClientHandler(firstMessageSize));  

                 }  

             });  

  

            // Start the client.  

            ChannelFuture f = b.connect(host, port).sync();  

  

            // Wait until the connection is closed.  

            f.channel().closeFuture().sync();  

        } finally {  

            // Shut down the event loop to terminate all threads.  

            group.shutdownGracefully();  

        }  

    }  

  

    public static void main(String[] args) throws Exception {  

        final String host = args[0];  

        final int port = Integer.parseInt(args[1]);  

        final int firstMessageSize;  

        if (args.length == 3) {  

            firstMessageSize = Integer.parseInt(args[2]);  

        } else {  

            firstMessageSize = 256;  

        }  

  

        new EchoClient(host, port, firstMessageSize).run();  

    }  

}

EchoClientHandler 实现

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
public class EchoClientHandler extends ChannelInboundHandlerAdapter {          private static final Logger logger = Logger.getLogger(               EchoClientHandler.class.getName());          private final ByteBuf firstMessage;          /**       * Creates a client-side handler.       */       public EchoClientHandler(int firstMessageSize) {           if (firstMessageSize = 0) {               throw new IllegalArgumentException("firstMessageSize: " + firstMessageSize);           }           firstMessage = Unpooled.buffer(firstMessageSize);           for (int i = 0; i  firstMessage.capacity(); i ++) {               firstMessage.writeByte((byte) i);           }       }          @Override       public void channelActive(ChannelHandlerContext ctx) {           ctx.writeAndFlush(firstMessage);       }          @Override       public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {           ctx.write(msg);       }          @Override       public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {          ctx.flush();       }          @Override       public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {           // Close the connection when an exception is raised.           logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);           ctx.close();       }

public class EchoClientHandler extends ChannelInboundHandlerAdapter {  

  

    private static final Logger logger = Logger.getLogger(  

            EchoClientHandler.class.getName());  

  

    private final ByteBuf firstMessage;  

  

    /** 

     * Creates a client-side handler. 

     */  

    public EchoClientHandler(int firstMessageSize) {  

        if (firstMessageSize = 0) {  

            throw new IllegalArgumentException(“firstMessageSize: “ + firstMessageSize);  

        }  

        firstMessage = Unpooled.buffer(firstMessageSize);  

        for (int i = 0; i  firstMessage.capacity(); i ++) {  

            firstMessage.writeByte((byte) i);  

        }  

    }  

  

    @Override  

    public void channelActive(ChannelHandlerContext ctx) {  

        ctx.writeAndFlush(firstMessage);  

    }  

  

    @Override  

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  

        ctx.write(msg);  

    }  

  

    @Override  

    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {  

       ctx.flush();  

    }  

  

    @Override  

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {  

        // Close the connection when an exception is raised.  

        logger.log(Level.WARNING, “Unexpected exception from downstream.”, cause);  

        ctx.close();  

    }

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> 【Netty 专栏】Netty入门简介


 上一篇
【Netty 专栏】Netty源码分析之服务启动 【Netty 专栏】Netty源码分析之服务启动
Netty是基于Nio实现的,所以也离不开selector、serverSocketChannel、socketChannel和selectKey等,只不过Netty把这些实现都封装在了底层。 从示例可以看出,一切从ServerBootst
2021-04-05
下一篇 
精尽 Netty 原理与源码专栏( 已经完成 61+ 篇,预计总共 70+ 篇 ) 精尽 Netty 原理与源码专栏( 已经完成 61+ 篇,预计总共 70+ 篇 )
**只更新在笔者的知识星球,欢迎加入一起讨论 Netty 源码与实现**。 目前已经有 **1000+** 位球友加入… 进度:已经完成 **60+** 篇,预计总共 70+ 篇,完成度 **90%** 。 目前已经有 1
2021-04-05