bitchat 是一個(gè)基于 Netty 的 IM 即時(shí)通訊框架
創(chuàng)新互聯(lián)建站主要從事網(wǎng)站建設(shè)、成都網(wǎng)站制作、網(wǎng)頁(yè)設(shè)計(jì)、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務(wù)。立足成都服務(wù)山海關(guān),十余年網(wǎng)站建設(shè)經(jīng)驗(yàn),價(jià)格優(yōu)惠、服務(wù)專(zhuān)業(yè),歡迎來(lái)電咨詢(xún)建站服務(wù):13518219792
項(xiàng)目地址:https://github.com/all4you/bitchat
bitchat-example模塊提供了一個(gè)服務(wù)端與客戶(hù)端的實(shí)現(xiàn)示例,可以參照該示例進(jìn)行自己的業(yè)務(wù)實(shí)現(xiàn)。
要啟動(dòng)服務(wù)端,需要獲取一個(gè) Server 的實(shí)例,可以通過(guò) ServerFactory 來(lái)獲取。
目前只實(shí)現(xiàn)了單機(jī)模式下的 Server ,通過(guò) SimpleServerFactory 只需要定義一個(gè)端口即可獲取一個(gè)單機(jī)的 Server 實(shí)例,如下所示:
public class StandaloneServerApplication {
public static void main(String[] args) {
Server server = SimpleServerFactory.getInstance()
.newServer(8864);
server.start();
}
}
服務(wù)端啟動(dòng)成功后,將顯示如下信息:
目前只實(shí)現(xiàn)了直連服務(wù)器的客戶(hù)端,通過(guò) SimpleClientFactory 只需要指定一個(gè) ServerAttr 即可獲取一個(gè)客戶(hù)端,然后進(jìn)行客戶(hù)端與服務(wù)端的連接,如下所示:
public class DirectConnectServerClientApplication {
public static void main(String[] args) {
Client client = SimpleClientFactory.getInstance()
.newClient(ServerAttr.getLocalServer(8864));
client.connect();
doClientBiz(client);
}
}
客戶(hù)端連接上服務(wù)端后,將顯示如下信息:
目前客戶(hù)端提供了三種 Func,分別是:登錄,查看在線(xiàn)用戶(hù)列表,發(fā)送單聊消息,每種 Func 有不同的命令格式。
通過(guò)在客戶(hù)端中執(zhí)行以下命令?-lo houyi 123456
?即可實(shí)現(xiàn)登錄,目前用戶(hù)中心還未實(shí)現(xiàn),通過(guò) Mock 的方式實(shí)現(xiàn)一個(gè)假的用戶(hù)服務(wù),所以輸入任何的用戶(hù)名密碼都會(huì)登錄成功,并且會(huì)為用戶(hù)創(chuàng)建一個(gè)用戶(hù)id。
登錄成功后,顯示如下:
再啟動(dòng)一個(gè)客戶(hù)端,并且也執(zhí)行登錄,登錄成功后,可以執(zhí)行?-lu
?命令,獲取在線(xiàn)用戶(hù)列表,目前用戶(hù)是保存在內(nèi)存中,獲取的結(jié)果如下所示:
用 gris 這個(gè)用戶(hù)向 houyi 這個(gè)用戶(hù)發(fā)送單聊信息,只要執(zhí)行?-pc 1 hello,houyi
?命令即可
其中第二個(gè)參數(shù)數(shù)要發(fā)送消息給那個(gè)用戶(hù)的用戶(hù)id,第三個(gè)參數(shù)是消息內(nèi)容
消息發(fā)送方,發(fā)送完消息:
消息接收方,接收到消息:
客戶(hù)端和服務(wù)端之間維持著心跳,雙方都會(huì)檢查連接是否可用,客戶(hù)端每隔5s會(huì)向服務(wù)端發(fā)送一個(gè) PingPacket,而服務(wù)端接收到這個(gè) PingPacket 之后,會(huì)回復(fù)一個(gè) PongPacket,這樣表示雙方都是健康的。
當(dāng)因?yàn)槟撤N原因,服務(wù)端沒(méi)有收到客戶(hù)端發(fā)送的消息,服務(wù)端將會(huì)把該客戶(hù)端的連接斷開(kāi),同樣的客戶(hù)端也會(huì)做這樣的檢查。
當(dāng)客戶(hù)端與服務(wù)端之間的連接斷開(kāi)之后,將會(huì)觸發(fā)客戶(hù)端 HealthyChecker 的 channelInactive 方法,從而進(jìn)行客戶(hù)端的斷線(xiàn)重連。
單機(jī)版的架構(gòu)只涉及到服務(wù)端、客戶(hù)端,另外有兩者之間的協(xié)議層,如下圖所示:
除了服務(wù)端和客戶(hù)端之外,還有三大中心:消息中心,用戶(hù)中心,鏈接中心。
消息中心:主要負(fù)責(zé)消息的存儲(chǔ)與歷史、離線(xiàn)消息的查詢(xún)
用戶(hù)中心:主要負(fù)責(zé)用戶(hù)和群組相關(guān)的服務(wù)
單機(jī)版無(wú)法做到高可用,性能與可服務(wù)的用戶(hù)數(shù)也有一定的限制,所以需要有可擴(kuò)展的集群版,集群版在單機(jī)版的基礎(chǔ)上增加了一個(gè)路由層,客戶(hù)端通過(guò)路由層來(lái)獲得可用的服務(wù)端地址,然后與服務(wù)端進(jìn)行通訊,如下圖所示:
客戶(hù)端發(fā)送消息給另一個(gè)用戶(hù),服務(wù)端接收到這個(gè)請(qǐng)求后,從 Connection中心中獲取目標(biāo)用戶(hù)“掛”在哪個(gè)服務(wù)端下,如果在自己名下,那最簡(jiǎn)單直接將消息推送給目標(biāo)用戶(hù)即可,如果在其他服務(wù)端,則需要將該請(qǐng)求轉(zhuǎn)交給目標(biāo)服務(wù)端,讓目標(biāo)服務(wù)端將消息推送給目標(biāo)用戶(hù)。
通過(guò)一個(gè)自定義協(xié)議來(lái)實(shí)現(xiàn)服務(wù)端與客戶(hù)端之間的通訊,協(xié)議中有如下幾個(gè)字段:
*
*
* The structure of a Packet is like blow:
* +----------+----------+----------------------------+
* | size | value | intro |
* +----------+----------+----------------------------+
* | 1 bytes | 0xBC | magic number |
* | 1 bytes | | serialize algorithm |
* | 4 bytes | | packet symbol |
* | 4 bytes | | content length |
* | ? bytes | | the content |
* +----------+----------+----------------------------+
*
*
每個(gè)字段的含義
所占字節(jié) | 用途 |
---|---|
1 | 魔數(shù),默認(rèn)為 0xBC |
1 | 序列化的算法 |
4 | Packet 的類(lèi)型 |
4 | Packet 的內(nèi)容長(zhǎng)度 |
? | Packet 的內(nèi)容 |
序列化算法將會(huì)決定該 Packet 在編解碼時(shí),使用何種序列化方式。
Packet 的類(lèi)型將會(huì)決定到達(dá)服務(wù)端的字節(jié)流將被反序列化為何種 Packet,也決定了該 Packet 將會(huì)被哪個(gè) PacketHandler 進(jìn)行處理。
內(nèi)容長(zhǎng)度將會(huì)解決 Packet 的拆包與粘包問(wèn)題,服務(wù)端在解析字節(jié)流時(shí),將會(huì)等到字節(jié)的長(zhǎng)度達(dá)到內(nèi)容的長(zhǎng)度時(shí),才進(jìn)行字節(jié)的讀取。
除此之外,Packet 中還會(huì)存儲(chǔ)一個(gè) sync 字段,該字段將指定服務(wù)端在處理該 Packet 的數(shù)據(jù)時(shí)是否需要使用異步的業(yè)務(wù)線(xiàn)程池來(lái)處理。
服務(wù)端與客戶(hù)端各自維護(hù)了一個(gè)健康檢查的服務(wù),即 Netty 為我們提供的 IdleStateHandler,通過(guò)繼承該類(lèi),并且實(shí)現(xiàn) channelIdle 方法即可實(shí)現(xiàn)連接 “空閑” 時(shí)的邏輯處理,當(dāng)出現(xiàn)空閑時(shí),目前我們只關(guān)心讀空閑,我們既可以認(rèn)為這條鏈接出現(xiàn)問(wèn)題了。
那么只需要在鏈接出現(xiàn)問(wèn)題時(shí),將這條鏈接關(guān)閉即可,如下所示:
public class IdleStateChecker extends IdleStateHandler {
private static final int DEFAULT_READER_IDLE_TIME = 15;
private int readerTime;
public IdleStateChecker(int readerIdleTime) {
super(readerIdleTime == 0 ? DEFAULT_READER_IDLE_TIME : readerIdleTime, 0, 0, TimeUnit.SECONDS);
readerTime = readerIdleTime == 0 ? DEFAULT_READER_IDLE_TIME : readerIdleTime;
}
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
log.warn("[{}] Hasn't read data after {} seconds, will close the channel:{}",
IdleStateChecker.class.getSimpleName(), readerTime, ctx.channel());
ctx.channel().close();
}
}
另外,客戶(hù)端需要額外再維護(hù)一個(gè)健康檢查器,正常情況下他負(fù)責(zé)定時(shí)向服務(wù)端發(fā)送心跳,當(dāng)鏈接的狀態(tài)變成 inActive 時(shí),該檢查器將負(fù)責(zé)進(jìn)行重連,如下所示:
public class HealthyChecker extends ChannelInboundHandlerAdapter {
private static final int DEFAULT_PING_INTERVAL = 5;
private Client client;
private int pingInterval;
public HealthyChecker(Client client, int pingInterval) {
Assert.notNull(client, "client can not be null");
this.client = client;
this.pingInterval = pingInterval <= 0 ? DEFAULT_PING_INTERVAL : pingInterval;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
schedulePing(ctx);
}
private void schedulePing(ChannelHandlerContext ctx) {
ctx.executor().schedule(() -> {
Channel channel = ctx.channel();
if (channel.isActive()) {
log.debug("[{}] Send a PingPacket", HealthyChecker.class.getSimpleName());
channel.writeAndFlush(new PingPacket());
schedulePing(ctx);
}
}, pingInterval, TimeUnit.SECONDS);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.executor().schedule(() -> {
log.info("[{}] Try to reconnecting...", HealthyChecker.class.getSimpleName());
client.connect();
}, 5, TimeUnit.SECONDS);
ctx.fireChannelInactive();
}
}
我們知道,Netty 中維護(hù)著兩個(gè) IO 線(xiàn)程池,一個(gè) boss 主要負(fù)責(zé)鏈接的建立,另外一個(gè) worker 主要負(fù)責(zé)鏈接上的數(shù)據(jù)讀寫(xiě),我們不應(yīng)該使用 IO 線(xiàn)程來(lái)處理我們的業(yè)務(wù),因?yàn)檫@樣很可能會(huì)對(duì) IO 線(xiàn)程造成阻塞,導(dǎo)致新鏈接無(wú)法及時(shí)建立或者數(shù)據(jù)無(wú)法及時(shí)讀寫(xiě)。
為了解決這個(gè)問(wèn)題,我們需要在業(yè)務(wù)線(xiàn)程池中來(lái)處理我們的業(yè)務(wù)邏輯,但是這并不是絕對(duì)的,如果我們要執(zhí)行的邏輯很簡(jiǎn)單,不會(huì)造成太大的阻塞,則可以直接在 IO 線(xiàn)程中處理,比如客戶(hù)端發(fā)送一個(gè) Ping 服務(wù)端回復(fù)一個(gè) Pong,這種情況是沒(méi)有必要在業(yè)務(wù)線(xiàn)程池中進(jìn)行處理的,因?yàn)樘幚硗炅俗罱K還是要交給 IO 線(xiàn)程去寫(xiě)數(shù)據(jù)。但是如果一個(gè)業(yè)務(wù)邏輯需要查詢(xún)數(shù)據(jù)庫(kù)或者讀取文件,這種操作往往比較耗時(shí)間,所以就需要將這些操作封裝起來(lái)交給業(yè)務(wù)線(xiàn)程池去處理。
服務(wù)端允許客戶(hù)端在傳輸?shù)?Packet 中指定采用何種方式進(jìn)行業(yè)務(wù)的處理,服務(wù)端在將字節(jié)流解碼成 Packet 之后,會(huì)根據(jù) Packet 中的 sync 字段的值,確定怎樣對(duì)該 Packet 進(jìn)行處理,如下所示:
public class ServerPacketDispatcher extends
SimpleChannelInboundHandler {
@Override
public void channelRead0(ChannelHandlerContext ctx, Packet request) {
// if the packet should be handled async
if (request.getAsync() == AsyncHandle.ASYNC) {
EventExecutor channelExecutor = ctx.executor();
// create a promise
Promise promise = new DefaultPromise<>(channelExecutor);
// async execute and get a future
Future future = executor.asyncExecute(promise, ctx, request);
future.addListener(new GenericFutureListener>() {
@Override
public void operationComplete(Future f) throws Exception {
if (f.isSuccess()) {
Packet response = f.get();
writeResponse(ctx, response);
}
}
});
} else {
// sync execute and get the response packet
Packet response = executor.execute(ctx, request);
writeResponse(ctx, response);
}
}
}
bitchat?除了可以作為 IM 框架之外,還可以作為一個(gè)通用的通訊框架。
Packet 作為通訊的載體,通過(guò)繼承 AbstractPacket 即可快速實(shí)現(xiàn)自己的業(yè)務(wù),搭配 PacketHandler 作為數(shù)據(jù)處理器即可實(shí)現(xiàn)客戶(hù)端與服務(wù)端的通訊。