真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

怎么讓Kafka達到最佳吞吐量

本篇內(nèi)容介紹了“怎么讓Kafka達到最佳吞吐量”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠?qū)W有所成!

我們提供的服務有:成都網(wǎng)站建設、網(wǎng)站制作、微信公眾號開發(fā)、網(wǎng)站優(yōu)化、網(wǎng)站認證、化隆ssl等。為數(shù)千家企事業(yè)單位解決了網(wǎng)站和推廣的問題。提供周到的售前咨詢和貼心的售后服務,是有科學管理、有技術(shù)的化隆網(wǎng)站制作公司

上手使用

func main() {
  // 1. 初始化
	pusher := kq.NewPusher([]string{
		"127.0.0.1:19092",
		"127.0.0.1:19092",
		"127.0.0.1:19092",
	}, "kq")

	ticker := time.NewTicker(time.Millisecond)
	for round := 0; round < 3; round++ {
		select {
		case <-ticker.C:
			count := rand.Intn(100)
			m := message{
				Key:     strconv.FormatInt(time.Now().UnixNano(), 10),
				Value:   fmt.Sprintf("%d,%d", round, count),
				Payload: fmt.Sprintf("%d,%d", round, count),
			}
			body, err := json.Marshal(m)
			if err != nil {
				log.Fatal(err)
			}

			fmt.Println(string(body))
      // 2. 寫入
			if err := pusher.Push(string(body)); err != nil {
				log.Fatal(err)
			}
		}
	}
}

kafka cluster 配置以及 topic 傳入,你就得到一個操作 kafkapush operator。

至于寫入消息,簡單的調(diào)用 pusher.Push(msg) 就行。是的,就這么簡單!

> 當然,目前只支持單個 msg 寫入。可能有人會疑惑,那就繼續(xù)往下看,為什么只能一條一條寫入?

初始化

一起看看 pusher 初始化哪些步驟:

NewPusher(clusterAddrs, topic, opts...)
	|- kafka.NewWriter(kfConfig)								// 與 kf 之前的連接
	|- executor = executors.NewChunkExecutor()  // 設置內(nèi)部寫入的executor為字節(jié)數(shù)定量寫入
  1. 建立與 kafka cluster 的連接。此處肯定就要傳入 kafka config;

  2. 設置內(nèi)部暫存區(qū)的寫入函數(shù)以及刷新規(guī)則。

使用 chunkExecutor 作用不言而喻:將隨機寫 -> 批量寫,減少 I/O 消耗;同時保證單次寫入不能超過默認的 1M 或者自己設定的最大寫入字節(jié)數(shù)。

其實再往 chunkExecutor 內(nèi)部看,其實每次觸發(fā)插入有兩個指標:

  • maxChunkSize:單次最大寫入字節(jié)數(shù)

  • flushInterval:刷新暫存消息插入的間隔時間

在觸發(fā)寫入,只要滿足任意一個指標都會執(zhí)行寫入。同時在 executors 都有設置插入間隔時間,以防暫存區(qū)寫入阻塞而暫存區(qū)內(nèi)消息一直不被刷新清空。

> 更多關于 executors 可以參看以下:https://zeromicro.github.io/go-zero/executors.html

生產(chǎn)者插入

根據(jù)上述初始化對 executors 介紹,插入過程中也少不了它的配合:

func (p *Pusher) Push(v string) error {
  // 1. 將 msg -> kafka 內(nèi)部的 Message
	msg := kafka.Message{
		Key:   []byte(strconv.FormatInt(time.Now().UnixNano(), 10)),
		Value: []byte(v),
	}
  
  // 使用 executor.Add() 插入內(nèi)部的 container
  // 當 executor 初始化失敗或者是內(nèi)部發(fā)生錯誤,也會將 Message 直接插入 kafka
	if p.executor != nil {
		return p.executor.Add(msg, len(v))
	} else {
		return p.produer.WriteMessages(context.Background(), msg)
	}
}

過程其實很簡單。那 executors.Add(msg, len(msg)) 是怎么把 msg 插入到 kafka 呢?

插入的邏輯其實在初始化中就聲明了:

pusher.executor = executors.NewChunkExecutor(func(tasks []interface{}) {
		chunk := make([]kafka.Message, len(tasks))
  	// 1
		for i := range tasks {
			chunk[i] = tasks[i].(kafka.Message)
		}
  	// 2
		if err := pusher.produer.WriteMessages(context.Background(), chunk...); err != nil {
			logx.Error(err)
		}
	}, newOptions(opts)...)
  1. 觸發(fā)插入時,將暫存區(qū)中存儲的 []msg 依次拿出,作為最終插入消息集合;

  2. 將上一步的消息集合,作為一個批次插入 kafkatopic

這樣 pusher -> chunkExecutor -> kafka 一個鏈路就出現(xiàn)了。

“怎么讓Kafka達到最佳吞吐量”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關的知識可以關注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!


名稱欄目:怎么讓Kafka達到最佳吞吐量
文章出自:http://weahome.cn/article/jcgshe.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部