創(chuàng)新互聯(lián)建站主要從事成都網(wǎng)站設(shè)計(jì)、成都網(wǎng)站建設(shè)、網(wǎng)頁設(shè)計(jì)、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務(wù)。立足成都服務(wù)寶山,10年網(wǎng)站建設(shè)經(jīng)驗(yàn),價(jià)格優(yōu)惠、服務(wù)專業(yè),歡迎來電咨詢建站服務(wù):028-86922220
文章已獲得作者授權(quán)
點(diǎn)擊文末左下角“閱讀原文”即可跳轉(zhuǎn)到原文地址
在TCP連接開始到結(jié)束連接,之間可能會(huì)多次傳輸數(shù)據(jù),也就是服務(wù)器和客戶端之間可能會(huì)在連接過程中互相傳輸多條消息。理想狀況是一方每發(fā)送一條消息,另一方就立即接收到一條,也就是一次write對(duì)應(yīng)一次read。但是,現(xiàn)實(shí)不總是按照劇本來走。
MINA官方文檔節(jié)選:
TCP guarantess delivery of all packets in the correct order. But there is no guarantee that one write operation on the sender-side will result in one read event on the receiving side. One call of IoSession.write(Object message) by the sender can result in multiple messageReceived(IoSession session, Object message) events on the receiver; and multiple calls of IoSession.write(Object message) can lead to a single messageReceived event.
Netty官方文檔節(jié)選:
In a stream-based transport such as TCP/IP, received data is stored into a socket receive buffer. Unfortunately, the buffer of a stream-based transport is not a queue of packets but a queue of bytes. It means, even if you sent two messages as two independent packets, an operating system will not treat them as two messages but as just a bunch of bytes. Therefore, there is no guarantee that what you read is exactly what your remote peer wrote.
上面兩段話表達(dá)的意思相同:TCP是基于字節(jié)流的協(xié)議,它只能保證一方發(fā)送和另一方接收到的數(shù)據(jù)的字節(jié)順序一致,但是,并不能保證一方每發(fā)送一條消息,另一方就能完整的接收到一條信息。有可能發(fā)送了兩條對(duì)方將其合并成一條,也有可能發(fā)送了一條對(duì)方將其拆分成兩條。所以在上一篇文章(Netty、MINA、Twisted一起學(xué)系列01:實(shí)現(xiàn)簡單的TCP服務(wù)器)中的Demo,可以說是一個(gè)錯(cuò)誤的示范。不過服務(wù)器和客戶端在同一臺(tái)機(jī)器上或者在局域網(wǎng)等網(wǎng)速很好的情況下,這種問題還是很難測(cè)試出來。
舉個(gè)簡單了例子(這個(gè)例子來源于Netty官方文檔):消息發(fā)送方發(fā)送了三個(gè)字符串:
但是接收方收到的可能是這樣的:
那么問題就很嚴(yán)重了,接收方?jīng)]法分開這三條信息了,也就沒法解析了。對(duì)此,MINA的官方文檔提供了以下幾種解決方案:
1、use fixed length messages
使用固定長度的消息。比如每個(gè)長度4字節(jié),那么接收的時(shí)候按每條4字節(jié)拆分就可以了。
2、use a fixed length header that indicates the length of the body
使用固定長度的Header,Header中指定Body的長度(字節(jié)數(shù)),將信息的內(nèi)容放在Body中。例如Header中指定的Body長度是100字節(jié),那么Header之后的100字節(jié)就是Body,也就是信息的內(nèi)容,100字節(jié)的Body后面就是下一條信息的Header了。
3、using a delimiter; for example many text-based protocols append a newline (or CR LF pair) after every message
使用分隔符。例如許多文本內(nèi)容的協(xié)議會(huì)在每條消息后面加上換行符(CR LF,即”\r\n”),也就是一行一條消息。當(dāng)然也可以用其他特殊符號(hào)作為分隔符,例如逗號(hào)、分號(hào)等等。
當(dāng)然除了上面說到的3種方案,還有其他方案。有的協(xié)議也可能會(huì)同時(shí)用到上面多種方案。例如HTTP協(xié)議,Header部分用的是CR LF換行來區(qū)分每一條Header,而Header中用Content-Length來指定Body字節(jié)數(shù)。
下面,分別用MINA、Netty、Twisted自帶的相關(guān)API實(shí)現(xiàn)按換行符CR LF來分割消息。
MINA
MINA可以使用ProtocolCodecFilter來對(duì)發(fā)送和接收的二進(jìn)制數(shù)據(jù)進(jìn)行加工,如何加工取決于ProtocolCodecFactory或ProtocolEncoder、ProtocolDecoder,加工后在IoHandler中messageReceived事件函數(shù)獲取的message就不再是IoBuffer了,而是你想要的其他類型,可以是字符串,Java對(duì)象。這里可以使用TextLineCodecFactory(ProtocolCodecFactory的一個(gè)實(shí)現(xiàn)類)實(shí)現(xiàn)CR LF分割消息。
public class TcpServer{
public static void main(String[] args) throws IOException{
IoAcceptor acceptor = new NioSocketAcceptor();
// 添加一個(gè)Filter,用于接收、發(fā)送的內(nèi)容按照"\r\n"分割
acceptor.getFilterChain().addLast("codec",
new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), "\r\n", "\r\n")));
acceptor.setHandler(new TcpServerHandle());
acceptor.bind(new InetSocketAddress(8080));
}
}
class TcpServerHandle extends IoHandlerAdapter{
@Override
public void exceptionCaught(IoSession session, Throwable cause)
throws Exception{
cause.printStackTrace();
}
// 接收到新的數(shù)據(jù)
@Override
public void messageReceived(IoSession session, Object message)
throws Exception{
// 接收客戶端的數(shù)據(jù),這里接收到的不再是IoBuffer類型,而是字符串
String line = (String) message;
System.out.println("messageReceived:" + line);
}
@Override
public void sessionCreated(IoSession session) throws Exception{
System.out.println("sessionCreated");
}
@Override
public void sessionClosed(IoSession session) throws Exception{
System.out.println("sessionClosed");
}
}
Netty
Netty設(shè)計(jì)上和MINA類似,需要在ChannelPipeline加上一些ChannelHandler用來對(duì)原始數(shù)據(jù)進(jìn)行處理。這里用LineBasedFrameDecoder將接收到的數(shù)據(jù)按行分割,StringDecoder再將數(shù)據(jù)由字節(jié)碼轉(zhuǎn)成字符串。同樣,接收到的數(shù)據(jù)進(jìn)過加工后,在channelRead事件函數(shù)中,msg參數(shù)不再是ByteBuf而是String。
public class TcpServer{
public static void main(String[] args) throws InterruptedException{
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch)
throws Exception{
ChannelPipeline pipeline = ch.pipeline();
// LineBasedFrameDecoder按行分割消息
pipeline.addLast(new LineBasedFrameDecoder(80));
// 再按UTF-8編碼轉(zhuǎn)成字符串
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new TcpServerHandler());
}
});
ChannelFuture f = b.bind(8080).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
class TcpServerHandler extends ChannelInboundHandlerAdapter{
// 接收到新的數(shù)據(jù)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg){
// msg經(jīng)過StringDecoder后類型不再是ByteBuf而是String
String line = (String) msg;
System.out.println("channelRead:" + line);
}
@Override
public void channelActive(ChannelHandlerContext ctx){
System.out.println("channelActive");
}
@Override
public void channelInactive(ChannelHandlerContext ctx){
System.out.println("channelInactive");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
cause.printStackTrace();
ctx.close();
}
}
Twisted
Twisted的設(shè)計(jì)和上面兩者的設(shè)計(jì)不太一樣,所以實(shí)現(xiàn)消息分割也不太一樣。處理事件的類TcpServerHandle不再繼承Protocol,而是繼承Protocol的子類LineOnlyReceiver。接收到新數(shù)據(jù)的事件方法也不再是dataReceived,而是LineOnlyReceiver提供的lineReceived。看Twisted源碼的話可以發(fā)現(xiàn)LineOnlyReceiver的內(nèi)部實(shí)際上自己實(shí)現(xiàn)了dataReceived,然后將其按行分割,有新的一行數(shù)據(jù)就調(diào)用lineReceived。
# -*- coding:utf-8 –*-
from twisted.protocols.basic import LineOnlyReceiver
from twisted.internet.protocol import Factory
from twisted.internet import reactor
class TcpServerHandle(LineOnlyReceiver):
# 新的連接建立
def connectionMade(self):
print 'connectionMade'
# 連接斷開
def connectionLost(self, reason):
print 'connectionLost'
# 接收到新的一行數(shù)據(jù)
def lineReceived(self, data):
print 'lineReceived:' + data
factory = Factory()
factory.protocol = TcpServerHandle
reactor.listenTCP(8080, factory)
reactor.run()
下面用一個(gè)Java客戶端對(duì)三個(gè)服務(wù)器進(jìn)行測(cè)試。
public class TcpClient{
public static void main(String[] args) throws IOException{
Socket socket = null;
OutputStream out = null;
try {
socket = new Socket("localhost", 8080);
out = socket.getOutputStream();
// 請(qǐng)求服務(wù)器
String lines = "床前明月光\r\n疑是地上霜\r\n舉頭望明月\r\n低頭思故鄉(xiāng)\r\n";
byte[] outputBytes = lines.getBytes("UTF-8");
out.write(outputBytes);
out.flush();
} finally {
// 關(guān)閉連接
out.close();
socket.close();
}
}
}
MINA服務(wù)器輸出結(jié)果:
sessionCreated
messageReceived:床前明月光
messageReceived:疑是地上霜
messageReceived:舉頭望明月
messageReceived:低頭思故鄉(xiāng)
sessionClosed
Netty服務(wù)器輸出結(jié)果:
channelActive
channelRead:床前明月光
channelRead:疑是地上霜
channelRead:舉頭望明月
channelRead:低頭思故鄉(xiāng)
channelInactive
Twisted服務(wù)器輸出結(jié)果:
connectionMade
lineReceived:床前明月光
lineReceived:疑是地上霜
lineReceived:舉頭望明月
lineReceived:低頭思故鄉(xiāng)
connectionLost
當(dāng)然,測(cè)試的時(shí)候也可以將發(fā)送的數(shù)據(jù)模擬成不按規(guī)則分割的情況,下面用一個(gè)更變態(tài)的客戶端來測(cè)試。
public class TcpClient{
public static void main(String[] args) throws IOException, InterruptedException{
Socket socket = null;
OutputStream out = null;
try{
socket = new Socket("localhost", 8080);
out = socket.getOutputStream();
String lines = "床前";
byte[] outputBytes = lines.getBytes("UTF-8");
out.write(outputBytes);
out.flush();
Thread.sleep(1000);
lines = "明月";
outputBytes = lines.getBytes("UTF-8");
out.write(outputBytes);
out.flush();
Thread.sleep(1000);
lines = "光\r\n疑是地上霜\r\n舉頭";
outputBytes = lines.getBytes("UTF-8");
out.write(outputBytes);
out.flush();
Thread.sleep(1000);
lines = "望明月\r\n低頭思故鄉(xiāng)\r\n";
outputBytes = lines.getBytes("UTF-8");
out.write(outputBytes);
out.flush();
} finally {
// 關(guān)閉連接
out.close();
socket.close();
}
}
}
再次分別測(cè)試上面三個(gè)服務(wù)器,結(jié)果和上面的輸出結(jié)果一樣,沒有任何問題。