本篇內(nèi)容介紹了“怎么讓Kafka達到最佳吞吐量”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠?qū)W有所成!
我們提供的服務有:成都網(wǎng)站建設、網(wǎng)站制作、微信公眾號開發(fā)、網(wǎng)站優(yōu)化、網(wǎng)站認證、化隆ssl等。為數(shù)千家企事業(yè)單位解決了網(wǎng)站和推廣的問題。提供周到的售前咨詢和貼心的售后服務,是有科學管理、有技術(shù)的化隆網(wǎng)站制作公司
func main() { // 1. 初始化 pusher := kq.NewPusher([]string{ "127.0.0.1:19092", "127.0.0.1:19092", "127.0.0.1:19092", }, "kq") ticker := time.NewTicker(time.Millisecond) for round := 0; round < 3; round++ { select { case <-ticker.C: count := rand.Intn(100) m := message{ Key: strconv.FormatInt(time.Now().UnixNano(), 10), Value: fmt.Sprintf("%d,%d", round, count), Payload: fmt.Sprintf("%d,%d", round, count), } body, err := json.Marshal(m) if err != nil { log.Fatal(err) } fmt.Println(string(body)) // 2. 寫入 if err := pusher.Push(string(body)); err != nil { log.Fatal(err) } } } }
將 kafka cluster
配置以及 topic
傳入,你就得到一個操作 kafka
的 push operator
。
至于寫入消息,簡單的調(diào)用 pusher.Push(msg)
就行。是的,就這么簡單!
> 當然,目前只支持單個 msg
寫入。可能有人會疑惑,那就繼續(xù)往下看,為什么只能一條一條寫入?
一起看看 pusher
初始化哪些步驟:
NewPusher(clusterAddrs, topic, opts...) |- kafka.NewWriter(kfConfig) // 與 kf 之前的連接 |- executor = executors.NewChunkExecutor() // 設置內(nèi)部寫入的executor為字節(jié)數(shù)定量寫入
建立與 kafka cluster
的連接。此處肯定就要傳入 kafka config
;
設置內(nèi)部暫存區(qū)的寫入函數(shù)以及刷新規(guī)則。
使用 chunkExecutor
作用不言而喻:將隨機寫 -> 批量寫,減少 I/O 消耗;同時保證單次寫入不能超過默認的 1M
或者自己設定的最大寫入字節(jié)數(shù)。
其實再往 chunkExecutor
內(nèi)部看,其實每次觸發(fā)插入有兩個指標:
maxChunkSize
:單次最大寫入字節(jié)數(shù)
flushInterval
:刷新暫存消息插入的間隔時間
在觸發(fā)寫入,只要滿足任意一個指標都會執(zhí)行寫入。同時在 executors
都有設置插入間隔時間,以防暫存區(qū)寫入阻塞而暫存區(qū)內(nèi)消息一直不被刷新清空。
> 更多關于 executors
可以參看以下:https://zeromicro.github.io/go-zero/executors.html
根據(jù)上述初始化對 executors
介紹,插入過程中也少不了它的配合:
func (p *Pusher) Push(v string) error { // 1. 將 msg -> kafka 內(nèi)部的 Message msg := kafka.Message{ Key: []byte(strconv.FormatInt(time.Now().UnixNano(), 10)), Value: []byte(v), } // 使用 executor.Add() 插入內(nèi)部的 container // 當 executor 初始化失敗或者是內(nèi)部發(fā)生錯誤,也會將 Message 直接插入 kafka if p.executor != nil { return p.executor.Add(msg, len(v)) } else { return p.produer.WriteMessages(context.Background(), msg) } }
過程其實很簡單。那 executors.Add(msg, len(msg))
是怎么把 msg
插入到 kafka
呢?
插入的邏輯其實在初始化中就聲明了:
pusher.executor = executors.NewChunkExecutor(func(tasks []interface{}) { chunk := make([]kafka.Message, len(tasks)) // 1 for i := range tasks { chunk[i] = tasks[i].(kafka.Message) } // 2 if err := pusher.produer.WriteMessages(context.Background(), chunk...); err != nil { logx.Error(err) } }, newOptions(opts)...)
觸發(fā)插入時,將暫存區(qū)中存儲的 []msg
依次拿出,作為最終插入消息集合;
將上一步的消息集合,作為一個批次插入 kafka
的 topic
中
這樣 pusher -> chunkExecutor -> kafka
一個鏈路就出現(xiàn)了。
“怎么讓Kafka達到最佳吞吐量”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關的知識可以關注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!