本篇文章給大家分享的是有關(guān)如何使用 Apache查詢Pulsar流,小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說(shuō),跟著小編一起來(lái)看看吧。
創(chuàng)新互聯(lián)公司是創(chuàng)新、創(chuàng)意、研發(fā)型一體的綜合型網(wǎng)站建設(shè)公司,自成立以來(lái)公司不斷探索創(chuàng)新,始終堅(jiān)持為客戶提供滿意周到的服務(wù),在本地打下了良好的口碑,在過(guò)去的十載時(shí)間我們累計(jì)服務(wù)了上千家以及全國(guó)政企客戶,如橡塑保溫等企業(yè)單位,完善的項(xiàng)目管理流程,嚴(yán)格把控項(xiàng)目進(jìn)度與質(zhì)量監(jiān)控加上過(guò)硬的技術(shù)實(shí)力獲得客戶的一致贊揚(yáng)。
這里將介紹 Apache Pulsar 和 Apache Flink 的集成和最新研發(fā)進(jìn)展,并詳細(xì)說(shuō)明如何利用 Pulsar 內(nèi)置 schema,使用 Apache Flink 實(shí)時(shí)查詢 Pulsar 流。
Apache Pulsar 簡(jiǎn)介
Apache Pulsar 是一個(gè)靈活的發(fā)布/訂閱消息系統(tǒng),支持持久日志存儲(chǔ)。 Pulsar 的架構(gòu)優(yōu)勢(shì)包括多租戶、統(tǒng)一消息模型、結(jié)構(gòu)化事件流、云原生架構(gòu)等,這些優(yōu)勢(shì)讓 Pulsar 能夠完美適用于多種用戶場(chǎng)景,從計(jì)費(fèi)、支付、交易服務(wù)到融合組織中不同的消息架構(gòu)。現(xiàn)有 Pulsar & Flink 集成
(Apache Flink 1.6+)在現(xiàn)有的 Pulsar 和 Flink 集成中,Pulsar 作為 Flink 應(yīng)用程序中的消息隊(duì)列來(lái)使用。Flink 開發(fā)人員可以選擇特定 Pulsar source,并連接到所需的 Puslar 集群和 topic,將 Pulsar 用作 Flink 的流 source 和流 sink:
// create and configure Pulsar consumerPulsarSourceBuilderbuilder = PulsarSourceBuilder .builder(new SimpleStringSchema()) .serviceUrl(serviceUrl) .topic(inputTopic) .subsciptionName(subscription);SourceFunction src = builder.build();// ingest DataStream with Pulsar consumerDataStream words = env.addSource(src);
然后,Pulsar 流可以連接到 Flink 的處理邏輯。
// perform computation on DataStream (here a simple WordCount)
DataStream
wc = words .flatmap((FlatMapFunction
) (word, collector) -> { collector.collect(new WordWithCount(word, 1));
})
.returns(WordWithCount.class)
.keyBy("word")
.timeWindow(Time.seconds(5))
.reduce((ReduceFunction
) (c1, c2) -> new WordWithCount(c1.word, c1.count + c2.count));
然后通過(guò) sink 將數(shù)據(jù)寫出到 Pulsar。
// emit result via Pulsar producer wc.addSink(new FlinkPulsarProducer<>( serviceUrl, outputTopic, new AuthentificationDisabled(), wordWithCount -> wordWithCount.toString().getBytes(UTF_8), wordWithCount -> wordWithCount.word));
對(duì)于集成而言,這是重要的第一步,但現(xiàn)有設(shè)計(jì)還不足以充分利用 Pulsar 的全部功能。
Pulsar 與 Flink 1.6.0 的集成中有一些不足,包括:既沒有作為持久存儲(chǔ)來(lái)使用,也沒有與 Flink 進(jìn)行 schema 集成,導(dǎo)致在為應(yīng)用程序 schema 注冊(cè)添加描述時(shí),需要手動(dòng)輸入。
Pulsar 與 Flink 1.9 的集成
將 Pulsar 用作 Flink catalog
Flink 1.9.0 與 Pulsar 的最新集成解決了前面提到的問(wèn)題。阿里巴巴 Blink 對(duì) Flink 倉(cāng)庫(kù)的貢獻(xiàn)不僅強(qiáng)化了處理架構(gòu),還增加了新功能,使得 Flink 與 Pulsar 的集成更強(qiáng)大有效。
Flink 1.9.0:
https://flink.apache.org/downloads.html#apache-flink-191
在新 connector 的實(shí)現(xiàn)中引入了 Pulsar schema 集成,增加了對(duì) Table API 的支持,同時(shí)提供了 exactly-once 語(yǔ)義的 Pulsar 讀與 at-least-once 語(yǔ)義的 Pulsar 寫。
并且,通過(guò) schema 集成,Pulsar 可以注冊(cè)為 Flink catalog,只需幾個(gè)命令就可以在 Pulsar 流上運(yùn)行 Flink 查詢。下面我們將詳細(xì)介紹新的集成,并舉例說(shuō)明如何使用 Flink SQL 查詢 Pulsar 流。
利用 Flink <> Pulsar Schema 集成
在展開集成細(xì)節(jié)與具體的使用方法之前,我們先來(lái)看一下 Pulsar schema 是怎么工作的。
Apache Pulsar 內(nèi)置對(duì) Schema 的支持,無(wú)須額外管理 schema。Pulsar 的數(shù)據(jù) schema 與每個(gè) topic 相關(guān)聯(lián),因此,producer 和 consumer 都可以使用預(yù)定義 schema 信息發(fā)送數(shù)據(jù),而 broker 可以驗(yàn)證 schema ,并在兼容性檢查中管理 schema 多版本化和 schema 演化。
下面分別是 Pulsar schema 用于 producer 和 consumer 的示例。在 producer 端,可以指定使用 schema,并且 Pulsar 無(wú)需執(zhí)行序列化/反序列化,就可以發(fā)送一個(gè) POJO 類。
類似地,在 consumer 端,也可以指定數(shù)據(jù) schema,并且在接收到數(shù)據(jù)后,Pulsar 會(huì)立即自動(dòng)驗(yàn)證 schema 信息,獲取給定版本的 schema,然后將數(shù)據(jù)反序列化到 POJO 結(jié)構(gòu)。Pulsar 在 topic 的元數(shù)據(jù)中存儲(chǔ) schema 信息。
// Create producer with Struct schema and send messagesProducerproducer = client.newProducer(Schema.AVRO(User.class)).create();producer.newMessage() .value(User.builder() .userName(“pulsar-user”) .userId(1L) .build()) .send();// Create consumer with Struct schema and receive messagesConsumer consumer = client.newCOnsumer(Schema.AVRO(User.class)).create();consumer.receive();
假設(shè)一個(gè)應(yīng)用程序?qū)?producer 和/或 consumer 指定 schema。在接收到 schema 信息時(shí),連接到 broker 的 producer(或 consumer)傳輸此類信息,以便 broker 在返回或拒絕該 schema 前注冊(cè) schema、驗(yàn)證 schema,并檢查 schema 兼容性,如下圖所示:
Pulsar 不僅可以處理并存儲(chǔ) schema 信息,還可以在必要時(shí)處理 schema 演化(schema evolution)。Pulsar 能夠有效管理 broker 中的 schema 演化,在必要的兼容性檢查中,追蹤 schema 的所有版本。
另外,當(dāng)消息發(fā)布在 producer 端時(shí),Pulsar 會(huì)在消息的元數(shù)據(jù)中標(biāo)記 schema 版本;當(dāng) consumer 接收到消息,并完成反序列化元數(shù)據(jù)時(shí),Pulsar 將會(huì)檢查與此消息相關(guān)聯(lián)的 schema 版本,并從 broker 中獲取 schema 信息。
因此,當(dāng) Pulsar 與 Flink 應(yīng)用集成時(shí),Pulsar 使用預(yù)先存在的 schema 信息,并將帶有 schema 信息的單個(gè)消息映射到 Flink 類型系統(tǒng)的不同行中。
當(dāng) Flink 用戶不直接與 schema 交互或不使用原始 schema(primitive schema)時(shí)(例如,用 topic 來(lái)存儲(chǔ)字符串或長(zhǎng)數(shù)值),Pulsar 會(huì)轉(zhuǎn)換消息到 Flink 行,即“值”;或者在結(jié)構(gòu)化的 schema 類型(例如,JSON 和 AVRO)中,Pulsar 從 schema 信息中提取單個(gè)字段信息,并將字段映射到 Flink 的類型系統(tǒng)。
最后,所有與消息相關(guān)的元數(shù)據(jù)信息(例如,消息密鑰、topic、發(fā)布時(shí)間、事件時(shí)間等)都會(huì)轉(zhuǎn)換到 Flink 行中的元數(shù)據(jù)字段。以下是使用原始 schema 和結(jié)構(gòu)化 schema 的兩個(gè)示例,解釋了如何將數(shù)據(jù)從 Pulsar topic 轉(zhuǎn)換到 Flink 類型系統(tǒng)。
原始 schema(Primitive Schema):
root|-- value: DOUBLE|-- __key: BYTES|-- __topic: STRING|-- __messageId: BYTES|-- __publishTime: TIMESTAMP(3)|-- __eventTime: TIMESTAMP(3)
結(jié)構(gòu)化 schema(Avor Schema):
@Data@AllArgsConstructor@NoArgsConstructorpublic static class Foo { public int i; public float f; public Bar bar;}@Data@AllArgsConstructor@NoArgsConstructorpublic static class Bar { public boolean b; public String s;}Schema s = Schema.AVRO(Foo.getClass());
root |-- i: INT |-- f: FLOAT |-- bar: ROW<`b` BOOLEAN, `s` STRING> |-- __key: BYTES |-- __topic: STRING |-- __messageId: BYTES |-- __publishTime: TIMESTAMP(3) |-- __eventTime: TIMESTAMP(3)
當(dāng)所有 schema 信息都映射到 Flink 類型系統(tǒng)時(shí),就可以在 Flink 中根據(jù)指定 schema 信息構(gòu)建 Pulsar source、sink 或 catalog,如下所示:
Flink & Pulsar: 從 Pulsar 讀取數(shù)據(jù)
1. 創(chuàng)建用于流查詢的 Pulsar source
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("service.url", "pulsar://...")
props.setProperty("admin.url", "http://...")
props.setProperty("partitionDiscoveryIntervalMillis", "5000")
props.setProperty("startingOffsets", "earliest")
props.setProperty("topic", "test-source-topic")
val source = new FlinkPulsarSource(props)
// you don't need to provide a type information to addSource since FlinkPulsarSource is ResultTypeQueryable
val dataStream = env.addSource(source)(null)
// chain operations on dataStream of Row and sink the output
// end method chaining
env.execute()
2. 將 Pusar 中的 topic 注冊(cè)為 streaming tables
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val prop = new Properties()
prop.setProperty("service.url", serviceUrl)
prop.setProperty("admin.url", adminUrl)
prop.setProperty("flushOnCheckpoint", "true")
prop.setProperty("failOnWrite", "true")
props.setProperty("topic", "test-sink-topic")
tEnv
.connect(new Pulsar().properties(props))
.inAppendMode()
.registerTableSource("sink-table")
val sql = "INSERT INTO sink-table ....."
tEnv.sqlUpdate(sql)
env.execute()
Flink & Pulsar:向 Pulsar 寫入數(shù)據(jù)
1. 創(chuàng)建用于流查詢的 Pulsar sink
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = .....
val prop = new Properties()
prop.setProperty("service.url", serviceUrl)
prop.setProperty("admin.url", adminUrl)
prop.setProperty("flushOnCheckpoint", "true")
prop.setProperty("failOnWrite", "true")
props.setProperty("topic", "test-sink-topic")
stream.addSink(new FlinkPulsarSink(prop, DummyTopicKeyExtractor))
env.execute()
2. 向 Pulsar 寫入 streaming table
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val prop = new Properties()
prop.setProperty("service.url", serviceUrl)
prop.setProperty("admin.url", adminUrl)
prop.setProperty("flushOnCheckpoint", "true")
prop.setProperty("failOnWrite", "true")
props.setProperty("topic", "test-sink-topic")
tEnv
.connect(new Pulsar().properties(props))
.inAppendMode()
.registerTableSource("sink-table")
val sql = "INSERT INTO sink-table ....."
tEnv.sqlUpdate(sql)
env.execute()
在以上示例中,F(xiàn)link 開發(fā)人員都無(wú)需擔(dān)心 schema 注冊(cè)、序列化/反序列化,并將 Pulsar 集群注冊(cè)為 Flink 中的 source、sink 或 streaming table。
當(dāng)這三個(gè)要素同時(shí)存在時(shí),Pulsar 會(huì)被注冊(cè)為 Flink 中的 catalog,這可以極大簡(jiǎn)化數(shù)據(jù)處理與查詢,例如,編寫程序從 Pulsar 查詢數(shù)據(jù),使用 Table API 和 SQL 查詢 Pulsar 數(shù)據(jù)流等。
以上就是如何使用 Apache查詢Pulsar流,小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見到或用到的。希望你能通過(guò)這篇文章學(xué)到更多知識(shí)。更多詳情敬請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。