真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Netty服務(wù)端啟動源碼是什么

這篇文章主要介紹“Netty服務(wù)端啟動源碼是什么”,在日常操作中,相信很多人在Netty服務(wù)端啟動源碼是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Netty服務(wù)端啟動源碼是什么”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

創(chuàng)新互聯(lián)建站從2013年成立,先為石河子等服務(wù)建站,石河子等地企業(yè),進(jìn)行企業(yè)商務(wù)咨詢服務(wù)。為石河子企業(yè)網(wǎng)站制作PC+手機(jī)+微官網(wǎng)三網(wǎng)同步一站式服務(wù)解決您的所有建站問題。

一、從EchoServer示例入手

Netty服務(wù)端啟動源碼是什么

示例從哪里來?任何開源框架都會有自己的示例代碼,Netty源碼也不例外,如模塊netty-example中就包括了最常見的EchoServer示例,下面通過這個示例進(jìn)入服務(wù)端啟動流程篇章。

public final class EchoServer {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // Configure SSL.
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }

        // 1. 聲明Main-Sub Reactor模式線程池:EventLoopGroup
        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        // 創(chuàng)建 EchoServerHandler 對象
        final EchoServerHandler serverHandler = new EchoServerHandler();
        try {
            // 2. 聲明服務(wù)端啟動引導(dǎo)器,并設(shè)置相關(guān)屬性
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc()));
                     }
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(serverHandler);
                 }
             });

            // 3. 綁定端口即啟動服務(wù)端,并同步等待
            // Start the server.
            ChannelFuture f = b.bind(PORT).sync();

            // 4. 監(jiān)聽服務(wù)端關(guān)閉,并阻塞等待
            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // 5. 優(yōu)雅地關(guān)閉兩個EventLoopGroup線程池 
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
  1. [代碼行18、19]聲明Main-Sub Reactor模式線程池:EventLoopGroup

創(chuàng)建兩個 EventLoopGroup 對象。其中,bossGroup用于服務(wù)端接受客戶端的連接,workerGroup用于進(jìn)行客戶端的 SocketChannel 的數(shù)據(jù)讀寫。

(關(guān)于EventLoopGroup不是本文重點(diǎn)所以在后續(xù)文章中進(jìn)行分析)

  1. [代碼行23-39]聲明服務(wù)端啟動引導(dǎo)器,并設(shè)置相關(guān)屬性

AbstractBootstrap是一個幫助類,通過方法鏈(method chaining)的方式,提供了一個簡單易用的方式來配置啟動一個Channel。io.netty.bootstrap.ServerBootstrap ,實(shí)現(xiàn) AbstractBootstrap 抽象類,用于 Server 的啟動器實(shí)現(xiàn)類。io.netty.bootstrap.Bootstrap ,實(shí)現(xiàn) AbstractBootstrap 抽象類,用于 Client 的啟動器實(shí)現(xiàn)類。如下類圖所示:

![AbstractBootstrap類繼承.png](//p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/d877706356d4427a9afd4b13d7177142~tplv-k3u1fbpfcp-zoom-1.image)

(EchoServer示例代碼中,我們看到 ServerBootstrap groupchannel、option、childHandler 等屬性鏈?zhǔn)皆O(shè)置都放到關(guān)于AbstractBootstrap體系代碼中詳細(xì)介紹。)

  1. [代碼行43]綁定端口即啟動服務(wù)端,并同步等待

先調(diào)用 #bind(int port) 方法,綁定端口,后調(diào)用 ChannelFuture#sync() 方法,阻塞等待成功。對于bind操作就是本文要詳細(xì)介紹的"服務(wù)端啟動流程"。

  1. [代碼行47]監(jiān)聽服務(wù)端關(guān)閉,并阻塞等待

先調(diào)用 #closeFuture() 方法,監(jiān)聽服務(wù)器關(guān)閉,后調(diào)用 ChannelFuture#sync() 方法,阻塞等待成功。 注意,此處不是關(guān)閉服務(wù)器,而是channel的監(jiān)聽關(guān)閉。

  1. [代碼行51、52]優(yōu)雅地關(guān)閉兩個EventLoopGroup線程池

finally代碼塊中執(zhí)行說明服務(wù)端將最終關(guān)閉,所以調(diào)用 EventLoopGroup#shutdownGracefully() 方法,分別關(guān)閉兩個EventLoopGroup對象,終止所有線程。

二、服務(wù)啟動過程

在服務(wù)啟動過程的源碼分析之前,這里回顧一下我們在通過JDK NIO編程在服務(wù)端啟動初始的代碼:

 serverSocketChannel = ServerSocketChannel.open();
 serverSocketChannel.configureBlocking(false);
 serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024);
 selector = Selector.open();
 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

這5行代碼標(biāo)示一個最為熟悉的過程:

  • 打開serverSocketChannel

  • 配置非阻塞模式

  • channelsocket綁定監(jiān)聽端口

  • 創(chuàng)建Selector

  • serverSocketChannel注冊到 selector

后面等分析完Netty的啟動過程后,會對這些步驟有一個新的認(rèn)識。在EchoServer示例中,進(jìn)入 #bind(int port) 方法,AbstractBootstrap#bind()其實(shí)有多個方法,方便不同地址參數(shù)的傳遞,實(shí)際調(diào)用的方法是AbstractBootstrap#doBind(final SocketAddress localAddress) 方法,代碼如下:

private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
}
  • [代碼行2] :調(diào)用 #initAndRegister() 方法,初始化并注冊一個 Channel 對象。因?yàn)樽允钱惒降倪^程,所以返回一個 ChannelFuture 對象。詳細(xì)解析,見 「initAndRegister()」。

  • [代碼行4-6]]:若發(fā)生異常,直接進(jìn)行返回。

  • [代碼行9-34]:因?yàn)樽允钱惒降倪^程,有可能已完成,有可能未完成。所以實(shí)現(xiàn)代碼分成了【第 10 至 14 行】和【第 15 至 36 行】分別處理已完成和未完成的情況。

    • 核心在[第 11 、29行],調(diào)用 #doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) 方法,綁定 Channel 的端口,并注冊 Channel 到 SelectionKey 中。

    • 如果異步注冊對應(yīng)的 ChanelFuture 未完成,則調(diào)用 ChannelFuture#addListener(ChannelFutureListener) 方法,添加監(jiān)聽器,在注冊完成后,進(jìn)行回調(diào)執(zhí)行 #doBind0(...) 方法的邏輯。

通過doBind方法可以知道服務(wù)端啟動流程大致如下幾個步驟:

Netty服務(wù)端啟動源碼是什么

1. 創(chuàng)建Channel

Netty服務(wù)端啟動源碼是什么

#doBind(final SocketAddress localAddress)進(jìn)入到initAndRegister():

 final ChannelFuture initAndRegister() {
     Channel channel = null;
     try {
         channel = channelFactory.newChannel();
         init(channel);
     } catch (Throwable t) {
         if (channel != null) {
             // channel can be null if newChannel crashed (eg SocketException("too many open files"))
             channel.unsafe().closeForcibly();
             // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
             return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
         }
         // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
         return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
     }

     ChannelFuture regFuture = config().group().register(channel);
     if (regFuture.cause() != null) {
         if (channel.isRegistered()) {
             channel.close();
         } else {
             channel.unsafe().closeForcibly();
         }
     }

     return regFuture;
}

[代碼行4]調(diào)用 ChannelFactory#newChannel() 方法,創(chuàng)建Channel對象。 ChannelFactory類繼承如下:

Netty服務(wù)端啟動源碼是什么

可以在ChannelFactory注釋看到@deprecated Use {@link io.netty.channel.ChannelFactory} instead.,這里只是包名的調(diào)整,對于繼承結(jié)構(gòu)不變。netty默認(rèn)使用ReflectiveChannelFactory,我們可以看到重載方法:

@Override
public T newChannel() {
    try {
        return constructor.newInstance();
    } catch (Throwable t) {
        throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
    }
}

很明顯,正如其名是通過反射機(jī)制構(gòu)造Channel對象實(shí)例的。constructor是在其構(gòu)造方法初始化的:this.constructor = clazz.getConstructor();這個clazz按理說應(yīng)該是我們要創(chuàng)建的Channel的Class對象。那Class對象是什么呢?我們接著看channelFactory是怎么初始化的。

首先在AbstractBootstrap找到如下代碼:

@Deprecated
public B channelFactory(ChannelFactory channelFactory) {
    ObjectUtil.checkNotNull(channelFactory, "channelFactory");
    if (this.channelFactory != null) {
        throw new IllegalStateException("channelFactory set already");
    }

    this.channelFactory = channelFactory;
    return self();
}

調(diào)用這個方法的遞推向上看到:

public B channel(Class channelClass) {
    return channelFactory(new ReflectiveChannelFactory(
        ObjectUtil.checkNotNull(channelClass, "channelClass")
    ));
}

這個方法正是在EchoServerServerBootstrap鏈?zhǔn)皆O(shè)置時調(diào)用.channel(NioServerSocketChannel.class)的方法。我們看到,channelClass就是NioServerSocketChannel.classchannelFactory也是以ReflectiveChannelFactory作為具體實(shí)例,并且將NioServerSocketChannel.class作為構(gòu)造參數(shù)傳遞初始化的,所以這回答了反射機(jī)制構(gòu)造的是io.netty.channel.socket.nio.NioServerSocketChannel對象。

繼續(xù)看NioServerSocketChannel構(gòu)造方法邏輯做了什么事情,看之前先給出NioServerSocketChannel類繼承關(guān)系:

Netty服務(wù)端啟動源碼是什么

NioServerSocketChannelNioSocketChannel分別對應(yīng)服務(wù)端和客戶端,公共父類都是AbstractNioChannelAbstractChannel,下面介紹創(chuàng)建過程可以參照這個Channel類繼承圖。進(jìn)入NioServerSocketChannel構(gòu)造方法:

/**
  * Create a new instance
  */
public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

點(diǎn)擊newSocket進(jìn)去:

private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
        /**
          *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
          *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
          *
          *  See #2308.
          */
        return provider.openServerSocketChannel();
    } catch (IOException e) {
        throw new ChannelException(
            "Failed to open a server socket.", e);
    }
}

以上傳進(jìn)來的providerDEFAULT_SELECTOR_PROVIDER即默認(rèn)的java.nio.channels.spi.SelectorProvider,[代碼行9]就是熟悉的jdk nio創(chuàng)建ServerSocketChannel。這樣newSocket(DEFAULT_SELECTOR_PROVIDER)就返回了結(jié)果ServerSocketChannel,回到NioServerSocketChannel()#this()點(diǎn)進(jìn)去:

/**
  * Create a new instance using the given {@link ServerSocketChannel}.
  */
public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

以上super代表父類AbstractNioMessageChannel構(gòu)造方法,點(diǎn)進(jìn)去看到:

 /**
   * @see AbstractNioChannel#AbstractNioChannel(Channel, SelectableChannel, int)
   */
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent, ch, readInterestOp);
}

以上super代表父類AbstractNioChannel構(gòu)造方法,點(diǎn)進(jìn)去看到:

 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
     super(parent);
     this.ch = ch;
     this.readInterestOp = readInterestOp;
     try {
         ch.configureBlocking(false);
     } catch (IOException e) {
         try {
             ch.close();
         } catch (IOException e2) {
             if (logger.isWarnEnabled()) {
                 logger.warn("Failed to close a partially initialized socket.", e2);
             }
         }

         throw new ChannelException("Failed to enter non-blocking mode.", e);
     }
}

以上[代碼行3]將ServerSocketChannel保存到了AbstractNioChannel#ch成員變量,在上面提到的NioServerSocketChannel構(gòu)造方法的[代碼行6]javaChannel()拿到的就是ch保存的ServerSocketChannel變量。

以上[代碼行6]就是熟悉的jdk nio編程設(shè)置ServerSocketChannel非阻塞方式。這里還有super父類構(gòu)造方法,點(diǎn)擊進(jìn)去看到:

 protected AbstractChannel(Channel parent) {
     this.parent = parent;
     id = newId();
     unsafe = newUnsafe();
     pipeline = newChannelPipeline();
}

以上構(gòu)造方法中:

  • parent 屬性,代表父 Channel 對象。對于NioServerSocketChannelparentnull。

  • id 屬性,Channel 編號對象。在構(gòu)造方法中,通過調(diào)用 #newId() 方法進(jìn)行創(chuàng)建。(這里不細(xì)展開Problem-1

  • unsafe 屬性,Unsafe 對象。因?yàn)?code>Channel 真正的具體操作,是通過調(diào)用對應(yīng)的 Unsafe 對象實(shí)施。所以需要在構(gòu)造方法中,通過調(diào)用 #newUnsafe() 方法進(jìn)行創(chuàng)建。這里的 Unsafe 并不是我們常說的 jdk自帶的sun.misc.Unsafe ,而是 io.netty.channel.Channel#Unsafe。(這里不細(xì)展開Problem-2

  • pipeline屬性默認(rèn)是DefaultChannelPipeline對象,賦值后在后面為channel綁定端口的時候會用到

通過以上創(chuàng)建channel源碼過程分析,總結(jié)的流程時序圖如下:

Netty服務(wù)端啟動源碼是什么

2. 初始化Channel

Netty服務(wù)端啟動源碼是什么

回到一開始創(chuàng)建ChannelinitAndRegister()入口方法,在創(chuàng)建Channel后緊接著init(channel)進(jìn)入初始化流程,因?yàn)槭欠?wù)端初始化,所以是ServerBootstrap#init(Channel channel),代碼如下:

@Override
void init(Channel channel) throws Exception {
    final Map, Object> options = options0();
    synchronized (options) {
        setChannelOptions(channel, options, logger);
    }

    final Map, Object> attrs = attrs0();
    synchronized (attrs) {
        for (Entry, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey key = (AttributeKey) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }

    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry, Object>[] currentChildOptions;
    final Entry, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
    }

    p.addLast(new ChannelInitializer() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}
  • [代碼 3 - 6 行]: options0()方法返回的options保存了用戶在EchoServer中設(shè)置自定義的可選項(xiàng)集合,這樣ServerBootstrap將配置的選項(xiàng)集合,設(shè)置到了 Channel 的可選項(xiàng)集合中。

  • [代碼 8 - 15 行]: attrs0()方法返回的attrs保存了用戶在EchoServer中設(shè)置自定義的屬性集合,這樣ServerBootstrap將配置的屬性集合,設(shè)置到了 Channel 的屬性集合中。

  • [代碼21-28行]:通過局部變量currentChildOptionscurrentChildAttrs保存了用戶自定義的childOptionschildAttrs,用于[代碼43行] ServerBootstrapAcceptor 構(gòu)造方法。

  • [代碼30-47]]:創(chuàng)建ChannelInitializer 對象,添加到 pipeline 中,用于后續(xù)初始化 ChannelHandler pipeline 中,包括用戶在EchoServer配置的LoggingHandler和創(chuàng)建的創(chuàng)建 ServerBootstrapAcceptor 對象。

    • [代碼行34-37]:添加啟動器配置的 LoggingHandlerpipeline 中。

    • [代碼行39-45]:創(chuàng)建 ServerBootstrapAcceptor 對象,添加到 pipeline 中。從名字上就可以看出來,ServerBootstrapAcceptor 也是一個 ChannelHandler 實(shí)現(xiàn)類,專門用于接受客戶端的新連接請求,把新的請求扔給某個事件循環(huán)器,我們先不做過多分析。我們發(fā)現(xiàn)是使用EventLoop.execute 執(zhí)行添加的過程,這是為什么呢?同樣記錄問題(Problem-3)

    • 需要說明的是pipeline 在之前介紹Netty核心組件的時候提到是一個包含ChannelHandlerContext的雙向鏈表,每一個context對于唯一一個ChannelHandler,這里初始化后,ChannelPipeline里就是如下一個結(jié)構(gòu):

      Netty服務(wù)端啟動源碼是什么

3. 注冊Channel

Netty服務(wù)端啟動源碼是什么

初始化Channel一些基本配置和屬性完畢后,回到一開始創(chuàng)建ChannelinitAndRegister()入口方法,在初始化Channel后緊接著[代碼行17] ChannelFuture regFuture = config().group().register(channel);明顯這里是通過EventLoopGroup進(jìn)入注冊流程(EventLoopGroup體系將在后續(xù)文章講解)

EchoServer中啟動器同樣通過ServerBootstrap#group()設(shè)置了NioEventLoopGroup,它繼承自MultithreadEventLoopGroup,所以注冊流程會進(jìn)入MultithreadEventLoopGroup重載的register(Channel channel)方法,代碼如下:

@Override
public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

這里會調(diào)用 next() 方法選擇出來一個 EventLoop 來注冊 Channel,里面實(shí)際上使用的是一個叫做 EventExecutorChooser 的東西來選擇,它實(shí)際上又有兩種實(shí)現(xiàn)方式 ——PowerOfTwoEventExecutorChooserGenericEventExecutorChooser,本質(zhì)上就是從 EventExecutor 數(shù)組中選擇一個 EventExecutor,我們這里就是 NioEventLoop,那么,它們有什么區(qū)別呢?(Problem-4:在介紹EventLoopGroup體系的后續(xù)文章中將會詳細(xì)講解,這里簡單地提一下,本質(zhì)都是按數(shù)組長度取余數(shù) ,不過,2 的 N 次方的形式更高效。

接著,來到 NioEventLoopregister(channel) 方法,你會不會問找不到該方法?提示NioEventLoop 繼承SingleThreadEventLoop,所以父類方法:

@Override
public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}

@Override
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}

可以看到,先創(chuàng)建了一個叫做 ChannelPromise 的東西,它是 ChannelFuture 的子類。[代碼行9]又調(diào)回了 ChannelUnsaferegister () 方法,這里第一個參數(shù)是 this,也就是 NioEventLoop,第二個參數(shù)是剛創(chuàng)建的 ChannelPromise

點(diǎn)擊 AbstractUnsafe#register(EventLoop eventLoop, final ChannelPromise promise) 方法進(jìn)去,代碼如下:

 public final void register(EventLoop eventLoop, final ChannelPromise promise) {
     if (eventLoop == null) {
         throw new NullPointerException("eventLoop");
     }
     if (isRegistered()) {
         promise.setFailure(new IllegalStateException("registered to an event loop already"));
         return;
     }
     if (!isCompatible(eventLoop)) {
         promise.setFailure(
             new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
         return;
     }

     AbstractChannel.this.eventLoop = eventLoop;

     if (eventLoop.inEventLoop()) {
         register0(promise);
     } else {
         try {
             eventLoop.execute(new Runnable() {
                 @Override
                 public void run() {
                     register0(promise);
                 }
             });
         } catch (Throwable t) {
             logger.warn(
                 "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                 AbstractChannel.this, t);
             closeForcibly();
             closeFuture.setClosed();
             safeSetFailure(promise, t);
         }
     }
}

[代碼行15]這行代碼是設(shè)置 ChanneleventLoop 屬性。這行前面的代碼主要是在校驗(yàn)傳入的 eventLoop 參數(shù)非空,校驗(yàn)是否有注冊過以及校驗(yàn) ChanneleventLoop 類型是否匹配。

[代碼18、24]接著,跟蹤到 AbstractUnsafe#register0(ChannelPromise promise) 方法中:

private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        doRegister();
        neverRegistered = false;
        registered = true;

        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
        // user may already fire events through the pipeline in the ChannelFutureListener.
        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                // This channel was registered before and autoRead() is set. This means we need to begin read
                // again so that we process inbound data.
                //
                // See https://github.com/netty/netty/issues/4805
                beginRead();
            }
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

[代碼行9]進(jìn)入 AbstractNioChannel#doRegister() 方法:

protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}

[代碼行5]關(guān)鍵一行代碼,將 Java 原生NIO Selector與 Java 原生 NIOChannel 對象(ServerSocketChannel) 綁定在一起,并將當(dāng)前 Netty 的Channel通過 attachment 的形式綁定到 SelectionKey 上:

  • 調(diào)用 #unwrappedSelector() 方法,返回 Java 原生 NIO Selector 對象,而且每個NioEventLoopSelector唯一一對應(yīng)。

  • 調(diào)用 SelectableChannel#register(Selector sel, int ops, Object att) 方法,注冊 Java 原生NIOChannel 對象到 NIO Selector對象上。

通過以上注冊channel源碼分析,總結(jié)流程的時序圖如下:

Netty服務(wù)端啟動源碼是什么

4. 綁定端口

Netty服務(wù)端啟動源碼是什么

注冊完Channel最后回到AbstractBootstrap#doBind() 方法,分析 Channel 的端口綁定邏輯。進(jìn)入doBind0代碼如下:

private static void doBind0(
    final ChannelFuture regFuture, final Channel channel,
    final SocketAddress localAddress, final ChannelPromise promise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}
  • [代碼行7]:在前面Channel 注冊成功的條件下,調(diào)用 EventLoop執(zhí)行 Channel 的端口綁定邏輯。但是,實(shí)際上當(dāng)前線程已經(jīng)是 EventLoop所在的線程了,為何還要這樣操作呢?答案在【第 5 至 6 行】的英語注釋,這里作為一個問題記著(Problem-5)。

  • [代碼行11]:進(jìn)入AbstractChannel#bind(SocketAddress localAddress, ChannelPromise promise),同樣立即異步返回并添加ChannelFutureListener.CLOSE_ON_FAILURE監(jiān)聽事件。

  • [代碼行13]:如果綁定端口之前的操作并沒有成功,自然也就不能進(jìn)行端口綁定操作了,通過promise記錄異常原因。

AbstractChannel#bind(SocketAddress localAddress, ChannelPromise promise)方法如下:

 public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return pipeline.bind(localAddress, promise);
 }

pipeline是之前創(chuàng)建channel的時候創(chuàng)建的DefaultChannelPipeline,進(jìn)入該方法:

 public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return tail.bind(localAddress, promise);
 }

[在分析初始化流程的時候最后畫一個DefaultChannelPipeline內(nèi)部的結(jié)構(gòu),能夠便于分析后面進(jìn)入DefaultChannelPipeline一系列bind方法。]

首先,tail代表TailContext,進(jìn)入AbstractChannelHandlerContext# bind(final SocketAddress localAddress, final ChannelPromise promise)方法:

 public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
     //省略部分代碼
     final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
     EventExecutor executor = next.executor();
     if (executor.inEventLoop()) {
         next.invokeBind(localAddress, promise);
     } else {
         safeExecute(executor, new Runnable() {
             @Override
             public void run() {
                 next.invokeBind(localAddress, promise);
             }
         }, promise, null);
     }
     return promise;
}

[代碼行3]:findContextOutbound方法里主要是執(zhí)行ctx = ctx.prev;那么得到的next就是綁定LoggingHandlercontext

[代碼行6]:進(jìn)入invokeBind(localAddress, promise)方法并直接執(zhí)行LoggingHandler#bind(this, localAddress, promise),進(jìn)入后的方法如下:

 public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
     if (logger.isEnabled(internalLevel)) {
         logger.log(internalLevel, format(ctx, "BIND", localAddress));
     }
     ctx.bind(localAddress, promise);
 }

設(shè)置了LoggingHandler的日志基本級別為默認(rèn)的INFO后,進(jìn)行綁定操作的信息打印。接著,繼續(xù)循環(huán)到AbstractChannelHandlerContext# bind(final SocketAddress localAddress, final ChannelPromise promise)方法執(zhí)行ctx = ctx.prev取出HeadContext進(jìn)入到bind方法:

 public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
     unsafe.bind(localAddress, promise);
 }

兜兜轉(zhuǎn)轉(zhuǎn),最終跳出了pipeline輪回到AbstractUnsafe#bind(final SocketAddress localAddress, final ChannelPromise promise) 方法,Channel 的端口綁定邏輯。代碼如下:

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    //此處有省略...
    boolean wasActive = isActive();
    try {
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }
    //此處有省略...
}

做實(shí)事方法doBind進(jìn)入后如下:

@Override
protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

到了此處,服務(wù)端的 Java 原生 NIO ServerSocketChannel 終于綁定上了端口。

三、問題歸納

  • Problem-1: 創(chuàng)建Channel流程中AbstractChannel構(gòu)造函數(shù)中為channel分配ID的算法如何實(shí)現(xiàn)?

  • Problem-2: AbstractChannel內(nèi)部類AbstractUnsafe的作用?

  • Problem-3: 初始化channel流程中pipeline 添加ServerBootstrapAcceptor 是通過EventLoop.execute 執(zhí)行添加的過程,這是為什么呢?

  • Problem-4:注冊channel流程中PowerOfTwoEventExecutorChooserGenericEventExecutorChooser的區(qū)別和優(yōu)化原理?

  • Problem-5:綁定端口流程中調(diào)用 EventLoop執(zhí)行 Channel 的端口綁定邏輯。但是,實(shí)際上當(dāng)前線程已經(jīng)是 EventLoop所在的線程了,為何還要這樣操作呢?

到此,關(guān)于“Netty服務(wù)端啟動源碼是什么”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!


文章標(biāo)題:Netty服務(wù)端啟動源碼是什么
文章鏈接:http://weahome.cn/article/gijhgi.html

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部