如圖.1中,kafka 相關(guān)名詞解釋如下:
成都創(chuàng)新互聯(lián)公司主要從事成都網(wǎng)站設(shè)計(jì)、成都網(wǎng)站建設(shè)、網(wǎng)頁設(shè)計(jì)、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務(wù)。立足成都服務(wù)獻(xiàn)縣,10余年網(wǎng)站建設(shè)經(jīng)驗(yàn),價(jià)格優(yōu)惠、服務(wù)專業(yè),歡迎來電咨詢建站服務(wù):18980820575
1.producer: 消息生產(chǎn)者,發(fā)布消息到 kafka 集群的終端或服務(wù)。 2.broker: kafka 集群中包含的服務(wù)器。 3.topic: 每條發(fā)布到 kafka 集群的消息屬于的類別,即 kafka 是面向 topic 的。 4.partition: partition 是物理上的概念,每個(gè) topic 包含一個(gè)或多個(gè) partition。kafka 分配的單位是 partition。 5.consumer: 從 kafka 集群中消費(fèi)消息的終端或服務(wù)。 6.Consumer group: high-level consumer API 中,每個(gè) consumer 都屬于一個(gè) consumer group,每條消息只能被 consumer group 中的一個(gè) Consumer 消費(fèi),但可以被多個(gè) consumer group 消費(fèi)。 7.replica: partition 的副本,保障 partition 的高可用。 8.leader: replica 中的一個(gè)角色, producer 和 consumer 只跟 leader 交互。 9.follower: replica 中的一個(gè)角色,從 leader 中復(fù)制數(shù)據(jù)。 10.controller: kafka 集群中的其中一個(gè)服務(wù)器,用來進(jìn)行 leader election 以及 各種 failover。 12.zookeeper: kafka 通過 zookeeper 來存儲(chǔ)集群的 meta 信息。
kafka 在 zookeeper 中的存儲(chǔ)結(jié)構(gòu)
producer 采用 push 模式將消息發(fā)布到 broker,每條消息都被 append 到 patition 中,屬于順序?qū)懘疟P(順序?qū)懘疟P效率比隨機(jī)寫內(nèi)存要高,保障 kafka 吞吐率)。
producer 發(fā)送消息到 broker 時(shí),會(huì)根據(jù)分區(qū)算法選擇將其存儲(chǔ)到哪一個(gè) partition。其路由機(jī)制為:
1. 指定了 patition,則直接使用; 2. 未指定 patition 但指定 key,通過對(duì) key 的 value 進(jìn)行hash 選出一個(gè) patition 3. patition 和 key 都未指定,使用輪詢選出一個(gè) patition。
附上 Java 客戶端分區(qū)源碼,一目了然:
//創(chuàng)建消息實(shí)例 public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) { if (topic == null) throw new IllegalArgumentException("Topic cannot be null"); if (timestamp != null && timestamp < 0) throw new IllegalArgumentException("Invalid timestamp " + timestamp); this.topic = topic; this.partition = partition; this.key = key; this.value = value; this.timestamp = timestamp; } //計(jì)算 patition,如果指定了 patition 則直接使用,否則使用 key 計(jì)算 private int partition(ProducerRecordrecord, byte[] serializedKey , byte[] serializedValue, Cluster cluster) { Integer partition = record.partition(); if (partition != null) { List partitions = cluster.partitionsForTopic(record.topic()); int lastPartition = partitions.size() - 1; if (partition < 0 || partition > lastPartition) { throw new IllegalArgumentException(String.format("Invalid partition given with record: %d is not in the range [0...%d].", partition, lastPartition)); } return partition; } return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); } // 使用 key 選取 patition public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement(); List availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { return DefaultPartitioner.toPositive(nextValue) % numPartitions; } } else { //對(duì) keyBytes 進(jìn)行 hash 選出一個(gè) patition return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
producer 寫入消息序列:
流程說明:
1. producer 先從 zookeeper 的 "/brokers/.../state" 節(jié)點(diǎn)找到該 partition 的 leader 2. producer 將消息發(fā)送給該 leader 3. leader 將消息寫入本地 log 4. followers 從 leader pull 消息,寫入本地 log 后 leader 發(fā)送 ACK 5. leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 發(fā)送 ACK
愿意了解更多技術(shù)分享的可關(guān)注:mingli.com
朋友需要請(qǐng)加球球:二零四二八四九二三七
一般情況下存在三種情況:
1. At most once 消息可能會(huì)丟,但絕不會(huì)重復(fù)傳輸 2. At least one 消息絕不會(huì)丟,但可能會(huì)重復(fù)傳輸 3. Exactly once 每條消息肯定會(huì)被傳輸一次且僅傳輸一次
當(dāng) producer 向 broker 發(fā)送消息時(shí),一旦這條消息被 commit,由于 replication 的存在,它就不會(huì)丟。但是如果 producer 發(fā)送數(shù)據(jù)給 broker 后,遇到網(wǎng)絡(luò)問題而造成通信中斷,那 Producer 就無法判斷該條消息是否已經(jīng) commit。雖然 Kafka 無法確定網(wǎng)絡(luò)故障期間發(fā)生了什么,但是 producer 可以生成一種類似于主鍵的東西,發(fā)生故障時(shí)冪等性的重試多次,這樣就做到了 Exactly once,但目前還并未實(shí)現(xiàn)。所以目前默認(rèn)情況下一條消息從 producer 到 broker 是確保了 At least once,可通過設(shè)置 producer 異步發(fā)送實(shí)現(xiàn)At most once。