本篇內(nèi)容介紹了“Websocket庫Ws的原理是什么”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠?qū)W有所成!
公司主營業(yè)務(wù):成都做網(wǎng)站、成都網(wǎng)站制作、移動網(wǎng)站開發(fā)等業(yè)務(wù)。幫助企業(yè)客戶真正實現(xiàn)互聯(lián)網(wǎng)宣傳,提高企業(yè)的競爭能力。創(chuàng)新互聯(lián)建站是一支青春激揚、勤奮敬業(yè)、活力青春激揚、勤奮敬業(yè)、活力澎湃、和諧高效的團隊。公司秉承以“開放、自由、嚴謹、自律”為核心的企業(yè)文化,感謝他們對我們的高要求,感謝他們從不同領(lǐng)域給我們帶來的挑戰(zhàn),讓我們激情的團隊有機會用頭腦與智慧不斷的給客戶帶來驚喜。創(chuàng)新互聯(lián)建站推出滁州免費做網(wǎng)站回饋大家。
ws服務(wù)器邏輯由websocket-server.js的WebSocketServer類實現(xiàn)。該類初始化了一些參數(shù)后就執(zhí)行以下代碼
if (this._server) { // 給server注冊下面事件,返回一個注銷函數(shù)(用于注銷下面注冊的事件) this._removeListeners = addListeners(this._server, { // listen成功的回調(diào) listening: this.emit.bind(this, 'listening'), error: this.emit.bind(this, 'error'), // 收到協(xié)議升級請求的回調(diào) upgrade: (req, socket, head) => { this.handleUpgrade(req, socket, head, (ws) => { // 處理成功,觸發(fā)鏈接成功事件 this.emit('connection', ws, req); }); } });
我們看到ws監(jiān)聽了upgrade事件,當有websocket請求到來時就會執(zhí)行handleUpgrade處理升級請求,升級成功后觸發(fā)connection事件。我們先看handleUpgrade。handleUpgrade邏輯不多,主要是處理和校驗升級請求的一些http頭。ws提供了一個校驗的鉤子。處理完http頭后,會調(diào)verifyClient校驗是否允許升級請求。如果成功則執(zhí)行completeUpgrade。顧名思義,completeUpgrade是完成升級請求的函數(shù),該函數(shù)返回同意協(xié)議升級并且設(shè)置一些http響應(yīng)頭。另外還有一些重要的邏輯處理。
const ws = new WebSocket(null); // 設(shè)置管理socket的數(shù)據(jù) ws.setSocket(socket, head, this.options.maxPayload); // cb就是this.emit('connection', ws, req); cb(ws);
我們看到這里新建了一個WebSocket對象并且調(diào)用了他的setSocket函數(shù)。我們來看看他做了什么。setSocket的邏輯非常多,我們慢慢分析。
class Receiver extends Writable {}
我們看到數(shù)據(jù)接收者是一個可寫流。這就意味著我們可以往里面寫數(shù)據(jù)。
const receiver = new Receiver(); receiver.write('hello');
我們看一下這時候Receiver的邏輯。
_write(chunk, encoding, cb) { if (this._opcode === 0x08 && this._state == GET_INFO) return cb(); this._bufferedBytes += chunk.length; this._buffers.push(chunk); this.startLoop(cb); }
首先記錄當前數(shù)據(jù)的大小,然后把數(shù)據(jù)存起來,最后執(zhí)行startLoop。
startLoop(cb) { let err; this._loop = true; do { switch (this._state) { // 忽略其他case case GET_DATA: err = this.getData(cb); break; default: // `INFLATING` this._loop = false; return; } } while (this._loop); cb(err); }
我們知道websocket是基于tcp上層的應(yīng)用層協(xié)議,所以我們收到數(shù)據(jù)時,需要解析出一個個數(shù)據(jù)包(粘包問題),所以Receiver其實就是一個狀態(tài)機,每次收到數(shù)據(jù)的時候,都會根據(jù)當前的狀態(tài)進行狀態(tài)流轉(zhuǎn)。比如當前處于GET_DATA狀態(tài),那么就會進行數(shù)據(jù)的處理。我們接著看一下數(shù)據(jù)處理的邏輯。
getData(cb) { let data = EMPTY_BUFFER; // 提取數(shù)據(jù)部分 if (this._payloadLength) { data = this.consume(this._payloadLength); if (this._masked) unmask(data, this._mask); } // 是控制報文則執(zhí)行controlMessage if (this._opcode > 0x07) return this.controlMessage(data); // 做了壓縮,則先解壓 if (this._compressed) { this._state = INFLATING; this.decompress(data, cb); return; } // 沒有壓縮則直接處理(先存到_fragments,然后執(zhí)行dataMessage) if (data.length) { this._messageLength = this._totalPayloadLength; this._fragments.push(data); } return this.dataMessage(); }
我們執(zhí)行websocket協(xié)議定義了報文的類型,比如控制報文,數(shù)據(jù)報文。我們分別看一下這兩個的邏輯。
controlMessage(data) { // 連接關(guān)閉 if (this._opcode === 0x08) { this._loop = false; if (data.length === 0) { this.emit('conclude', 1005, ''); this.end(); } } else if (this._opcode === 0x09) { this.emit('ping', data); } else { this.emit('pong', data); } this._state = GET_INFO; }
我們看到控制報文包括三種(conclude、ping、pong)。而數(shù)據(jù)報文只有this.emit('message', data);一種。這個就是接收者的整體邏輯。
數(shù)據(jù)發(fā)送者是對websocket協(xié)議的封裝,當用戶調(diào)研數(shù)據(jù)發(fā)送者的send接口發(fā)送數(shù)據(jù)時,數(shù)據(jù)發(fā)送者會組裝成一個websocket協(xié)議的包再發(fā)送出去。
send(data, options, cb) { const buf = toBuffer(data); const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName]; let opcode = options.binary ? 2 : 1; let rsv1 = options.compress; if (this._firstFragment) { this._firstFragment = false; if (rsv1 && perMessageDeflate) { rsv1 = buf.length >= perMessageDeflate._threshold; } this._compress = rsv1; } else { rsv1 = false; opcode = 0; } if (options.fin) this._firstFragment = true; // 需要壓縮 if (perMessageDeflate) { const opts = { fin: options.fin, rsv1, opcode, mask: options.mask, readOnly: toBuffer.readOnly }; // 正在壓縮,則排隊等待,否則執(zhí)行壓縮 if (this._deflating) { this.enqueue([this.dispatch, buf, this._compress, opts, cb]); } else { this.dispatch(buf, this._compress, opts, cb); } } else { // 不需要壓縮,直接發(fā)送 this.sendFrame( Sender.frame(buf, { fin: options.fin, rsv1: false, opcode, mask: options.mask, readOnly: toBuffer.readOnly }), cb ); } }
send函數(shù)做了一些參數(shù)的處理后發(fā)送數(shù)據(jù),但是如果需要壓縮的話,要壓縮后才能發(fā)送。數(shù)據(jù)處理完成后調(diào)用真正的發(fā)送函數(shù)
sendFrame(list, cb) { if (list.length === 2) { this._socket.cork(); this._socket.write(list[0]); this._socket.write(list[1], cb); this._socket.uncork(); } else { this._socket.write(list[0], cb); } }
了解了數(shù)據(jù)接收者和發(fā)送者的邏輯后,我們看一下websocket對象和setSocket函數(shù)做了什么事情,websocket對象本質(zhì)是對TCP socket的封裝。它接收來自底層的數(shù)據(jù),然后透傳給數(shù)據(jù)接收者,數(shù)據(jù)接收者處理完后,觸發(fā)websocket對應(yīng)的對應(yīng)的事件,比如message事件。發(fā)送數(shù)據(jù)的時候,websocket會調(diào)用數(shù)據(jù)發(fā)送者的接口,數(shù)據(jù)發(fā)送者組裝成websocket協(xié)議的數(shù)據(jù)包后再發(fā)送出去,架構(gòu)如下圖所示。
接下來我們看看setSocket的邏輯
setSocket(socket, head, maxPayload) { // 數(shù)據(jù)接收者,負責處理tcp上收到的數(shù)據(jù)(socket是tcp層的socket) const receiver = new Receiver(...); // 數(shù)據(jù)發(fā)送者,負責發(fā)送數(shù)據(jù)給對端 this._sender = new Sender(socket, this._extensions); // 數(shù)據(jù)接收者,負責解析數(shù)據(jù) this._receiver = receiver; // net模塊的tcp socket this._socket = socket; // 關(guān)聯(lián)起來 receiver[kWebSocket] = this; socket[kWebSocket] = this; // 監(jiān)聽接收者的事件,解析數(shù)據(jù)的時候會回調(diào) receiver.on('conclude', receiverOnConclude); // 下面兩個事件由Writable觸發(fā) receiver.on('drain', receiverOnDrain); receiver.on('error', receiverOnError); receiver.on('message', receiverOnMessage); receiver.on('ping', receiverOnPing); receiver.on('pong', receiverOnPong); // 清除定時器 socket.setTimeout(0); // 關(guān)閉nagle算法 socket.setNoDelay(); // 升級請求中,攜帶的http body,通常是空 if (head.length > 0) socket.unshift(head); // 監(jiān)聽tcp底層的事件 socket.on('close', socketOnClose); socket.on('data', socketOnData); socket.on('end', socketOnEnd); socket.on('error', socketOnError); this.readyState = WebSocket.OPEN; this.emit('open'); }
我們看到里面監(jiān)聽了各種事件,下面以data事件為例,看一下處理過程。當tcp socket收到數(shù)據(jù)的時候會執(zhí)行socketOnData函數(shù)。
function socketOnData(chunk) { // 會調(diào)用receiver里的_write函數(shù),其實就是換成到receiver對象上,如果數(shù)據(jù)解析出錯,會觸發(fā)socket error事件 if (!this[kWebSocket]._receiver.write(chunk)) { this.pause(); } }
socketOnData通過接收者的接口把數(shù)據(jù)傳給接收者,接收者會解析數(shù)據(jù),然后觸發(fā)對應(yīng)的事件,比如message。
receiver.on('message', receiverOnMessage); function receiverOnMessage(data) { this[kWebSocket].emit('message', data); }
然后ws的socket對象繼續(xù)往上層觸發(fā)message事件。this[kWebSocket]的值是ws提供的socket對象本身。架構(gòu)圖如下。
這就是ws實現(xiàn)websocket協(xié)議的基本原理,具體細節(jié)可以參考源碼。
“Websocket庫Ws的原理是什么”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!