這篇文章主要介紹了Go如何實(shí)現(xiàn)百萬(wàn)WebSocket連接,具有一定借鑒價(jià)值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
成都創(chuàng)新互聯(lián)公司是一家專注于成都做網(wǎng)站、成都網(wǎng)站制作、成都外貿(mào)網(wǎng)站建設(shè)與策劃設(shè)計(jì),昌江網(wǎng)站建設(shè)哪家好?成都創(chuàng)新互聯(lián)公司做網(wǎng)站,專注于網(wǎng)站建設(shè)10年,網(wǎng)設(shè)計(jì)領(lǐng)域的專業(yè)建站公司;建站業(yè)務(wù)涵蓋:昌江等地區(qū)。昌江做網(wǎng)站價(jià)格咨詢:028-86922220
go是golang的簡(jiǎn)稱,而golang可以做服務(wù)器端開發(fā),且golang很適合做日志處理、數(shù)據(jù)打包、虛擬機(jī)處理、數(shù)據(jù)庫(kù)代理等工作。在網(wǎng)絡(luò)編程方面,它還廣泛應(yīng)用于web應(yīng)用、API應(yīng)用等領(lǐng)域。
1. 簡(jiǎn)介
為了定義本文的討論范圍,有必要說(shuō)明我們?yōu)槭裁葱枰@個(gè)服務(wù)。
Mail.Ru 有很多有狀態(tài)系統(tǒng)。用戶的電子郵件存儲(chǔ)就是其中之一。我們有幾種方法可以跟蹤該系統(tǒng)的狀態(tài)變化以及系統(tǒng)事件,主要是通過(guò)定期系統(tǒng)輪詢或者狀態(tài)變化時(shí)的系統(tǒng)通知來(lái)實(shí)現(xiàn)。
兩種方式各有利弊。但是對(duì)于郵件而言,用戶收到新郵件的速度越快越好。
郵件輪詢大約每秒 50,000 個(gè) HTTP 查詢,其中 60% 返回 304 狀態(tài),這意味著郵箱中沒(méi)有任何更改。
因此,為了減少服務(wù)器的負(fù)載并加快向用戶發(fā)送郵件的速度,我們決定通過(guò)用發(fā)布 - 訂閱服務(wù)(也稱為消息總線,消息代理或事件管道)的模式來(lái)造一個(gè)輪子。一端接收有關(guān)狀態(tài)更改的通知,另一端訂閱此類通知。
之前的架構(gòu):
現(xiàn)在的架構(gòu):
第一個(gè)方案是之前的架構(gòu)。瀏覽器定期輪詢 API 并查詢存儲(chǔ)(郵箱服務(wù))是否有更改。
第二種方案是現(xiàn)在的架構(gòu)。瀏覽器與通知 API 建立了 WebSocket 連接,通知 API 是總線服務(wù)的消費(fèi)者。一旦接收到新郵件后,Storage 會(huì)將有關(guān)它的通知發(fā)送到總線(1),總線將其發(fā)送給訂閱者(2)。 API 通過(guò)連接發(fā)送這個(gè)收到的通知,將其發(fā)送到用戶的瀏覽器(3)。
所以現(xiàn)在我們將討論這個(gè) API 或者這個(gè) WebSocket 服務(wù)。展望一下未來(lái),我們的服務(wù)將來(lái)可能會(huì)有 300 萬(wàn)個(gè)在線連接。
2. 常用的方式
我們來(lái)看看如何在沒(méi)有任何優(yōu)化的情況下使用 Go 實(shí)現(xiàn)服務(wù)器的某些部分。
在我們繼續(xù)使用 net/http
之前,來(lái)談?wù)勅绾伟l(fā)送和接收數(shù)據(jù)。這個(gè)數(shù)據(jù)位于 WebSocket 協(xié)議上(例如 JSON 對(duì)象),我們?cè)谙挛闹袑⑵浞Q為包。
我們先來(lái)實(shí)現(xiàn) Channel
結(jié)構(gòu)體,該結(jié)構(gòu)體將包含在 WebSocket 連接上發(fā)送和接收數(shù)據(jù)包的邏輯。
2.1 Channel 結(jié)構(gòu)體
// WebSocket Channel 的實(shí)現(xiàn) // Packet 結(jié)構(gòu)體表示應(yīng)用程序級(jí)數(shù)據(jù) type Packet struct { ... } // Channel 裝飾用戶連接 type Channel struct { conn net.Conn // WebSocket 連接 send chan Packet // 傳出的 packets 隊(duì)列 } func NewChannel(conn net.Conn) *Channel { c := &Channel{ conn: conn, send: make(chan Packet, N), } go c.reader() go c.writer() return c }
我想讓你注意的是 reader
和 writer
goroutines。每個(gè) goroutine 都需要內(nèi)存棧,初始大小可能為 2 到 8 KB,具體 取決于操作系統(tǒng) 和 Go 版本。
關(guān)于上面提到的 300 萬(wàn)個(gè)線上連接,為此我們需要消耗 24 GB 的內(nèi)存(假設(shè)單個(gè) goroutine 消耗 4 KB 棧內(nèi)存)用于所有的連接。并且這還沒(méi)包括為 Channel
結(jié)構(gòu)體分配的內(nèi)存, ch.send
傳出的數(shù)據(jù)包占用的內(nèi)存以及其他內(nèi)部字段的內(nèi)存。
2.2 I/O goroutines
讓我們來(lái)看看 reader
的實(shí)現(xiàn):
// Channel's reading goroutine. func (c *Channel) reader() { // 創(chuàng)建一個(gè)緩沖 read 來(lái)減少 read 的系統(tǒng)調(diào)用 buf := bufio.NewReader(c.conn) for { pkt, _ := readPacket(buf) c.handle(pkt) } }
這里我們使用了 bufio.Reader
來(lái)減少 read()
系統(tǒng)調(diào)用的次數(shù),并盡可能多地讀取 buf
中緩沖區(qū)大小所允許的數(shù)量。在這個(gè)無(wú)限循環(huán)中,我們等待新數(shù)據(jù)的到來(lái)。請(qǐng)先記住這句話: 等待新數(shù)據(jù)的到來(lái) 。我們稍后會(huì)回顧。
我們先不考慮傳入的數(shù)據(jù)包的解析和處理,因?yàn)樗鼘?duì)我們討論的優(yōu)化并不重要。但是, buf
值得我們關(guān)注:默認(rèn)情況下,它是 4 KB,這意味著連接還需要 12 GB 的內(nèi)存。 writer
也有類似的情況:
// Channel's writing goroutine. func (c *Channel) writer() { // 創(chuàng)建一個(gè)緩沖 write 來(lái)減少 write 的系統(tǒng)調(diào)用 buf := bufio.NewWriter(c.conn) for pkt := range c.send { _ := writePacket(buf, pkt) buf.Flush() } }
我們通過(guò) Channel 的 c.send
遍歷將數(shù)據(jù)包傳出 并將它們寫入緩沖區(qū)。細(xì)心的讀者可能猜到了,這是我們 300 萬(wàn)個(gè)連接的另外 12 GB 的內(nèi)存消耗。
2.3 HTTP
已經(jīng)實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的 Channel
,現(xiàn)在我們需要使用 WebSocket 連接。由于仍然處于常用的方式的標(biāo)題下,所以我們以常用的方式繼續(xù)。
注意:如果你不知道 WebSocket 的運(yùn)行原理,需要記住客戶端會(huì)通過(guò)名為 Upgrade 的特殊 HTTP 機(jī)制轉(zhuǎn)換到 WebSocket 協(xié)議。在成功處理 Upgrade 請(qǐng)求后,服務(wù)端和客戶端將使用 TCP 連接來(lái)傳輸二進(jìn)制的 WebSocket 幀。 這里 是連接的內(nèi)部結(jié)構(gòu)的說(shuō)明。
// 常用的轉(zhuǎn)換為 WebSocket 的方法 import ( "net/http" "some/websocket" ) http.HandleFunc("/v1/ws", func(w http.ResponseWriter, r *http.Request) { conn, _ := websocket.Upgrade(r, w) ch := NewChannel(conn) //... })
需要注意的是, http.ResponseWriter
為 bufio.Reader
和 bufio.Writer
(均為 4 KB 的緩沖區(qū))分配了內(nèi)存,用于對(duì) *http.Request
的初始化和進(jìn)一步的響應(yīng)寫入。
無(wú)論使用哪種 WebSocket 庫(kù),在 Upgrade 成功后, 服務(wù)端在調(diào)用 responseWriter.Hijack()
之后都會(huì)收到 I/O 緩沖區(qū)和 TCP 連接。
提示:在某些情況下, go:linkname
可被用于通過(guò)調(diào)用 net/http.putBufio {Reader, Writer}
將緩沖區(qū)返回給 net/http
內(nèi)的 sync.Pool
。
因此,我們還需要 24 GB 的內(nèi)存用于 300 萬(wàn)個(gè)連接。
那么,現(xiàn)在為了一個(gè)什么功能都沒(méi)有的應(yīng)用程序,一共需要消耗 72 GB 的內(nèi)存!
3. 優(yōu)化
我們回顧一下在簡(jiǎn)介部分中談到的內(nèi)容,并記住用戶連接的方式。在切換到 WebSocket 后,客戶端會(huì)通過(guò)連接發(fā)送包含相關(guān)事件的數(shù)據(jù)包。然后(不考慮 ping/pong
等消息),客戶端可能在整個(gè)連接的生命周期中不會(huì)發(fā)送任何其他內(nèi)容。
連接的生命周期可能持續(xù)幾秒到幾天。
因此,大部分時(shí)間 Channel.reader()
和 Channel.writer()
都在等待接收或發(fā)送數(shù)據(jù)。與它們一起等待的還有每個(gè)大小為 4 KB 的 I/O 緩沖區(qū)。
現(xiàn)在我們對(duì)哪些地方可以做優(yōu)化應(yīng)該比較清晰了。
3.1 Netpoll
Channel.reader()
通過(guò)給 bufio.Reader.Read()
內(nèi)的 conn.Read()
加鎖來(lái) 等待新數(shù)據(jù)的到來(lái)(譯者注:上文中的伏筆),一旦連接中有數(shù)據(jù),Go runtime(譯者注:runtime 包含 Go 運(yùn)行時(shí)的系統(tǒng)交互的操作,這里保留原文)“喚醒” goroutine 并允許它讀取下一個(gè)數(shù)據(jù)包。在此之后,goroutine 再次被鎖定,同時(shí)等待新的數(shù)據(jù)。讓我們看看 Go runtime 來(lái)理解 goroutine 為什么必須“被喚醒”。
如果我們查看 conn.Read()
的實(shí)現(xiàn) ,將會(huì)在其中看到 net.netFD.Read()
調(diào)用 :
// Go 內(nèi)部的非阻塞讀. // net/fd_unix.go func (fd *netFD) Read(p []byte) (n int, err error) { //... for { n, err = syscall.Read(fd.sysfd, p) if err != nil { n = 0 if err == syscall.EAGAIN { if err = fd.pd.waitRead(); err == nil { continue } } } //... break } //... }
Go 在非阻塞模式下使用套接字。 EAGAIN 表示套接字中沒(méi)有數(shù)據(jù),并且讀取空套接字時(shí)不會(huì)被鎖定,操作系統(tǒng)將返回控制權(quán)給我們。(譯者注:EAGAIN 表示目前沒(méi)有可用數(shù)據(jù),請(qǐng)稍后再試)
我們從連接文件描述符中看到一個(gè) read()
系統(tǒng)調(diào)用函數(shù)。如果 read 返回 EAGAIN 錯(cuò)誤 ,則 runtime 調(diào)用 pollDesc.waitRead() :
// Go 內(nèi)部關(guān)于 netpoll 的使用 // net/fd_poll_runtime.go func (pd *pollDesc) waitRead() error { return pd.wait('r') } func (pd *pollDesc) wait(mode int) error { res := runtime_pollWait(pd.runtimeCtx, mode) //... }
如果 深入挖掘 ,我們將看到 netpoll 在 Linux 中是使用 epoll 實(shí)現(xiàn)的,而在 BSD 中是使用 kqueue 實(shí)現(xiàn)的。為什么不對(duì)連接使用相同的方法?我們可以分配一個(gè) read 緩沖區(qū)并僅在真正需要時(shí)啟動(dòng) read goroutine:當(dāng)套接字中有可讀的數(shù)據(jù)時(shí)。
在 github.com/golang/go 上,有一個(gè)導(dǎo)出 netpoll 函數(shù)的 issue 。
3.2 去除 goroutines 的內(nèi)存消耗
假設(shè)我們有 Go 的 netpoll 實(shí)現(xiàn) ?,F(xiàn)在我們可以避免在內(nèi)部緩沖區(qū)啟動(dòng) Channel.reader()
goroutine,而是在連接中訂閱可讀數(shù)據(jù)的事件:
// 使用 netpoll ch := NewChannel(conn) // 通過(guò) netpoll 實(shí)例觀察 conn poller.Start(conn, netpoll.EventRead, func() { // 我們?cè)谶@里產(chǎn)生 goroutine 以防止在輪詢從 ch 接收數(shù)據(jù)包時(shí)被鎖。 go Receive(ch) }) // Receive 從 conn 讀取數(shù)據(jù)包并以某種方式處理它。 func (ch *Channel) Receive() { buf := bufio.NewReader(ch.conn) pkt := readPacket(buf) c.handle(pkt) }
Channel.writer()
更簡(jiǎn)單,因?yàn)槲覀冎荒茉诎l(fā)送數(shù)據(jù)包時(shí)運(yùn)行 goroutine 并分配緩沖區(qū):
// 當(dāng)我們需要時(shí)啟動(dòng) writer goroutine func (ch *Channel) Send(p Packet) { if c.noWriterYet() { go ch.writer() } ch.send <- p }
需要注意的是,當(dāng)操作系統(tǒng)在 write()
調(diào)用上返回 EAGAIN
時(shí),我們不處理這種情況。我們依靠 Go runtime 來(lái)處理這種情況,因?yàn)檫@種情況在服務(wù)器上很少見。然而,如果有必要,它可以以與 reader()
相同的方式處理。
當(dāng)從 ch.send
(一個(gè)或幾個(gè))讀取傳出數(shù)據(jù)包后,writer 將完成其操作并釋放 goroutine 的內(nèi)存和發(fā)送緩沖區(qū)的內(nèi)存。
完美!我們通過(guò)去除兩個(gè)運(yùn)行的 goroutine 中的內(nèi)存消耗和 I/O 緩沖區(qū)的內(nèi)存消耗節(jié)省了 48 GB。
3.3 資源控制
大量連接不僅僅涉及到內(nèi)存消耗高的問(wèn)題。在開發(fā)服務(wù)時(shí),我們遇到了反復(fù)出現(xiàn)的競(jìng)態(tài)條件和 self-DDoS 造成的死鎖。
例如,如果由于某種原因我們突然無(wú)法處理 ping/pong
消息,但是空閑連接的處理程序繼續(xù)關(guān)閉這樣的連接(假設(shè)連接被破壞,沒(méi)有提供數(shù)據(jù)),客戶端每隔 N 秒失去連接并嘗試再次連接而不是等待事件。
被鎖或超載的服務(wù)器停止服務(wù),如果它之前的負(fù)載均衡器(例如,nginx)將請(qǐng)求傳遞給下一個(gè)服務(wù)器實(shí)例,這將是不錯(cuò)的。
此外,無(wú)論服務(wù)器負(fù)載如何,如果所有客戶端突然(可能是由于錯(cuò)誤原因)向我們發(fā)送數(shù)據(jù)包,之前的 48 GB 內(nèi)存的消耗將不可避免,因?yàn)樾枰獮槊總€(gè)連接分配 goroutine 和緩沖區(qū)。
Goroutine 池
上面的情況,我們可以使用 goroutine 池限制同時(shí)處理的數(shù)據(jù)包數(shù)量。下面是這種池的簡(jiǎn)單實(shí)現(xiàn):
// goroutine 池的簡(jiǎn)單實(shí)現(xiàn) package gopool func New(size int) *Pool { return &Pool{ work: make(chan func()), sem: make(chan struct{}, size), } } func (p *Pool) Schedule(task func()) error { select { case p.work <- task: case p.sem <- struct{}{}: go p.worker(task) } } func (p *Pool) worker(task func()) { defer func() { <-p.sem } for { task() task = <-p.work } }
現(xiàn)在我們的 netpoll 代碼如下:
// 處理 goroutine 池中的輪詢事件。 pool := gopool.New(128) poller.Start(conn, netpoll.EventRead, func() { // 我們?cè)谒?nbsp;worker 被占用時(shí)阻塞 poller pool.Schedule(func() { Receive(ch) }) })
現(xiàn)在我們不僅在套接字中有可讀數(shù)據(jù)時(shí)讀取,而且還在第一次機(jī)會(huì)獲取池中的空閑 goroutine。??
同樣,我們修改 Send()
:
// 復(fù)用 writing goroutine pool := gopool.New(128) func (ch *Channel) Send(p Packet) { if c.noWriterYet() { pool.Schedule(ch.writer) } ch.send <- p }
取代 go ch.writer()
,我們想寫一個(gè)復(fù)用的 goroutines。因此,對(duì)于擁有 N
個(gè) goroutines 的池,我們可以保證同時(shí)處理 N
個(gè)請(qǐng)求并且在 N + 1
的時(shí)候, 我們不會(huì)分配 N + 1
個(gè)緩沖區(qū)。 goroutine 池還允許我們限制新連接的 Accept()
和 Upgrade()
,并避免大多數(shù)的 DDoS 攻擊。
3.4 upgrade 零拷貝
如前所述,客戶端使用 HTTP Upgrade 切換到 WebSocket 協(xié)議。這就是 WebSocket 協(xié)議的樣子:
## HTTP Upgrade 示例 GET /ws HTTP/1.1 Host: mail.ru Connection: Upgrade Sec-Websocket-Key: A3xNe7sEB9HixkmBhVrYaA== Sec-Websocket-Version: 13 Upgrade: websocket HTTP/1.1 101 Switching Protocols Connection: Upgrade Sec-Websocket-Accept: ksu0wXWG+YmkVx+KQR2agP0cQn4= Upgrade: websocket
也就是說(shuō),在我們的例子中,需要 HTTP 請(qǐng)求及其 Header 用于切換到 WebSocket 協(xié)議。這些知識(shí)以及 http.Request
中存儲(chǔ)的內(nèi)容 表明,為了優(yōu)化,我們需要在處理 HTTP 請(qǐng)求時(shí)放棄不必要的內(nèi)存分配和內(nèi)存復(fù)制,并棄用 net/http
庫(kù)。
例如, http.Request
有一個(gè)與 Header 具有相同名稱的字段 ,這個(gè)字段用于將數(shù)據(jù)從連接中復(fù)制出來(lái)填充請(qǐng)求頭。想象一下,該字段需要消耗多少額外內(nèi)存,例如碰到比較大的 Cookie 頭。
WebSocket 的實(shí)現(xiàn)
不幸的是,在我們優(yōu)化的時(shí)候所有存在的庫(kù)都是使用標(biāo)準(zhǔn)的 net/http
庫(kù)進(jìn)行升級(jí)。而且,(兩個(gè))庫(kù)都不能使用上述的讀寫優(yōu)化方案。為了采用這些優(yōu)化方案,我們需要用一個(gè)比較低級(jí)的 API 來(lái)處理 WebSocket。要重用緩沖區(qū),我們需要把協(xié)議函數(shù)變成這樣:
func ReadFrame(io.Reader) (Frame, error) func WriteFrame(io.Writer, Frame) error
如果有一個(gè)這種 API 的庫(kù),我們可以按下面的方式從連接中讀取數(shù)據(jù)包(數(shù)據(jù)包的寫入也一樣):
// 預(yù)期的 WebSocket 實(shí)現(xiàn)API // getReadBuf, putReadBuf 用來(lái)復(fù)用 *bufio.Reader (with sync.Pool for example). func getReadBuf(io.Reader) *bufio.Reader func putReadBuf(*bufio.Reader) // 當(dāng) conn 中的數(shù)據(jù)可讀取時(shí),readPacket 被調(diào)用 func readPacket(conn io.Reader) error { buf := getReadBuf() defer putReadBuf(buf) buf.Reset(conn) frame, _ := ReadFrame(buf) parsePacket(frame.Payload) //... }
簡(jiǎn)單來(lái)說(shuō),我們需要自己的 WebSocket 庫(kù)。
github.com/gobwas/ws
在意識(shí)形態(tài)上,編寫 ws
庫(kù)是為了不將其協(xié)議操作邏輯強(qiáng)加給用戶。所有讀寫方法都實(shí)現(xiàn)了標(biāo)準(zhǔn)的 io.Reader 和 io.Writer 接口,這樣就可以使用或不使用緩沖或任何其他 I/O 包裝器。
除了來(lái)自標(biāo)準(zhǔn)庫(kù) net/http
的升級(jí)請(qǐng)求之外, ws
還支持零拷貝升級(jí),升級(jí)請(qǐng)求的處理以及切換到 WebSocket 無(wú)需分配內(nèi)存或復(fù)制內(nèi)存。 ws.Upgrade()
接受 io.ReadWriter
( net.Conn
實(shí)現(xiàn)了此接口)。換句話說(shuō),我們可以使用標(biāo)準(zhǔn)的 net.Listen()
將接收到的連接從 ln.Accept()
轉(zhuǎn)移給 ws.Upgrade()
。該庫(kù)使得可以復(fù)制任何請(qǐng)求數(shù)據(jù)以供應(yīng)用程序使用(例如, Cookie
用來(lái)驗(yàn)證會(huì)話)。
下面是升級(jí)請(qǐng)求的 基準(zhǔn)測(cè)試 結(jié)果:標(biāo)準(zhǔn)庫(kù) net/http
的服務(wù)與用零拷貝升級(jí)的 net.Listen()
:
BenchmarkUpgradeHTTP 5156 ns/op 8576 B/op 9 allocs/op BenchmarkUpgradeTCP 973 ns/op 0 B/op 0 allocs/op
切換到 ws
和 零拷貝升級(jí)為我們節(jié)省了另外的 24 GB 內(nèi)存 - 在 net/http
處理請(qǐng)求時(shí)為 I/O 緩沖區(qū)分配的空間。
3.5 摘要
我們總結(jié)一下這些優(yōu)化。
內(nèi)部有緩沖區(qū)的 read goroutine 是代價(jià)比較大的。解決方案:netpoll(epoll,kqueue); 重用緩沖區(qū)。
內(nèi)部有緩沖區(qū)的 write goroutine 是代價(jià)比較大的。解決方案:需要的時(shí)候才啟動(dòng) goroutine; 重用緩沖區(qū)。
如果有大量的連接,netpoll 將無(wú)法正常工作。解決方案:使用 goroutines 池并限制池的 worker 數(shù)。
net/http 不是處理升級(jí)到 WebSocket 的最快方法。解決方案:在裸 TCP 連接上使用內(nèi)存零拷貝升級(jí)。
服務(wù)的代碼看起來(lái)如下所示:
// WebSocket 服務(wù)器示例,包含 netpoll,goroutine 池和內(nèi)存零拷貝的升級(jí)。 import ( "net" "github.com/gobwas/ws" ) ln, _ := net.Listen("tcp", ":8080") for { // 嘗試在空閑池的 worker 內(nèi)的接收傳入的連接。如果超過(guò) 1ms 沒(méi)有空閑 worker,則稍后再試。這有助于防止 self-ddos 或耗盡服務(wù)器資源的情況。 err := pool.ScheduleTimeout(time.Millisecond, func() { conn := ln.Accept() _ = ws.Upgrade(conn) // 使用 Channel 結(jié)構(gòu)體包裝 WebSocket 連接 // 將幫助我們處理應(yīng)用包 ch := NewChannel(conn) // 等待連接傳入字節(jié) poller.Start(conn, netpoll.EventRead, func() { // 不要超過(guò)資源限制 pool.Schedule(func() { // 讀取并處理傳入的包 ch.Recevie() }) }) }) if err != nil { time.Sleep(time.Millisecond) } }
感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“Go如何實(shí)現(xiàn)百萬(wàn)WebSocket連接”這篇文章對(duì)大家有幫助,同時(shí)也希望大家多多支持創(chuàng)新互聯(lián),關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,更多相關(guān)知識(shí)等著你來(lái)學(xué)習(xí)!