真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

?Kafka順序消費(fèi)線程模型的優(yōu)化方法是什么

本篇內(nèi)容主要講解“ Kafka順序消費(fèi)線程模型的優(yōu)化方法是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“ Kafka順序消費(fèi)線程模型的優(yōu)化方法是什么”吧!

公司主營業(yè)務(wù):成都網(wǎng)站建設(shè)、成都做網(wǎng)站、移動(dòng)網(wǎng)站開發(fā)等業(yè)務(wù)。幫助企業(yè)客戶真正實(shí)現(xiàn)互聯(lián)網(wǎng)宣傳,提高企業(yè)的競爭能力。創(chuàng)新互聯(lián)公司是一支青春激揚(yáng)、勤奮敬業(yè)、活力青春激揚(yáng)、勤奮敬業(yè)、活力澎湃、和諧高效的團(tuán)隊(duì)。公司秉承以“開放、自由、嚴(yán)謹(jǐn)、自律”為核心的企業(yè)文化,感謝他們對(duì)我們的高要求,感謝他們從不同領(lǐng)域給我們帶來的挑戰(zhàn),讓我們激情的團(tuán)隊(duì)有機(jī)會(huì)用頭腦與智慧不斷的給客戶帶來驚喜。創(chuàng)新互聯(lián)公司推出海曙免費(fèi)做網(wǎng)站回饋大家。

 Kafka 順序消費(fèi)線程模型的實(shí)踐與優(yōu)化

各類消息中間件對(duì)順序消息實(shí)現(xiàn)的做法是將具有順序性的一類消息發(fā)往相同的主題分區(qū)中,只需要將這類消息設(shè)置相同的 Key 即可,而 Kafka 會(huì)在任意時(shí)刻保證一個(gè)消費(fèi)組同時(shí)只能有一個(gè)消費(fèi)者監(jiān)聽消費(fèi),因此可在消費(fèi)時(shí)按分區(qū)進(jìn)行順序消費(fèi),保證每個(gè)分區(qū)的消息具備局部順序性。由于需要確保分區(qū)消息的順序性,并不能并發(fā)地消費(fèi)消費(fèi),對(duì)消費(fèi)的吞吐量會(huì)造成一定的影響。那么,如何在保證消息順序性的前提下,最大限度的提高消費(fèi)者的消費(fèi)能力?

本文將會(huì)對(duì) Kafka 消費(fèi)者拉取消息流程進(jìn)行深度分析之后,對(duì) Kafka 消費(fèi)者順序消費(fèi)線程模型進(jìn)行一次實(shí)踐與優(yōu)化。

Kafka 消費(fèi)者拉取消息流程分析

在講實(shí)現(xiàn) Kafka 順序消費(fèi)線程模型之前,我們需要先深入分析 Kafka 消費(fèi)者的消息拉取機(jī)制,只有當(dāng)你對(duì) Kafka 消費(fèi)者拉取消息的整個(gè)流程有深入的了解之后,你才能夠很好地理解本次線程模型改造的方案。

我先給大家模擬一下消息拉取的實(shí)際現(xiàn)象,這里 max.poll.records = 500。

1、消息沒有堆積時(shí):

?Kafka順序消費(fèi)線程模型的優(yōu)化方法是什么

可以發(fā)現(xiàn),在消息沒有堆積時(shí),消費(fèi)者拉取時(shí),如果某個(gè)分區(qū)沒有的消息不足 500 條,會(huì)從其他分區(qū)湊夠 500 條后再返回。

2、多個(gè)分區(qū)都有堆積時(shí):

在消息有堆積時(shí),可以發(fā)現(xiàn)每次返回的都是同一個(gè)分區(qū)的消息,但經(jīng)過不斷 debug,消費(fèi)者在拉取過程中并不是等某個(gè)分區(qū)消費(fèi)完沒有堆積了,再拉取下一個(gè)分區(qū)的消息,而是不斷循環(huán)的拉取各個(gè)分區(qū)的消息,但是這個(gè)循環(huán)并不是說分區(qū) p0 拉取完 500 條,后面一定會(huì)拉取分區(qū) p1 的消息,很有可能后面還會(huì)拉取 p0 分區(qū)的消息,為了弄明白這種現(xiàn)象,我仔細(xì)閱讀了相關(guān)源碼。

org.apache.kafka.clients.consumer.KafkaConsumer#poll

private ConsumerRecords poll(final Timer timer, final boolean includeMetadataInTimeout) {  try {// poll for new data until the timeout expiresdo {      // 客戶端拉取消息核心邏輯  final Map>> records = pollForFetches(timer);      if (!records.isEmpty()) {//  在返回?cái)?shù)據(jù)之前, 發(fā)送下次的 fetch 請(qǐng)求, 避免用戶在下次獲取數(shù)據(jù)時(shí)線程阻塞if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {          // 調(diào)用 ConsumerNetworkClient#poll 方法將 FetchRequest 發(fā)送出去。  client.pollNoWakeup();
        }return this.interceptors.onConsume(new ConsumerRecords<>(records));
      }
    } while (timer.notExpired());return ConsumerRecords.empty();
  } finally {
    release();
  }
}

我們使用 Kafka consumer 進(jìn)行消費(fèi)的時(shí)候通常會(huì)給一個(gè)時(shí)間,比如:

consumer.poll(Duration.ofMillis(3000));

從以上代碼邏輯可以看出來,用戶給定的這個(gè)時(shí)間,目的是為了等待消息湊夠 max.poll.records 條消息后再返回,即使消息條數(shù)不夠 max.poll.records 消息,時(shí)間到了用戶給定的等待時(shí)間后,也會(huì)返回。

pollForFetches 方法是客戶端拉取消息核心邏輯,但并不是真正去 broker 中拉取,而是從緩存中去獲取消息。在 pollForFetches 拉取消息后,如果消息不為零,還會(huì)調(diào)用 fetcher.sendFetches() 與 client.pollNoWakeup(),調(diào)用這兩個(gè)方法究竟有什么用呢?

fetcher.sendFetches() 經(jīng)過源碼閱讀后,得知該方法目的是為了構(gòu)建拉取請(qǐng)求 FetchRequest 并進(jìn)行發(fā)送,但是這里的發(fā)送并不是真正的發(fā)送,而是將 FetchRequest 請(qǐng)求對(duì)象存放在 unsend 緩存當(dāng)中,然后會(huì)在 ConsumerNetworkClient#poll 方法調(diào)用時(shí)才會(huì)被真正地執(zhí)行發(fā)送。

fetcher.sendFetches() 在構(gòu)建 FetchRequest 前,會(huì)對(duì)當(dāng)前可拉取分區(qū)進(jìn)行篩選,而這個(gè)也是決定多分區(qū)拉取消息規(guī)律的核心,后面我會(huì)講到。

從 KafkaConsumer#poll 方法源碼可以看出來,其實(shí) Kafka 消費(fèi)者在拉取消息過程中,有兩條線程在工作,其中用戶主線程調(diào)用 pollForFetches 方法從緩存中獲取消息消費(fèi),在獲取消息后,會(huì)再調(diào)用 ConsumerNetworkClient#poll 方法從 Broker 發(fā)送拉取請(qǐng)求,然后將拉取到的消息緩存到本地,這里為什么在拉取完消息后,會(huì)主動(dòng)調(diào)用 ConsumerNetworkClient#poll 方法呢?我想這里的目的是為了下次 poll 的時(shí)候可以立即從緩存中拉取消息。

pollForFetches 方法會(huì)調(diào)用 Fetcher#fetchedRecords 方法從緩存中獲取并解析消息:

public Map>> fetchedRecords() {
  Map>> fetched = new HashMap<>();
  int recordsRemaining = maxPollRecords;  try {while (recordsRemaining > 0) {      // 如果當(dāng)前獲取消息的 PartitionRecords 為空,或者已經(jīng)拉取完畢  // 則需要從 completedFetches 重新獲取 completedFetch 并解析成 PartitionRecords  if (nextInLineRecords == null || nextInLineRecords.isFetched) {// 如果上一個(gè)分區(qū)緩存中的數(shù)據(jù)已經(jīng)拉取完了,直接中斷本次循環(huán)拉取,并返回空的消息列表// 直至有緩存數(shù)據(jù)為止CompletedFetch completedFetch = completedFetches.peek();if (completedFetch == null) break;try {          // CompletedFetch 即拉取消息的本地緩存數(shù)據(jù)  // 緩存數(shù)據(jù)中 CompletedFetch 解析成 PartitionRecords  nextInLineRecords = parseCompletedFetch(completedFetch);
        } catch (Exception e) {          // ...}
        completedFetches.poll();
      } else {// 從分區(qū)緩存中獲取指定條數(shù)的消息List> records = fetchRecords(nextInLineRecords, recordsRemaining);// ...fetched.put(partition, records);
        recordsRemaining -= records.size();
      }
    }
  }
} catch (KafkaException e) {  // ...}return fetched;
}

completedFetches 是拉取到的消息緩存,以上代碼邏輯就是圍繞著如何從 completedFetches 緩存中獲取消息的,從以上代碼邏輯可以看出:

maxPollRecords 為本次拉取的最大消息數(shù)量,該值可通過 max.poll.records 參數(shù)配置,默認(rèn)為 500 條,該方法每次從 completedFetches 中取出一個(gè) CompletedFetch 并解析成可以拉取的 PartitionRecords 對(duì)象,即方法中的 nextInLineRecords,請(qǐng)注意,PartitionRecords 中的消息數(shù)量可能大與 500 條,因此可能本次可能一次性從 PartitionRecords 獲取 500 條消息后即返回,如果 PartitionRecords 中消息數(shù)量不足 500 條,會(huì)從 completedFetches 緩存中取出下一個(gè)要拉取的分區(qū)消息,recordsRemaining 會(huì)記錄本次剩余還有多少消息沒拉取,通過循環(huán)不斷地從 completedFetches 緩存中取消息,直至 recordsRemaining 為 0。

以上代碼即可解釋為什么消息有堆積的情況下,每次拉取的消息很大概率是同一個(gè)分區(qū)的消息,因?yàn)榫彺?CompletedFetch 緩存中的消息很大概率會(huì)多余每次拉取消息數(shù)量,Kafka 客戶端每次從 Broker 拉取的消息數(shù)據(jù)并不是通過 max.poll.records 決定的,該參數(shù)僅決定用戶每次從本地緩存中獲取多少條數(shù)據(jù),真正決定從 Broker 拉取的消息數(shù)據(jù)量是通過 fetch.min.bytes、max.partition.fetch.bytes、fetch.max.bytes 等參數(shù)決定的。

我們?cè)傧胍幌?,假設(shè)某個(gè)分區(qū)的消息一直都處于堆積狀態(tài),Kafka 會(huì)每次都拉取這個(gè)分區(qū)直至將該分區(qū)消費(fèi)完畢嗎?(根據(jù)假設(shè),Kafka 消費(fèi)者每次都會(huì)從這個(gè)分區(qū)拉取消息,并將消息存到分區(qū)關(guān)聯(lián)的 CompletedFetch 緩存中,根據(jù)以上代碼邏輯,nextInLineRecords 一直處于還沒拉取完的狀態(tài),導(dǎo)致每次拉取都會(huì)從該分區(qū)中拉取消息。)

答案顯然不會(huì),不信你打開 Kafka-manager 觀察每個(gè)分區(qū)的消費(fèi)進(jìn)度情況,每個(gè)分區(qū)都會(huì)有消費(fèi)者在消費(fèi)中。

那 Kafka 消費(fèi)者是如何循環(huán)地拉取它監(jiān)聽的分區(qū)呢?我們接著往下分析。

發(fā)送拉取請(qǐng)求邏輯:

org.apache.kafka.clients.consumer.internals.Fetcher#sendFetches

public synchronized int sendFetches() {  // 解析本次可拉取的分區(qū)
  Map fetchRequestMap = prepareFetchRequests();  for (Map.Entry entry : fetchRequestMap.entrySet()) {final Node fetchTarget = entry.getKey();final FetchSessionHandler.FetchRequestData data = entry.getValue();// 構(gòu)建請(qǐng)求對(duì)象final FetchRequest.Builder request = FetchRequest.Builder
      .forConsumer(this.maxWaitMs, this.minBytes, data.toSend())
      .isolationLevel(isolationLevel)
      .setMaxBytes(this.maxBytes)
      .metadata(data.metadata())
      .toForget(data.toForget());// 發(fā)送請(qǐng)求,但不是真的發(fā)送,而是將請(qǐng)求保存在 unsent 中client.send(fetchTarget, request)
      .addListener(new RequestFutureListener() {@Overridepublic void onSuccess(ClientResponse resp) {
          synchronized (Fetcher.this) {// ... ...// 創(chuàng)建 CompletedFetch, 并緩存到 completedFetches 隊(duì)列中completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
                                                    resp.requestHeader().apiVersion()));
          }

        }
      }                   // ... ...   });
  }  return fetchRequestMap.size();
}

以上代碼邏輯很好理解,在發(fā)送拉取請(qǐng)求前,先檢查哪些分區(qū)可拉取,接著為每個(gè)分區(qū)構(gòu)建一個(gè) FetchRequest 對(duì)象,F(xiàn)etchRequest 中的 minBytes 和 maxBytes,分別可通過 fetch.min.bytes 和 fetch.max.bytes 參數(shù)設(shè)置。這也是每次從 Broker 中拉取的消息不一定等于 max.poll.records 的原因。

?Kafka順序消費(fèi)線程模型的優(yōu)化方法是什么

prepareFetchRequests 方法會(huì)調(diào)用 Fetcher#fetchablePartitions 篩選可拉取的分區(qū),我們來看下 Kafka 消費(fèi)者是如何進(jìn)行篩選的:

org.apache.kafka.clients.consumer.internals.Fetcher#fetchablePartitions

private List fetchablePartitions() {
  Set exclude = new HashSet<>();
  List fetchable = subscriptions.fetchablePartitions();  if (nextInLineRecords != null && !nextInLineRecords.isFetched) {
    exclude.add(nextInLineRecords.partition);
  }  for (CompletedFetch completedFetch : completedFetches) {
    exclude.add(completedFetch.partition);
  }
  fetchable.removeAll(exclude);  return fetchable;
}

nextInLineRecords 即我們上面提到的根據(jù)某個(gè)分區(qū)緩存 CompletedFetch 解析得到的,如果 nextInLineRecords 中的緩存還沒拉取完,則不從 broker 中拉取消息了,以及如果此時(shí) completedFetches 緩存中存在該分區(qū)的緩存,也不進(jìn)行拉取消息。

我們可以很清楚的得出結(jié)論:

當(dāng)緩存中還存在中還存在某個(gè)分區(qū)的消息數(shù)據(jù)時(shí),消費(fèi)者不會(huì)繼續(xù)對(duì)該分區(qū)進(jìn)行拉取請(qǐng)求,直到該分區(qū)的本地緩存被消費(fèi)完,才會(huì)繼續(xù)發(fā)送拉取請(qǐng)求。

為了更加清晰的表達(dá)這段邏輯,我舉個(gè)例子并將整個(gè)流程用圖表達(dá)出來:

假設(shè)某消費(fèi)者監(jiān)聽三個(gè)分區(qū),每個(gè)分區(qū)每次從 Broker 中拉取 4 條消息,用戶每次從本地緩存中獲取 2 條消息:

?Kafka順序消費(fèi)線程模型的優(yōu)化方法是什么

這種消費(fèi)模型創(chuàng)建多個(gè) KafkaConsumer 對(duì)象,每個(gè)線程維護(hù)一個(gè) KafkaConsumer,從而實(shí)現(xiàn)線程隔離消費(fèi),由于每個(gè)分區(qū)同一時(shí)刻只能有一個(gè)消費(fèi)者消費(fèi),所以這種消費(fèi)模型天然支持順序消費(fèi)。

但是缺點(diǎn)是無法提升單個(gè)分區(qū)的消費(fèi)能力,如果一個(gè)主題分區(qū)數(shù)量很多,只能通過增加 KafkaConsumer 實(shí)例提高消費(fèi)能力,這樣一來線程數(shù)量過多,導(dǎo)致項(xiàng)目 Socket 連接開銷巨大,項(xiàng)目中一般不用該線程模型去消費(fèi)。

2、單 KafkaConsumer 實(shí)例 + 多 worker 線程

?Kafka順序消費(fèi)線程模型的優(yōu)化方法是什么

首先在初始化的時(shí)候,會(huì)對(duì)消費(fèi)線程池進(jìn)行初始化,具體是根據(jù) threadsNumMax 的數(shù)量創(chuàng)建若干個(gè)單個(gè)線程的線程池,單個(gè)線程的線程池就是為了保證每個(gè)分區(qū)取模后拿到線程池是串行消費(fèi)的,但這里創(chuàng)建 threadsNumMax 個(gè)線程池是不合理的,后面我會(huì)說到。

com.zto.consumer.KafkaConsumerProxy#submitRecords

?Kafka順序消費(fèi)線程模型的優(yōu)化方法是什么

以上就是目前 ZMS 順序消費(fèi)的線程模型,用圖表示以上代碼邏輯:

?Kafka順序消費(fèi)線程模型的優(yōu)化方法是什么

在消息流量大的時(shí)候,順序消息消費(fèi)時(shí)卻退化成單線程消費(fèi)了。

如何提高 Kafka 順序消費(fèi)的并發(fā)度?

經(jīng)過對(duì) ZMS 的消費(fèi)線程模型以及對(duì) Kafka 消費(fèi)者拉取消息流程的深入了解之后,我想到了如下幾個(gè)方面對(duì) ZMS 的消費(fèi)線程模型進(jìn)行優(yōu)化:

1、細(xì)化消息順序粒度

之前的做法是將每個(gè)分區(qū)單獨(dú)一條線程消費(fèi),無法再繼續(xù)在分區(qū)之上增加消費(fèi)能力,我們知道業(yè)務(wù)方發(fā)送順序消息時(shí),會(huì)將同一類型具有順序性的消息給一個(gè)相同的 Key,以保證這類消息發(fā)送到同一個(gè)分區(qū)進(jìn)行消費(fèi),從而達(dá)到消息順序消費(fèi)的目的,而同一個(gè)分區(qū)會(huì)接收多種類型(即不同 Key)的消息,每次拉取的消息具有很大可能是不同類型的,那么我們就可以將同一個(gè)分區(qū)的消息,分配一個(gè)獨(dú)立的線程池,再利用消息 Key 進(jìn)行取模放入對(duì)應(yīng)的線程中消費(fèi),達(dá)到并發(fā)消費(fèi)的目的,且不打亂消息的順序性。

2、細(xì)化位移提交粒度

由于 ZMS 目前是手動(dòng)提交位移,目前每次拉取消息必須先消費(fèi)完才能進(jìn)行位移提交,既然已經(jīng)對(duì)分區(qū)消息進(jìn)行指定的線程池消費(fèi)了,由于分區(qū)之間的位移先后提交不影響,那么我們可以將位移提交交給每個(gè)分區(qū)進(jìn)行管理,這樣拉取主線程不必等到是否消費(fèi)完才進(jìn)行下一輪的消息拉取。

3、異步拉取與限流

異步拉取有個(gè)問題,就是如果節(jié)點(diǎn)消費(fèi)跟不上,而拉取消息過多地保存在本地,很可能會(huì)造成內(nèi)存溢出,因此我們需要對(duì)消息拉取進(jìn)行限流,當(dāng)本地消息緩存量達(dá)到一定量時(shí),阻止消息拉取。

上面在分析 Kafka 消費(fèi)者拉取消息流程時(shí),我們知道消費(fèi)者在發(fā)送拉取請(qǐng)求時(shí),首先會(huì)判斷本地緩存中是否存在該分區(qū)的緩存,如果存在,則不發(fā)送拉取請(qǐng)求,但由于 ZMS 需要改造成異步拉取的形式,由于 Comsumer#poll 不再等待消息消費(fèi)完再進(jìn)行下一輪拉取,因此 Kafka 的本地緩存中幾乎不會(huì)存在數(shù)據(jù)了,導(dǎo)致 Kafka 每次都會(huì)發(fā)送拉取請(qǐng)求,相當(dāng)于將 Kafka 的本地緩存放到 ZMS 中,因此我們需要 ZMS 層面上對(duì)消息拉取進(jìn)行限流,Kafka 消費(fèi)者有兩個(gè)方法可以設(shè)置訂閱的分區(qū)是否可以發(fā)送拉取請(qǐng)求:

// 暫停分區(qū)消費(fèi)(即暫停該分區(qū)發(fā)送拉取消息請(qǐng)求)org.apache.kafka.clients.consumer.KafkaConsumer#pause// 恢復(fù)分區(qū)消費(fèi)(即恢復(fù)該分區(qū)發(fā)送拉取消息請(qǐng)求)org.apache.kafka.clients.consumer.KafkaConsumer#resume

以上兩個(gè)方法,其實(shí)就是改變了消費(fèi)者的訂閱分區(qū)的狀態(tài)值 paused,當(dāng) paused = true 時(shí),暫停分區(qū)消費(fèi),當(dāng) paused = false 時(shí),恢復(fù)分區(qū)消費(fèi),這個(gè)參數(shù)是在哪里使用到呢?上面在分析 Kafka 消費(fèi)者拉取消息流程時(shí)我們有提到發(fā)送拉取請(qǐng)求之前,會(huì)對(duì)可拉取的分區(qū)進(jìn)行篩選,其中一個(gè)條件即分區(qū) paused = false:

org.apache.kafka.clients.consumer.internals.SubscriptionState.TopicPartitionState#isFetchable

private boolean isFetchable() {  return !paused && hasValidPosition();
}

由于 KafkaConsumer 是非線程安全的,如果我們?cè)诋惒骄€程 KafkaConsumer 相關(guān)的類,會(huì)報(bào)如下錯(cuò)誤:

KafkaConsumer is not safe for multi-threaded access

只需要確保 KafkaConsumer 相關(guān)方法在 KafkaConsumer#poll 方法線程中調(diào)用即可,具體做法可以設(shè)置一個(gè)線程安全上下文容器,異步線程操作 KafkaConsumer 相關(guān)方法是,只需要將具體的分區(qū)放到上下文容器即可,后續(xù)統(tǒng)一由 poll 線程執(zhí)行。

因此我們只需要利用好這個(gè)特性,就可以實(shí)現(xiàn)拉取限流,消費(fèi)者主線程的 Comsumer#poll 方法依然是異步不斷地從緩存中獲取消息,同時(shí)不會(huì)造成兩次 poll 之間的時(shí)間過大導(dǎo)致消費(fèi)者被踢出消費(fèi)組。

以上優(yōu)化改造的核心是在不打亂消息順序的前提下利用消息 Key 盡可能地并發(fā)消費(fèi),但如果遇到分區(qū)中的消息都是相同 Key,并且在有一定的積壓下每次拉取都是同一個(gè)分區(qū)的消息時(shí),以上模型可能沒有理想情況下的那么好。這時(shí)是否可以將 fetch.max.bytes 與 max.partition.fetch.bytes 參數(shù)設(shè)置小一點(diǎn),讓每個(gè)分區(qū)的本地緩存都不足 500 條,這樣每次 poll 的消息列表都可以包含多個(gè)分區(qū)的消息了,但這樣又會(huì)導(dǎo)致 RPC 請(qǐng)求增多,這就需要針對(duì)業(yè)務(wù)消息大小,對(duì)這些參數(shù)進(jìn)行調(diào)優(yōu)。

以上線程模型,需要增加一個(gè)參數(shù) orderlyConsumePartitionParallelism,用于設(shè)置分區(qū)消費(fèi)并行度,假設(shè)某個(gè)消費(fèi)組被分配 5 個(gè)分區(qū)進(jìn)行消費(fèi),則每個(gè)分區(qū)默認(rèn)啟動(dòng)一條線程消費(fèi),一共 5 * 1 = 5 條消費(fèi)線程,當(dāng) orderlyConsumePartitionParallelism = 3,則每個(gè)分區(qū)啟動(dòng) 3 條線程消費(fèi),一共 5 * 3 = 15 條消費(fèi)線程。orderlyConsumePartitionParallelism = 1 時(shí),則說明該分區(qū)所有消息都處在順序(串行)消費(fèi);當(dāng) orderlyConsumePartitionParallelism > 1 時(shí),則根據(jù)分區(qū)消息的 Key 進(jìn)行取模分配線程消費(fèi),保證不了整個(gè)分區(qū)順序消費(fèi),但保證相同 Key 的消息順序消費(fèi)。

注意,當(dāng) orderlyConsumePartitionParallelism > 1 時(shí),分區(qū)消費(fèi)線程的有效使用率取決于該分區(qū)消息的 Key:

1、如果該分區(qū)所有消息的 Key 都相同,則消費(fèi)的 Key 取模都分配都同一條線程當(dāng)中,并行度退化成 orderlyConsumePartitionParallelism = 1;

2、如果該分區(qū)相同 Key 的消息過于集中,會(huì)導(dǎo)致每次拉取都是相同 key 的一批消息,同樣并行度退化成 orderlyConsumePartitionParallelism = 1。

綜合對(duì)比:

優(yōu)化前,ZMS 可保證整個(gè)分區(qū)消息的順序性,優(yōu)化后可根據(jù)消息 Key 在分區(qū)的基礎(chǔ)上不打亂相同 Key 消息的順序性前提下進(jìn)行并發(fā)消費(fèi),有效地提升了單分區(qū)的消費(fèi)吞吐量;優(yōu)化前,有很大的概率會(huì)退化成同一時(shí)刻單線程消費(fèi),優(yōu)化后盡可能至少保證每個(gè)分區(qū)一條線程消費(fèi),情況好的時(shí)候每個(gè)分區(qū)可多條線程消費(fèi)。

通過以上場景分析,該優(yōu)化方案不是提高順序消費(fèi)吞吐量的銀彈,它有很大的局限性,用戶在業(yè)務(wù)的實(shí)現(xiàn)上不能重度依賴順序消費(fèi)去實(shí)現(xiàn),以免影響業(yè)務(wù)性能上的需求。

到此,相信大家對(duì)“ Kafka順序消費(fèi)線程模型的優(yōu)化方法是什么”有了更深的了解,不妨來實(shí)際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!


本文題目:?Kafka順序消費(fèi)線程模型的優(yōu)化方法是什么
網(wǎng)站地址:http://weahome.cn/article/goeshd.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部