真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

怎樣深度剖析KafkaProducer的緩沖池機(jī)制

這期內(nèi)容當(dāng)中小編將會(huì)給大家?guī)碛嘘P(guān)怎樣深度剖析Kafka Producer的緩沖池機(jī)制,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

創(chuàng)新互聯(lián)建站專注于大慶網(wǎng)站建設(shè)服務(wù)及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗(yàn)。 熱誠為您提供大慶營銷型網(wǎng)站建設(shè),大慶網(wǎng)站制作、大慶網(wǎng)頁設(shè)計(jì)、大慶網(wǎng)站官網(wǎng)定制、成都微信小程序服務(wù),打造大慶網(wǎng)絡(luò)公司原創(chuàng)品牌,更為您提供大慶網(wǎng)站排名全網(wǎng)營銷落地服務(wù)。

深度剖析 Kafka Producer 的緩沖池機(jī)制

在新版的 Kafka Producer 中,設(shè)計(jì)了一個(gè)消息緩沖池,在創(chuàng)建 Producer 時(shí)會(huì)默認(rèn)創(chuàng)建一個(gè)大小為 32M 的緩沖池,也可以通過 buffer.memory 參數(shù)指定緩沖池的大小,同時(shí)緩沖池被切分成多個(gè)內(nèi)存塊,內(nèi)存塊的大小就是我們創(chuàng)建 Producer 時(shí)傳的 batch.size 大小,默認(rèn)大小 16384,而每個(gè) Batch 都會(huì)包含一個(gè) batch.size 大小的內(nèi)存塊,消息就是存放在內(nèi)存塊當(dāng)中。整個(gè)緩沖池的結(jié)構(gòu)如下圖所示:

怎樣深度剖析Kafka Producer的緩沖池機(jī)制

客戶端將消息追加到對(duì)應(yīng)主題分區(qū)的某個(gè) Batch 中,如果 Batch 已經(jīng)滿了,則會(huì)新建一個(gè) Batch,同時(shí)向緩沖池(RecordAccumulator)申請(qǐng)一塊大小為 batch.size 的內(nèi)存塊用于存儲(chǔ)消息。

當(dāng) Batch 的消息被發(fā)到了 Broker 后,Kafka Producer 就會(huì)移除該 Batch,既然 Batch 持有某個(gè)內(nèi)存塊,那必然就會(huì)涉及到 GC 問題。

頻繁的申請(qǐng)內(nèi)存,用完后就丟棄,必然導(dǎo)致頻繁的 GC,造成嚴(yán)重的性能問題。那么,Kafka 是怎么做到避免頻繁 GC 的呢?

前面說過了,緩沖池在設(shè)計(jì)邏輯上面被切分成一個(gè)個(gè)大小相等的內(nèi)存塊,當(dāng)消息發(fā)送完畢,歸還給緩沖池不就可以避免被回收了嗎?

緩沖池的內(nèi)存持有類是 BufferPool,我們先來看下 BufferPool 都有哪些成員:

public class BufferPool {  // 總的內(nèi)存大小  private final long totalMemory;  // 每個(gè)內(nèi)存塊大小,即 batch.size  private final int poolableSize;  // 申請(qǐng)、歸還內(nèi)存的方法的同步鎖  private final ReentrantLock lock;  // 空閑的內(nèi)存塊  private final Deque free;  // 需要等待空閑內(nèi)存塊的事件  private final Deque waiters;  /** Total available memory is the sum of nonPooledAvailableMemory and the number of byte buffers in free * poolableSize.  */  // 緩沖池還未分配的空閑內(nèi)存,新申請(qǐng)的內(nèi)存塊就是從這里獲取內(nèi)存值  private long nonPooledAvailableMemory; // ...}

從 BufferPool 的成員可看出,緩沖池實(shí)際上由一個(gè)個(gè) ByteBuffer 組成的,BufferPool 持有這些內(nèi)存塊,并保存在成員 free 中,free 的總大小由 totalMemory 作限制,而 nonPooledAvailableMemory 則表示還剩下緩沖池還剩下多少內(nèi)存還未被分配。

當(dāng) Batch 的消息發(fā)送完畢后,就會(huì)將它持有的內(nèi)存塊歸還到 free 中,以便后面的 Batch 申請(qǐng)內(nèi)存塊時(shí)不再創(chuàng)建新的 ByteBuffer,從 free 中取就可以了,從而避免了內(nèi)存塊被 JVM 回收的問題。

怎樣深度剖析Kafka Producer的緩沖池機(jī)制

接下來跟大家一起分析申請(qǐng)內(nèi)存和歸還內(nèi)存是如何實(shí)現(xiàn)的。

1、申請(qǐng)內(nèi)存

申請(qǐng)內(nèi)存的入口:

org.apache.kafka.clients.producer.internals.BufferPool#allocate

1)內(nèi)存足夠的情況

當(dāng)用戶請(qǐng)求申請(qǐng)內(nèi)存時(shí),如果發(fā)現(xiàn) free 中有空閑的內(nèi)存,則直接從中取:

if (size == poolableSize && !this.free.isEmpty()){
  return this.free.pollFirst(); 
}

這里的 size 即申請(qǐng)的內(nèi)存大小,它等于 Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));

即如果你的消息大小小于 batchSize,則申請(qǐng)的內(nèi)存大小為 batchSize,那么上面的邏輯就是如果申請(qǐng)的內(nèi)存大小等于 batchSize 并且 free 不空閑,則直接從 free 中獲取。

我們不妨想一下,為什么 Kafka 一定要申請(qǐng)內(nèi)存大小等于 batchSize,才能從 free 獲取空閑的內(nèi)存塊呢?

前面也說過,緩沖池的內(nèi)存塊大小是固定的,它等于 batchSize,如果申請(qǐng)的內(nèi)存比 batchSize 還大,說明一條消息所需要存放的內(nèi)存空間比內(nèi)存塊的內(nèi)存空間還要大,因此不滿足需求,不滿組需求怎么辦呢?我們接著往下分析:

// now check if the request is immediately satisfiable with the// memory on hand or if we need to blockint freeListSize = freeSize() * this.poolableSize;if (this.nonPooledAvailableMemory + freeListSize >= size) {  // we have enough unallocated or pooled memory to immediately  // satisfy the request, but need to allocate the buffer  freeUp(size);  this.nonPooledAvailableMemory -= size;}

freeListSize:指的是 free 中已經(jīng)分配好并且已經(jīng)回收的空閑內(nèi)存塊總大?。?/p>

nonPooledAvailableMemory:緩沖池還未分配的空閑內(nèi)存,新申請(qǐng)的內(nèi)存塊就是從這里獲取內(nèi)存值;

this.nonPooledAvailableMemory + freeListSize:即緩沖池中總的空閑內(nèi)存空間。

如果緩沖池的內(nèi)存空間比申請(qǐng)內(nèi)存大小要大,則調(diào)用 freeUp(size); 方法,接著將空閑的內(nèi)存大小減去申請(qǐng)的內(nèi)存大小。

private void freeUp(int size) {
  while (!this.free.isEmpty() && this.nonPooledAvailableMemory < size)
    this.nonPooledAvailableMemory += this.free.pollLast().capacity();
}

freeUp 這個(gè)方法很有趣,它的思想是這樣的:

如果未分配的內(nèi)存大小比申請(qǐng)的內(nèi)存還要小,那只能從已分配的內(nèi)存列表 free 中將內(nèi)存空間要回來,直到 nonPooledAvailableMemory 比申請(qǐng)內(nèi)存大為止。

2)內(nèi)存不足的情況

在我的「Kafka Producer 異步發(fā)送消息居然也會(huì)阻塞?」這篇文章當(dāng)中也提到了,當(dāng)緩沖池的內(nèi)存塊用完后,消息追加調(diào)用將會(huì)被阻塞,直到有空閑的內(nèi)存塊。

阻塞等待的邏輯是怎么實(shí)現(xiàn)的呢?

// we are out of memory and will have to block
int accumulated = 0;
Condition moreMemory = this.lock.newCondition();try {
  long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);  this.waiters.addLast(moreMemory);
  // loop over and over until we have a buffer or have reserved
  // enough memory to allocate one
  while (accumulated < size) {
    long startWaitNs = time.nanoseconds();    long timeNs;    boolean waitingTimeElapsed;    try {
      waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
    } finally {
      long endWaitNs = time.nanoseconds();      timeNs = Math.max(0L, endWaitNs - startWaitNs);
      recordWaitTime(timeNs);    }    if (waitingTimeElapsed) {
      throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
    }    remainingTimeToBlockNs -= timeNs;    // check if we can satisfy this request from the free list,
    // otherwise allocate memory
    if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
      // just grab a buffer from the free list
      buffer = this.free.pollFirst();
      accumulated = size;    } else {
      // we'll need to allocate memory, but we may only get      // part of what we need on this iteration      freeUp(size - accumulated);      int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);      this.nonPooledAvailableMemory -= got;      accumulated += got;    }  }

以上源碼的大致邏輯:

首先創(chuàng)建一個(gè)本次等待 Condition,并且把它添加到類型為 Deque 的 waiters 中(后面在歸還內(nèi)存中會(huì)喚醒),while 循環(huán)不斷收集空閑的內(nèi)存,直到內(nèi)存比申請(qǐng)內(nèi)存大時(shí)退出,在 while 循環(huán)過程中,調(diào)用 Condition#await 方法進(jìn)行阻塞等待,歸還內(nèi)存時(shí)會(huì)被喚醒,喚醒后會(huì)判斷當(dāng)前申請(qǐng)內(nèi)存是否大于 batchSize,如果等與 batchSize 則直接將歸還的內(nèi)存返回即可,如果當(dāng)前申請(qǐng)的內(nèi)存大于 大于 batchSize,則需要調(diào)用 freeUp 方法從 free 中釋放空閑的內(nèi)存出來,然后進(jìn)行累加,直到大于申請(qǐng)的內(nèi)存為止。

2、歸還內(nèi)存

申請(qǐng)內(nèi)存的入口:

org.apache.kafka.clients.producer.internals.BufferPool#deallocate(java.nio.ByteBuffer, int)

public void deallocate(ByteBuffer buffer, int size) {
  lock.lock();
  try {
    if (size == this.poolableSize && size == buffer.capacity()) {
      buffer.clear();      this.free.add(buffer);
    } else {
      this.nonPooledAvailableMemory += size;
    }    Condition moreMem = this.waiters.peekFirst();
    if (moreMem != null)
      moreMem.signal();  } finally {
    lock.unlock();
  }}

歸還內(nèi)存塊的邏輯比較簡單:

如果歸還的內(nèi)存塊大小等于 batchSize,則將其清空后添加到緩沖池的 free 中,即將其歸還給緩沖池,避免了 JVM GC 回收該內(nèi)存塊。如果不等于呢?直接將內(nèi)存大小累加到未分配并且空閑的內(nèi)存大小值中即可,內(nèi)存就無需歸還了,等待 JVM GC 回收掉,最后喚醒正在等待空閑內(nèi)存的線程。

怎樣深度剖析Kafka Producer的緩沖池機(jī)制

經(jīng)過以上的源碼分析之后,給大家指出需要注意的一個(gè)問題,這會(huì)給 Producer 端帶來嚴(yán)重的性能影響:

如果你的消息大小比 batchSize 還要大,則不會(huì)從 free 中循環(huán)獲取已分配好的內(nèi)存塊,而是重新創(chuàng)建一個(gè)新的 ByteBuffer,并且該 ByteBuffer 不會(huì)被歸還到緩沖池中(JVM GC 回收),如果此時(shí) nonPooledAvailableMemory 比消息體還要小,還會(huì)將 free 中空閑的內(nèi)存塊銷毀(JVM GC 回收),以便緩沖池中有足夠的內(nèi)存空間提供給用戶申請(qǐng),這些動(dòng)作都會(huì)導(dǎo)致頻繁 GC 的問題出現(xiàn)。

因此,需要根據(jù)業(yè)務(wù)消息的大小,適當(dāng)調(diào)整 batch.size 的大小,避免頻繁 GC。

上述就是小編為大家分享的怎樣深度剖析Kafka Producer的緩沖池機(jī)制了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。


當(dāng)前名稱:怎樣深度剖析KafkaProducer的緩沖池機(jī)制
網(wǎng)頁鏈接:http://weahome.cn/article/pogcgo.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部