真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

如何進(jìn)行PulsarConnector機(jī)制的剖析

本篇文章給大家分享的是有關(guān)如何進(jìn)行Pulsar Connector機(jī)制的剖析,小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

創(chuàng)新互聯(lián)公司主營灌云網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營網(wǎng)站建設(shè)方案,重慶APP開發(fā),灌云h5微信平臺(tái)小程序開發(fā)搭建,灌云網(wǎng)站營銷推廣歡迎灌云等地區(qū)企業(yè)咨詢

Apache Pulsar 是 Yahoo 開源的下一代分布式消息系統(tǒng),在2018年9月從 Apache  軟件基金會(huì)畢業(yè)成為頂級(jí)項(xiàng)目。Pulsar 特有的分層分片的架構(gòu),在保證大數(shù)據(jù)消息流系統(tǒng)的性能和吞吐量的同時(shí),也提供了高可用性、高可擴(kuò)展性和易維護(hù)性。

分片架構(gòu)將消息流數(shù)據(jù)的存儲(chǔ)粒度從分區(qū)拉低到了分片,以及相應(yīng)的層級(jí)化存儲(chǔ),使 Pulsar 成為 unbounded streaming data storage 的不二之選。這使得 Pulsar 可以更完美地匹配和適配 Flink 的批流一體的計(jì)算模式。

1. Pulsar 簡(jiǎn)介


1.1 特點(diǎn)
 
隨著開源后,各行業(yè)企業(yè)可以根據(jù)不同需求,為 Pulsar 賦予更豐富的功能,所以目前它也不再只是中間件的功能,而是慢慢發(fā)展成為一個(gè) Event Streaming Platform(事件流處理平臺(tái)),具有 Connect(連接)、Store(存儲(chǔ))和 Process(處理)功能。
 
■ Connect
 
在連接方面,Pulsar 具有自己?jiǎn)为?dú)的 Pub/Sub 模型,可以同時(shí)滿足 Kafka 和 RocketMQ 的應(yīng)用場(chǎng)景。同時(shí) Pulsar IO 的功能,其實(shí)就是 Connector,可以非常方便地將數(shù)據(jù)源導(dǎo)入到 Pulsar 或從 Pulsar 導(dǎo)出等。

另外,在Pulsar 2.5.0 中,我們新增了一個(gè)重要機(jī)制:Protocol handler。這個(gè)機(jī)制支持在 broker 自定義添加額外的協(xié)議支持,可以保證在不更改原數(shù)據(jù)庫的基礎(chǔ)上,也能享用 Pulsar 的一些高級(jí)功能。所以 Pulsar 也延展出比如:KoP、ActiveMQ、Rest 等。
 
■   Store
 
Pulsar 提供了可以讓用戶導(dǎo)入的途徑后就必然需要考慮在 Pulsar 上進(jìn)行存儲(chǔ)。Pulsar 采用的是分布式存儲(chǔ),最開始是在 Apache BookKeeper 上進(jìn)行。后來添加了更多的層級(jí)存儲(chǔ),通過 JCloud 和 HDFS 等多種模式進(jìn)行存儲(chǔ)的選擇。當(dāng)然,層級(jí)存儲(chǔ)也受限于存儲(chǔ)容量。
 
■ Process
 
Pulsar 提供了一個(gè)無限存儲(chǔ)的抽象,方便第三方平臺(tái)進(jìn)行更好的批流融合的計(jì)算。即 Pulsar 的數(shù)據(jù)處理能力。Pulsar 的數(shù)據(jù)處理能力實(shí)際上是按照你數(shù)據(jù)計(jì)算的難易程度、實(shí)效性等進(jìn)行了切分。
 
目前 Pulsar 包含以下幾類集成融合處理方式:
 
  • Pulsar Function:Pulsar 自帶的函數(shù)處理,通過不同系統(tǒng)端的函數(shù)編寫,即可完成計(jì)算并運(yùn)用到 Pulsar 中。

  • Pulsar-Flink connector 和 Pulsar-Spark connector:作為批流融合計(jì)算引擎,F(xiàn)link 和 Spark 都提供流計(jì)算的機(jī)制。如果你已經(jīng)在使用他們了,那恭喜你。因?yàn)?Pulsar 也全部支持這兩種計(jì)算,無需你再進(jìn)行多余的操作了。

  • Presto (Pulsar SQL):有的朋友會(huì)在應(yīng)用場(chǎng)景中更多的使用 SQL,進(jìn)行交互式查詢等。Pulsar 與 Presto 有很好的集成處理,可以用 SQL 在 Pulsar 進(jìn)行處理。

 

如何進(jìn)行Pulsar Connector機(jī)制的剖析

 
1.2 訂閱模型
 
從使用來看,Pulsar 的用法與傳統(tǒng)的消息系統(tǒng)類似,是基于發(fā)布-訂閱模型的。使用者被分為生產(chǎn)者(Producer)和消費(fèi)者(Consumer)兩個(gè)角色,對(duì)于更具體的需求,還可以以 Reader 的角色來消費(fèi)數(shù)據(jù)。用戶可以以生產(chǎn)者的身份將數(shù)據(jù)發(fā)布在特定的主題之下,也可以以消費(fèi)者的身份訂閱(Subscription)特定的主題,從而獲取數(shù)據(jù)。在這個(gè)過程中,Pulsar 實(shí)現(xiàn)了數(shù)據(jù)的持久化與數(shù)據(jù)分發(fā),Pulsar 還提供了Schema 功能,能夠?qū)?shù)據(jù)進(jìn)行驗(yàn)證。

如下圖所示,Pulsar 里面有幾種訂閱模式:

  1. 獨(dú)占訂閱(Exclusive) 

  2. 故障轉(zhuǎn)移訂閱(Failover) 

  3. 共享訂閱(Shared) 

  4. Key保序共享訂閱(Key_shared)

   

如何進(jìn)行Pulsar Connector機(jī)制的剖析

如何進(jìn)行Pulsar Connector機(jī)制的剖析

    
Pulsar 里的主題分成兩類,一類是分區(qū)主題(Partitioned Topic),一類是非分區(qū)主題(Not Partitioned Topic)。

分區(qū)主題實(shí)際上是由多個(gè)非分區(qū)主題組成的。主題和分區(qū)都是邏輯上的概念,我們可以把主題看作是一個(gè)大的無限的事件流,被分區(qū)切分成幾條小的無限事件流。

而對(duì)應(yīng)的,在物理上,Pulsar 采用分層結(jié)構(gòu)。每一條事件流存儲(chǔ)在一個(gè) Segment 中,每個(gè)Segment 包括了許多個(gè)Entry,Entry 里面存放的才是用戶發(fā)送過來的一條或多條消息實(shí)體。
 
Message 是 Entry 中存放的數(shù)據(jù),也是 Pulsar 中消費(fèi)者消費(fèi)一次獲得的數(shù)據(jù)。Message 中除了包括字節(jié)流數(shù)據(jù),還有 Key 屬性,兩種時(shí)間屬性和 MessageId 以及其他信息。MessageId 是消息的唯一標(biāo)識(shí),包括了ledger-id、entry-id、 batch-index、 partition-index 的信息,如下圖,分別記錄了消息在Pulsar 中的Segment、Entry、Message、Partition 存儲(chǔ)位置, 因此也可以據(jù)此從物理上找到Message的信息內(nèi)容。

如何進(jìn)行Pulsar Connector機(jī)制的剖析

   

2. Pulsar 架構(gòu)

   
 
一個(gè) Pulsar 集群由 Brokers 集群和 Bookies 集群組成。Brokers 之間是相互獨(dú)立的,負(fù)責(zé)向生產(chǎn)者和消費(fèi)者提供關(guān)于某個(gè)主題的服務(wù)。Bookies 之間也是相互獨(dú)立的,負(fù)責(zé)存儲(chǔ) Segment 的數(shù)據(jù),是消息持久化的地方。為了管理配置信息和代理信息,Pulsar 還借助了 Zookeeper 這個(gè)組件,Brokers 和 Bookies 都會(huì)在 zookeeper 上注冊(cè),下面從消息的具體讀寫路徑(見下圖)來介紹 Pulsar 的結(jié)構(gòu)。
 

如何進(jìn)行Pulsar Connector機(jī)制的剖析

如何進(jìn)行Pulsar Connector機(jī)制的剖析

 
在寫路徑中,生產(chǎn)者創(chuàng)建并發(fā)送一條消息到主題中,該消息可能會(huì)以某種算法(比如Round robin)被路由到一個(gè)具體的分區(qū)上,Pulsar 會(huì)選擇一個(gè)Broker 為這個(gè)分區(qū)服務(wù),該分區(qū)的消息實(shí)際會(huì)被發(fā)送到這個(gè) Broker上。當(dāng)Broker 拿到一條消息,它會(huì)以 Write Quorum (Qw)的方式將消息寫入到 Bookies 中。當(dāng)成功寫入到 Bookies 的數(shù)量達(dá)到設(shè)定時(shí),Broker 會(huì)收到完成通知,并且 Broker 也會(huì)返回通知生產(chǎn)者寫入成功。

在讀路徑中,消費(fèi)者首先要發(fā)起一次訂閱,之后才能與主題對(duì)應(yīng)的 Broker 進(jìn)行連接,Broker 從 Bookies 請(qǐng)求數(shù)據(jù)并發(fā)送給消費(fèi)者。當(dāng)數(shù)據(jù)接受成功,消費(fèi)者可以選擇向 Broker 發(fā)送確認(rèn)信息,使得 Broker 能夠更新消費(fèi)者的訪問位置信息。前面也提到,對(duì)于剛寫入的數(shù)據(jù),Pulsar 會(huì)存儲(chǔ)在緩存中,那么就可以直接從 Brokers 的緩存中讀取了,縮短了讀取路徑。
 
Pulsar 將存儲(chǔ)與服務(wù)相分離,實(shí)現(xiàn)了很好的可拓展性,在平臺(tái)層面,能夠通過調(diào)整Bookies 的數(shù)量來滿足不同的需求。在用戶層面,只需要跟 Brokers 通信,而Brokers 本身被設(shè)計(jì)成沒有狀態(tài)的,當(dāng)某個(gè) Broker 因故障無法使用時(shí),可以動(dòng)態(tài)的生成一個(gè)新的 Broker 來替換。
 

3. Pulsar Connector 內(nèi)部機(jī)制


 
首先,Pulsar Connector 在使用上是比較簡(jiǎn)單的,由一個(gè) Source 和一個(gè) Sink 組成,source 的功能就是將一個(gè)或多個(gè)主題下的消息傳入到 Flink 的Source中,Sink的功能就是從 Flink 的 Sink 中獲取數(shù)據(jù)并放入到某些主題下,在使用方式上,如下所示,與 Kafa Connector 很相似,使用時(shí)需要設(shè)置一些參數(shù)。
 

StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); Properties props = new Properties(); props.setProperty("topic", "test-source-topic") FlinkPulsarSource source = new FlinkPulsarSource<>(              serviceUrl,              adminUrl,              new SimpleStringSchema(),              props); DataStream stream = see.addSource(source);
FlinkPulsarSink sink =      new FlinkPulsarSink(              serviceUrl,              adminUrl,              Optional.of(topic), // mandatory target topic              props,              TopicKeyExtractor.NULL, // replace this to extract key or topic for each record              Person.class); stream.addSink(sink);

 
現(xiàn)在介紹 Kulsar Connector 一些特性的實(shí)現(xiàn)機(jī)制。
 
3.1 精確一次
 
因?yàn)?Pulsar 中的 MessageId 是全局唯一且有序的,與消息在 Pulsar 中的物理存儲(chǔ)也對(duì)應(yīng),因此為了實(shí)現(xiàn) Exactly Once,Pulsar Connector 借助 Flink 的 Checkpoint 機(jī)制,將 MessageId 存儲(chǔ)到 Checkpoint。

對(duì)于連接器的 Source 任務(wù),在每次觸發(fā) Checkpoint 的時(shí)候,會(huì)將各個(gè)分區(qū)當(dāng)前處理的 MessageId 保存到狀態(tài)存儲(chǔ)里面,這樣在任務(wù)重啟的時(shí)候,每個(gè)分區(qū)都可以通過 Pulsar 提供的 Reader seek 接口找到 MessageId 對(duì)應(yīng)的消息位置,然后從這個(gè)位置之后讀取消息數(shù)據(jù)。

通過 Checkpoint 機(jī)制,還能夠向存儲(chǔ)數(shù)據(jù)的節(jié)點(diǎn)發(fā)送數(shù)據(jù)使用完畢的通知,從而能準(zhǔn)確刪除過期的數(shù)據(jù),做到存儲(chǔ)的合理利用。
 
3.2 動(dòng)態(tài)發(fā)現(xiàn)
 
考慮到Flink中的任務(wù)都是長(zhǎng)時(shí)間運(yùn)行的,在運(yùn)行任務(wù)的過程中,用戶也許會(huì)需要?jiǎng)討B(tài)的增加部分主題或者分區(qū),Pulsar Connector 提供了自動(dòng)發(fā)現(xiàn)的解決方案。

Pulsar 的策略是另外啟動(dòng)一個(gè)線程,定期的去查詢?cè)O(shè)定的主題是否改變,分區(qū)有沒有增刪,如果發(fā)生了新增分區(qū)的情況,那么就額外創(chuàng)建新的Reader 任務(wù)去完成主題下的數(shù)據(jù)的反序列化,當(dāng)然如果是刪除分區(qū),也會(huì)相應(yīng)的減少讀取任務(wù)。
 
3.3 結(jié)構(gòu)化數(shù)據(jù)
 
在讀取主題下的數(shù)據(jù)的過程中,我們可以將數(shù)據(jù)轉(zhuǎn)化成一條條結(jié)構(gòu)化的記錄來處理。Pulsar 支持 Avro schema and avro/json/protobuf Message 格式類型的數(shù)據(jù)轉(zhuǎn)化成 Flink 中的 Row格式數(shù)據(jù)。對(duì)于用戶關(guān)心的元數(shù)據(jù),Pulsar 也在 Row 中提供了對(duì)應(yīng)的元數(shù)據(jù)域。

另外,Pulsar 基于 Flink 1.9 版本進(jìn)行了新的開發(fā),支持 Table API 和 Catalog,Pulsar 做了一個(gè)簡(jiǎn)單的映射,如下圖所示,將 Pulsar 的租戶/命名空間對(duì)應(yīng)到 Catalog 的數(shù)據(jù)庫,將主題對(duì)應(yīng)為庫中的具體表。
 

如何進(jìn)行Pulsar Connector機(jī)制的剖析

 

 之前提到 Pulsar 將數(shù)據(jù)存儲(chǔ)在 Bookeeper 中,還可以導(dǎo)入到 Hdfs 或者 S3 這樣的文件系統(tǒng)中,但對(duì)于分析型應(yīng)用來說,我們往往只關(guān)心所有數(shù)據(jù)中每條數(shù)據(jù)的部分屬性,因此采用列存儲(chǔ)的方式對(duì) IO 和網(wǎng)絡(luò)都會(huì)有性能提升,Pulsar 也在嘗試在Segment 中以列的方式存儲(chǔ)。
在原來的讀路徑中,不管是 Reader 還是Comsumer,都需要通過 Brokers 來傳遞數(shù)據(jù)。如果采用新的 Bypass Broker方式,通過查詢?cè)獢?shù)據(jù),就能直接找到每條 Message 存儲(chǔ)的 Bookie 位置,這樣可以直接從 Bookie 讀取數(shù)據(jù),縮短讀取路徑,從而提升效率。
Pulsar 相對(duì) Kafka 來說,由于數(shù)據(jù)在物理上是存放在一個(gè)個(gè) Segment 中的,那么在讀取的過程中,通過提高并行化的方式,建立多線程同時(shí)讀取多個(gè) Segment,就能夠提升整個(gè)作業(yè)的完成效率,不過這也需要你的任務(wù)自身對(duì)每個(gè)Topic 分區(qū)的訪問順序沒有嚴(yán)格要求,并且對(duì)于新產(chǎn)生的數(shù)據(jù),是不保存在 Segement 的,還是需要做緩存的訪問來獲取數(shù)據(jù),因此,并行讀取將成為一個(gè)可選項(xiàng),為用戶提供更多的選擇方案。

以上就是如何進(jìn)行Pulsar Connector機(jī)制的剖析,小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見到或用到的。希望你能通過這篇文章學(xué)到更多知識(shí)。更多詳情敬請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。


當(dāng)前題目:如何進(jìn)行PulsarConnector機(jī)制的剖析
鏈接地址:http://weahome.cn/article/gojsop.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部