真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

node中如何實(shí)現(xiàn)RPC通信

本篇內(nèi)容主要講解“node中如何實(shí)現(xiàn)RPC通信”,感興趣的朋友不妨來看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“node中如何實(shí)現(xiàn)RPC通信”吧!

創(chuàng)新互聯(lián)堅(jiān)持“要么做到,要么別承諾”的工作理念,服務(wù)領(lǐng)域包括:網(wǎng)站建設(shè)、成都網(wǎng)站設(shè)計(jì)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣等服務(wù),滿足客戶于互聯(lián)網(wǎng)時(shí)代的游仙網(wǎng)站設(shè)計(jì)、移動(dòng)媒體設(shè)計(jì)的需求,幫助企業(yè)找到有效的互聯(lián)網(wǎng)解決方案。努力成為您成熟可靠的網(wǎng)絡(luò)建設(shè)合作伙伴!

什么是RPC?

RPC:Remote Procedure Call(遠(yuǎn)程過程調(diào)用)是指遠(yuǎn)程過程調(diào)用,也就是說兩臺(tái)服務(wù)器A,B,一個(gè)應(yīng)用部署在A服務(wù)器上,想要調(diào)用B服務(wù)器上應(yīng)用提供的函數(shù)/方法,由于不在一個(gè)內(nèi)存空間,不能直接調(diào)用,需要通過網(wǎng)絡(luò)來表達(dá)調(diào)用的語義和傳達(dá)調(diào)用的數(shù)據(jù)。

服務(wù)器和服務(wù)器之間的通信

RPC vs HTTP

相同點(diǎn)

  • 都是兩臺(tái)計(jì)算機(jī)之間的網(wǎng)絡(luò)通信。ajax是瀏覽器和服務(wù)器之間的通行,RPC是服務(wù)器與服務(wù)器之間的通行

  • 需要雙方約定一個(gè)數(shù)據(jù)格式

不同點(diǎn)

  • 尋址服務(wù)器不同

ajax 是使用 DNS作為尋址服務(wù)獲取域名所對(duì)應(yīng)的ip地址,瀏覽器拿到ip地址之后發(fā)送請(qǐng)求獲取數(shù)據(jù)。

RPC一般是在內(nèi)網(wǎng)里面相互請(qǐng)求,所以它一般不用DNS做尋址服務(wù)。因?yàn)樵趦?nèi)網(wǎng),所以可以使用規(guī)定的id或者一個(gè)虛擬vip,比如v5:8001,然后到尋址服務(wù)器獲取v5所對(duì)應(yīng)的ip地址。

  • 應(yīng)用層協(xié)議不同

ajax使用http協(xié)議,它是一個(gè)文本協(xié)議,我們交互數(shù)據(jù)的時(shí)候文件格式要么是html,要么是json對(duì)象,使用json的時(shí)候就是key-value的形式。

RPC采用二進(jìn)制協(xié)議。采用二進(jìn)制傳輸,它傳輸?shù)陌沁@樣子的[0001 0001 0111 0110 0010],里面都是二進(jìn)制,一般采用那幾位表示一個(gè)字段,比如前6位是一個(gè)字段,依次類推。

這樣就不需要http傳輸json對(duì)象里面的key,所以有更小的數(shù)據(jù)體積。

因?yàn)閭鬏數(shù)氖嵌M(jìn)制,更適合于計(jì)算機(jī)來理解,文本協(xié)議更適合人類理解,所以計(jì)算機(jī)去解讀各個(gè)字段的耗時(shí)是比文本協(xié)議少很多的。

RPC采用二進(jìn)制有更小的數(shù)據(jù)體積,及更快的解讀速度。

  • TCP通訊方式

  • 單工通信:只能客戶端給服務(wù)端發(fā)消息,或者只能服務(wù)端給客戶端發(fā)消息

  • 半雙工通信:在某個(gè)時(shí)間段內(nèi)只能客戶端給服務(wù)端發(fā)消息,過了這個(gè)時(shí)間段服務(wù)端可以給客戶端發(fā)消息。如果把時(shí)間分成很多時(shí)間片,在一個(gè)時(shí)間片內(nèi)就屬于單工通信

  • 全雙工通信:客戶端和服務(wù)端能相互通信

選擇這三種通信方式的哪一種主要考慮的因素是:實(shí)現(xiàn)難度和成本。全雙工通信是要比半雙工通信的成本要高的,在某些場(chǎng)景下還是可以考慮使用半雙工通信。

ajax是一種半雙工通信。http是文本協(xié)議,但是它底層是tcp協(xié)議,http文本在tcp這一層會(huì)經(jīng)歷從二進(jìn)制數(shù)據(jù)流到文本的轉(zhuǎn)換過程。

理解RPC只是在更深入地理解前端技術(shù)。

buffer編解碼二進(jìn)制數(shù)據(jù)包

創(chuàng)建buffer

buffer.from: 從已有的數(shù)據(jù)創(chuàng)建二進(jìn)制

const buffer1 = Buffer.from('geekbang')
const buffer2 = Buffer.from([0, 1, 2, 3, 4])



buffer.alloc: 創(chuàng)建一個(gè)空的二進(jìn)制

const buffer3 = Buffer.alloc(20)

往buffer里面寫東西

  • buffer.write(string, offset): 寫入字符串

  • buffer.writeInt8(value, offset): int8表示二進(jìn)制8位(8位表示一個(gè)字節(jié))所能表示的整數(shù),offset開始寫入之前要跳過的字節(jié)數(shù)。

  • buffer.writeInt16BE(value, offset): int16(兩個(gè)字節(jié)數(shù)),表示16個(gè)二進(jìn)制位所能表示的整數(shù),即32767。超過這個(gè)數(shù)程序會(huì)報(bào)錯(cuò)。

const buffer = Buffer.from([1, 2, 3, 4]) // 

// 往第二個(gè)字節(jié)里面寫入12
buffer.writeInt8(12, 1) // 

大端BE與小端LE:主要是對(duì)于2個(gè)以上字節(jié)的數(shù)據(jù)排列方式不同(writeInt8因?yàn)橹挥幸粋€(gè)字節(jié),所以沒有大端和小端),大端的話就是低位地址放高位,小端就是低位地址放低位。如下:

const buffer = Buffer.from([1, 2, 3, 4])

buffer.writeInt16BE(512, 2) // 
buffer.writeInt16LE(512, 2) // 

RPC傳輸?shù)亩M(jìn)制如何表示傳遞的字段

PC傳輸?shù)亩M(jìn)制是如何表示字段的呢?現(xiàn)在有個(gè)二進(jìn)制包[00, 00, 00, 00, 00, 00, 00],我們假定前三個(gè)字節(jié)表示一個(gè)字段值,后面兩個(gè)表示一個(gè)字段的值,最后兩個(gè)也表示一個(gè)字段的值。那寫法如下:

writeInt16BE(value, 0)
writeInt16BE(value, 2)
writeInt16BE(value, 4)

發(fā)現(xiàn)像這樣寫,不僅要知道寫入的值,還要知道值的數(shù)據(jù)類型,這樣就很麻煩。不如json格式那么方便。針對(duì)這種情況業(yè)界也有解決方案。npm有個(gè)庫(kù)protocol-buffers,把我們寫的參數(shù)轉(zhuǎn)化為buffer

// test.proto 定義的協(xié)議文件
message Column {
  required float num  = 1;
  required string payload = 2;
}
// index.js
const fs = require('fs')
var protobuf = require('protocol-buffers')
var messages = protobuf(fs.readFileSync('test.proto'))

var buf = messages.Column.encode({
	num: 42,
	payload: 'hello world'
})
console.log(buf)
// 

var obj = messages.Column.decode(buf)
console.log(obj)
// { num: 42, payload: 'hello world' }

net建立RPC通道

半雙工通信

服務(wù)端代碼:

const net = require('net')

const LESSON_DATA = {
  136797: '01 | 課程介紹',
  136798: '02 | 內(nèi)容綜述',
  136799: '03 | Node.js是什么?',
  136800: '04 | Node.js可以用來做什么?',
  136801: '05 | 課程實(shí)戰(zhàn)項(xiàng)目介紹',
  136803: '06 | 什么是技術(shù)預(yù)研?',
  136804: '07 | Node.js開發(fā)環(huán)境安裝',
  136806: '08 | 第一個(gè)Node.js程序:石頭剪刀布游戲',
  136807: '09 | 模塊:CommonJS規(guī)范',
  136808: '10 | 模塊:使用模塊規(guī)范改造石頭剪刀布游戲',
  136809: '11 | 模塊:npm',
  141994: '12 | 模塊:Node.js內(nèi)置模塊',
  143517: '13 | 異步:非阻塞I/O',
  143557: '14 | 異步:異步編程之callback',
  143564: '15 | 異步:事件循環(huán)',
  143644: '16 | 異步:異步編程之Promise',
  146470: '17 | 異步:異步編程之a(chǎn)sync/await',
  146569: '18 | HTTP:什么是HTTP服務(wù)器?',
  146582: '19 | HTTP:簡(jiǎn)單實(shí)現(xiàn)一個(gè)HTTP服務(wù)器'
}

const server = net.createServer(socket => {
  // 監(jiān)聽客戶端發(fā)送的消息
  socket.on('data', buffer => {
    const lessonId = buffer.readInt32BE()
    setTimeout(() => {
      // 往客戶端發(fā)送消息
      socket.write(LESSON_DATA[lessonId])
    }, 1000)
  })
})

server.listen(4000)

客戶端代碼:

const net = require('net')

const socket = new net.Socket({})

const LESSON_IDS = [
  '136797',
  '136798',
  '136799',
  '136800',
  '136801',
  '136803',
  '136804',
  '136806',
  '136807',
  '136808',
  '136809',
  '141994',
  '143517',
  '143557',
  '143564',
  '143644',
  '146470',
  '146569',
  '146582'
]

socket.connect({
  host: '127.0.0.1',
  port: 4000
})

let buffer = Buffer.alloc(4)
buffer.writeInt32BE(LESSON_IDS[Math.floor(Math.random() * LESSON_IDS.length)])

// 往服務(wù)端發(fā)送消息
socket.write(buffer)

// 監(jiān)聽從服務(wù)端傳回的消息
socket.on('data', buffer => {
  console.log(buffer.toString())

  // 獲取到數(shù)據(jù)之后再次發(fā)送消息
  buffer = Buffer.alloc(4)
  buffer.writeInt32BE(LESSON_IDS[Math.floor(Math.random() * LESSON_IDS.length)])

  socket.write(buffer)
})

以上半雙工通信步驟如下:

  • 客戶端發(fā)送消息 socket.write(buffer)

  • 服務(wù)端接受消息后往客戶端發(fā)送消息 socket.write(buffer)

  • 客戶端接受消息后再次發(fā)送消息

這樣在一個(gè)時(shí)間端之內(nèi),只有一個(gè)端往另一個(gè)端發(fā)送消息,這樣就實(shí)現(xiàn)了半雙工通信。那如何實(shí)現(xiàn)全雙工通信呢,也就是在客戶端往服務(wù)端發(fā)送消息的同時(shí),服務(wù)端還沒有消息返回給客戶端之前,客戶端又發(fā)送了一個(gè)消息給服務(wù)端。

全雙工通信

先來看一個(gè)場(chǎng)景:

node中如何實(shí)現(xiàn)RPC通信

客戶端發(fā)送了一個(gè)id1的請(qǐng)求,但是服務(wù)端還來不及返回,接著客戶端又發(fā)送了一個(gè)id2的請(qǐng)求。

等了一個(gè)之后,服務(wù)端先把id2的結(jié)果返回了,然后再把id1的結(jié)果返回。

那如何結(jié)果匹配到對(duì)應(yīng)的請(qǐng)求上呢?

如果按照時(shí)間順序,那么id1的請(qǐng)求對(duì)應(yīng)了id2的結(jié)果,因?yàn)閕d2是先返回的;id2的請(qǐng)求對(duì)應(yīng)了id1的結(jié)果,這樣就導(dǎo)致請(qǐng)求包和返回包錯(cuò)位的情況。

怎么辦呢?

我們可以給請(qǐng)求包和返回包都帶上序號(hào),這樣就能對(duì)應(yīng)上。

錯(cuò)位處理

客戶端代碼:

socket.on('data', buffer => {
  // 包序號(hào)
  const seqBuffer = buffer.slice(0, 2)
  // 服務(wù)端返回的內(nèi)容
  const titleBuffer = buffer.slice(2)
    
  console.log(seqBuffer.readInt16BE(), titleBuffer.toString())
})

// 包序號(hào)
let seq = 0
function encode(index) {
  // 請(qǐng)求包的長(zhǎng)度現(xiàn)在是6 = 2(包序號(hào)) + 4(課程id)
  buffer = Buffer.alloc(6)
  buffer.writeInt16BE(seq)
  buffer.writeInt32BE(LESSON_IDS[index], 2)

  seq++
  return buffer
}

// 每50ms發(fā)送一次請(qǐng)求
setInterval(() => {
  id = Math.floor(Math.random() * LESSON_IDS.length)
  socket.write(encode(id))
}, 50)

服務(wù)端代碼:

const server = net.createServer(socket => {
  socket.on('data', buffer => {
    // 把包序號(hào)取出
    const seqBuffer = buffer.slice(0, 2)
    // 從第2個(gè)字節(jié)開始讀取
    const lessonId = buffer.readInt32BE(2)
    setTimeout(() => {
      const buffer = Buffer.concat([
        seqBuffer,
        Buffer.from(LESSON_DATA[lessonId])
      ])
      socket.write(buffer)
      // 這里返回時(shí)間采用隨機(jī)的,這樣就不會(huì)按順序返回,就可以測(cè)試錯(cuò)位的情況
    }, 10 + Math.random() * 1000)
  })
})

  • 客戶端把包序號(hào)和對(duì)應(yīng)的id給服務(wù)端

  • 服務(wù)端取出包序號(hào)和對(duì)應(yīng)的id,然后把包序號(hào)和id對(duì)應(yīng)的內(nèi)容返回給客戶端,同時(shí)設(shè)置返回的時(shí)間是隨機(jī)的,這樣就不會(huì)按照順序返回。

粘包處理

如果我們這樣發(fā)送請(qǐng)求:

for (let i = 0; i < 100; i++) {
  id = Math.floor(Math.random() * LESSON_IDS.length)
  socket.write(encode(id))
}

我們發(fā)現(xiàn)服務(wù)端接收到的信息如下:

這是因?yàn)?code>TCP自己做的一個(gè)優(yōu)化,它會(huì)把所有的請(qǐng)求包拼接在一起,這樣就會(huì)產(chǎn)生粘包的現(xiàn)象。

服務(wù)端需要把包進(jìn)行拆分,拆分成100個(gè)小包。

那如何拆分呢?

首先客戶端發(fā)送的數(shù)據(jù)包包括兩部分:定長(zhǎng)的包頭和不定長(zhǎng)的包體

包頭又分為兩部分:包序號(hào)及包體的長(zhǎng)度。只有知道包體的長(zhǎng)度,才能知道從哪里進(jìn)行分割。

let seq = 0
function encode(data) {
    // 正常情況下,這里應(yīng)該是使用 protocol-buffers 來encode一段代表業(yè)務(wù)數(shù)據(jù)的數(shù)據(jù)包
    // 為了不要混淆重點(diǎn),這個(gè)例子比較簡(jiǎn)單,就直接把課程id轉(zhuǎn)buffer發(fā)送
    const body = Buffer.alloc(4);
    body.writeInt32BE(LESSON_IDS[data.id]);

    // 一般來說,一個(gè)rpc調(diào)用的數(shù)據(jù)包會(huì)分為定長(zhǎng)的包頭和不定長(zhǎng)的包體兩部分
    // 包頭的作用就是用來記載包的序號(hào)和包的長(zhǎng)度,以實(shí)現(xiàn)全雙工通信
    const header = Buffer.alloc(6); // 包序號(hào)占2個(gè)字節(jié),包體長(zhǎng)度占4個(gè)字節(jié),共6個(gè)字節(jié)
    header.writeInt16BE(seq)
    header.writeInt32BE(body.length, 2);

    // 包頭和包體拼起來發(fā)送
    const buffer = Buffer.concat([header, body])

    console.log(`包${seq}傳輸?shù)恼n程id為${LESSON_IDS[data.id]}`);
    seq++;
    return buffer;
}

// 并發(fā)
for (let i = 0; i < 100; i++) {
    id = Math.floor(Math.random() * LESSON_IDS.length)
    socket.write(encode({ id }))
}

服務(wù)端進(jìn)行拆包

const server = net.createServer(socket => {
  let oldBuffer = null
  socket.on('data', buffer => {
    // 把上一次data事件使用殘余的buffer接上來
    if (oldBuffer) {
      buffer = Buffer.concat([oldBuffer, buffer])
    }
    let packageLength = 0
    // 只要還存在可以解成完整包的包長(zhǎng)
    while ((packageLength = checkComplete(buffer))) {
      // 確定包的長(zhǎng)度后進(jìn)行slice分割
      const package = buffer.slice(0, packageLength)
      // 剩余的包利用循環(huán)繼續(xù)分割
      buffer = buffer.slice(packageLength)

      // 把這個(gè)包解成數(shù)據(jù)和seq
      const result = decode(package)

      // 計(jì)算得到要返回的結(jié)果,并write返回
      socket.write(encode(LESSON_DATA[result.data], result.seq))
    }

    // 把殘余的buffer記下來
    oldBuffer = buffer
  })
})

checkComplete 函數(shù)的作用來確定一個(gè)數(shù)據(jù)包的長(zhǎng)度,然后進(jìn)行分割:

function checkComplete(buffer) {
  // 如果包的長(zhǎng)度小于6個(gè)字節(jié)說明只有包頭,沒有包體,那么直接返回0
  if (buffer.length <= 6) {
    return 0
  }
  // 讀取包頭的第二個(gè)字節(jié),取出包體的長(zhǎng)度
  const bodyLength = buffer.readInt32BE(2)
  // 請(qǐng)求包包括包頭(6個(gè)字節(jié))和包體body
  return 6 + bodyLength
}

decode對(duì)包進(jìn)行解密:

function decode(buffer) {
  // 讀取包頭
  const header = buffer.slice(0, 6)
  const seq = header.readInt16BE()
    
  // 讀取包體  
  // 正常情況下,這里應(yīng)該是使用 protobuf 來decode一段代表業(yè)務(wù)數(shù)據(jù)的數(shù)據(jù)包
  // 為了不要混淆重點(diǎn),這個(gè)例子比較簡(jiǎn)單,就直接讀一個(gè)Int32即可
  const body = buffer.slice(6).readInt32BE()

  // 這里把seq和數(shù)據(jù)返回出去
  return {
    seq,
    data: body
  }
}

encode把客戶端想要的數(shù)據(jù)轉(zhuǎn)化為二進(jìn)制返回,這個(gè)包同樣包括包頭和包體,包頭又包括包需要包序號(hào)和包體的長(zhǎng)度。

function encode(data, seq) {
  // 正常情況下,這里應(yīng)該是使用 protobuf 來encode一段代表業(yè)務(wù)數(shù)據(jù)的數(shù)據(jù)包
  // 為了不要混淆重點(diǎn),這個(gè)例子比較簡(jiǎn)單,就直接把課程標(biāo)題轉(zhuǎn)buffer返回
  const body = Buffer.from(data)

  // 一般來說,一個(gè)rpc調(diào)用的數(shù)據(jù)包會(huì)分為定長(zhǎng)的包頭和不定長(zhǎng)的包體兩部分
  // 包頭的作用就是用來記載包的序號(hào)和包的長(zhǎng)度,以實(shí)現(xiàn)全雙工通信
  const header = Buffer.alloc(6)
  header.writeInt16BE(seq)
  header.writeInt32BE(body.length, 2)

  const buffer = Buffer.concat([header, body])

  return buffer
}

當(dāng)客戶端收到服務(wù)端發(fā)送的包之后,同樣也要進(jìn)行拆包,因?yàn)樗械陌瑯佣颊吃谝黄鹆?

 

因此,客戶端也需要拆包,拆包策略與服務(wù)端的拆包策略是一致的:

let oldBuffer = null
socket.on('data', buffer => {
  // 把上一次data事件使用殘余的buffer接上來
  if (oldBuffer) {
    buffer = Buffer.concat([oldBuffer, buffer])
  }
  let completeLength = 0

  // 只要還存在可以解成完整包的包長(zhǎng)
  while ((completeLength = checkComplete(buffer))) {
    const package = buffer.slice(0, completeLength)
    buffer = buffer.slice(completeLength)

    // 把這個(gè)包解成數(shù)據(jù)和seq
    const result = decode(package)
    console.log(`包${result.seq},返回值是${result.data}`)
  }

  // 把殘余的buffer記下來
  oldBuffer = buffer
})

到這里就實(shí)現(xiàn)了雙全工通行,這樣客戶端和服務(wù)端隨時(shí)都可以往對(duì)方發(fā)小消息了。

到此,相信大家對(duì)“node中如何實(shí)現(xiàn)RPC通信”有了更深的了解,不妨來實(shí)際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!


名稱欄目:node中如何實(shí)現(xiàn)RPC通信
文章起源:http://weahome.cn/article/pojsjg.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部