[TOC]
專注于為中小企業(yè)提供網(wǎng)站制作、成都網(wǎng)站設(shè)計服務(wù),電腦端+手機端+微信端的三站合一,更高效的管理,為中小企業(yè)海林免費做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動了上千多家企業(yè)的穩(wěn)健成長,幫助中小企業(yè)通過網(wǎng)站建設(shè)實現(xiàn)規(guī)模擴充和轉(zhuǎn)變。
? Kafka是一個分布式消息隊列,采用scala語言開發(fā)。Kafka對消息保存時根據(jù)Topic進行歸類,發(fā)送消息者稱為Producer,消息接受者稱為Consumer,此外kafka集群有多個kafka實例組成,每個實例(server)成為broker。無論是kafka集群,還是producer和consumer都依賴于zookeeper集群保存一些meta信息,來保證系統(tǒng)可用性。
(1)點對點模式(類似接受文件,一對一,消費者主動拉取數(shù)據(jù),消息收到后消息清除)點對點模型通常是一個基于拉取或者輪詢的消息傳送模型,這種模型從隊列中請求信息,而不是將消息推送到客戶端。這個模型的特點是發(fā)送到隊列的消息被一個且只有一個接收者接收處理,即使有多個消息監(jiān)聽者也是如此。
(2)發(fā)布/訂閱模式(類似公眾號,一對多,數(shù)據(jù)生產(chǎn)后,推送給所有訂閱者)
發(fā)布訂閱模型則是一個基于推送的消息傳送模型。發(fā)布訂閱模型可以有多種不同的訂閱者,臨時訂閱者只在主動監(jiān)聽主題時才接收消息,而持久訂閱者則監(jiān)聽主題的所有消息,即使當(dāng)前訂閱者不可用,處于離線狀態(tài)。
1)解耦:
允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。
2)冗余:
消息隊列把數(shù)據(jù)進行持久化直到它們已經(jīng)被完全處理,通過這一方式規(guī)避了數(shù)據(jù)丟失風(fēng)險。許多消息隊列所采用的"插入-獲取-刪除"范式中,在把一個消息從隊列中刪除之前,需要你的處理系統(tǒng)明確的指出該消息已經(jīng)被處理完畢,從而確保你的數(shù)據(jù)被安全的保存直到你使用完畢。
3)擴展性:
因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。
4)靈活性 & 峰值處理能力:
在訪問量劇增的情況下,應(yīng)用仍然需要繼續(xù)發(fā)揮作用,但是這樣的突發(fā)流量并不常見。如果為以能處理這類峰值訪問為標(biāo)準(zhǔn)來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關(guān)鍵組件頂住突發(fā)的訪問壓力,而不會因為突發(fā)的超負(fù)荷的請求而完全崩潰。
5)可恢復(fù)性:
系統(tǒng)的一部分組件失效時,不會影響到整個系統(tǒng)。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統(tǒng)恢復(fù)后被處理。
6)順序保證:
在大多使用場景下,數(shù)據(jù)處理的順序都很重要。大部分消息隊列本來就是排序的,并且能保證數(shù)據(jù)會按照特定的順序來處理。(Kafka保證一個Partition內(nèi)的消息的有序性,無法保證整體有序,觸發(fā)一個topic只有一個partition)
7)緩沖:
有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過系統(tǒng)的速度,解決生產(chǎn)消息和消費消息的處理速度不一致的情況。
8)異步通信:
很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但并不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。
? 消息由生產(chǎn)者發(fā)布到Kafka集群后,會被消費者消費。消息的消費模型有兩種:推送模型(push)和拉取模型(pull)。
? 基于推送模型(push)的消息系統(tǒng),由消息代理記錄消費者的消費狀態(tài)。消息代理在將消息推送到消費者后,標(biāo)記這條消息為已消費,但這種方式無法很好地保證消息被處理。比如,消息代理把消息發(fā)送出去后,當(dāng)消費進程掛掉或者由于網(wǎng)絡(luò)原因沒有收到這條消息時,就有可能造成消息丟失(因為消息代理已經(jīng)把這條消息標(biāo)記為已消費了,但實際上這條消息并沒有被實際處理)。如果要保證消息被處理,消息代理發(fā)送完消息后,要設(shè)置狀態(tài)為“已發(fā)送”,只有收到消費者的確認(rèn)請求后才更新為“已消費”,這就需要消息代理中記錄所有的消費狀態(tài),這種做法顯然是不可取的。
? Kafka采用拉取模型,由消費者自己記錄消費狀態(tài),每個消費者互相獨立地順序讀取每個分區(qū)的消息。如下圖所示,有兩個消費者(不同消費者組)拉取同一個主題的消息,消費者A的消費進度是3,消費者B的消費進度是6。消費者拉取的最大上限通過最高水位(watermark)控制(也就是只能取到當(dāng)前topic的最后一條消息),生產(chǎn)者最新寫入的消息如果還沒有達到備份數(shù)量(也就是要保證副本數(shù)寫入完成,從而保證消息不丟失,由此才讓該消息給消費者消費),對消費者是不可見的。這種由消費者控制偏移量的優(yōu)點是:消費者可以按照任意的順序消費消息。比如,消費者可以重置到舊的偏移量,重新處理之前已經(jīng)消費過的消息;或者直接跳到最近的位置,從當(dāng)前的時刻開始消費。
? 圖1.1 kafka消費模型
? 在一些消息系統(tǒng)中,消息代理會在消息被消費之后立即刪除消息。如果有不同類型的消費者訂閱同一個主題,消息代理可能需要冗余地存儲同一消息;或者等所有消費者都消費完才刪除,這就需要消息代理跟蹤每個消費者的消費狀態(tài),這種設(shè)計很大程度上限制了消息系統(tǒng)的整體吞吐量和處理延遲。Kafka的做法是生產(chǎn)者發(fā)布的所有消息會一致保存在Kafka集群中,不管消息有沒有被消費。用戶可以通過設(shè)置保留時間來清理過期的數(shù)據(jù),比如,設(shè)置保留策略為兩天。那么,在消息發(fā)布之后,它可以被不同的消費者消費,在兩天之后,過期的消息就會自動清理掉。
1)Producer :消息生產(chǎn)者,就是向kafka broker發(fā)消息的客戶端。
2)Consumer :消息消費者,向kafka broker取 消息的客戶端
3)Topic :可以理解為一個隊列。是消息的一個分組
4) Consumer Group (CG):kafka提供的可擴展且具有容錯性的消費者機制。既然是一個組,那么組內(nèi)必然可以有多個消費者或消費者實例(consumer instance),它們共享一個公共的ID,即group ID。組內(nèi)的所有消費者協(xié)調(diào)在一起來消費訂閱主題(subscribed topics)的所有分區(qū)(partition)。當(dāng)然,每個分區(qū)只能由同一個消費組內(nèi)的一個consumer來消費。但是不同消費者組消費同一個topic是可以的,而且互不影響。消費者可以通過水平擴展的方式同時讀取大量的消息。另外,如果一個消費者失敗了,那么其他的group成員會自動負(fù)載均衡讀取之前失敗的消費者讀取的分區(qū)
5)Broker :一臺kafka服務(wù)器就是一個broker。一個集群由多個broker組成。一個broker可以容納多個topic。
6)Partition:為了實現(xiàn)擴展性,一個非常大的topic可以分布到多個broker(即服務(wù)器)上,一個topic可以分為多個partition,每個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序?qū)⑾l(fā)給consumer,不保證一個topic的整體(多個partition間)的順序。
7)Offset:kafka的存儲文件都是按照offset.kafka來命名,用offset做名字的好處是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。當(dāng)然the first offset就是00000000000.kafka
一般來說,假設(shè) "test" 這個topic有兩個分區(qū),那么該topic的存儲目錄有兩個,命名為:test-0,test-1 ,然后對應(yīng)分區(qū)的目錄保存對應(yīng)的數(shù)據(jù)
首先kafka依賴zookeeper存儲元信息、且需要jdk來運行程序。所以需要事先部署好這兩個。請看之前的文章。
準(zhǔn)備好三臺虛擬機:
bigdata121 | bigdata122 | bigdata123 |
---|---|---|
zookeeper1 | zookeeper2 | zookeeper3 |
kafka1 | kafka | kafka3 |
軟件版本:
jdk | 1.8 |
---|---|
zookeeper | 3.4.10 |
kafka | 2.1.1 |
centos | 7.2.1511 |
bigdata121:
1、解壓:
tar zxf kafka_2.11-2.1.1.tgz -C /opt/modules/
2、創(chuàng)建日志目錄:
mkdir /opt/modules/kafka_2.11-2.1.1/logs
3、修改kafka server配置文件:
vim /opt/modules/kafka_2.11-2.1.1/config/server.properties
#### 修改一些關(guān)鍵性配置
#broker的全局唯一編號,不能重復(fù)
broker.id=0
#是否允許刪除topic,測試環(huán)境方便測試設(shè)置為true,生產(chǎn)環(huán)境建議設(shè)置為false
delete.topic.enable=true
#kafka運行日志存放的路徑
log.dirs=/opt/modules/kafka_2.11-2.1.1/logs
#配置連接Zookeeper集群地址,并且/path/to 是指定在zookeeper中存儲的根節(jié)點路徑,比如 /root
zookeeper.connect=bigdata121:2181,bigdata122:2181,bigdata123:2181/path/to
4、配置環(huán)境變量
vim /etc/profile.d/kafka.sh
#!/bin/bash
export KAFKA_HOME=/opt/modules/kafka_2.11-2.1.1
export PATH=$PATH:${KAFKA_HOME}/bin
5、啟用環(huán)境變量
source /etc/profile.d/kafka.sh
配置好后,將kafka的整個目錄rsync到其他兩臺主機的 /opt/modules 下,并修改
/opt/modules/kafka_2.11-2.1.1/config/server.properties 這個配置文件
broker.id=1、broker.id=2
反正就是每個broker的id必須唯一
分別在三臺機器上啟動kafka集群節(jié)點:
kafka-server-start.sh -daemon config/server.properties
-daemon 表示以后臺進程方式啟動kafka服務(wù)
config/server.properties server的配置文件路徑
停止當(dāng)前節(jié)點:
kafka-server-stop.sh
1)查看當(dāng)前服務(wù)器中的所有topic
[root@bigdata11 kafka]$ bin/kafka-topics.sh --zookeeper bigdata13:2181 --list
2)創(chuàng)建topic
[root@bigdata11 kafka]$ bin/kafka-topics.sh --zookeeper bigdata13:2181 --create --replication-factor 3 --partitions 1 --topic first
選項說明:
--topic 定義topic名
--replication-factor 定義副本數(shù)
--partitions 定義分區(qū)數(shù)
3)刪除topic
[root@bigdata11 kafka]$ bin/kafka-topics.sh --zookeeper bigdata11:2181 --delete --topic first
需要server.properties中設(shè)置delete.topic.enable=true否則只是標(biāo)記刪除或者直接重啟。
4)發(fā)送消息
[root@bigdata11 kafka]$ bin/kafka-console-producer.sh --broker-list bigdata11:9092 --topic first
>hello world
5)消費消息
[root@bigdata12 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server node3:9092 --from-beginning --topic first
--from-beginning:會把first主題中以往所有的數(shù)據(jù)都讀取出來。根據(jù)業(yè)務(wù)場景選擇是否增加該配置。
6)查看某個Topic的詳情
[root@bigdata11 kafka]$ bin/kafka-topics.sh --zookeeper bigdata11:2181 --describe --topic first
? producer采用推(push)模式將消息發(fā)布到broker,每條消息都被追加(append)到分區(qū)(patition)中,屬于順序?qū)懘疟P(順序?qū)懘疟P效率比隨機寫內(nèi)存要高,保障kafka吞吐率)
? Kafka集群有多個消息代理服務(wù)器(broker-server)組成,發(fā)布到Kafka集群的每條消息都有一個類別,用主題(topic)來表示。通常,不同應(yīng)用產(chǎn)生不同類型的數(shù)據(jù),可以設(shè)置不同的主題。一個主題一般會有多個消息的訂閱者,當(dāng)生產(chǎn)者發(fā)布消息到某個主題時,訂閱了這個主題的消費者都可以接收到生成者寫入的新消息。
? Kafka集群為每個主題維護了分布式的分區(qū)(partition)日志文件,物理意義上可以把主題(topic)看作進行了分區(qū)的日志文件(partition log)。主題的每個分區(qū)都是一個有序的、不可變的記錄序列,新的消息會不斷追加到日志中。分區(qū)中的每條消息都會按照時間順序分配到一個單調(diào)遞增的順序編號,叫做偏移量(offset),這個偏移量能夠唯一地定位當(dāng)前分區(qū)中的每一條消息。
? 消息發(fā)送時都被發(fā)送到一個topic,其本質(zhì)就是一個目錄,而topic是由一些Partition Logs(分區(qū)日志)組成,其組織結(jié)構(gòu)如下圖所示:下圖中的topic有3個分區(qū),每個分區(qū)的偏移量都從0開始,不同分區(qū)之間的偏移量都是獨立的,不會相互影響。
? 圖3.1 kafka寫入方式
? 圖3.2 kafka分區(qū)讀取
? 我們可以看到,每個Partition中的消息都是有序的,生產(chǎn)的消息被不斷追加到Partition log上,其中的每一個消息都被賦予了一個唯一的offset值。
發(fā)布到Kafka主題的每條消息包括鍵值和時間戳。消息到達服務(wù)器端的指定分區(qū)后,都會分配到一個自增的偏移量。原始的消息內(nèi)容和分配的偏移量以及其他一些元數(shù)據(jù)信息最后都會存儲到分區(qū)日志文件中。消息的鍵也可以不用設(shè)置,這種情況下消息會均衡地分布到不同的分區(qū)。
(1)方便在集群中擴展,每個Partition可以通過調(diào)整以適應(yīng)它所在的機器,而一個topic又可以有多個Partition組成,因此整個集群就可以適應(yīng)任意大小的數(shù)據(jù)了;
(2)可以提高并發(fā),因為可以以Partition為單位讀寫了。
傳統(tǒng)消息系統(tǒng)在服務(wù)端保持消息的順序,如果有多個消費者消費同一個消息隊列,服務(wù)端會以消費存儲的順序依次發(fā)送給消費者。但由于消息是異步發(fā)送給消費者的,消息到達消費者的順序可能是無序的,這就意味著在并行消費時,傳統(tǒng)消息系統(tǒng)無法很好地保證消息被順序處理。雖然我們可以設(shè)置一個專用的消費者只消費一個隊列,以此來解決消息順序的問題,但是這就使得消費處理無法真正執(zhí)行。
Kafka比傳統(tǒng)消息系統(tǒng)有更強的順序性保證,它使用主題的分區(qū)作為消息處理的并行單元。Kafka以分區(qū)作為最小的粒度,將每個分區(qū)分配給消費者組中不同的而且是唯一的消費者,并確保一個分區(qū)只屬于一個消費者,即這個消費者就是這個分區(qū)的唯一讀取線程。那么,只要分區(qū)的消息是有序的,消費者處理的消息順序就有保證。每個主題有多個分區(qū),不同的消費者處理不同的分區(qū),所以Kafka不僅保證了消息的有序性,也做到了消費者的負(fù)載均衡。
(1)指定了patition,則直接使用;
(2)未指定patition但指定key,通過對key進行hash出一個patition
(3)patition和key都未指定,使用輪詢選出一個patition。
下面看看這個默認(rèn)的partition實現(xiàn)類的源碼:
DefaultPartitioner類
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
//未指定key,輪詢獲取分區(qū)號
if (keyBytes == null) {
int nextValue = nextValue(topic);
List availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
//這里就是當(dāng)指定了key時,對key進行hash來獲取分區(qū)號
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
? 同一個partition可能會有多個replication(對應(yīng) server.properties 配置中的 default.replication.factor=N)。沒有replication的情況下,一旦broker 宕機,其上所有 patition 的數(shù)據(jù)都不可被消費(數(shù)據(jù)直接丟失了),同時producer也不能再將數(shù)據(jù)存于其上的patition。引入replication之后,同一個partition可能會有多個replication,而這時需要在這些replication之間選出一個leader,producer和consumer只與這個leader交互(讀寫操作都只會和leader交互),其它replication作為follower從leader 中復(fù)制數(shù)據(jù),不會執(zhí)行其他操作。當(dāng)leader掛了時,會在follower中選出新的leader。
1) producer先從zookeeper的 "/brokers/.../state"節(jié)點找到該partition的leader
比如完整的路徑如:/brokers/topics/TOPIC_NAME/partitions/NUM_OF_PARTITION/state
2)producer將消息發(fā)送給該leader
3)leader將消息寫入本地log
4)followers從leader pull消息,寫入本地log后向leader發(fā)送ACK
5)leader收到所有ISR中的replication的ACK后,增加HW(high watermark,最后commit 的offset)并向producer發(fā)送ACK
物理上把topic分成一個或多個patition(對應(yīng) server.properties 中的num.partitions=3配置,這是默認(rèn)分區(qū)個數(shù),創(chuàng)建topic時可以手動指定分區(qū)個數(shù)),每個patition物理上對應(yīng)一個文件夾(該文件夾存儲該patition的所有消息和索引文件),如下:
分區(qū)目錄命名方式為 topicName-partiontionNum 的形式
首先,我們創(chuàng)建了first這個topic,有三個partition,0、1、2
[root@bigdata11 logs]$ ll
drwxrwxr-x. 2 root root 4096 8月 6 14:37 first-0
drwxrwxr-x. 2 root root 4096 8月 6 14:35 first-1
drwxrwxr-x. 2 root root 4096 8月 6 14:37 first-2
[root@bigdata11 logs]$ cd first-0
[root@bigdata11 first-0]$ ll
-rw-rw-r--. 1 root root 10485760 8月 6 14:33 00000000000000000000.index 這是索引
-rw-rw-r--. 1 root root 219 8月 6 15:07 00000000000000000000.log 這是分區(qū)日志,也就是存儲消息的地方
-rw-rw-r--. 1 root root 10485756 8月 6 14:33 00000000000000000000.timeindex
-rw-rw-r--. 1 root root 8 8月 6 14:37 leader-epoch-checkpoint
前面說到,無論消息是否被消費,kafka都會保留所有消息,消費者可以根據(jù)需要隨時從需要offset消費數(shù)據(jù)。有兩種策略可以刪除舊數(shù)據(jù):
1)基于時間:log.retention.hours=168,也就是默認(rèn)刪除7天前的數(shù)據(jù)
2)基于大?。簂og.retention.bytes=1073741824,超過1GB刪除
需要注意的是,因為Kafka讀取特定消息的時間復(fù)雜度為O(1)(因為是通過索引直接定位讀取,所以和大小無關(guān)),即與文件大小無關(guān),所以這里刪除過期文件與提高 Kafka 性能無關(guān)。
3.2.3 zookeeper存儲結(jié)構(gòu)
zookeeper存儲了整個kafka集群的一些元信息,比如有哪些broker,哪些topic等。下面看看結(jié)構(gòu):
? 圖3.3 zookeeper存儲結(jié)構(gòu)
其中某些目錄的作用如下:
/brokers/topics/TOPIC_NAME/partitions/PARTITION_NUM/state:
指定topic的指定分區(qū)的元信息,里面存儲了該分區(qū)leader所在broker的id,以及所有副本存儲在哪些broker中。
/brokers/ids/xxxx:
有哪些broker,以及對應(yīng)的id
/consumer:
注冊的consumer的信息,例如消費者組id、消費的topic、消費的offset、消費者組中的哪個消費者消費哪個partition等
要注意的是,只有consumer會在zookeeper注冊,producer不會在zookeeper注冊
kafka支持高級api和低級api進行操作。
1)高級API優(yōu)點
高級API 寫起來簡單
不需要自行去管理offset,系統(tǒng)通過zookeeper自行管理。
不需要管理分區(qū),副本等情況,系統(tǒng)自動管理。
消費者斷線會自動根據(jù)上一次記錄在zookeeper中的offset去接著獲取數(shù)據(jù)(默認(rèn)設(shè)置1分鐘更新一下zookeeper中存的offset)
可以使用group來區(qū)分對同一個topic 的不同程序訪問分離開來(不同的group記錄不同的offset,這樣不同程序讀取同一個topic才不會因為offset互相影響)
2)高級API缺點
不能自行控制offset(對于某些特殊需求來說)
不能細(xì)化控制如分區(qū)、副本、zk等
1)低級 API 優(yōu)點
能夠讓開發(fā)者自己控制offset,想從哪里讀取就從哪里讀取。
自行控制連接分區(qū),對分區(qū)自定義進行負(fù)載均衡
對zookeeper的依賴性降低(如:offset不一定非要靠zk存儲,自行存儲offset即可,比如存在文件或者內(nèi)存中)
2)低級API缺點
太過復(fù)雜,需要自行控制offset,連接哪個分區(qū),找到分區(qū)leader 等。
? 消費者是以consumer group消費者組的方式工作,由一個或者多個消費者組成一個組,共同消費一個topic。每個分區(qū)在同一時間只能由group中的一個消費者讀取,但是多個group可以同時消費這個partition。某個消費者讀取某個分區(qū),也可以叫做某個消費者是某個分區(qū)的擁有者。
? 在這種情況下,消費者可以通過水平擴展的方式同時讀取大量的消息。另外,如果一個消費者失敗了,那么其他的group成員會自動負(fù)載均衡讀取之前失敗的消費者讀取的分區(qū)。
? consumer采用pull(拉)模式從broker中讀取數(shù)據(jù)。
? push(推)模式很難適應(yīng)消費速率不同的消費者,因為消息發(fā)送速率是由broker決定的。它的目標(biāo)是盡可能以最快速度傳遞消息,但是這樣很容易造成consumer來不及處理消息,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞。而pull模式則可以根據(jù)consumer的消費能力以適當(dāng)?shù)乃俾氏M消息。
? 對于Kafka而言,pull模式更合適,它可簡化broker的設(shè)計,consumer可自主控制消費消息的速率,同時consumer可以自己控制消費方式——即可批量消費也可逐條消費,同時還能選擇不同的提交方式從而實現(xiàn)不同的傳輸語義。
? pull模式不足之處是,如果kafka沒有數(shù)據(jù),消費者可能會陷入循環(huán)中,一直等待數(shù)據(jù)到達。為了避免這種情況,我們在我們的拉請求中有參數(shù),允許消費者請求在等待數(shù)據(jù)到達的“長輪詢”中進行阻塞(并且可選地等待到給定的字節(jié)數(shù),以確保大的傳輸大小)。
idea創(chuàng)建maven工程,添加kafka依賴:
org.apache.kafka
kafka_2.12
2.1.1
org.apache.kafka
kafka-streams
2.1.1
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class OldProducer {
@SuppressWarnings("deprecation")
public static void main(String[] args) {
Properties properties = new Properties();
//指定broker地址列表
properties.put("metadata.broker.list", "bigdata11:9092");
//指定producer需要broker發(fā)送ack確認(rèn)收到消息
properties.put("request.required.acks", "1");
//指定序列化類
properties.put("serializer.class", "kafka.serializer.StringEncoder");
//使用上面的配置項創(chuàng)建kafka producer
Producer producer = new Producer(new ProducerConfig(properties));
//發(fā)送消息
KeyedMessage message = new KeyedMessage("first", "hello world");
producer.send(message );
}
}
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class NewProducer {
public static void main(String[] args) {
Properties props = new Properties();
// Kafka服務(wù)端的主機名和端口號
props.put("bootstrap.servers", "bigdata12:9092");
// 等待所有副本節(jié)點的應(yīng)答
props.put("acks", "all");
// 消息發(fā)送最大嘗試次數(shù)
props.put("retries", 0);
// 一批消息處理大小
props.put("batch.size", 16384);
// 請求延時
props.put("linger.ms", 1);
// 發(fā)送緩存區(qū)內(nèi)存大小
props.put("buffer.memory", 33554432);
// key序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer<>(props);
for (int i = 0; i < 50; i++) {
producer.send(new ProducerRecord("first", Integer.toString(i), "hello world-" + i));
}
producer.close();
}
}
package com.king.kafka;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class CallBackProducer {
public static void main(String[] args) {
Properties props = new Properties();
// Kafka服務(wù)端的主機名和端口號
props.put("bootstrap.servers", "bigdata12:9092");
// 等待所有副本節(jié)點的應(yīng)答
props.put("acks", "all");
// 消息發(fā)送最大嘗試次數(shù)
props.put("retries", 0);
// 一批消息處理大小
props.put("batch.size", 16384);
// 增加服務(wù)端請求延時
props.put("linger.ms", 1);
// 發(fā)送緩存區(qū)內(nèi)存大小
props.put("buffer.memory", 33554432);
// key序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer kafkaProducer = new KafkaProducer<>(props);
for (int i = 0; i < 50; i++) {
kafkaProducer.send(new ProducerRecord("first", "hello" + i), new Callback() {
//重寫里面的回到方法
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (metadata != null) {
System.out.println(metadata.partition() + "---" + metadata.offset());
}
}
});
}
kafkaProducer.close();
}
}
舊api:
import java.util.Map;
import kafka.producer.Partitioner;
public class CustomPartitioner implements Partitioner {
public CustomPartitioner() {
super();
}
@Override
public int partition(Object key, int numPartitions) {
// 控制分區(qū)
return 0;
}
}
新api:
import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
public class CustomPartitioner implements Partitioner {
@Override
public void configure(Map configs) {
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 控制分區(qū)
return 0;
}
@Override
public void close() {
}
}
實現(xiàn)好自定義的分區(qū)類之后,需要在創(chuàng)建producer的配置項添加指定自定義分區(qū)類的配置:
properties.put("partitioner.class", "自定義的分區(qū)類名,需要全類名");
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class CustomConsumer {
@SuppressWarnings("deprecation")
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("zookeeper.connect", "bigdata11:2181");
properties.put("group.id", "g1");
properties.put("zookeeper.session.timeout.ms", "500");
properties.put("zookeeper.sync.time.ms", "250");
properties.put("auto.commit.interval.ms", "1000");
// 創(chuàng)建消費者連接器
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
//需要自己維護offset
HashMap topicCount = new HashMap<>();
topicCount.put("first", 1);
Map>> consumerMap = consumer.createMessageStreams(topicCount);
KafkaStream stream = consumerMap.get("first").get(0);
ConsumerIterator it = stream.iterator();
while (it.hasNext()) {
System.out.println(new String(it.next().message()));
}
}
}
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class CustomNewConsumer {
public static void main(String[] args) {
Properties props = new Properties();
// 定義kakfa 服務(wù)的地址,不需要將所有broker指定上
props.put("bootstrap.servers", "bigdata11:9092");
// 制定consumer group
props.put("group.id", "test");
// 是否自動確認(rèn)offset
props.put("enable.auto.commit", "true");
// 自動確認(rèn)offset的時間間隔
props.put("auto.commit.interval.ms", "1000");
// key的序列化類
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// value的序列化類
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 定義consumer
KafkaConsumer consumer = new KafkaConsumer<>(props);
// 消費者訂閱的topic, 可同時訂閱多個
consumer.subscribe(Arrays.asList("first", "second","third"));
while (true) {
// 讀取數(shù)據(jù),讀取超時時間為100ms
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
? Producer攔截器(interceptor)是在Kafka 0.10版本被引入的,主要用于實現(xiàn)clients端的定制化控制邏輯。對于producer而言,interceptor使得用戶在消息發(fā)送前以及producer回調(diào)邏輯前有機會對消息做一些定制化需求,比如修改消息等。同時,producer允許用戶指定多個interceptor按序作用于同一條消息從而形成一個攔截鏈(interceptor chain)。Intercetpor的實現(xiàn)接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定義的方法包括:
(1)configure(configs)
獲取配置信息和初始化數(shù)據(jù)時調(diào)用。
(2)onSend(ProducerRecord):
該方法封裝進KafkaProducer.send方法中,即它運行在用戶主線程中。Producer確保在消息被序列化以及計算分區(qū)前調(diào)用該方法。用戶可以在該方法中對消息做任何操作,但最好保證不要修改消息所屬的topic和分區(qū),否則會影響目標(biāo)分區(qū)的計算
(3)onAcknowledgement(RecordMetadata, Exception):
該方法會在消息被應(yīng)答或消息發(fā)送失敗時調(diào)用,并且通常都是在producer回調(diào)邏輯觸發(fā)之前。onAcknowledgement運行在producer的IO線程中,因此不要在該方法中放入很重的邏輯,否則會拖慢producer的消息發(fā)送效率
(4)close:
關(guān)閉interceptor,主要用于執(zhí)行一些資源清理工作
如前所述,interceptor可能被運行在多個線程中,因此在具體實現(xiàn)時用戶需要自行確保線程安全。另外倘若指定了多個interceptor,則producer將按照指定順序調(diào)用它們,并僅僅是捕獲每個interceptor可能拋出的異常記錄到錯誤日志中而非在向上傳遞。這在使用過程中要特別留意。
需求:
實現(xiàn)一個簡單的雙interceptor組成的攔截鏈。第一個interceptor會在消息發(fā)送前將時間戳信息加到消息value的最前部;第二個interceptor會在消息發(fā)送后更新成功發(fā)送消息數(shù)或失敗發(fā)送消息數(shù)。
程序:
(1)實現(xiàn)時間攔截器:
package com.king.kafka.interceptor;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class TimeInterceptor implements ProducerInterceptor {
@Override
public void configure(Map configs) {
}
@Override
public ProducerRecord onSend(ProducerRecord record) {
// 創(chuàng)建一個新的record,把時間戳寫入消息體的最前部
return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
System.currentTimeMillis() + "," + record.value().toString());
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
}
(2)統(tǒng)計發(fā)送消息成功和發(fā)送失敗消息數(shù),并在producer關(guān)閉時打印這兩個計數(shù)器
package com.king.kafka.interceptor;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class CounterInterceptor implements ProducerInterceptor{
private int errorCounter = 0;
private int successCounter = 0;
@Override
public void configure(Map configs) {
}
@Override
public ProducerRecord onSend(ProducerRecord record) {
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
// 統(tǒng)計成功和失敗的次數(shù)
if (exception == null) {
successCounter++;
} else {
errorCounter++;
}
}
@Override
public void close() {
// 保存結(jié)果
System.out.println("Successful sent: " + successCounter);
System.out.println("Failed sent: " + errorCounter);
}
}
(3)producer主程序
package com.king.kafka.interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
public class InterceptorProducer {
public static void main(String[] args) throws Exception {
// 1 設(shè)置配置信息
Properties props = new Properties();
props.put("bootstrap.servers", "bigdata11:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 2 構(gòu)建攔截鏈
List interceptors = new ArrayList<>();
interceptors.add("com.king.kafka.interceptor.TimeInterceptor"); interceptors.add("com.king.kafka.interceptor.CounterInterceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
String topic = "first";
Producer producer = new KafkaProducer<>(props);
// 3 發(fā)送消息
for (int i = 0; i < 10; i++) {
ProducerRecord record = new ProducerRecord<>(topic, "message" + i);
producer.send(record);
}
// 4 一定要關(guān)閉producer,這樣才會調(diào)用interceptor的close方法
producer.close();
}
}
(4)測試
(1)在kafka上啟動消費者,然后運行客戶端java程序。
[root@bigdata11 kafka]$ bin/kafka-console-consumer.sh --zookeeper bigdata11:2181 --from-beginning --topic first
1501904047034,message0
1501904047225,message1
1501904047230,message2
1501904047234,message3
1501904047236,message4
1501904047240,message5
1501904047243,message6
1501904047246,message7
1501904047249,message8
1501904047252,message9
(2)觀察java平臺控制臺輸出數(shù)據(jù)如下:
Successful sent: 10
Failed sent: 0
? Kafka Streams。Apache Kafka開源項目的一個組成部分。是一個功能強大,易于使用的庫。用于在Kafka上構(gòu)建高可分布式、拓展性,容錯的應(yīng)用程序。有如下特點:
1)功能強大
? 高擴展性,彈性,容錯
2)輕量級
? 無需專門的集群
? 一個庫,而不是框架
3)完全集成
? 100%的Kafka 0.10.0版本兼容
? 易于集成到現(xiàn)有的應(yīng)用程序
4)實時性
? 毫秒級延遲
? 并非微批處理
? 窗口允許亂序數(shù)據(jù)
? 允許遲到數(shù)據(jù)
? 當(dāng)前已經(jīng)有非常多的流式處理系統(tǒng),最知名且應(yīng)用最多的開源流式處理系統(tǒng)有Spark Streaming和Apache Storm。Apache Storm發(fā)展多年,應(yīng)用廣泛,提供記錄級別的處理能力,當(dāng)前也支持SQL on Stream。而Spark Streaming基于Apache Spark,可以非常方便與圖計算,SQL處理等集成,功能強大,對于熟悉其它Spark應(yīng)用開發(fā)的用戶而言使用門檻低。另外,目前主流的Hadoop發(fā)行版,如Cloudera和Hortonworks,都集成了Apache Storm和Apache Spark,使得部署更容易。
? 既然Apache Spark與Apache Storm擁用如此多的優(yōu)勢,那為何還需要Kafka Stream呢?主要有如下原因。
? 第一,Spark和Storm都是流式處理框架,而Kafka Stream提供的是一個基于Kafka的流式處理類庫??蚣芤箝_發(fā)者按照特定的方式去開發(fā)邏輯部分,供框架調(diào)用。開發(fā)者很難了解框架的具體運行方式,從而使得調(diào)試成本高,并且使用受限。而Kafka Stream作為流式處理類庫,直接提供具體的類給開發(fā)者調(diào)用,整個應(yīng)用的運行方式主要由開發(fā)者控制,方便使用和調(diào)試。
? 第二,雖然Cloudera與Hortonworks方便了Storm和Spark的部署,但是這些框架的部署仍然相對復(fù)雜。而Kafka Stream作為類庫,可以非常方便的嵌入應(yīng)用程序中,它對應(yīng)用的打包和部署基本沒有任何要求。
? 第三,就流式處理系統(tǒng)而言,基本都支持Kafka作為數(shù)據(jù)源。例如Storm具有專門的kafka-spout,而Spark也提供專門的spark-streaming-kafka模塊。事實上,Kafka基本上是主流的流式處理系統(tǒng)的標(biāo)準(zhǔn)數(shù)據(jù)源。換言之,大部分流式系統(tǒng)中都已部署了Kafka,此時使用Kafka Stream的成本非常低。
? 第四,使用Storm或Spark Streaming時,需要為框架本身的進程預(yù)留資源,如Storm的supervisor和Spark on YARN的node manager。即使對于應(yīng)用實例而言,框架本身也會占用部分資源,如Spark Streaming需要為shuffle和storage預(yù)留內(nèi)存。但是Kafka作為類庫不占用系統(tǒng)資源。
? 第五,由于Kafka本身提供數(shù)據(jù)持久化,因此Kafka Stream提供滾動部署和滾動升級以及重新計算的能力。
? 第六,由于Kafka Consumer Rebalance機制,Kafka Stream可以在線動態(tài)調(diào)整并行度。
(1)需求
實時處理單詞帶有”>>>”前綴的內(nèi)容。例如輸入”test>>>ximenqing”,最終處理成“ximenqing”
(2)代碼程序:
業(yè)務(wù)處理類:
package com.king.kafka.stream;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
//實現(xiàn) Processor 接口,用于實現(xiàn)具體業(yè)務(wù)邏輯
public class LogProcessor implements Processor {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
//這里是具體業(yè)務(wù)邏輯
@Override
public void process(byte[] key, byte[] value) {
String input = new String(value);
// 如果包含“>>>”則只保留該標(biāo)記后面的內(nèi)容
if (input.contains(">>>")) {
input = input.split(">>>")[1].trim();
// 輸出到下一個topic
context.forward("logProcessor".getBytes(), input.getBytes());
}else{
context.forward("logProcessor".getBytes(), input.getBytes());
}
}
@Override
public void punctuate(long timestamp) {
}
@Override
public void close() {
}
}
主類入口:
package com.king.kafka.stream;
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;
public class Application {
public static void main(String[] args) {
// 定義輸入的topic
String from = "first";
// 定義輸出的topic
String to = "second";
// 設(shè)置參數(shù)
Properties settings = new Properties();
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "bigdata11:9092");
StreamsConfig config = new StreamsConfig(settings);
// 構(gòu)建拓?fù)? TopologyBuilder builder = new TopologyBuilder();
//創(chuàng)建一個builder,指定source ,processor ,sink。并給它們起別名。
//這里的parentName實際上是指定上一層是什么的名字
builder.addSource("SOURCE", from)
.addProcessor("PROCESS", new ProcessorSupplier() {
@Override
public Processor get() {
// 具體分析處理
return new LogProcessor();
}
}, "SOURCE")
.addSink("SINK", to, "PROCESS");
//構(gòu)建處理任務(wù),包括配置以及任務(wù)詳情
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
}
}
(3)測試
運行程序,然后在命令行下分別啟動producer和consumer,看情況:
在bigdata13上啟動生產(chǎn)者
[root@bigdata13 kafka]$ bin/kafka-console-producer.sh --broker-list bigdata11:9092 --topic first
>hello>>>world
>h>>>itstar
>hahaha
(6)在bigdata12上啟動消費者
[root@bigdata12 kafka]$ bin/kafka-console-consumer.sh --zookeeper bigdata11:2181 --from-beginning --topic second
world
itstar
hahaha
可以看到消費處理的數(shù)據(jù)是符合預(yù)期的。