本項(xiàng)目用于移動(dòng)端的數(shù)據(jù)統(tǒng)計(jì),項(xiàng)目地址: 。開源的數(shù)據(jù)統(tǒng)計(jì)countly做的很好,但是基礎(chǔ)免費(fèi)版的功能實(shí)在不夠看,因此我就決定用go語(yǔ)言來(lái)寫了這個(gè)項(xiàng)目,一來(lái)可以在實(shí)踐中學(xué)習(xí)go語(yǔ)言,二來(lái)也可以開發(fā)功能完整的開源平臺(tái)。該項(xiàng)目正在開發(fā)中,歡迎有興趣的gopher一起參與。
成都創(chuàng)新互聯(lián)公司是一家專注網(wǎng)站建設(shè)、網(wǎng)絡(luò)營(yíng)銷策劃、微信小程序開發(fā)、電子商務(wù)建設(shè)、網(wǎng)絡(luò)推廣、移動(dòng)互聯(lián)開發(fā)、研究、服務(wù)為一體的技術(shù)型公司。公司成立10年以來(lái),已經(jīng)為近千家生料攪拌車各業(yè)的企業(yè)公司提供互聯(lián)網(wǎng)服務(wù)。現(xiàn)在,服務(wù)的近千家客戶與我們一路同行,見證我們的成長(zhǎng);未來(lái),我們一起分享成功的喜悅。
數(shù)據(jù)存儲(chǔ)方面使用的是mongodb。由于數(shù)據(jù)統(tǒng)計(jì)業(yè)務(wù)幾乎不涉及到事務(wù)以及嚴(yán)格的一致性場(chǎng)景,而且mongodb的自動(dòng)分片功能可以支撐較大的數(shù)據(jù)量。使用大數(shù)據(jù)的存儲(chǔ)組件的話就太過于重了。因此選用mongodb。
業(yè)務(wù)邏輯整體基于事件的發(fā)布訂閱。當(dāng)收到客戶端請(qǐng)求, frontend 會(huì)對(duì)請(qǐng)求數(shù)據(jù)進(jìn)行處理,然后發(fā)布響應(yīng)的事件。 backend 收到事件后進(jìn)行統(tǒng)計(jì)處理。
后臺(tái)展示基于Vue-Admin-Template開發(fā),本人前端能力基本就是依葫蘆畫瓢,希望有前端大神來(lái)開發(fā)后臺(tái)頁(yè)面,項(xiàng)目地址:
目前客戶端API僅有2個(gè)。一個(gè)是上報(bào) openApp 打開APP時(shí)間,一個(gè)是上報(bào) usageTime 一次啟動(dòng)使用時(shí)長(zhǎng)事件。SDK方面也需要移動(dòng)端的大神開發(fā),感興趣的大佬可以一起開發(fā)。
下面放一點(diǎn)后臺(tái)頁(yè)面的效果圖:
GoAnalytics是基于go實(shí)現(xiàn)的一個(gè)數(shù)據(jù)統(tǒng)計(jì)平臺(tái),用于統(tǒng)計(jì)移動(dòng)端的數(shù)據(jù)指標(biāo),比如啟動(dòng)次數(shù)、用戶增長(zhǎng)、活躍用戶、留存等指標(biāo)分析。前端數(shù)據(jù)展示項(xiàng)目是 goanalytics-web 。目前正在積極開發(fā)中,歡迎提交新的需求和pull request。
Go版本需要支持module,本地開發(fā)測(cè)試
cmd/goanalytics_kafka 和 goanalytics_rmq 是分別基于 kafka 和 rocketmq 的發(fā)布訂閱功能做的數(shù)據(jù)發(fā)布
和訂閱處理,橫向擴(kuò)展能力比 local 高。另外由于 rocketmq 還沒有原生基于 go 的客戶端(原生客戶端正在開發(fā)中
2.0.0 road map ),可能會(huì)存在問題。
項(xiàng)目結(jié)構(gòu)
├── README.md
├── api
│ ├── authentication 用戶認(rèn)證、管理API
│ ├── middlewares GIN 中間件
│ └── router API route
├── cmd
│ ├── account 生成admin賬號(hào)命令
│ ├── analytic_local 不依賴消息系統(tǒng)的goanalytics
│ ├── goanalytics_kafka 基于kafak的goanalytics
│ ├── goanalytics_rmq 基于rocketmq的goanalytics
│ └── test_data 生成測(cè)試數(shù)據(jù)命令
├── common
│ └── data.go
├── conf 配置
│ └── conf.go
├── event
│ ├── codec 數(shù)據(jù)編解碼
│ └── pubsub 消息發(fā)布訂閱
├── go.mod
├── go.sum
├── metric 所有的統(tǒng)計(jì)指標(biāo)在這里實(shí)現(xiàn)
│ ├── init.go
│ └── user 用戶相關(guān)指標(biāo)的實(shí)現(xiàn)
├── schedule
│ └── schedule.go 定時(shí)任務(wù)調(diào)度
├── storage 存儲(chǔ)模塊
│ ├── counter.go 計(jì)數(shù)器接口
│ ├── data.go
│ └── mongodb 基于mongodb實(shí)現(xiàn)的存儲(chǔ)及計(jì)數(shù)器
└── utils
├── date.go
├── date_test.go
├── errors.go
└── key.go
這10款工具如下:
AKHQ
Kowl
Kafdrop
UI for Apache Kafka
Lenses
CMAK
Confluent CC
Conduktor
LogiKM
kafka-console-ui
如果上面這個(gè)地址可以打開,可以直接去看介紹,下文也不再重復(fù)說明。
關(guān)于前8款的對(duì)比,可以看下面這張圖片,圖片也是于上面,我直接copy過來(lái)了(可能有好多同學(xué)打不開上面這個(gè)鏈接,就直接看這張圖片了解了下吧)
關(guān)于這8款工具的介紹,人家說的很清晰了,這里就不再重復(fù)說明了,并且這些工具,大部分我也沒用過,也沒資格評(píng)價(jià)太多。
考慮到很多同學(xué)可能打開github太慢,我下面會(huì)把相關(guān)基本信息整理一下,供大家快速了解,方便選型。
概覽
AKHQ (previously known as KafkaHQ)
開發(fā)語(yǔ)言:后端是java為主
Kowl - A Web UI for Apache Kafka
p.s. github上完整的動(dòng)圖這里上傳失敗,就只放一個(gè)靜態(tài)的截圖了,如果可以打開github,建議打開下面的地址直接看吧。
但是這個(gè)并不是所有功能都是免費(fèi),有部分功能是商業(yè)版才有:
開發(fā)語(yǔ)言:后端是go為主
Kafdrop – Kafka Web UI
開發(fā)語(yǔ)言:后端以java為主
要求jdk11或更高版本
UI for Apache Kafka – Free Web UI for Apache Kafka
開發(fā)語(yǔ)言:后端以java為主
要求jdk13或更高版本
Lenses.io
Apache Kafka 和 Kubernetes 的實(shí)時(shí)應(yīng)用程序和數(shù)據(jù)操作 #DataOps 門戶。
CMAK (Cluster Manager for Apache Kafka, previously known as Kafka Manager)
這個(gè)想必很多同學(xué)都知道,原來(lái)的名字就是kafka manager。
開發(fā)語(yǔ)言:后端以scala為主
Confluent Inc.
Apache
Conduktor
一個(gè)商業(yè)版本的桌面客戶端
官網(wǎng)找到一個(gè)這樣的圖片,湊合看吧:
LogiKM
滴滴開源的一站式Apache Kafka集群指標(biāo)監(jiān)控與運(yùn)維管控平臺(tái)。
也是分社區(qū)版和商業(yè)版的。
這個(gè)建議直接看github說明吧,都是中文,內(nèi)容清晰,相關(guān)的資料也都有。
我也簡(jiǎn)單的了解了下,有個(gè)邏輯集群的概念,對(duì)于規(guī)模比較大的kafka集群管理還是挺好的,不過,這里比較高端的特性都是不開源的,必須商業(yè)版才能用。
開發(fā)語(yǔ)言:后端以java為主
kafka-console-ui(kafka可視化管理平臺(tái))
一款輕量級(jí)的kafka可視化管理平臺(tái),安裝配置快捷、簡(jiǎn)單易用。界面風(fēng)格有點(diǎn)類似rocketmq-console。
這款權(quán)當(dāng)是“王婆賣瓜,自賣自夸”吧,一個(gè)小工具,如果剛接觸kafka的同學(xué)或者是中小型集群,想找個(gè)簡(jiǎn)單易用的,可以考慮一下。
開發(fā)語(yǔ)言:后端以java和scala為主
參考鏈接:
本文主要研究一下golang的zap的ZapKafkaWriter
WriteSyncer內(nèi)嵌了io.Writer接口,定義了Sync方法;Sink接口內(nèi)嵌了zapcore.WriteSyncer及io.Closer接口;ZapKafkaWriter實(shí)現(xiàn)Sink接口及zapcore.WriteSyncer接口,其Write方法直接將data通過kafka發(fā)送出去。
最近寫了個(gè)kafka的接收消息的功能,需要使用回調(diào)處理收到的消息。
一個(gè)是基本的回調(diào),一個(gè)是使用接口功能實(shí)現(xiàn)回調(diào),對(duì)接口是個(gè)很好的學(xué)習(xí)。
1.正?;卣{(diào)
kafka的接收消息處。收到消息后,使用傳入的Onmessage進(jìn)行處理。
調(diào)用kafka接收消息的單元,并在調(diào)用方寫好回調(diào)
在調(diào)用方實(shí)現(xiàn)回調(diào)需要執(zhí)行的方法
感覺還是使用基本回調(diào)相對(duì)簡(jiǎn)單點(diǎn),接口就當(dāng)學(xué)習(xí)了。
另外跨包的接口的方法要大寫!定位了好久發(fā)現(xiàn)個(gè)入門的問題。
1. 介紹
最近在研究一些消息中間件,常用的MQ如RabbitMQ,ActiveMQ,Kafka等。NSQ是一個(gè)基于Go語(yǔ)言的分布式實(shí)時(shí)消息平臺(tái),它基于MIT開源協(xié)議發(fā)布,由bitly公司開源出來(lái)的一款簡(jiǎn)單易用的消息中間件。
官方和第三方還為NSQ開發(fā)了眾多客戶端功能庫(kù),如官方提供的基于HTTP的nsqd、Go客戶端go-nsq、Python客戶端pynsq、基于Node.js的JavaScript客戶端nsqjs、異步C客戶端libnsq、Java客戶端nsq-java以及基于各種語(yǔ)言的眾多第三方客戶端功能庫(kù)。
1.1 Features
1). Distributed
NSQ提供了分布式的,去中心化,且沒有單點(diǎn)故障的拓?fù)浣Y(jié)構(gòu),穩(wěn)定的消息傳輸發(fā)布保障,能夠具有高容錯(cuò)和HA(高可用)特性。
2). Scalable易于擴(kuò)展
NSQ支持水平擴(kuò)展,沒有中心化的brokers。內(nèi)置的發(fā)現(xiàn)服務(wù)簡(jiǎn)化了在集群中增加節(jié)點(diǎn)。同時(shí)支持pub-sub和load-balanced 的消息分發(fā)。
3). Ops Friendly
NSQ非常容易配置和部署,生來(lái)就綁定了一個(gè)管理界面。二進(jìn)制包沒有運(yùn)行時(shí)依賴。官方有Docker image。
4.Integrated高度集成
官方的 Go 和 Python庫(kù)都有提供。而且為大多數(shù)語(yǔ)言提供了庫(kù)。
1.2 組件
1.3 拓?fù)浣Y(jié)構(gòu)
NSQ推薦通過他們相應(yīng)的nsqd實(shí)例使用協(xié)同定位發(fā)布者,這意味著即使面對(duì)網(wǎng)絡(luò)分區(qū),消息也會(huì)被保存在本地,直到它們被一個(gè)消費(fèi)者讀取。更重要的是,發(fā)布者不必去發(fā)現(xiàn)其他的nsqd節(jié)點(diǎn),他們總是可以向本地實(shí)例發(fā)布消息。
NSQ
首先,一個(gè)發(fā)布者向它的本地nsqd發(fā)送消息,要做到這點(diǎn),首先要先打開一個(gè)連接,然后發(fā)送一個(gè)包含topic和消息主體的發(fā)布命令,在這種情況下,我們將消息發(fā)布到事件topic上以分散到我們不同的worker中。
事件topic會(huì)復(fù)制這些消息并且在每一個(gè)連接topic的channel上進(jìn)行排隊(duì),在我們的案例中,有三個(gè)channel,它們其中之一作為檔案channel。消費(fèi)者會(huì)獲取這些消息并且上傳到S3。
nsqd
每個(gè)channel的消息都會(huì)進(jìn)行排隊(duì),直到一個(gè)worker把他們消費(fèi),如果此隊(duì)列超出了內(nèi)存限制,消息將會(huì)被寫入到磁盤中。Nsqd節(jié)點(diǎn)首先會(huì)向nsqlookup廣播他們的位置信息,一旦它們注冊(cè)成功,worker將會(huì)從nsqlookup服務(wù)器節(jié)點(diǎn)上發(fā)現(xiàn)所有包含事件topic的nsqd節(jié)點(diǎn)。
nsqlookupd
2. Internals
2.1 消息傳遞擔(dān)保
1)客戶表示已經(jīng)準(zhǔn)備好接收消息
2)NSQ 發(fā)送一條消息,并暫時(shí)將數(shù)據(jù)存儲(chǔ)在本地(在 re-queue 或 timeout)
3)客戶端回復(fù) FIN(結(jié)束)或 REQ(重新排隊(duì))分別指示成功或失敗。如果客戶端沒有回復(fù), NSQ 會(huì)在設(shè)定的時(shí)間超時(shí),自動(dòng)重新排隊(duì)消息
這確保了消息丟失唯一可能的情況是不正常結(jié)束 nsqd 進(jìn)程。在這種情況下,這是在內(nèi)存中的任何信息(或任何緩沖未刷新到磁盤)都將丟失。
如何防止消息丟失是最重要的,即使是這個(gè)意外情況可以得到緩解。一種解決方案是構(gòu)成冗余 nsqd對(duì)(在不同的主機(jī)上)接收消息的相同部分的副本。因?yàn)槟銓?shí)現(xiàn)的消費(fèi)者是冪等的,以兩倍時(shí)間處理這些消息不會(huì)對(duì)下游造成影響,并使得系統(tǒng)能夠承受任何單一節(jié)點(diǎn)故障而不會(huì)丟失信息。
2.2 簡(jiǎn)化配置和管理
單個(gè) nsqd 實(shí)例被設(shè)計(jì)成可以同時(shí)處理多個(gè)數(shù)據(jù)流。流被稱為“話題”和話題有 1 個(gè)或多個(gè)“通道”。每個(gè)通道都接收到一個(gè)話題中所有消息的拷貝。在實(shí)踐中,一個(gè)通道映射到下行服務(wù)消費(fèi)一個(gè)話題。
在更底的層面,每個(gè) nsqd 有一個(gè)與 nsqlookupd 的長(zhǎng)期 TCP 連接,定期推動(dòng)其狀態(tài)。這個(gè)數(shù)據(jù)被 nsqlookupd 用于給消費(fèi)者通知 nsqd 地址。對(duì)于消費(fèi)者來(lái)說,一個(gè)暴露的 HTTP /lookup 接口用于輪詢。為話題引入一個(gè)新的消費(fèi)者,只需啟動(dòng)一個(gè)配置了 nsqlookup 實(shí)例地址的 NSQ 客戶端。無(wú)需為添加任何新的消費(fèi)者或生產(chǎn)者更改配置,大大降低了開銷和復(fù)雜性。
2.3 消除單點(diǎn)故障
NSQ被設(shè)計(jì)以分布的方式被使用。nsqd 客戶端(通過 TCP )連接到指定話題的所有生產(chǎn)者實(shí)例。沒有中間人,沒有消息代理,也沒有單點(diǎn)故障。
這種拓?fù)浣Y(jié)構(gòu)消除單鏈,聚合,反饋。相反,你的消費(fèi)者直接訪問所有生產(chǎn)者。從技術(shù)上講,哪個(gè)客戶端連接到哪個(gè) NSQ 不重要,只要有足夠的消費(fèi)者連接到所有生產(chǎn)者,以滿足大量的消息,保證所有東西最終將被處理。對(duì)于 nsqlookupd,高可用性是通過運(yùn)行多個(gè)實(shí)例來(lái)實(shí)現(xiàn)。他們不直接相互通信和數(shù)據(jù)被認(rèn)為是最終一致。消費(fèi)者輪詢所有的配置的 nsqlookupd 實(shí)例和合并 response。失敗的,無(wú)法訪問的,或以其他方式故障的節(jié)點(diǎn)不會(huì)讓系統(tǒng)陷于停頓。
2.4 效率
對(duì)于數(shù)據(jù)的協(xié)議,通過推送數(shù)據(jù)到客戶端最大限度地提高性能和吞吐量的,而不是等待客戶端拉數(shù)據(jù)。這個(gè)概念,稱之為 RDY 狀態(tài),基本上是客戶端流量控制的一種形式。
efficiency
2.5 心跳和超時(shí)
組合應(yīng)用級(jí)別的心跳和 RDY 狀態(tài),避免頭阻塞現(xiàn)象,也可能使心跳無(wú)用(即,如果消費(fèi)者是在后面的處理消息流的接收緩沖區(qū)中,操作系統(tǒng)將被填滿,堵心跳)為了保證進(jìn)度,所有的網(wǎng)絡(luò) IO 時(shí)間上限勢(shì)必與配置的心跳間隔相關(guān)聯(lián)。這意味著,你可以從字面上拔掉之間的網(wǎng)絡(luò)連接 nsqd 和消費(fèi)者,它會(huì)檢測(cè)并正確處理錯(cuò)誤。當(dāng)檢測(cè)到一個(gè)致命錯(cuò)誤,客戶端連接被強(qiáng)制關(guān)閉。在傳輸中的消息會(huì)超時(shí)而重新排隊(duì)等待傳遞到另一個(gè)消費(fèi)者。最后,錯(cuò)誤會(huì)被記錄并累計(jì)到各種內(nèi)部指標(biāo)。
2.6 分布式
因?yàn)镹SQ沒有在守護(hù)程序之間共享信息,所以它從一開始就是為了分布式操作而生。個(gè)別的機(jī)器可以隨便宕機(jī)隨便啟動(dòng)而不會(huì)影響到系統(tǒng)的其余部分,消息發(fā)布者可以在本地發(fā)布,即使面對(duì)網(wǎng)絡(luò)分區(qū)。
這種“分布式優(yōu)先”的設(shè)計(jì)理念意味著NSQ基本上可以永遠(yuǎn)不斷地?cái)U(kuò)展,需要更高的吞吐量?那就添加更多的nsqd吧。唯一的共享狀態(tài)就是保存在lookup節(jié)點(diǎn)上,甚至它們不需要全局視圖,配置某些nsqd注冊(cè)到某些lookup節(jié)點(diǎn)上這是很簡(jiǎn)單的配置,唯一關(guān)鍵的地方就是消費(fèi)者可以通過lookup節(jié)點(diǎn)獲取所有完整的節(jié)點(diǎn)集。清晰的故障事件——NSQ在組件內(nèi)建立了一套明確關(guān)于可能導(dǎo)致故障的的故障權(quán)衡機(jī)制,這對(duì)消息傳遞和恢復(fù)都有意義。雖然它們可能不像Kafka系統(tǒng)那樣提供嚴(yán)格的保證級(jí)別,但NSQ簡(jiǎn)單的操作使故障情況非常明顯。
2.7 no replication
不像其他的隊(duì)列組件,NSQ并沒有提供任何形式的復(fù)制和集群,也正是這點(diǎn)讓它能夠如此簡(jiǎn)單地運(yùn)行,但它確實(shí)對(duì)于一些高保證性高可靠性的消息發(fā)布沒有足夠的保證。我們可以通過降低文件同步的時(shí)間來(lái)部分避免,只需通過一個(gè)標(biāo)志配置,通過EBS支持我們的隊(duì)列。但是這樣仍然存在一個(gè)消息被發(fā)布后馬上死亡,丟失了有效的寫入的情況。
2.8 沒有嚴(yán)格的順序
雖然Kafka由一個(gè)有序的日志構(gòu)成,但NSQ不是。消息可以在任何時(shí)間以任何順序進(jìn)入隊(duì)列。在我們使用的案例中,這通常沒有關(guān)系,因?yàn)樗械臄?shù)據(jù)都被加上了時(shí)間戳,但它并不適合需要嚴(yán)格順序的情況。
2.9 無(wú)數(shù)據(jù)重復(fù)刪除功能
NSQ對(duì)于超時(shí)系統(tǒng),它使用了心跳檢測(cè)機(jī)制去測(cè)試消費(fèi)者是否存活還是死亡。很多原因會(huì)導(dǎo)致我們的consumer無(wú)法完成心跳檢測(cè),所以在consumer中必須有一個(gè)單獨(dú)的步驟確保冪等性。
3. 實(shí)踐安裝過程
本文將nsq集群具體的安裝過程略去,大家可以自行參考官網(wǎng),比較簡(jiǎn)單。這部分介紹下筆者實(shí)驗(yàn)的拓?fù)?,以及nsqadmin的相關(guān)信息。
3.1 拓?fù)浣Y(jié)構(gòu)
topology
實(shí)驗(yàn)采用3臺(tái)NSQD服務(wù),2臺(tái)LOOKUPD服務(wù)。
采用官方推薦的拓?fù)?,消息發(fā)布的服務(wù)和NSQD在一臺(tái)主機(jī)。一共5臺(tái)機(jī)器。
NSQ基本沒有配置文件,配置通過命令行指定參數(shù)。
主要命令如下:
LOOKUPD命令
NSQD命令
工具類,消費(fèi)后存儲(chǔ)到本地文件。
發(fā)布一條消息
3.2 nsqadmin
對(duì)Streams的詳細(xì)信息進(jìn)行查看,包括NSQD節(jié)點(diǎn),具體的channel,隊(duì)列中的消息數(shù),連接數(shù)等信息。
nsqadmin
channel
列出所有的NSQD節(jié)點(diǎn):
nodes
消息的統(tǒng)計(jì):
msgs
lookup主機(jī)的列表:
hosts
4. 總結(jié)
NSQ基本核心就是簡(jiǎn)單性,是一個(gè)簡(jiǎn)單的隊(duì)列,這意味著它很容易進(jìn)行故障推理和很容易發(fā)現(xiàn)bug。消費(fèi)者可以自行處理故障事件而不會(huì)影響系統(tǒng)剩下的其余部分。
事實(shí)上,簡(jiǎn)單性是我們決定使用NSQ的首要因素,這方便與我們的許多其他軟件一起維護(hù),通過引入隊(duì)列使我們得到了堪稱完美的表現(xiàn),通過隊(duì)列甚至讓我們?cè)黾恿藥讉€(gè)數(shù)量級(jí)的吞吐量。越來(lái)越多的consumer需要一套嚴(yán)格可靠性和順序性保障,這已經(jīng)超過了NSQ提供的簡(jiǎn)單功能。
結(jié)合我們的業(yè)務(wù)系統(tǒng)來(lái)看,對(duì)于我們所需要傳輸?shù)陌l(fā)票消息,相對(duì)比較敏感,無(wú)法容忍某個(gè)nsqd宕機(jī),或者磁盤無(wú)法使用的情況,該節(jié)點(diǎn)堆積的消息無(wú)法找回。這是我們沒有選擇該消息中間件的主要原因。簡(jiǎn)單性和可靠性似乎并不能完全滿足。相比Kafka,ops肩負(fù)起更多負(fù)責(zé)的運(yùn)營(yíng)。另一方面,它擁有一個(gè)可復(fù)制的、有序的日志可以提供給我們更好的服務(wù)。但對(duì)于其他適合NSQ的consumer,它為我們服務(wù)的相當(dāng)好,我們期待著繼續(xù)鞏固它的堅(jiān)實(shí)的基礎(chǔ)。