前言
西峰網(wǎng)站制作公司哪家好,找創(chuàng)新互聯(lián)!從網(wǎng)頁設(shè)計(jì)、網(wǎng)站建設(shè)、微信開發(fā)、APP開發(fā)、成都響應(yīng)式網(wǎng)站建設(shè)公司等網(wǎng)站項(xiàng)目制作,到程序開發(fā),運(yùn)營維護(hù)。創(chuàng)新互聯(lián)自2013年起到現(xiàn)在10年的時(shí)間,我們擁有了豐富的建站經(jīng)驗(yàn)和運(yùn)維經(jīng)驗(yàn),來保證我們的工作的順利進(jìn)行。專注于網(wǎng)站建設(shè)就選創(chuàng)新互聯(lián)。
前段時(shí)間接到了一個(gè)支付中轉(zhuǎn)服務(wù)的需求,即支付數(shù)據(jù)通過http接口傳到中轉(zhuǎn)服務(wù)器,中轉(zhuǎn)服務(wù)器將支付數(shù)據(jù)發(fā)送到異構(gòu)后臺(Lua)的指定tcp socket。
一開始評估的時(shí)候感覺蠻簡單的,就是http server和tcp server間的通信,不是一個(gè)Event實(shí)例就能解決的狀態(tài)管理問題嗎?注冊一個(gè)事件A用于消息傳遞,在socket連接時(shí)注冊唯一的ID,然后在http接收到數(shù)據(jù)時(shí),emit事件A;在監(jiān)聽到事件A時(shí),在tcp server中尋找指定ID對應(yīng)的socket處理該數(shù)據(jù)即可。
盡管node.js在高并發(fā)方面有不錯(cuò)的性能,但是單個(gè)tcp server實(shí)例的承載能力有限,為避免服務(wù)器過載,node.js 單進(jìn)程的內(nèi)存有上限(默認(rèn)2G),能容納的長連接客戶端數(shù)不多。但隨著業(yè)務(wù)的擴(kuò)大,我們需要考慮多機(jī)集群部署,客戶端可以連接到任一節(jié)點(diǎn),并發(fā)送消息。如何做到多節(jié)點(diǎn)的同時(shí)推送,我們需要建立一套多節(jié)點(diǎn)之間的消息分發(fā)/訂閱架構(gòu)。常用的第三方消息管理庫有 RabbitMQ和redis等。在這里,我用的是Redis的訂閱發(fā)布服務(wù)。
redis.io有一個(gè)比較成熟的redis消息中轉(zhuǎn)庫socket.io-redis (本地下載)。但我們項(xiàng)目中異構(gòu)后臺用到的并非websocket,而是原生的TCP原生的Socket。用原生redis的sub/pubs實(shí)現(xiàn)并不難,就手寫了。
redis在該項(xiàng)目中主要起到一個(gè)消息分發(fā)中心(publish/subscribe)的作用。當(dāng)http請求的支付數(shù)據(jù)發(fā)送過來時(shí),則通過redis的publish功能往所有的channel推送消息,這樣所有訂閱該channel的socket server就能收到回調(diào),然后推送到指定客戶端。在應(yīng)用層看跟Event事件消息的處理差不多。
const redis = require("redis"), redisClient = redis.createClient, REDIS_CFG = { host: '127.0.0.1', port: 6379 }, sub = redisClient(REDIS_CFG), pub = redisClient(REDIS_CFG), PAY_MQ_CHANNEL = 'pay_mq_channel'; // 監(jiān)聽頻道的消息回調(diào) sub.on('message', function(channel, message) { switch (channle){ case PAY_MQ_CHANNEL: console.log('notification received:', message); // 廣播消息到指定socket break; } }); // 訂閱頻道 sub.subscribe(PAY_MQ_CHANNEL); // 當(dāng)接收到支付數(shù)據(jù)時(shí),推送頻道消息 pub.publish(PAY_MQ_CHANNEL, {id: '01', msg: `hello ${PAY_MQ_CHANNEL}!`});
由于redis的sub/pub的channel訂閱數(shù)有上限,所以建議一類消息使用一個(gè)channel,一個(gè)channel下使用map、set或數(shù)組來存儲訂閱時(shí)的回調(diào)函數(shù),在接收到訂閱消息時(shí)遍歷執(zhí)行回調(diào)函數(shù)。
下面是我封裝好的Redis組件(RedisMQProxy.js):
/* * redis 訂閱/發(fā)布 */ const _ = require('lodash'), redis = require("redis"), REDIS_CFG = { host: '127.0.0.1', port: 6379 }, sub = redisClient(REDIS_CFG), pub = redisClient(REDIS_CFG); let SubListenerFuns = {}; // channel的回調(diào)函數(shù)列表 let RedisMQProxy = { // 訂閱channel on(channel, cb, errorCb, once = false) { sub.subscribe(channel); // 訂閱channel消息 // 將回調(diào)函數(shù)存放數(shù)組中 SubListenerFuns[channel] = _.isEmpty( SubListenerFuns[channel] ) ? [] : SubListenerFuns[channel]; SubListenerFuns[channel].push({ once, cb, errorCb }); }, // 監(jiān)聽一次性的channel回調(diào)函數(shù) once(channel, cb, errorCb) { this.on(channel, cb, errorCb, true); }, // 發(fā)送channel消息 emit(channel, message) { if(!_.isString(message)) { message = JSON.stringify(message); } pub.publish(channel, message); }, // 移除channel上的監(jiān)聽函數(shù) removeListener(channel, func) { let channelHandlers = _.isEmpty( SubListenerFuns[channel] ) ? [] : SubListenerFuns[channel]; for(let i = 0, l = channelHandlers.length; i < l; i++) { let handler = channelHandlers[i] || {}; let cb = handler.cb; if(func && func == cb) { channelHandlers.splice(i, 1); return false; } } } }; RedisMQProxy.SubListeners = SubListenerFuns; pub.on('error', onError); sub.on('error', onError); // 監(jiān)聽redis的訂閱消息 sub.on("message", function(channel, message) { // 遍歷執(zhí)行channel的回調(diào)函數(shù) try { message = JSON.parse(message); } catch(e) {} broadcastToChannel(channel, message); }); // 廣播消息到指定頻道 function broadcastToChannel(channel, message, isError) { let channelHandlers = _.isEmpty( SubListenerFuns[channel] ) ? [] : SubListenerFuns[channel]; for(let i = 0, l = channelHandlers.length; i < l; i++) { let handler = channelHandlers[i] || {}; let isOnce = handler.once || false; let func = handler.cb; let errorFunc = handler.errorCb; _.isFunction(func) && func(message); isError && _.isFunction(errorFunc) && errorFunc(message); isOnce && channelHandlers.splice(i, 1); // 移除一次性監(jiān)聽的函數(shù) } } function broadcastToAllChannels(message, isError) { for(let channel in SubListenerFuns) { broadcastToChannel(channel, message, isError); } } function onError(err) { err = err || {}; err.msg = err.msg || 'redis sub/pub fail'; // 通知所有channel執(zhí)行錯(cuò)誤回調(diào)函數(shù) broadcastToAllChannels(err, true); } module.exports = RedisMQProxy;
在使用時(shí)就可以比較方便地調(diào)用了:
const RedisMQProxy = require('./RedisMQProxy'), PAY_MQ_CHANNEL = 'pay_mq_channel'; // 訂閱channel RedisMQ.on(PAY_MQ_CHANNEL, function(message) { console.log('notification received:', message); // 廣播消息到指定socket // ... }); // 訂閱一次性的channel RedisMQ.once(PAY_MQ_CHANNEL, function(message) { // ... }); // 當(dāng)接收到支付數(shù)據(jù)時(shí),推送頻道消息 RedisMQ.emit(PAY_MQ_CHANNEL, {id: '01', msg: `hello ${PAY_MQ_CHANNEL}!`});
目前該項(xiàng)目已經(jīng)健康運(yùn)行了一個(gè)多月。由于socket server的多進(jìn)程間消息推送依賴于redis的消息中轉(zhuǎn),而Redis使用的是單進(jìn)程,未能充分利用CPU。當(dāng)業(yè)務(wù)膨脹的時(shí)候,redis就要考慮分布集群了。
總結(jié)
以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,如果有疑問大家可以留言交流,謝謝大家對創(chuàng)新互聯(lián)的支持。