真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

一、kafka--基本原理、環(huán)境搭建、api使用

[TOC]

專注于為中小企業(yè)提供網(wǎng)站制作、成都網(wǎng)站設(shè)計服務(wù),電腦端+手機端+微信端的三站合一,更高效的管理,為中小企業(yè)海林免費做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動了上千多家企業(yè)的穩(wěn)健成長,幫助中小企業(yè)通過網(wǎng)站建設(shè)實現(xiàn)規(guī)模擴充和轉(zhuǎn)變。

一、概述

1.1 什么是kafka

? Kafka是一個分布式消息隊列,采用scala語言開發(fā)。Kafka對消息保存時根據(jù)Topic進行歸類,發(fā)送消息者稱為Producer,消息接受者稱為Consumer,此外kafka集群有多個kafka實例組成,每個實例(server)成為broker。無論是kafka集群,還是producer和consumer都依賴于zookeeper集群保存一些meta信息,來保證系統(tǒng)可用性。

1.2 消息隊列的兩種工作模式

(1)點對點模式(類似接受文件,一對一,消費者主動拉取數(shù)據(jù),消息收到后消息清除)點對點模型通常是一個基于拉取或者輪詢的消息傳送模型,這種模型從隊列中請求信息,而不是將消息推送到客戶端。這個模型的特點是發(fā)送到隊列的消息被一個且只有一個接收者接收處理,即使有多個消息監(jiān)聽者也是如此。

(2)發(fā)布/訂閱模式(類似公眾號,一對多,數(shù)據(jù)生產(chǎn)后,推送給所有訂閱者)
發(fā)布訂閱模型則是一個基于推送的消息傳送模型。發(fā)布訂閱模型可以有多種不同的訂閱者,臨時訂閱者只在主動監(jiān)聽主題時才接收消息,而持久訂閱者則監(jiān)聽主題的所有消息,即使當(dāng)前訂閱者不可用,處于離線狀態(tài)。

1.3 為什么需要消息隊列

1)解耦:
允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。
2)冗余:
消息隊列把數(shù)據(jù)進行持久化直到它們已經(jīng)被完全處理,通過這一方式規(guī)避了數(shù)據(jù)丟失風(fēng)險。許多消息隊列所采用的"插入-獲取-刪除"范式中,在把一個消息從隊列中刪除之前,需要你的處理系統(tǒng)明確的指出該消息已經(jīng)被處理完畢,從而確保你的數(shù)據(jù)被安全的保存直到你使用完畢。
3)擴展性:
因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。
4)靈活性 & 峰值處理能力:
在訪問量劇增的情況下,應(yīng)用仍然需要繼續(xù)發(fā)揮作用,但是這樣的突發(fā)流量并不常見。如果為以能處理這類峰值訪問為標(biāo)準(zhǔn)來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關(guān)鍵組件頂住突發(fā)的訪問壓力,而不會因為突發(fā)的超負(fù)荷的請求而完全崩潰。
5)可恢復(fù)性:
系統(tǒng)的一部分組件失效時,不會影響到整個系統(tǒng)。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統(tǒng)恢復(fù)后被處理。
6)順序保證:
在大多使用場景下,數(shù)據(jù)處理的順序都很重要。大部分消息隊列本來就是排序的,并且能保證數(shù)據(jù)會按照特定的順序來處理。(Kafka保證一個Partition內(nèi)的消息的有序性,無法保證整體有序,觸發(fā)一個topic只有一個partition)
7)緩沖:
有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過系統(tǒng)的速度,解決生產(chǎn)消息和消費消息的處理速度不一致的情況。
8)異步通信:
很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但并不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。

1.4 消費模型

? 消息由生產(chǎn)者發(fā)布到Kafka集群后,會被消費者消費。消息的消費模型有兩種:推送模型(push)和拉取模型(pull)。

1.4.1 推送模型(push)

? 基于推送模型(push)的消息系統(tǒng),由消息代理記錄消費者的消費狀態(tài)。消息代理在將消息推送到消費者后,標(biāo)記這條消息為已消費,但這種方式無法很好地保證消息被處理。比如,消息代理把消息發(fā)送出去后,當(dāng)消費進程掛掉或者由于網(wǎng)絡(luò)原因沒有收到這條消息時,就有可能造成消息丟失(因為消息代理已經(jīng)把這條消息標(biāo)記為已消費了,但實際上這條消息并沒有被實際處理)。如果要保證消息被處理,消息代理發(fā)送完消息后,要設(shè)置狀態(tài)為“已發(fā)送”,只有收到消費者的確認(rèn)請求后才更新為“已消費”,這就需要消息代理中記錄所有的消費狀態(tài),這種做法顯然是不可取的。

1.4.2 拉取模型(pull)

? Kafka采用拉取模型,由消費者自己記錄消費狀態(tài),每個消費者互相獨立地順序讀取每個分區(qū)的消息。如下圖所示,有兩個消費者(不同消費者組)拉取同一個主題的消息,消費者A的消費進度是3,消費者B的消費進度是6。消費者拉取的最大上限通過最高水位(watermark)控制(也就是只能取到當(dāng)前topic的最后一條消息),生產(chǎn)者最新寫入的消息如果還沒有達到備份數(shù)量(也就是要保證副本數(shù)寫入完成,從而保證消息不丟失,由此才讓該消息給消費者消費),對消費者是不可見的。這種由消費者控制偏移量的優(yōu)點是:消費者可以按照任意的順序消費消息。比如,消費者可以重置到舊的偏移量,重新處理之前已經(jīng)消費過的消息;或者直接跳到最近的位置,從當(dāng)前的時刻開始消費。
一、kafka--基本原理、環(huán)境搭建、api使用
? 圖1.1 kafka消費模型
? 在一些消息系統(tǒng)中,消息代理會在消息被消費之后立即刪除消息。如果有不同類型的消費者訂閱同一個主題,消息代理可能需要冗余地存儲同一消息;或者等所有消費者都消費完才刪除,這就需要消息代理跟蹤每個消費者的消費狀態(tài),這種設(shè)計很大程度上限制了消息系統(tǒng)的整體吞吐量和處理延遲。Kafka的做法是生產(chǎn)者發(fā)布的所有消息會一致保存在Kafka集群中,不管消息有沒有被消費。用戶可以通過設(shè)置保留時間來清理過期的數(shù)據(jù),比如,設(shè)置保留策略為兩天。那么,在消息發(fā)布之后,它可以被不同的消費者消費,在兩天之后,過期的消息就會自動清理掉。

1.5 kafka中的關(guān)鍵概念

1)Producer :消息生產(chǎn)者,就是向kafka broker發(fā)消息的客戶端。

2)Consumer :消息消費者,向kafka broker取 消息的客戶端

3)Topic :可以理解為一個隊列。是消息的一個分組

4) Consumer Group (CG):kafka提供的可擴展且具有容錯性的消費者機制。既然是一個組,那么組內(nèi)必然可以有多個消費者或消費者實例(consumer instance),它們共享一個公共的ID,即group ID。組內(nèi)的所有消費者協(xié)調(diào)在一起來消費訂閱主題(subscribed topics)的所有分區(qū)(partition)。當(dāng)然,每個分區(qū)只能由同一個消費組內(nèi)的一個consumer來消費。但是不同消費者組消費同一個topic是可以的,而且互不影響。消費者可以通過水平擴展的方式同時讀取大量的消息。另外,如果一個消費者失敗了,那么其他的group成員會自動負(fù)載均衡讀取之前失敗的消費者讀取的分區(qū)

5)Broker :一臺kafka服務(wù)器就是一個broker。一個集群由多個broker組成。一個broker可以容納多個topic。

6)Partition:為了實現(xiàn)擴展性,一個非常大的topic可以分布到多個broker(即服務(wù)器)上,一個topic可以分為多個partition,每個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序?qū)⑾l(fā)給consumer,不保證一個topic的整體(多個partition間)的順序。

7)Offset:kafka的存儲文件都是按照offset.kafka來命名,用offset做名字的好處是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。當(dāng)然the first offset就是00000000000.kafka
一般來說,假設(shè) "test" 這個topic有兩個分區(qū),那么該topic的存儲目錄有兩個,命名為:test-0,test-1 ,然后對應(yīng)分區(qū)的目錄保存對應(yīng)的數(shù)據(jù)

二、kafka部署

2.1 環(huán)境規(guī)劃和準(zhǔn)備

首先kafka依賴zookeeper存儲元信息、且需要jdk來運行程序。所以需要事先部署好這兩個。請看之前的文章。
準(zhǔn)備好三臺虛擬機:

bigdata121bigdata122bigdata123
zookeeper1 zookeeper2 zookeeper3
kafka1 kafka kafka3

軟件版本:

jdk1.8
zookeeper 3.4.10
kafka 2.1.1
centos 7.2.1511

2.2 部署

bigdata121:

1、解壓:
tar zxf kafka_2.11-2.1.1.tgz -C /opt/modules/

2、創(chuàng)建日志目錄:
mkdir /opt/modules/kafka_2.11-2.1.1/logs

3、修改kafka server配置文件:
vim /opt/modules/kafka_2.11-2.1.1/config/server.properties
#### 修改一些關(guān)鍵性配置
#broker的全局唯一編號,不能重復(fù)
broker.id=0
#是否允許刪除topic,測試環(huán)境方便測試設(shè)置為true,生產(chǎn)環(huán)境建議設(shè)置為false
delete.topic.enable=true
#kafka運行日志存放的路徑
log.dirs=/opt/modules/kafka_2.11-2.1.1/logs
#配置連接Zookeeper集群地址,并且/path/to 是指定在zookeeper中存儲的根節(jié)點路徑,比如 /root
zookeeper.connect=bigdata121:2181,bigdata122:2181,bigdata123:2181/path/to

4、配置環(huán)境變量
vim /etc/profile.d/kafka.sh 
#!/bin/bash
export KAFKA_HOME=/opt/modules/kafka_2.11-2.1.1
export PATH=$PATH:${KAFKA_HOME}/bin

5、啟用環(huán)境變量
source /etc/profile.d/kafka.sh

配置好后,將kafka的整個目錄rsync到其他兩臺主機的 /opt/modules 下,并修改
/opt/modules/kafka_2.11-2.1.1/config/server.properties 這個配置文件

broker.id=1、broker.id=2   
反正就是每個broker的id必須唯一

分別在三臺機器上啟動kafka集群節(jié)點:

kafka-server-start.sh -daemon config/server.properties 

-daemon  表示以后臺進程方式啟動kafka服務(wù)
config/server.properties server的配置文件路徑

停止當(dāng)前節(jié)點:

kafka-server-stop.sh

2.3 常用命令行操作

1)查看當(dāng)前服務(wù)器中的所有topic
[root@bigdata11 kafka]$ bin/kafka-topics.sh --zookeeper bigdata13:2181 --list

2)創(chuàng)建topic
[root@bigdata11 kafka]$ bin/kafka-topics.sh --zookeeper bigdata13:2181 --create --replication-factor 3 --partitions 1 --topic first
選項說明:
--topic 定義topic名
--replication-factor  定義副本數(shù)
--partitions  定義分區(qū)數(shù)

3)刪除topic
[root@bigdata11 kafka]$ bin/kafka-topics.sh --zookeeper bigdata11:2181 --delete --topic first
需要server.properties中設(shè)置delete.topic.enable=true否則只是標(biāo)記刪除或者直接重啟。

4)發(fā)送消息
[root@bigdata11 kafka]$ bin/kafka-console-producer.sh --broker-list bigdata11:9092 --topic first
>hello world

5)消費消息
[root@bigdata12 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server node3:9092 --from-beginning --topic first
--from-beginning:會把first主題中以往所有的數(shù)據(jù)都讀取出來。根據(jù)業(yè)務(wù)場景選擇是否增加該配置。

6)查看某個Topic的詳情
[root@bigdata11 kafka]$ bin/kafka-topics.sh --zookeeper bigdata11:2181 --describe --topic first 

三、kafka工作原理

3.1 kafka生產(chǎn)者寫入過程分析

3.1.1 寫入方式

? producer采用推(push)模式將消息發(fā)布到broker,每條消息都被追加(append)到分區(qū)(patition)中,屬于順序?qū)懘疟P(順序?qū)懘疟P效率比隨機寫內(nèi)存要高,保障kafka吞吐率)

3.1.2 分區(qū)(partition)

? Kafka集群有多個消息代理服務(wù)器(broker-server)組成,發(fā)布到Kafka集群的每條消息都有一個類別,用主題(topic)來表示。通常,不同應(yīng)用產(chǎn)生不同類型的數(shù)據(jù),可以設(shè)置不同的主題。一個主題一般會有多個消息的訂閱者,當(dāng)生產(chǎn)者發(fā)布消息到某個主題時,訂閱了這個主題的消費者都可以接收到生成者寫入的新消息。
? Kafka集群為每個主題維護了分布式的分區(qū)(partition)日志文件,物理意義上可以把主題(topic)看作進行了分區(qū)的日志文件(partition log)。主題的每個分區(qū)都是一個有序的、不可變的記錄序列,新的消息會不斷追加到日志中。分區(qū)中的每條消息都會按照時間順序分配到一個單調(diào)遞增的順序編號,叫做偏移量(offset),這個偏移量能夠唯一地定位當(dāng)前分區(qū)中的每一條消息。
? 消息發(fā)送時都被發(fā)送到一個topic,其本質(zhì)就是一個目錄,而topic是由一些Partition Logs(分區(qū)日志)組成,其組織結(jié)構(gòu)如下圖所示:下圖中的topic有3個分區(qū),每個分區(qū)的偏移量都從0開始,不同分區(qū)之間的偏移量都是獨立的,不會相互影響。
一、kafka--基本原理、環(huán)境搭建、api使用
? 圖3.1 kafka寫入方式
一、kafka--基本原理、環(huán)境搭建、api使用
? 圖3.2 kafka分區(qū)讀取
? 我們可以看到,每個Partition中的消息都是有序的,生產(chǎn)的消息被不斷追加到Partition log上,其中的每一個消息都被賦予了一個唯一的offset值。
發(fā)布到Kafka主題的每條消息包括鍵值和時間戳。消息到達服務(wù)器端的指定分區(qū)后,都會分配到一個自增的偏移量。原始的消息內(nèi)容和分配的偏移量以及其他一些元數(shù)據(jù)信息最后都會存儲到分區(qū)日志文件中。消息的鍵也可以不用設(shè)置,這種情況下消息會均衡地分布到不同的分區(qū)。

3.1.2.1 分區(qū)的原因

(1)方便在集群中擴展,每個Partition可以通過調(diào)整以適應(yīng)它所在的機器,而一個topic又可以有多個Partition組成,因此整個集群就可以適應(yīng)任意大小的數(shù)據(jù)了;

(2)可以提高并發(fā),因為可以以Partition為單位讀寫了。
傳統(tǒng)消息系統(tǒng)在服務(wù)端保持消息的順序,如果有多個消費者消費同一個消息隊列,服務(wù)端會以消費存儲的順序依次發(fā)送給消費者。但由于消息是異步發(fā)送給消費者的,消息到達消費者的順序可能是無序的,這就意味著在并行消費時,傳統(tǒng)消息系統(tǒng)無法很好地保證消息被順序處理。雖然我們可以設(shè)置一個專用的消費者只消費一個隊列,以此來解決消息順序的問題,但是這就使得消費處理無法真正執(zhí)行。
Kafka比傳統(tǒng)消息系統(tǒng)有更強的順序性保證,它使用主題的分區(qū)作為消息處理的并行單元。Kafka以分區(qū)作為最小的粒度,將每個分區(qū)分配給消費者組中不同的而且是唯一的消費者,并確保一個分區(qū)只屬于一個消費者,即這個消費者就是這個分區(qū)的唯一讀取線程。那么,只要分區(qū)的消息是有序的,消費者處理的消息順序就有保證。每個主題有多個分區(qū),不同的消費者處理不同的分區(qū),所以Kafka不僅保證了消息的有序性,也做到了消費者的負(fù)載均衡。

3.1.2.2 分區(qū)的原則

(1)指定了patition,則直接使用;
(2)未指定patition但指定key,通過對key進行hash出一個patition
(3)patition和key都未指定,使用輪詢選出一個patition。
下面看看這個默認(rèn)的partition實現(xiàn)類的源碼:

DefaultPartitioner類
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        //未指定key,輪詢獲取分區(qū)號
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            //這里就是當(dāng)指定了key時,對key進行hash來獲取分區(qū)號
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

3.1.3 副本(replication)

? 同一個partition可能會有多個replication(對應(yīng) server.properties 配置中的 default.replication.factor=N)。沒有replication的情況下,一旦broker 宕機,其上所有 patition 的數(shù)據(jù)都不可被消費(數(shù)據(jù)直接丟失了),同時producer也不能再將數(shù)據(jù)存于其上的patition。引入replication之后,同一個partition可能會有多個replication,而這時需要在這些replication之間選出一個leader,producer和consumer只與這個leader交互(讀寫操作都只會和leader交互),其它replication作為follower從leader 中復(fù)制數(shù)據(jù),不會執(zhí)行其他操作。當(dāng)leader掛了時,會在follower中選出新的leader。

3.1.4 寫入流程

1) producer先從zookeeper的 "/brokers/.../state"節(jié)點找到該partition的leader
比如完整的路徑如:/brokers/topics/TOPIC_NAME/partitions/NUM_OF_PARTITION/state
2)producer將消息發(fā)送給該leader
3)leader將消息寫入本地log
4)followers從leader pull消息,寫入本地log后向leader發(fā)送ACK
5)leader收到所有ISR中的replication的ACK后,增加HW(high watermark,最后commit 的offset)并向producer發(fā)送ACK

3.2 brokers存儲原理

3.2.1 存儲方式

物理上把topic分成一個或多個patition(對應(yīng) server.properties 中的num.partitions=3配置,這是默認(rèn)分區(qū)個數(shù),創(chuàng)建topic時可以手動指定分區(qū)個數(shù)),每個patition物理上對應(yīng)一個文件夾(該文件夾存儲該patition的所有消息和索引文件),如下:

分區(qū)目錄命名方式為 topicName-partiontionNum 的形式

首先,我們創(chuàng)建了first這個topic,有三個partition,0、1、2
[root@bigdata11 logs]$ ll
drwxrwxr-x. 2 root root  4096 8月   6 14:37 first-0
drwxrwxr-x. 2 root root  4096 8月   6 14:35 first-1
drwxrwxr-x. 2 root root  4096 8月   6 14:37 first-2
[root@bigdata11 logs]$ cd first-0
[root@bigdata11 first-0]$ ll
-rw-rw-r--. 1 root root 10485760 8月   6 14:33 00000000000000000000.index 這是索引
-rw-rw-r--. 1 root root      219 8月   6 15:07 00000000000000000000.log 這是分區(qū)日志,也就是存儲消息的地方
-rw-rw-r--. 1 root root 10485756 8月   6 14:33 00000000000000000000.timeindex
-rw-rw-r--. 1 root root        8 8月   6 14:37 leader-epoch-checkpoint

3.2.2 存儲策略

前面說到,無論消息是否被消費,kafka都會保留所有消息,消費者可以根據(jù)需要隨時從需要offset消費數(shù)據(jù)。有兩種策略可以刪除舊數(shù)據(jù):
1)基于時間:log.retention.hours=168,也就是默認(rèn)刪除7天前的數(shù)據(jù)
2)基于大?。簂og.retention.bytes=1073741824,超過1GB刪除
需要注意的是,因為Kafka讀取特定消息的時間復(fù)雜度為O(1)(因為是通過索引直接定位讀取,所以和大小無關(guān)),即與文件大小無關(guān),所以這里刪除過期文件與提高 Kafka 性能無關(guān)。

3.2.3 zookeeper存儲結(jié)構(gòu)
zookeeper存儲了整個kafka集群的一些元信息,比如有哪些broker,哪些topic等。下面看看結(jié)構(gòu):
一、kafka--基本原理、環(huán)境搭建、api使用
? 圖3.3 zookeeper存儲結(jié)構(gòu)
其中某些目錄的作用如下:

/brokers/topics/TOPIC_NAME/partitions/PARTITION_NUM/state:
指定topic的指定分區(qū)的元信息,里面存儲了該分區(qū)leader所在broker的id,以及所有副本存儲在哪些broker中。

/brokers/ids/xxxx:
有哪些broker,以及對應(yīng)的id

/consumer:
注冊的consumer的信息,例如消費者組id、消費的topic、消費的offset、消費者組中的哪個消費者消費哪個partition等

要注意的是,只有consumer會在zookeeper注冊,producer不會在zookeeper注冊

3.3 kafka消費過程分析

kafka支持高級api和低級api進行操作。

3.3.1 高級api

1)高級API優(yōu)點
高級API 寫起來簡單
不需要自行去管理offset,系統(tǒng)通過zookeeper自行管理。
不需要管理分區(qū),副本等情況,系統(tǒng)自動管理。
消費者斷線會自動根據(jù)上一次記錄在zookeeper中的offset去接著獲取數(shù)據(jù)(默認(rèn)設(shè)置1分鐘更新一下zookeeper中存的offset)
可以使用group來區(qū)分對同一個topic 的不同程序訪問分離開來(不同的group記錄不同的offset,這樣不同程序讀取同一個topic才不會因為offset互相影響)

2)高級API缺點
不能自行控制offset(對于某些特殊需求來說)
不能細(xì)化控制如分區(qū)、副本、zk等

3.3.2 低級api

1)低級 API 優(yōu)點
能夠讓開發(fā)者自己控制offset,想從哪里讀取就從哪里讀取。
自行控制連接分區(qū),對分區(qū)自定義進行負(fù)載均衡
對zookeeper的依賴性降低(如:offset不一定非要靠zk存儲,自行存儲offset即可,比如存在文件或者內(nèi)存中)

2)低級API缺點
太過復(fù)雜,需要自行控制offset,連接哪個分區(qū),找到分區(qū)leader 等。

3.3.3 消費者組的定義

? 消費者是以consumer group消費者組的方式工作,由一個或者多個消費者組成一個組,共同消費一個topic。每個分區(qū)在同一時間只能由group中的一個消費者讀取,但是多個group可以同時消費這個partition。某個消費者讀取某個分區(qū),也可以叫做某個消費者是某個分區(qū)的擁有者。
? 在這種情況下,消費者可以通過水平擴展的方式同時讀取大量的消息。另外,如果一個消費者失敗了,那么其他的group成員會自動負(fù)載均衡讀取之前失敗的消費者讀取的分區(qū)。

3.3.4 消費方式

? consumer采用pull(拉)模式從broker中讀取數(shù)據(jù)。
? push(推)模式很難適應(yīng)消費速率不同的消費者,因為消息發(fā)送速率是由broker決定的。它的目標(biāo)是盡可能以最快速度傳遞消息,但是這樣很容易造成consumer來不及處理消息,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞。而pull模式則可以根據(jù)consumer的消費能力以適當(dāng)?shù)乃俾氏M消息。
? 對于Kafka而言,pull模式更合適,它可簡化broker的設(shè)計,consumer可自主控制消費消息的速率,同時consumer可以自己控制消費方式——即可批量消費也可逐條消費,同時還能選擇不同的提交方式從而實現(xiàn)不同的傳輸語義。
? pull模式不足之處是,如果kafka沒有數(shù)據(jù),消費者可能會陷入循環(huán)中,一直等待數(shù)據(jù)到達。為了避免這種情況,我們在我們的拉請求中有參數(shù),允許消費者請求在等待數(shù)據(jù)到達的“長輪詢”中進行阻塞(并且可選地等待到給定的字節(jié)數(shù),以確保大的傳輸大小)。

四、kafka api使用

4.1 環(huán)境準(zhǔn)備

idea創(chuàng)建maven工程,添加kafka依賴:


            org.apache.kafka
            kafka_2.12
            2.1.1
        

        
        
            org.apache.kafka
            kafka-streams
            2.1.1
        

4.2 生產(chǎn)者api(java)

4.2.1 創(chuàng)建生產(chǎn)者(舊api)

import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class OldProducer {

    @SuppressWarnings("deprecation")
    public static void main(String[] args) {

        Properties properties = new Properties();
        //指定broker地址列表
        properties.put("metadata.broker.list", "bigdata11:9092");
        //指定producer需要broker發(fā)送ack確認(rèn)收到消息
        properties.put("request.required.acks", "1");
        //指定序列化類
        properties.put("serializer.class", "kafka.serializer.StringEncoder");

        //使用上面的配置項創(chuàng)建kafka producer
        Producer producer = new Producer(new ProducerConfig(properties));

        //發(fā)送消息
        KeyedMessage message = new KeyedMessage("first", "hello world");
        producer.send(message );
    }
}

4.2.2 創(chuàng)建生產(chǎn)者(新api)

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class NewProducer {

    public static void main(String[] args) {

        Properties props = new Properties();
        // Kafka服務(wù)端的主機名和端口號
        props.put("bootstrap.servers", "bigdata12:9092");
        // 等待所有副本節(jié)點的應(yīng)答
        props.put("acks", "all");
        // 消息發(fā)送最大嘗試次數(shù)
        props.put("retries", 0);
        // 一批消息處理大小
        props.put("batch.size", 16384);
        // 請求延時
        props.put("linger.ms", 1);
        // 發(fā)送緩存區(qū)內(nèi)存大小
        props.put("buffer.memory", 33554432);
        // key序列化
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // value序列化
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer producer = new KafkaProducer<>(props);
        for (int i = 0; i < 50; i++) {
            producer.send(new ProducerRecord("first", Integer.toString(i), "hello world-" + i));
        }

        producer.close();
    }
}

4.2.3 創(chuàng)建生產(chǎn)者帶回調(diào)函數(shù)

package com.king.kafka;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class CallBackProducer {

    public static void main(String[] args) {

Properties props = new Properties();
        // Kafka服務(wù)端的主機名和端口號
        props.put("bootstrap.servers", "bigdata12:9092");
        // 等待所有副本節(jié)點的應(yīng)答
        props.put("acks", "all");
        // 消息發(fā)送最大嘗試次數(shù)
        props.put("retries", 0);
        // 一批消息處理大小
        props.put("batch.size", 16384);
        // 增加服務(wù)端請求延時
        props.put("linger.ms", 1);
// 發(fā)送緩存區(qū)內(nèi)存大小
        props.put("buffer.memory", 33554432);
        // key序列化
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // value序列化
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer kafkaProducer = new KafkaProducer<>(props);

        for (int i = 0; i < 50; i++) {

            kafkaProducer.send(new ProducerRecord("first", "hello" + i), new Callback() {
               //重寫里面的回到方法
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {

                    if (metadata != null) {

                        System.out.println(metadata.partition() + "---" + metadata.offset());
                    }
                }
            });
        }

        kafkaProducer.close();
    }
}

4.2.4 自定義分區(qū)生產(chǎn)者

舊api:

import java.util.Map;
import kafka.producer.Partitioner;

public class CustomPartitioner implements Partitioner {

    public CustomPartitioner() {
        super();
    }

    @Override
    public int partition(Object key, int numPartitions) {
        // 控制分區(qū)
        return 0;
    }
}

新api:

import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

public class CustomPartitioner implements Partitioner {

    @Override
    public void configure(Map configs) {

    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 控制分區(qū)
        return 0;
    }

    @Override
    public void close() {

    }
}

實現(xiàn)好自定義的分區(qū)類之后,需要在創(chuàng)建producer的配置項添加指定自定義分區(qū)類的配置:

properties.put("partitioner.class", "自定義的分區(qū)類名,需要全類名");

4.3 消費者api(java)

4.3.1 創(chuàng)建消費者(舊api)

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class CustomConsumer {

    @SuppressWarnings("deprecation")
    public static void main(String[] args) {
        Properties properties = new Properties();

        properties.put("zookeeper.connect", "bigdata11:2181");
        properties.put("group.id", "g1");
        properties.put("zookeeper.session.timeout.ms", "500");
        properties.put("zookeeper.sync.time.ms", "250");
        properties.put("auto.commit.interval.ms", "1000");

        // 創(chuàng)建消費者連接器
        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));

        //需要自己維護offset
        HashMap topicCount = new HashMap<>();
        topicCount.put("first", 1);

        Map>> consumerMap = consumer.createMessageStreams(topicCount);

        KafkaStream stream = consumerMap.get("first").get(0);

        ConsumerIterator it = stream.iterator();

        while (it.hasNext()) {
            System.out.println(new String(it.next().message()));
        }
    }
}

4.3.2 創(chuàng)建消費者(新api)

import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class CustomNewConsumer {

    public static void main(String[] args) {

        Properties props = new Properties();
        // 定義kakfa 服務(wù)的地址,不需要將所有broker指定上 
        props.put("bootstrap.servers", "bigdata11:9092");
        // 制定consumer group 
        props.put("group.id", "test");
        // 是否自動確認(rèn)offset 
        props.put("enable.auto.commit", "true");
        // 自動確認(rèn)offset的時間間隔 
        props.put("auto.commit.interval.ms", "1000");
        // key的序列化類
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // value的序列化類 
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 定義consumer 
        KafkaConsumer consumer = new KafkaConsumer<>(props);

        // 消費者訂閱的topic, 可同時訂閱多個 
        consumer.subscribe(Arrays.asList("first", "second","third"));

        while (true) {
            // 讀取數(shù)據(jù),讀取超時時間為100ms 
            ConsumerRecords records = consumer.poll(100);

            for (ConsumerRecord record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
}

五、kafka攔截器

5.1 攔截器原理

? Producer攔截器(interceptor)是在Kafka 0.10版本被引入的,主要用于實現(xiàn)clients端的定制化控制邏輯。對于producer而言,interceptor使得用戶在消息發(fā)送前以及producer回調(diào)邏輯前有機會對消息做一些定制化需求,比如修改消息等。同時,producer允許用戶指定多個interceptor按序作用于同一條消息從而形成一個攔截鏈(interceptor chain)。Intercetpor的實現(xiàn)接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定義的方法包括:
(1)configure(configs)
獲取配置信息和初始化數(shù)據(jù)時調(diào)用。
(2)onSend(ProducerRecord):
該方法封裝進KafkaProducer.send方法中,即它運行在用戶主線程中。Producer確保在消息被序列化以及計算分區(qū)前調(diào)用該方法。用戶可以在該方法中對消息做任何操作,但最好保證不要修改消息所屬的topic和分區(qū),否則會影響目標(biāo)分區(qū)的計算
(3)onAcknowledgement(RecordMetadata, Exception):
該方法會在消息被應(yīng)答或消息發(fā)送失敗時調(diào)用,并且通常都是在producer回調(diào)邏輯觸發(fā)之前。onAcknowledgement運行在producer的IO線程中,因此不要在該方法中放入很重的邏輯,否則會拖慢producer的消息發(fā)送效率
(4)close:
關(guān)閉interceptor,主要用于執(zhí)行一些資源清理工作
如前所述,interceptor可能被運行在多個線程中,因此在具體實現(xiàn)時用戶需要自行確保線程安全。另外倘若指定了多個interceptor,則producer將按照指定順序調(diào)用它們,并僅僅是捕獲每個interceptor可能拋出的異常記錄到錯誤日志中而非在向上傳遞。這在使用過程中要特別留意。

5.2 攔截器實例

需求:
實現(xiàn)一個簡單的雙interceptor組成的攔截鏈。第一個interceptor會在消息發(fā)送前將時間戳信息加到消息value的最前部;第二個interceptor會在消息發(fā)送后更新成功發(fā)送消息數(shù)或失敗發(fā)送消息數(shù)。

程序:
(1)實現(xiàn)時間攔截器:

package com.king.kafka.interceptor;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class TimeInterceptor implements ProducerInterceptor {

    @Override
    public void configure(Map configs) {

    }

    @Override
    public ProducerRecord onSend(ProducerRecord record) {
        // 創(chuàng)建一個新的record,把時間戳寫入消息體的最前部
        return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
                System.currentTimeMillis() + "," + record.value().toString());
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

    }

    @Override
    public void close() {

    }
}

(2)統(tǒng)計發(fā)送消息成功和發(fā)送失敗消息數(shù),并在producer關(guān)閉時打印這兩個計數(shù)器

package com.king.kafka.interceptor;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class CounterInterceptor implements ProducerInterceptor{
    private int errorCounter = 0;
    private int successCounter = 0;

    @Override
    public void configure(Map configs) {

    }

    @Override
    public ProducerRecord onSend(ProducerRecord record) {
         return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        // 統(tǒng)計成功和失敗的次數(shù)
        if (exception == null) {
            successCounter++;
        } else {
            errorCounter++;
        }
    }

    @Override
    public void close() {
        // 保存結(jié)果
        System.out.println("Successful sent: " + successCounter);
        System.out.println("Failed sent: " + errorCounter);
    }
}

(3)producer主程序

package com.king.kafka.interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

public class InterceptorProducer {

    public static void main(String[] args) throws Exception {
        // 1 設(shè)置配置信息
        Properties props = new Properties();
        props.put("bootstrap.servers", "bigdata11:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 2 構(gòu)建攔截鏈
        List interceptors = new ArrayList<>();
        interceptors.add("com.king.kafka.interceptor.TimeInterceptor");     interceptors.add("com.king.kafka.interceptor.CounterInterceptor"); 
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

        String topic = "first";
        Producer producer = new KafkaProducer<>(props);

        // 3 發(fā)送消息
        for (int i = 0; i < 10; i++) {

            ProducerRecord record = new ProducerRecord<>(topic, "message" + i);
            producer.send(record);
        }

        // 4 一定要關(guān)閉producer,這樣才會調(diào)用interceptor的close方法
        producer.close();
    }
}

(4)測試

(1)在kafka上啟動消費者,然后運行客戶端java程序。
[root@bigdata11 kafka]$ bin/kafka-console-consumer.sh --zookeeper bigdata11:2181 --from-beginning --topic first
1501904047034,message0
1501904047225,message1
1501904047230,message2
1501904047234,message3
1501904047236,message4
1501904047240,message5
1501904047243,message6
1501904047246,message7
1501904047249,message8
1501904047252,message9
(2)觀察java平臺控制臺輸出數(shù)據(jù)如下:
Successful sent: 10
Failed sent: 0

六、kafka stream

6.1 kafka stream概念以及特點

? Kafka Streams。Apache Kafka開源項目的一個組成部分。是一個功能強大,易于使用的庫。用于在Kafka上構(gòu)建高可分布式、拓展性,容錯的應(yīng)用程序。有如下特點:
1)功能強大
? 高擴展性,彈性,容錯
2)輕量級
? 無需專門的集群
? 一個庫,而不是框架
3)完全集成
? 100%的Kafka 0.10.0版本兼容
? 易于集成到現(xiàn)有的應(yīng)用程序
4)實時性
? 毫秒級延遲
? 并非微批處理
? 窗口允許亂序數(shù)據(jù)
? 允許遲到數(shù)據(jù)

6.2 為什么要有kafka stream

? 當(dāng)前已經(jīng)有非常多的流式處理系統(tǒng),最知名且應(yīng)用最多的開源流式處理系統(tǒng)有Spark Streaming和Apache Storm。Apache Storm發(fā)展多年,應(yīng)用廣泛,提供記錄級別的處理能力,當(dāng)前也支持SQL on Stream。而Spark Streaming基于Apache Spark,可以非常方便與圖計算,SQL處理等集成,功能強大,對于熟悉其它Spark應(yīng)用開發(fā)的用戶而言使用門檻低。另外,目前主流的Hadoop發(fā)行版,如Cloudera和Hortonworks,都集成了Apache Storm和Apache Spark,使得部署更容易。
? 既然Apache Spark與Apache Storm擁用如此多的優(yōu)勢,那為何還需要Kafka Stream呢?主要有如下原因。
? 第一,Spark和Storm都是流式處理框架,而Kafka Stream提供的是一個基于Kafka的流式處理類庫??蚣芤箝_發(fā)者按照特定的方式去開發(fā)邏輯部分,供框架調(diào)用。開發(fā)者很難了解框架的具體運行方式,從而使得調(diào)試成本高,并且使用受限。而Kafka Stream作為流式處理類庫,直接提供具體的類給開發(fā)者調(diào)用,整個應(yīng)用的運行方式主要由開發(fā)者控制,方便使用和調(diào)試。
? 第二,雖然Cloudera與Hortonworks方便了Storm和Spark的部署,但是這些框架的部署仍然相對復(fù)雜。而Kafka Stream作為類庫,可以非常方便的嵌入應(yīng)用程序中,它對應(yīng)用的打包和部署基本沒有任何要求。
? 第三,就流式處理系統(tǒng)而言,基本都支持Kafka作為數(shù)據(jù)源。例如Storm具有專門的kafka-spout,而Spark也提供專門的spark-streaming-kafka模塊。事實上,Kafka基本上是主流的流式處理系統(tǒng)的標(biāo)準(zhǔn)數(shù)據(jù)源。換言之,大部分流式系統(tǒng)中都已部署了Kafka,此時使用Kafka Stream的成本非常低。
? 第四,使用Storm或Spark Streaming時,需要為框架本身的進程預(yù)留資源,如Storm的supervisor和Spark on YARN的node manager。即使對于應(yīng)用實例而言,框架本身也會占用部分資源,如Spark Streaming需要為shuffle和storage預(yù)留內(nèi)存。但是Kafka作為類庫不占用系統(tǒng)資源。
? 第五,由于Kafka本身提供數(shù)據(jù)持久化,因此Kafka Stream提供滾動部署和滾動升級以及重新計算的能力。
? 第六,由于Kafka Consumer Rebalance機制,Kafka Stream可以在線動態(tài)調(diào)整并行度。

6.3 kafka stream數(shù)據(jù)清洗小實例

(1)需求
實時處理單詞帶有”>>>”前綴的內(nèi)容。例如輸入”test>>>ximenqing”,最終處理成“ximenqing”

(2)代碼程序:
業(yè)務(wù)處理類:

package com.king.kafka.stream;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;

//實現(xiàn) Processor 接口,用于實現(xiàn)具體業(yè)務(wù)邏輯
public class LogProcessor implements Processor {

    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

    //這里是具體業(yè)務(wù)邏輯
    @Override
    public void process(byte[] key, byte[] value) {
        String input = new String(value);

        // 如果包含“>>>”則只保留該標(biāo)記后面的內(nèi)容
        if (input.contains(">>>")) {
            input = input.split(">>>")[1].trim();
            // 輸出到下一個topic
            context.forward("logProcessor".getBytes(), input.getBytes());
        }else{
            context.forward("logProcessor".getBytes(), input.getBytes());
        }
    }

    @Override
    public void punctuate(long timestamp) {

    }

    @Override
    public void close() {

    }
}

主類入口:

package com.king.kafka.stream;
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;

public class Application {

    public static void main(String[] args) {

        // 定義輸入的topic
        String from = "first";
        // 定義輸出的topic
        String to = "second";

        // 設(shè)置參數(shù)
        Properties settings = new Properties();
        settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");
        settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "bigdata11:9092");

        StreamsConfig config = new StreamsConfig(settings);

        // 構(gòu)建拓?fù)?        TopologyBuilder builder = new TopologyBuilder();

        //創(chuàng)建一個builder,指定source ,processor ,sink。并給它們起別名。
        //這里的parentName實際上是指定上一層是什么的名字
        builder.addSource("SOURCE", from)
               .addProcessor("PROCESS", new ProcessorSupplier() {

                    @Override
                    public Processor get() {
                        // 具體分析處理
                        return new LogProcessor();
                    }
                }, "SOURCE")
                .addSink("SINK", to, "PROCESS");

        //構(gòu)建處理任務(wù),包括配置以及任務(wù)詳情
        KafkaStreams streams = new KafkaStreams(builder, config);
        streams.start();
    }
}

(3)測試
運行程序,然后在命令行下分別啟動producer和consumer,看情況:

在bigdata13上啟動生產(chǎn)者
[root@bigdata13 kafka]$ bin/kafka-console-producer.sh --broker-list bigdata11:9092 --topic first
>hello>>>world
>h>>>itstar
>hahaha

(6)在bigdata12上啟動消費者
[root@bigdata12 kafka]$ bin/kafka-console-consumer.sh --zookeeper bigdata11:2181 --from-beginning --topic second
world
itstar
hahaha

可以看到消費處理的數(shù)據(jù)是符合預(yù)期的。


本文題目:一、kafka--基本原理、環(huán)境搭建、api使用
轉(zhuǎn)載來于:http://weahome.cn/article/pogosp.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部