可根據(jù)需要配置線程模型:單線程Reactor、多線程Reactor、多層線程Reactor
創(chuàng)新互聯(lián)建站專業(yè)為企業(yè)提供扎魯特旗網(wǎng)站建設(shè)、扎魯特旗做網(wǎng)站、扎魯特旗網(wǎng)站設(shè)計、扎魯特旗網(wǎng)站制作等企業(yè)網(wǎng)站建設(shè)、網(wǎng)頁設(shè)計與制作、扎魯特旗企業(yè)網(wǎng)站模板建站服務(wù),十余年扎魯特旗做網(wǎng)站經(jīng)驗,不只是建網(wǎng)站,更提供有價值的思路和整體網(wǎng)絡(luò)服務(wù)。
無論幾個線程,都通過單一的Acceptor接收客戶端請求,可以創(chuàng)建更多的NioEventLoop來處理IO操作。
EventLoop和EventLoopGroup實際繼承了Java的ScheduledExecutorService,使其具備了線程池的特性,其線程數(shù)量可動態(tài)配置。例如配置單線程模型,設(shè)置線程數(shù)量為1即可。
Future即異步操作
future操作可以被close,但結(jié)果是未知的;調(diào)用get可以獲取操作結(jié)果,但是會被阻塞;isDone可判斷是否完成操作。
ChannelFuture是為了獲取異步返回結(jié)果而設(shè)計
可以通過ChannelFutureListener接口獲得回調(diào),無需等待get方法返回。
public interface ChannelFutureListener extends GenericFutureListener {
ChannelFutureListener CLOSE = new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
future.channel().close();
}
};
ChannelFutureListener CLOSE_ON_FAILURE = new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
if (!future.isSuccess()) {
future.channel().close();
}
}
};
ChannelFutureListener FIRE_EXCEPTION_ON_FAILURE = new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
if (!future.isSuccess()) {
future.channel().pipeline().fireExceptionCaught(future.cause());
}
}
};
}
連接超時和channel超時配置
Bootstrap bootstrap = new Bootstrap();
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,10000);
channelFutrue.awaitUninterruptibly(10, TimeUnit.SECONDS);
注意:
1、謹(jǐn)慎調(diào)用await,可能導(dǎo)致死鎖。
2、ChannelFuture超時后如果調(diào)用了業(yè)務(wù)代碼重連,而此時IO未超時,將可能導(dǎo)致多條連接并存,設(shè)置IO超時時間建議小于業(yè)務(wù)代碼超時時間。
升級版的future,可寫可操作(對回調(diào)過程)。future好比古代飛鴿傳書,只能等鴿子回來或者不回來,不可控;promise就像現(xiàn)代快遞員,送快遞送一半可以打電話給他叫他不要送了或者中途請他幫忙買個餅。
例如:
DefaultPromise類
awaitUninterruptibly()可手動打斷回調(diào),使進(jìn)程等待。
public Promise awaitUninterruptibly() {
if (this.isDone()) {
return this;
} else {
boolean interrupted = false;
synchronized(this) {
while(!this.isDone()) {
this.checkDeadLock();
this.incWaiters();
try {
this.wait();
} catch (InterruptedException var9) {
interrupted = true;
} finally {
this.decWaiters();
}
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
return this;
}
}
進(jìn)行了死鎖判斷,避免已存在相同任務(wù);并限制了最大等待數(shù)量32767
protected void checkDeadLock() {
EventExecutor e = this.executor();
if (e != null && e.inEventLoop()) {
throw new BlockingOperationException(this.toString());
}
}
private void incWaiters() {
if (this.waiters == 32767) {
throw new IllegalStateException("too many waiters: " + this);
} else {
++this.waiters;
}
}
Channel負(fù)責(zé)對外提供操作IO的接口,而UnSafe是Channel的內(nèi)部接口類,如其名一樣是不安全的操作,所以封裝在接口內(nèi)部不讓外部調(diào)用,而實際的操作IO最終都是在Unsafe中執(zhí)行。
//Channel調(diào)用連接為例,跟蹤實現(xiàn)連接請求的過程
ChannelFuture connect(SocketAddress var1);
//DefaultChannelPipeline中執(zhí)行,實際是調(diào)用尾部的pipeline
public ChannelFuture connect(SocketAddress remoteAddress) {
return this.tail.connect(remoteAddress);
}
//AbstractChannelHandlerContext是Pipeline容器中的對象,
//持續(xù)尋找所有handler執(zhí)行對象,直到全部被調(diào)用
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
AbstractChannelHandlerContext next = this.findContextOutbound();
next.invoker().invokeConnect(next, remoteAddress, localAddress, promise);
return promise;
}
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while(!ctx.outbound);
return ctx;
}
//而真實的執(zhí)行是尋找到UnSafe的Invoker
public ChannelHandlerInvoker invoker() {
return this.invoker == null ? this.channel().unsafe().invoker() : this.invoker;
}
public void invokeConnect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
} else if (ChannelHandlerInvokerUtil.validatePromise(ctx, promise, false)) {
if (this.executor.inEventLoop()) {
ChannelHandlerInvokerUtil.invokeConnectNow(ctx, remoteAddress, localAddress, promise);
} else {
this.safeExecuteOutbound(new OneTimeTask() {
public void run() {
ChannelHandlerInvokerUtil.invokeConnectNow(ctx, remoteAddress, localAddress, promise);
}
}, promise);
}
}
}