這篇文章將為大家詳細(xì)講解有關(guān)netty中pipeline如何添加handler,小編覺(jué)得挺實(shí)用的,因此分享給大家做個(gè)參考,希望大家閱讀完這篇文章后可以有所收獲。
創(chuàng)新互聯(lián)公司是專業(yè)的霍爾果斯網(wǎng)站建設(shè)公司,霍爾果斯接單;提供做網(wǎng)站、成都網(wǎng)站設(shè)計(jì),網(wǎng)頁(yè)設(shè)計(jì),網(wǎng)站設(shè)計(jì),建網(wǎng)站,PHP網(wǎng)站建設(shè)等專業(yè)做網(wǎng)站服務(wù);采用PHP框架,可快速的進(jìn)行霍爾果斯網(wǎng)站開發(fā)網(wǎng)頁(yè)制作和功能擴(kuò)展;專業(yè)做搜索引擎喜愛(ài)的網(wǎng)站,專業(yè)的做網(wǎng)站團(tuán)隊(duì),希望更多企業(yè)前來(lái)合作!
public void start() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup group = new NioEventLoopGroup(); try { ServerBootstrap sb = new ServerBootstrap(); sb.option(ChannelOption.SO_BACKLOG, 1024); // 綁定線程池 sb.group(group, bossGroup) // 指定使用的channel .channel(NioServerSocketChannel.class) // 綁定監(jiān)聽端口 .localAddress(this.port) .handler(new LoggingHandler(LogLevel.INFO)) // 綁定客戶端連接時(shí)候觸發(fā)操作 .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast(new LengthFieldPrepender(4)); //字符串解碼 pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); //字符串編碼 pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new IdleStateHandler(0, 0, 120, TimeUnit.SECONDS)); } }); // 服務(wù)器異步創(chuàng)建綁定 ChannelFuture cf = sb.bind().sync(); log.info(NettyServer.class + " 啟動(dòng)正在監(jiān)聽: " + cf.channel().localAddress()); // 關(guān)閉服務(wù)器通道 cf.channel().closeFuture().sync(); } finally { // 釋放線程池資源 group.shutdownGracefully().sync(); bossGroup.shutdownGracefully().sync(); } }
final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
void init(Channel channel) throws Exception { //省略若干 ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry, Object>[] currentChildOptions; final Entry , Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); } //看這里 添加了一個(gè)ChannelInitializer實(shí)例 實(shí)例中實(shí)現(xiàn)了initChannel 獲取配置的handler添加到pipeline p.addLast(new ChannelInitializer () { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { checkMultiplicity(handler); newCtx = newContext(group, filterName(name, handler), handler); //這方法很簡(jiǎn)單 添加到鏈表中 但是此時(shí)添加的是上文的ChannelInitializer 而不是我們定義的handler addLast0(newCtx); // If the registered is false it means that the channel was not registered on an eventLoop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { callHandlerAddedInEventLoop(newCtx, executor); return this; } } //這個(gè)方法就是關(guān)鍵 將ChannelInitializer中的handler添加到pipeline中 以及刪除ChannelInitializer //但請(qǐng)注意 首次注冊(cè)不會(huì)走到這里 在前面的判斷就會(huì)返回 會(huì)在注冊(cè)的邏輯中觸發(fā) 最終調(diào)用的一致 callHandlerAdded0(newCtx); return this; }
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isRegistered()) { // This should always be true with our current DefaultChannelPipeline implementation. // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers // will be added in the expected order. if (initChannel(ctx)) { // We are done with init the Channel, removing the initializer now. removeState(ctx); } } } private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.add(ctx)) { // Guard against re-entrance. try { initChannel((C) ctx.channel()); } catch (Throwable cause) { // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...). // We do so to prevent multiple calls to initChannel(...). exceptionCaught(ctx, cause); } finally { //看這里刪除pipeline中的 ChannelInitializer ChannelInitializer 其實(shí)就是一個(gè)handler 只是它重寫了handlerAdded方法 ChannelPipeline pipeline = ctx.pipeline(); if (pipeline.context(this) != null) { pipeline.remove(this); } } return true; } return false; }
關(guān)于“netty中pipeline如何添加handler”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,使各位可以學(xué)到更多知識(shí),如果覺(jué)得文章不錯(cuò),請(qǐng)把它分享出去讓更多的人看到。