真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

go語言異步庫,go 異步

golang使用Nsq

1. 介紹

創(chuàng)新互聯(lián)2013年開創(chuàng)至今,是專業(yè)互聯(lián)網(wǎng)技術(shù)服務(wù)公司,擁有項(xiàng)目網(wǎng)站設(shè)計(jì)、成都網(wǎng)站設(shè)計(jì)網(wǎng)站策劃,項(xiàng)目實(shí)施與項(xiàng)目整合能力。我們以讓每一個(gè)夢想脫穎而出為使命,1280元石峰做網(wǎng)站,已為上家服務(wù),為石峰各地企業(yè)和個(gè)人服務(wù),聯(lián)系電話:18980820575

最近在研究一些消息中間件,常用的MQ如RabbitMQ,ActiveMQ,Kafka等。NSQ是一個(gè)基于Go語言的分布式實(shí)時(shí)消息平臺(tái),它基于MIT開源協(xié)議發(fā)布,由bitly公司開源出來的一款簡單易用的消息中間件。

官方和第三方還為NSQ開發(fā)了眾多客戶端功能庫,如官方提供的基于HTTP的nsqd、Go客戶端go-nsq、Python客戶端pynsq、基于Node.js的JavaScript客戶端nsqjs、異步C客戶端libnsq、Java客戶端nsq-java以及基于各種語言的眾多第三方客戶端功能庫。

1.1 Features

1). Distributed

NSQ提供了分布式的,去中心化,且沒有單點(diǎn)故障的拓?fù)浣Y(jié)構(gòu),穩(wěn)定的消息傳輸發(fā)布保障,能夠具有高容錯(cuò)和HA(高可用)特性。

2). Scalable易于擴(kuò)展

NSQ支持水平擴(kuò)展,沒有中心化的brokers。內(nèi)置的發(fā)現(xiàn)服務(wù)簡化了在集群中增加節(jié)點(diǎn)。同時(shí)支持pub-sub和load-balanced 的消息分發(fā)。

3). Ops Friendly

NSQ非常容易配置和部署,生來就綁定了一個(gè)管理界面。二進(jìn)制包沒有運(yùn)行時(shí)依賴。官方有Docker image。

4.Integrated高度集成

官方的 Go 和 Python庫都有提供。而且為大多數(shù)語言提供了庫。

1.2 組件

1.3 拓?fù)浣Y(jié)構(gòu)

NSQ推薦通過他們相應(yīng)的nsqd實(shí)例使用協(xié)同定位發(fā)布者,這意味著即使面對網(wǎng)絡(luò)分區(qū),消息也會(huì)被保存在本地,直到它們被一個(gè)消費(fèi)者讀取。更重要的是,發(fā)布者不必去發(fā)現(xiàn)其他的nsqd節(jié)點(diǎn),他們總是可以向本地實(shí)例發(fā)布消息。

NSQ

首先,一個(gè)發(fā)布者向它的本地nsqd發(fā)送消息,要做到這點(diǎn),首先要先打開一個(gè)連接,然后發(fā)送一個(gè)包含topic和消息主體的發(fā)布命令,在這種情況下,我們將消息發(fā)布到事件topic上以分散到我們不同的worker中。

事件topic會(huì)復(fù)制這些消息并且在每一個(gè)連接topic的channel上進(jìn)行排隊(duì),在我們的案例中,有三個(gè)channel,它們其中之一作為檔案channel。消費(fèi)者會(huì)獲取這些消息并且上傳到S3。

nsqd

每個(gè)channel的消息都會(huì)進(jìn)行排隊(duì),直到一個(gè)worker把他們消費(fèi),如果此隊(duì)列超出了內(nèi)存限制,消息將會(huì)被寫入到磁盤中。Nsqd節(jié)點(diǎn)首先會(huì)向nsqlookup廣播他們的位置信息,一旦它們注冊成功,worker將會(huì)從nsqlookup服務(wù)器節(jié)點(diǎn)上發(fā)現(xiàn)所有包含事件topic的nsqd節(jié)點(diǎn)。

nsqlookupd

2. Internals

2.1 消息傳遞擔(dān)保

1)客戶表示已經(jīng)準(zhǔn)備好接收消息

2)NSQ 發(fā)送一條消息,并暫時(shí)將數(shù)據(jù)存儲(chǔ)在本地(在 re-queue 或 timeout)

3)客戶端回復(fù) FIN(結(jié)束)或 REQ(重新排隊(duì))分別指示成功或失敗。如果客戶端沒有回復(fù), NSQ 會(huì)在設(shè)定的時(shí)間超時(shí),自動(dòng)重新排隊(duì)消息

這確保了消息丟失唯一可能的情況是不正常結(jié)束 nsqd 進(jìn)程。在這種情況下,這是在內(nèi)存中的任何信息(或任何緩沖未刷新到磁盤)都將丟失。

如何防止消息丟失是最重要的,即使是這個(gè)意外情況可以得到緩解。一種解決方案是構(gòu)成冗余 nsqd對(在不同的主機(jī)上)接收消息的相同部分的副本。因?yàn)槟銓?shí)現(xiàn)的消費(fèi)者是冪等的,以兩倍時(shí)間處理這些消息不會(huì)對下游造成影響,并使得系統(tǒng)能夠承受任何單一節(jié)點(diǎn)故障而不會(huì)丟失信息。

2.2 簡化配置和管理

單個(gè) nsqd 實(shí)例被設(shè)計(jì)成可以同時(shí)處理多個(gè)數(shù)據(jù)流。流被稱為“話題”和話題有 1 個(gè)或多個(gè)“通道”。每個(gè)通道都接收到一個(gè)話題中所有消息的拷貝。在實(shí)踐中,一個(gè)通道映射到下行服務(wù)消費(fèi)一個(gè)話題。

在更底的層面,每個(gè) nsqd 有一個(gè)與 nsqlookupd 的長期 TCP 連接,定期推動(dòng)其狀態(tài)。這個(gè)數(shù)據(jù)被 nsqlookupd 用于給消費(fèi)者通知 nsqd 地址。對于消費(fèi)者來說,一個(gè)暴露的 HTTP /lookup 接口用于輪詢。為話題引入一個(gè)新的消費(fèi)者,只需啟動(dòng)一個(gè)配置了 nsqlookup 實(shí)例地址的 NSQ 客戶端。無需為添加任何新的消費(fèi)者或生產(chǎn)者更改配置,大大降低了開銷和復(fù)雜性。

2.3 消除單點(diǎn)故障

NSQ被設(shè)計(jì)以分布的方式被使用。nsqd 客戶端(通過 TCP )連接到指定話題的所有生產(chǎn)者實(shí)例。沒有中間人,沒有消息代理,也沒有單點(diǎn)故障。

這種拓?fù)浣Y(jié)構(gòu)消除單鏈,聚合,反饋。相反,你的消費(fèi)者直接訪問所有生產(chǎn)者。從技術(shù)上講,哪個(gè)客戶端連接到哪個(gè) NSQ 不重要,只要有足夠的消費(fèi)者連接到所有生產(chǎn)者,以滿足大量的消息,保證所有東西最終將被處理。對于 nsqlookupd,高可用性是通過運(yùn)行多個(gè)實(shí)例來實(shí)現(xiàn)。他們不直接相互通信和數(shù)據(jù)被認(rèn)為是最終一致。消費(fèi)者輪詢所有的配置的 nsqlookupd 實(shí)例和合并 response。失敗的,無法訪問的,或以其他方式故障的節(jié)點(diǎn)不會(huì)讓系統(tǒng)陷于停頓。

2.4 效率

對于數(shù)據(jù)的協(xié)議,通過推送數(shù)據(jù)到客戶端最大限度地提高性能和吞吐量的,而不是等待客戶端拉數(shù)據(jù)。這個(gè)概念,稱之為 RDY 狀態(tài),基本上是客戶端流量控制的一種形式。

efficiency

2.5 心跳和超時(shí)

組合應(yīng)用級別的心跳和 RDY 狀態(tài),避免頭阻塞現(xiàn)象,也可能使心跳無用(即,如果消費(fèi)者是在后面的處理消息流的接收緩沖區(qū)中,操作系統(tǒng)將被填滿,堵心跳)為了保證進(jìn)度,所有的網(wǎng)絡(luò) IO 時(shí)間上限勢必與配置的心跳間隔相關(guān)聯(lián)。這意味著,你可以從字面上拔掉之間的網(wǎng)絡(luò)連接 nsqd 和消費(fèi)者,它會(huì)檢測并正確處理錯(cuò)誤。當(dāng)檢測到一個(gè)致命錯(cuò)誤,客戶端連接被強(qiáng)制關(guān)閉。在傳輸中的消息會(huì)超時(shí)而重新排隊(duì)等待傳遞到另一個(gè)消費(fèi)者。最后,錯(cuò)誤會(huì)被記錄并累計(jì)到各種內(nèi)部指標(biāo)。

2.6 分布式

因?yàn)镹SQ沒有在守護(hù)程序之間共享信息,所以它從一開始就是為了分布式操作而生。個(gè)別的機(jī)器可以隨便宕機(jī)隨便啟動(dòng)而不會(huì)影響到系統(tǒng)的其余部分,消息發(fā)布者可以在本地發(fā)布,即使面對網(wǎng)絡(luò)分區(qū)。

這種“分布式優(yōu)先”的設(shè)計(jì)理念意味著NSQ基本上可以永遠(yuǎn)不斷地?cái)U(kuò)展,需要更高的吞吐量?那就添加更多的nsqd吧。唯一的共享狀態(tài)就是保存在lookup節(jié)點(diǎn)上,甚至它們不需要全局視圖,配置某些nsqd注冊到某些lookup節(jié)點(diǎn)上這是很簡單的配置,唯一關(guān)鍵的地方就是消費(fèi)者可以通過lookup節(jié)點(diǎn)獲取所有完整的節(jié)點(diǎn)集。清晰的故障事件——NSQ在組件內(nèi)建立了一套明確關(guān)于可能導(dǎo)致故障的的故障權(quán)衡機(jī)制,這對消息傳遞和恢復(fù)都有意義。雖然它們可能不像Kafka系統(tǒng)那樣提供嚴(yán)格的保證級別,但NSQ簡單的操作使故障情況非常明顯。

2.7 no replication

不像其他的隊(duì)列組件,NSQ并沒有提供任何形式的復(fù)制和集群,也正是這點(diǎn)讓它能夠如此簡單地運(yùn)行,但它確實(shí)對于一些高保證性高可靠性的消息發(fā)布沒有足夠的保證。我們可以通過降低文件同步的時(shí)間來部分避免,只需通過一個(gè)標(biāo)志配置,通過EBS支持我們的隊(duì)列。但是這樣仍然存在一個(gè)消息被發(fā)布后馬上死亡,丟失了有效的寫入的情況。

2.8 沒有嚴(yán)格的順序

雖然Kafka由一個(gè)有序的日志構(gòu)成,但NSQ不是。消息可以在任何時(shí)間以任何順序進(jìn)入隊(duì)列。在我們使用的案例中,這通常沒有關(guān)系,因?yàn)樗械臄?shù)據(jù)都被加上了時(shí)間戳,但它并不適合需要嚴(yán)格順序的情況。

2.9 無數(shù)據(jù)重復(fù)刪除功能

NSQ對于超時(shí)系統(tǒng),它使用了心跳檢測機(jī)制去測試消費(fèi)者是否存活還是死亡。很多原因會(huì)導(dǎo)致我們的consumer無法完成心跳檢測,所以在consumer中必須有一個(gè)單獨(dú)的步驟確保冪等性。

3. 實(shí)踐安裝過程

本文將nsq集群具體的安裝過程略去,大家可以自行參考官網(wǎng),比較簡單。這部分介紹下筆者實(shí)驗(yàn)的拓?fù)洌约皀sqadmin的相關(guān)信息。

3.1 拓?fù)浣Y(jié)構(gòu)

topology

實(shí)驗(yàn)采用3臺(tái)NSQD服務(wù),2臺(tái)LOOKUPD服務(wù)。

采用官方推薦的拓?fù)?,消息發(fā)布的服務(wù)和NSQD在一臺(tái)主機(jī)。一共5臺(tái)機(jī)器。

NSQ基本沒有配置文件,配置通過命令行指定參數(shù)。

主要命令如下:

LOOKUPD命令

NSQD命令

工具類,消費(fèi)后存儲(chǔ)到本地文件。

發(fā)布一條消息

3.2 nsqadmin

對Streams的詳細(xì)信息進(jìn)行查看,包括NSQD節(jié)點(diǎn),具體的channel,隊(duì)列中的消息數(shù),連接數(shù)等信息。

nsqadmin

channel

列出所有的NSQD節(jié)點(diǎn):

nodes

消息的統(tǒng)計(jì):

msgs

lookup主機(jī)的列表:

hosts

4. 總結(jié)

NSQ基本核心就是簡單性,是一個(gè)簡單的隊(duì)列,這意味著它很容易進(jìn)行故障推理和很容易發(fā)現(xiàn)bug。消費(fèi)者可以自行處理故障事件而不會(huì)影響系統(tǒng)剩下的其余部分。

事實(shí)上,簡單性是我們決定使用NSQ的首要因素,這方便與我們的許多其他軟件一起維護(hù),通過引入隊(duì)列使我們得到了堪稱完美的表現(xiàn),通過隊(duì)列甚至讓我們增加了幾個(gè)數(shù)量級的吞吐量。越來越多的consumer需要一套嚴(yán)格可靠性和順序性保障,這已經(jīng)超過了NSQ提供的簡單功能。

結(jié)合我們的業(yè)務(wù)系統(tǒng)來看,對于我們所需要傳輸?shù)陌l(fā)票消息,相對比較敏感,無法容忍某個(gè)nsqd宕機(jī),或者磁盤無法使用的情況,該節(jié)點(diǎn)堆積的消息無法找回。這是我們沒有選擇該消息中間件的主要原因。簡單性和可靠性似乎并不能完全滿足。相比Kafka,ops肩負(fù)起更多負(fù)責(zé)的運(yùn)營。另一方面,它擁有一個(gè)可復(fù)制的、有序的日志可以提供給我們更好的服務(wù)。但對于其他適合NSQ的consumer,它為我們服務(wù)的相當(dāng)好,我們期待著繼續(xù)鞏固它的堅(jiān)實(shí)的基礎(chǔ)。

GO語言商業(yè)案例(十八):stream

切換到新語言始終是一大步,尤其是當(dāng)您的團(tuán)隊(duì)成員只有一個(gè)時(shí)有該語言的先前經(jīng)驗(yàn)?,F(xiàn)在,Stream 的主要編程語言從 Python 切換到了 Go。這篇文章將解釋stream決定放棄 Python 并轉(zhuǎn)向 Go 的一些原因。

Go 非常快。性能類似于 Java 或 C++。對于用例,Go 通常比 Python 快 40 倍。

對于許多應(yīng)用程序來說,編程語言只是應(yīng)用程序和數(shù)據(jù)庫之間的粘合劑。語言本身的性能通常并不重要。然而,Stream 是一個(gè)API 提供商,為 700 家公司和超過 5 億最終用戶提供提要和聊天平臺(tái)。多年來,我們一直在優(yōu)化 Cassandra、PostgreSQL、Redis 等,但最終,您會(huì)達(dá)到所使用語言的極限。Python 是一門很棒的語言,但對于序列化/反序列化、排名和聚合等用例,它的性能相當(dāng)緩慢。我們經(jīng)常遇到性能問題,Cassandra 需要 1 毫秒來檢索數(shù)據(jù),而 Python 會(huì)花費(fèi)接下來的 10 毫秒將其轉(zhuǎn)換為對象。

看看我如何開始 Go 教程中的一小段 Go 代碼。(這是一個(gè)很棒的教程,也是學(xué)習(xí) Go 的一個(gè)很好的起點(diǎn)。)

如果您是 Go 新手,那么在閱讀那個(gè)小代碼片段時(shí)不會(huì)有太多讓您感到驚訝的事情。它展示了多個(gè)賦值、數(shù)據(jù)結(jié)構(gòu)、指針、格式和一個(gè)內(nèi)置的 HTTP 庫。當(dāng)我第一次開始編程時(shí),我一直喜歡使用 Python 更高級的功能。Python 允許您在編寫代碼時(shí)獲得相當(dāng)?shù)膭?chuàng)意。例如,您可以:

這些功能玩起來很有趣,但是,正如大多數(shù)程序員會(huì)同意的那樣,在閱讀別人的作品時(shí),它們通常會(huì)使代碼更難理解。Go 迫使你堅(jiān)持基礎(chǔ)。這使得閱讀任何人的代碼并立即了解發(fā)生了什么變得非常容易。 注意:當(dāng)然,它實(shí)際上有多“容易”取決于您的用例。如果你想創(chuàng)建一個(gè)基本的 CRUD API,我仍然推薦 Django + DRF或 Rails。

作為一門語言,Go 試圖讓事情變得簡單。它沒有引入許多新概念。重點(diǎn)是創(chuàng)建一種非常快速且易于使用的簡單語言。它唯一具有創(chuàng)新性的領(lǐng)域是 goroutine 和通道。(100% 正確CSP的概念始于 1977 年,所以這項(xiàng)創(chuàng)新更多是對舊思想的一種新方法。)Goroutines 是 Go 的輕量級線程方法,通道是 goroutines 之間通信的首選方式。Goroutines 的創(chuàng)建非常便宜,并且只需要幾 KB 的額外內(nèi)存。因?yàn)?Goroutine 非常輕量,所以有可能同時(shí)運(yùn)行數(shù)百甚至數(shù)千個(gè)。您可以使用通道在 goroutine 之間進(jìn)行通信。Go 運(yùn)行時(shí)處理所有復(fù)雜性。goroutines 和基于通道的并發(fā)方法使得使用所有可用的 CPU 內(nèi)核和處理并發(fā) IO 變得非常容易——所有這些都不會(huì)使開發(fā)復(fù)雜化。與 Python/Java 相比,在 goroutine 上運(yùn)行函數(shù)需要最少的樣板代碼。您只需在函數(shù)調(diào)用前加上關(guān)鍵字“go”:

Go 的并發(fā)方法很容易使用。與 Node 相比,這是一種有趣的方法,開發(fā)人員必須密切關(guān)注異步代碼的處理方式。Go 中并發(fā)的另一個(gè)重要方面是競爭檢測器。這樣可以很容易地確定異步代碼中是否存在任何競爭條件。

我們目前用 Go 編寫的最大的微服務(wù)編譯需要 4 秒。與以編譯速度慢而聞名的 Java 和 C++ 等語言相比,Go 的快速編譯時(shí)間是一項(xiàng)重大的生產(chǎn)力勝利。我喜歡在程序編譯的時(shí)候摸魚,但在我還記得代碼應(yīng)該做什么的同時(shí)完成事情會(huì)更好。

首先,讓我們從顯而易見的開始:與 C++ 和 Java 等舊語言相比,Go 開發(fā)人員的數(shù)量并不多。根據(jù)StackOverflow的數(shù)據(jù), 38% 的開發(fā)人員知道 Java, 19.3% 的人知道 C++,只有 4.6% 的人知道 Go。GitHub 數(shù)據(jù)顯示了類似的趨勢:Go 比 Erlang、Scala 和 Elixir 等語言使用更廣泛,但不如 Java 和 C++ 流行。幸運(yùn)的是,Go 是一種非常簡單易學(xué)的語言。它提供了您需要的基本功能,僅此而已。它引入的新概念是“延遲”聲明和內(nèi)置的并發(fā)管理與“goroutines”和通道。(對于純粹主義者來說:Go 并不是第一種實(shí)現(xiàn)這些概念的語言,只是第一種使它們流行起來的語言。)任何加入團(tuán)隊(duì)的 Python、Elixir、C++、Scala 或 Java 開發(fā)人員都可以在一個(gè)月內(nèi)在 Go 上發(fā)揮作用,因?yàn)樗暮唵涡?。與許多其他語言相比,我們發(fā)現(xiàn)組建 Go 開發(fā)人員團(tuán)隊(duì)更容易。如果您在博爾德和阿姆斯特丹等競爭激烈的生態(tài)系統(tǒng)中招聘人員,這是一項(xiàng)重要的優(yōu)勢。

對于我們這樣規(guī)模的團(tuán)隊(duì)(約 20 人)來說,生態(tài)系統(tǒng)很重要。如果您必須重新發(fā)明每一個(gè)小功能,您根本無法為您的客戶創(chuàng)造價(jià)值。Go 對我們使用的工具有很好的支持。實(shí)體庫已經(jīng)可用于 Redis、RabbitMQ、PostgreSQL、模板解析、任務(wù)調(diào)度、表達(dá)式解析和 RocksDB。與 Rust 或 Elixir 等其他較新的語言相比,Go 的生態(tài)系統(tǒng)是一個(gè)重大勝利。它當(dāng)然不如 Java、Python 或 Node 之類的語言好,但它很可靠,而且對于許多基本需求,你會(huì)發(fā)現(xiàn)已經(jīng)有高質(zhì)量的包可用。

Gofmt 是一個(gè)很棒的命令行實(shí)用程序,內(nèi)置在 Go 編譯器中,用于格式化代碼。就功能而言,它與 Python 的 autopep8 非常相似。我們大多數(shù)人并不真正喜歡爭論制表符與空格。格式的一致性很重要,但實(shí)際的格式標(biāo)準(zhǔn)并不那么重要。Gofmt 通過使用一種正式的方式來格式化您的代碼來避免所有這些討論。

Go 對協(xié)議緩沖區(qū)和 gRPC 具有一流的支持。這兩個(gè)工具非常適合構(gòu)建需要通過 RPC 通信的微服務(wù)。您只需要編寫一個(gè)清單,在其中定義可以進(jìn)行的 RPC 調(diào)用以及它們采用的參數(shù)。然后從這個(gè)清單中自動(dòng)生成服務(wù)器和客戶端代碼。生成的代碼既快速又具有非常小的網(wǎng)絡(luò)占用空間并且易于使用。從同一個(gè)清單中,您甚至可以為許多不同的語言生成客戶端代碼,例如 C++、Java、Python 和 Ruby。因此,內(nèi)部流量不再有模棱兩可的 REST 端點(diǎn),您每次都必須編寫幾乎相同的客戶端和服務(wù)器代碼。.

Go 沒有像 Rails 用于 Ruby、Django 用于 Python 或 Laravel 用于 PHP 那樣的單一主導(dǎo)框架。這是 Go 社區(qū)內(nèi)激烈爭論的話題,因?yàn)樵S多人主張你不應(yīng)該一開始就使用框架。我完全同意這對于某些用例是正確的。但是,如果有人想構(gòu)建一個(gè)簡單的 CRUD API,他們將更容易使用 Django/DJRF、Rails Laravel 或Phoenix。對于 Stream 的用例,我們更喜歡不使用框架。然而,對于許多希望提供簡單 CRUD API 的新項(xiàng)目來說,缺乏主導(dǎo)框架將是一個(gè)嚴(yán)重的劣勢。

Go 通過簡單地從函數(shù)返回錯(cuò)誤并期望調(diào)用代碼來處理錯(cuò)誤(或?qū)⑵浞祷氐秸{(diào)用堆棧)來處理錯(cuò)誤。雖然這種方法有效,但很容易失去問題的范圍,以確保您可以向用戶提供有意義的錯(cuò)誤。錯(cuò)誤包通過允許您向錯(cuò)誤添加上下文和堆棧跟蹤來解決此問題。另一個(gè)問題是很容易忘記處理錯(cuò)誤。像 errcheck 和 megacheck 這樣的靜態(tài)分析工具可以方便地避免犯這些錯(cuò)誤。雖然這些變通辦法效果很好,但感覺不太對勁。您希望該語言支持正確的錯(cuò)誤處理。

Go 的包管理絕不是完美的。默認(rèn)情況下,它無法指定特定版本的依賴項(xiàng),也無法創(chuàng)建可重現(xiàn)的構(gòu)建。Python、Node 和 Ruby 都有更好的包管理系統(tǒng)。但是,使用正確的工具,Go 的包管理工作得很好。您可以使用Dep來管理您的依賴項(xiàng),以允許指定和固定版本。除此之外,我們還貢獻(xiàn)了一個(gè)名為的開源工具VirtualGo,它可以更輕松地處理用 Go 編寫的多個(gè)項(xiàng)目。

我們進(jìn)行的一個(gè)有趣的實(shí)驗(yàn)是在 Python 中使用我們的排名提要功能并在 Go 中重寫它。看看這個(gè)排名方法的例子:

Python 和 Go 代碼都需要執(zhí)行以下操作來支持這種排名方法:

開發(fā) Python 版本的排名代碼大約花了 3 天時(shí)間。這包括編寫代碼、單元測試和文檔。接下來,我們花了大約 2 周的時(shí)間優(yōu)化代碼。其中一項(xiàng)優(yōu)化是將分?jǐn)?shù)表達(dá)式 (simple_gauss(time)*popularity) 轉(zhuǎn)換為抽象語法樹. 我們還實(shí)現(xiàn)了緩存邏輯,可以在未來的特定時(shí)間預(yù)先計(jì)算分?jǐn)?shù)。相比之下,開發(fā)此代碼的 Go 版本大約需要 4 天時(shí)間。性能不需要任何進(jìn)一步的優(yōu)化。因此,雖然 Python 的最初開發(fā)速度更快,但基于 Go 的版本最終需要我們團(tuán)隊(duì)的工作量大大減少。另外一個(gè)好處是,Go 代碼的執(zhí)行速度比我們高度優(yōu)化的 Python 代碼快大約 40 倍?,F(xiàn)在,這只是我們通過切換到 Go 體驗(yàn)到的性能提升的一個(gè)示例。

與 Python 相比,我們系統(tǒng)的其他一些組件在 Go 中構(gòu)建所需的時(shí)間要多得多。作為一個(gè)總體趨勢,我們看到 開發(fā) Go 代碼需要更多的努力。但是,我們花更少的時(shí)間 優(yōu)化 代碼以提高性能。

我們評估的另一種語言是Elixir.。Elixir 建立在 Erlang 虛擬機(jī)之上。這是一種迷人的語言,我們之所以考慮它,是因?yàn)槲覀兊囊幻麍F(tuán)隊(duì)成員在 Erlang 方面擁有豐富的經(jīng)驗(yàn)。對于我們的用例,我們注意到 Go 的原始性能要好得多。Go 和 Elixir 都可以很好地服務(wù)數(shù)千個(gè)并發(fā)請求。但是,如果您查看單個(gè)請求的性能,Go 對于我們的用例來說要快得多。我們選擇 Go 而不是 Elixir 的另一個(gè)原因是生態(tài)系統(tǒng)。對于我們需要的組件,Go 有更成熟的庫,而在許多情況下,Elixir 庫還沒有準(zhǔn)備好用于生產(chǎn)環(huán)境。培訓(xùn)/尋找開發(fā)人員使用 Elixir 也更加困難。這些原因使天平向 Go 傾斜。Elixir 的 Phoenix 框架看起來很棒,絕對值得一看。

Go 是一種非常高性能的語言,對并發(fā)有很好的支持。它幾乎與 C++ 和 Java 等語言一樣快。雖然與 Python 或 Ruby 相比,使用 Go 構(gòu)建東西確實(shí)需要更多時(shí)間,但您將節(jié)省大量用于優(yōu)化代碼的時(shí)間。我們在Stream有一個(gè)小型開發(fā)團(tuán)隊(duì),為超過 5 億最終用戶提供動(dòng)力和聊天。Go 結(jié)合了 強(qiáng)大的生態(tài)系統(tǒng) 、新開發(fā)人員的 輕松入門、快速的性能 、對并發(fā)的 可靠支持和高效的編程環(huán)境 ,使其成為一個(gè)不錯(cuò)的選擇。Stream 仍然在我們的儀表板、站點(diǎn)和機(jī)器學(xué)習(xí)中利用 Python 來提供個(gè)性化的訂閱源. 我們不會(huì)很快與 Python 說再見,但今后所有性能密集型代碼都將使用 Go 編寫。我們新的聊天 API也完全用 Go 編寫。

協(xié)程與異步IO

協(xié)程,又稱微線程,纖程。英文名 Coroutine 。Python對協(xié)程的支持是通過 generator 實(shí)現(xiàn)的。在generator中,我們不但可以通過for循環(huán)來迭代,還可以不斷調(diào)用 next()函數(shù) 獲取由 yield 語句返回的下一個(gè)值。但是Python的yield不但可以返回一個(gè)值,它還可以接收調(diào)用者發(fā)出的參數(shù)。yield其實(shí)是終端當(dāng)前的函數(shù),返回給調(diào)用方。python3中使用yield來實(shí)現(xiàn)range,節(jié)省內(nèi)存,提高性能,懶加載的模式。

asyncio是Python 3.4 版本引入的 標(biāo)準(zhǔn)庫 ,直接內(nèi)置了對異步IO的支持。

從Python 3.5 開始引入了新的語法 async 和 await ,用來簡化yield的語法:

import asyncio

import threading

async def compute(x, y):

print("Compute %s + %s ..." % (x, y))

print(threading.current_thread().name)

await asyncio.sleep(x + y)

return x + y

async def print_sum(x, y):

result = await compute(x, y)

print("%s + %s = %s" % (x, y, result))

print(threading.current_thread().name)

if __name__ == "__main__":

loop = asyncio.get_event_loop()

tasks = [print_sum(1, 2), print_sum(3, 4)]

loop.run_until_complete(asyncio.wait(tasks))

loop.close()

線程是內(nèi)核進(jìn)行搶占式的調(diào)度的,這樣就確保了每個(gè)線程都有執(zhí)行的機(jī)會(huì)。而 coroutine 運(yùn)行在同一個(gè)線程中,由語言的運(yùn)行時(shí)中的 EventLoop(事件循環(huán)) 來進(jìn)行調(diào)度。和大多數(shù)語言一樣,在 Python 中,協(xié)程的調(diào)度是非搶占式的,也就是說一個(gè)協(xié)程必須主動(dòng)讓出執(zhí)行機(jī)會(huì),其他協(xié)程才有機(jī)會(huì)運(yùn)行。

讓出執(zhí)行的關(guān)鍵字就是 await。也就是說一個(gè)協(xié)程如果阻塞了,持續(xù)不讓出 CPU,那么整個(gè)線程就卡住了,沒有任何并發(fā)。

PS: 作為服務(wù)端,event loop最核心的就是IO多路復(fù)用技術(shù),所有來自客戶端的請求都由IO多路復(fù)用函數(shù)來處理;作為客戶端,event loop的核心在于利用Future對象延遲執(zhí)行,并使用send函數(shù)激發(fā)協(xié)程,掛起,等待服務(wù)端處理完成返回后再調(diào)用CallBack函數(shù)繼續(xù)下面的流程

Go語言的協(xié)程是 語言本身特性 ,erlang和golang都是采用了CSP(Communicating Sequential Processes)模式(Python中的協(xié)程是eventloop模型),但是erlang是基于進(jìn)程的消息通信,go是基于goroutine和channel的通信。

Python和Go都引入了消息調(diào)度系統(tǒng)模型,來避免鎖的影響和進(jìn)程/線程開銷大的問題。

協(xié)程從本質(zhì)上來說是一種用戶態(tài)的線程,不需要系統(tǒng)來執(zhí)行搶占式調(diào)度,而是在語言層面實(shí)現(xiàn)線程的調(diào)度 。因?yàn)閰f(xié)程 不再使用共享內(nèi)存/數(shù)據(jù) ,而是使用 通信 來共享內(nèi)存/鎖,因?yàn)樵谝粋€(gè)超級大系統(tǒng)里具有無數(shù)的鎖,共享變量等等會(huì)使得整個(gè)系統(tǒng)變得無比的臃腫,而通過消息機(jī)制來交流,可以使得每個(gè)并發(fā)的單元都成為一個(gè)獨(dú)立的個(gè)體,擁有自己的變量,單元之間變量并不共享,對于單元的輸入輸出只有消息。開發(fā)者只需要關(guān)心在一個(gè)并發(fā)單元的輸入與輸出的影響,而不需要再考慮類似于修改共享內(nèi)存/數(shù)據(jù)對其它程序的影響。

Golang kafka簡述和操作(sarama同步異步和消費(fèi)組)

一、Kafka簡述

1. 為什么需要用到消息隊(duì)列

異步:對比以前的串行同步方式來說,可以在同一時(shí)間做更多的事情,提高效率;

解耦:在耦合太高的場景,多個(gè)任務(wù)要對同一個(gè)數(shù)據(jù)進(jìn)行操作消費(fèi)的時(shí)候,會(huì)導(dǎo)致一個(gè)任務(wù)的處理因?yàn)榱硪粋€(gè)任務(wù)對數(shù)據(jù)的操作變得及其復(fù)雜。

緩沖:當(dāng)遇到突發(fā)大流量的時(shí)候,消息隊(duì)列可以先把所有消息有序保存起來,避免直接作用于系統(tǒng)主體,系統(tǒng)主題始終以一個(gè)平穩(wěn)的速率去消費(fèi)這些消息。

2.為什么選擇kafka呢?

這沒有絕對的好壞,看個(gè)人需求來選擇,我這里就抄了一段他人總結(jié)的的優(yōu)缺點(diǎn),可見原文

kafka的優(yōu)點(diǎn):

1.支持多個(gè)生產(chǎn)者和消費(fèi)者2.支持broker的橫向拓展3.副本集機(jī)制,實(shí)現(xiàn)數(shù)據(jù)冗余,保證數(shù)據(jù)不丟失4.通過topic將數(shù)據(jù)進(jìn)行分類5.通過分批發(fā)送壓縮數(shù)據(jù)的方式,減少數(shù)據(jù)傳輸開銷,提高吞高量6.支持多種模式的消息7.基于磁盤實(shí)現(xiàn)數(shù)據(jù)的持久化8.高性能的處理信息,在大數(shù)據(jù)的情況下,可以保證亞秒級的消息延遲9.一個(gè)消費(fèi)者可以支持多種topic的消息10.對CPU和內(nèi)存的消耗比較小11.對網(wǎng)絡(luò)開銷也比較小12.支持跨數(shù)據(jù)中心的數(shù)據(jù)復(fù)制13.支持鏡像集群

kafka的缺點(diǎn):

1.由于是批量發(fā)送,所以數(shù)據(jù)達(dá)不到真正的實(shí)時(shí)2.對于mqtt協(xié)議不支持3.不支持物聯(lián)網(wǎng)傳感數(shù)據(jù)直接接入4.只能支持統(tǒng)一分區(qū)內(nèi)消息有序,無法實(shí)現(xiàn)全局消息有序5.監(jiān)控不完善,需要安裝插件6.需要配合zookeeper進(jìn)行元數(shù)據(jù)管理7.會(huì)丟失數(shù)據(jù),并且不支持事務(wù)8.可能會(huì)重復(fù)消費(fèi)數(shù)據(jù),消息會(huì)亂序,可用保證一個(gè)固定的partition內(nèi)部的消息是有序的,但是一個(gè)topic有多個(gè)partition的話,就不能保證有序了,需要zookeeper的支持,topic一般需要人工創(chuàng)建,部署和維護(hù)一般都比mq高

3. Golang 操作kafka

3.1. kafka的環(huán)境

網(wǎng)上有很多搭建kafka環(huán)境教程,這里就不再搭建,就展示一下kafka的環(huán)境,在kubernetes上進(jìn)行的搭建,有需要的私我,可以發(fā)yaml文件

3.2. 第三方庫

github.com/Shopify/sarama // kafka主要的庫*github.com/bsm/sarama-cluster // kafka消費(fèi)組

3.3. 消費(fèi)者

單個(gè)消費(fèi)者

funcconsumer(){varwg sync.WaitGroup? consumer, err := sarama.NewConsumer([]string{"172.20.3.13:30901"},nil)iferr !=nil{? ? ? fmt.Println("Failed to start consumer: %s", err)return}? partitionList, err := consumer.Partitions("test0")//獲得該topic所有的分區(qū)iferr !=nil{? ? ? fmt.Println("Failed to get the list of partition:, ", err)return}forpartition :=rangepartitionList {? ? ? pc, err := consumer.ConsumePartition("test0",int32(partition), sarama.OffsetNewest)iferr !=nil{? ? ? ? fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)return}? ? ? wg.Add(1)gofunc(sarama.PartitionConsumer){//為每個(gè)分區(qū)開一個(gè)go協(xié)程去取值formsg :=rangepc.Messages() {//阻塞直到有值發(fā)送過來,然后再繼續(xù)等待fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value))? ? ? ? }deferpc.AsyncClose()? ? ? ? wg.Done()? ? ? }(pc)? }? wg.Wait()}funcmain(){? consumer()}

消費(fèi)組

funcconsumerCluster(){? groupID :="group-1"config := cluster.NewConfig()? config.Group.Return.Notifications =trueconfig.Consumer.Offsets.CommitInterval =1* time.Second? config.Consumer.Offsets.Initial = sarama.OffsetNewest//初始從最新的offset開始c, err := cluster.NewConsumer(strings.Split("172.20.3.13:30901",","),groupID, strings.Split("test0",","), config)iferr !=nil{? ? ? glog.Errorf("Failed open consumer: %v", err)return}deferc.Close()gofunc(c *cluster.Consumer){? ? ? errors := c.Errors()? ? ? noti := c.Notifications()for{select{caseerr := -errors:? ? ? ? ? ? glog.Errorln(err)case-noti:? ? ? ? }? ? ? }? }(c)formsg :=rangec.Messages() {? ? ? fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value))? ? ? c.MarkOffset(msg,"")//MarkOffset 并不是實(shí)時(shí)寫入kafka,有可能在程序crash時(shí)丟掉未提交的offset}}funcmain(){goconsumerCluster()}

3.4. 生產(chǎn)者

同步生產(chǎn)者

packagemainimport("fmt""github.com/Shopify/sarama")funcmain(){? config := sarama.NewConfig()? config.Producer.RequiredAcks = sarama.WaitForAll//賦值為-1:這意味著producer在follower副本確認(rèn)接收到數(shù)據(jù)后才算一次發(fā)送完成。config.Producer.Partitioner = sarama.NewRandomPartitioner//寫到隨機(jī)分區(qū)中,默認(rèn)設(shè)置8個(gè)分區(qū)config.Producer.Return.Successes =truemsg := sarama.ProducerMessage{}? msg.Topic =`test0`msg.Value = sarama.StringEncoder("Hello World!")? client, err := sarama.NewSyncProducer([]string{"172.20.3.13:30901"}, config)iferr !=nil{? ? ? fmt.Println("producer close err, ", err)return}deferclient.Close()? pid, offset, err := client.SendMessage(msg)iferr !=nil{? ? ? fmt.Println("send message failed, ", err)return}? fmt.Printf("分區(qū)ID:%v, offset:%v \n", pid, offset)}

異步生產(chǎn)者

funcasyncProducer(){? config := sarama.NewConfig()? config.Producer.Return.Successes =true//必須有這個(gè)選項(xiàng)config.Producer.Timeout =5* time.Second? p, err := sarama.NewAsyncProducer(strings.Split("172.20.3.13:30901",","), config)deferp.Close()iferr !=nil{return}//這個(gè)部分一定要寫,不然通道會(huì)被堵塞gofunc(p sarama.AsyncProducer){? ? ? errors := p.Errors()? ? ? success := p.Successes()for{select{caseerr := -errors:iferr !=nil{? ? ? ? ? ? ? glog.Errorln(err)? ? ? ? ? ? }case-success:? ? ? ? }? ? ? }? }(p)for{? ? ? v :="async: "+ strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000))? ? ? fmt.Fprintln(os.Stdout, v)? ? ? msg := sarama.ProducerMessage{? ? ? ? Topic: topics,? ? ? ? Value: sarama.ByteEncoder(v),? ? ? }? ? ? p.Input() - msg? ? ? time.Sleep(time.Second *1)? }}funcmain(){goasyncProducer()select{? ? ? }}

3.5. 結(jié)果展示-

同步生產(chǎn)打?。?/p>

分區(qū)ID:0,offset:90

消費(fèi)打印:

Partition:0,Offset:90,key:,value:Hello World!

異步生產(chǎn)打?。?/p>

async:7272async:7616async:998

消費(fèi)打印:

Partition:0,Offset:91,key:,value:async:7272Partition:0,Offset:92,key:,value:async:7616Partition:0,Offset:93,key:,value:async:998


標(biāo)題名稱:go語言異步庫,go 異步
鏈接分享:http://weahome.cn/article/hccjdo.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部