這篇文章將為大家詳細講解有關Pulsar IO,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
在濱海新區(qū)等地區(qū),都構建了全面的區(qū)域性戰(zhàn)略布局,加強發(fā)展的系統(tǒng)性、市場前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務理念,為客戶提供成都網(wǎng)站制作、成都網(wǎng)站建設 網(wǎng)站設計制作按需制作網(wǎng)站,公司網(wǎng)站建設,企業(yè)網(wǎng)站建設,成都品牌網(wǎng)站建設,營銷型網(wǎng)站,成都外貿(mào)網(wǎng)站建設公司,濱海新區(qū)網(wǎng)站建設費用合理。
Apache Pulsar 是業(yè)界領先的消息系統(tǒng)。使用消息系統(tǒng)時,一個較為常見的問題就是:將數(shù)據(jù)移入或移出消息平臺的最佳方法是什么?
當然,用戶可以使用 Pulsar 的 consumer 和 producer API 編寫自定義代碼,來傳輸數(shù)據(jù)。但除此之外,是否還有其他方法呢?
以下為用戶提出的一些相關問題:
1. 要將數(shù)據(jù)發(fā)布到 Pulsar 或使用 Pulsar 中的數(shù)據(jù),我應該在哪里運行相應程序?
2. 要將數(shù)據(jù)發(fā)布到 Pulsar 或使用 Pulsar 中的數(shù)據(jù),我應該怎樣運行相應程序?
用戶之所以會提出這些問題,是因為其他消息/發(fā)布-訂閱系統(tǒng)沒有提供有組織且容錯的方式來幫助用戶從外部系統(tǒng)輸入數(shù)據(jù)或?qū)?shù)據(jù)輸出到外部系統(tǒng),因而用戶需要尋求自定義解決方案并手動運行。
為了解決上述問題并簡化這一過程,我們推出了 Pulsar IO。
Pulsar IO 通過利用現(xiàn)有的 Pulsar Functions 框架來輸入/輸出數(shù)據(jù)。而 Pulsar Functions 框架的所有優(yōu)勢(如:容錯性、并行性、彈性、負載平衡、按需更新等)都可以直接被 Pulsar 輸入/輸出數(shù)據(jù)的應用程序所利用。
而且,我們發(fā)現(xiàn)經(jīng)常會出現(xiàn)這樣的情況,用戶花很大功夫(因為他們不是消息系統(tǒng)方面的專家,可能也不想成為這一領域的專家)去編寫自定義程序,用于從消息傳遞系統(tǒng)訪問數(shù)據(jù)。
自定義編寫這些應用程序不僅會很困難,而且我們發(fā)現(xiàn),許多用戶在嘗試實現(xiàn)執(zhí)行相同功能的應用程序時,做了相同的工作。歸根結(jié)底,消息系統(tǒng)只是用于移動數(shù)據(jù)的工具,因此,在設計 Pulsar IO 框架時,我們的主要目標之一就是易用性。
我們希望用戶能夠在不編寫任何代碼,也不用同時成為 Pulsar 和外部系統(tǒng)專家的情況下,可以從外部系統(tǒng)輸入數(shù)據(jù)或?qū)?shù)據(jù)輸出到外部系統(tǒng)。
首先,我們定義兩個應用程序,一個作為 source 將數(shù)據(jù)輸入到 Pulsar ,另一個作為 sink 從 Pulsar 接收數(shù)據(jù)。
Source 將數(shù)據(jù)從外部系統(tǒng)導入 Pulsar,而 sink 將數(shù)據(jù)從 Pulsar 導出到外部系統(tǒng)。具體來看,source 從外部系統(tǒng)讀取數(shù)據(jù),并將數(shù)據(jù)寫入 Pulsar topic,而 sink 從一個或多個 Pulsar topic 讀取數(shù)據(jù),并將數(shù)據(jù)寫入外部系統(tǒng)。
Pulsar IO 框架在現(xiàn)有的 Pulsar functions 框架上運行。單個 source 和 sink 可以像 function 一樣與 Pulsar broker 一起運行,如下圖所示。
因此,Pulsar Functions 框架的所有優(yōu)勢都適用于 Pulsar IO 框架,即 sink 和 source 應用程序。
正如前面提到的,我們的設計目標包括用戶無需編寫任何自定義應用程序,也無需編寫任何代碼就可以將數(shù)據(jù)移入或移出 Pulsar。
因此,Pulsar IO 框架中有多種內(nèi)置 source 和 sink(Kafka、Twitter Firehose、Cassandra、Aerospike 等,還會支持更多),用戶只需使用一個命令便可運行。用戶因此可以關注于業(yè)務邏輯,而無需擔心實現(xiàn)細節(jié)。
使用 Pulsar IO 框架很容易。用戶可以在命令行界面使用一行簡單的命令啟動內(nèi)置 source 或 sink。例如,用戶可以用下面的命令來提交 source 到已有的 Pulsar 集群,命令格式如下:
$ ./bin/pulsar-admin source create \ --tenant\ --namespace \ --name \ --destinationTopicName \ --source-type
以下示例為運行 twitter firehose source 的命令,用于將 Twitter 中的數(shù)據(jù)導入 Pulsar:
$ ./bin/pulsar-admin source create \--tenant test \ --namespace ns1 \ --name twitter-source \ --destinationTopicName twitter_data \ --sourceConfigFile examples/twitter.yml \ --source-type twitter
經(jīng)過以上步驟,用戶即可向 Pulsar 輸入數(shù)據(jù),而無需編寫或編譯任何代碼。唯一可能需要的是一個配置文件,用于為該 source 或 sink 指定某些配置。用戶可以通過以下格式的命令向現(xiàn)有的 Pulsar 集群中提交待運行的內(nèi)置 sink:
$ ./bin/pulsar-admin sink create \ --tenant\ --namespace \ --name \ --inputs \ --sink-type
以下為運行 Cassandra sink 的示例命令,用于將數(shù)據(jù)從 Pulsar 導出到 Cassandra:
$ ./bin/pulsar-admin sink create \ --tenant public \ --namespace default \ --name cassandra-test-sink \ --sink-type cassandra \ --sinkConfigFile examples/cassandra-sink.yml \ --inputs test_cassandra
更多關于如何運行 Cassandra source 的信息,參閱快速入門指南:
https://pulsar.apache.org/docs/en/2.1.1-incubating/io-quickstart/
以上命令顯示了如何在“集群”模式下(即作為現(xiàn)有 Pulsar 集群的一部分)運行 source 和 sink。除此之外,還可以在“本地運行”模式下將 source 和 sink 作為獨立進程運行,這一模式會在機器上生成本地進程并且運行 source 或者 sink 的邏輯。
本地運行模式有助于測試和調(diào)試,但是,需要用戶自行監(jiān)控和監(jiān)督。以下為在本地運行模式下運行 source 的命令示例:
$ ./bin/pulsar-admin sink localrun \ --tenant public \ --namespace default \ --name cassandra-test-sink \ --sink-type cassandra \ --sinkConfigFile examples/cassandra-sink.yml \ --inputs test_cassandra
由于 Pulsar IO 框架在 Pulsar Functions 上運行,因此可以通過更新參數(shù)和配置來動態(tài)更新 source 或 sink。例如,當希望利用前面提到的 Twitter firehose source 將數(shù)據(jù)輸入到另一個 Pulsar topic 時,可以執(zhí)行以下命令:
$ ./bin/pulsar-admin source update \--tenant test \ --namespace ns1 \ --name twitter-source \ --destinationTopicName twitter_data_2 \ --sourceConfigFile examples/twitter.yml \ --source-type twitter
?
也可以使用同樣格式的命令更新 sink。大多數(shù) source 和 sink 的更新都可以在運行時進行配置,從而簡化修改、測試、部署等流程。
如果要自定義實現(xiàn)一個小眾的用例,則可以通過實現(xiàn)一個簡單的界面來創(chuàng)建 source 或 sink。但是,Pulsar IO 的目的是幫助用戶直接使用現(xiàn)有的內(nèi)置 source 或 sink,而不必自己手動實現(xiàn) source 或 sink。
???? 實現(xiàn)自定義 source
要創(chuàng)建自定義 source,用戶需要編寫一個實現(xiàn) source 接口的 Java 類:
public interface Sourceextends AutoCloseable {/** * Open source with configuration * * @param config initialization config * @throws Exception IO type exceptions when opening a connector */ void open(final Map config) throws Exception; /** * Reads the next message from source. * If source does not have any new messages, this call should block. * @return next message from source. The return result should never be null * @throws Exception */ Record read() throws Exception;}
這是一個 source 實現(xiàn)的簡單示例:
public class TestSource implements Source{ private int i = 0; @Override public void open(Map config) throws Exception { } @Override public Record read() throws Exception { return () -> i++; } @Override public void close() throws Exception { }}
在上面的 source 示例中,單調(diào)遞增的整數(shù)被傳入到 Pulsar。
實現(xiàn) “Record” 接口的對象需要通過 “read” 方法返回,因為 “Record” 接口包含可用于實現(xiàn)不同消息傳遞語義或保證的字段,例如 exactly-once/effectively-once。在后續(xù)文章中,我將詳細討論如何執(zhí)行此操作。
???? 實現(xiàn)自定義 sink
要創(chuàng)建自定義 sink,用戶需要編寫一個實現(xiàn) sink 接口的 Java 類:
public interface Sinkextends AutoCloseable{ /** * Open Sink with configuration * * @param config initialization config * @throws Exception IO type exceptions when opening a connector */ void open(final Map config) throws Exception; /** * Write a message to Sink * @param inputRecordContext Context of value * @param value value to write to sink * @throws Exception */ void write(RecordContext inputRecordContext, T value) throws Exception;}
例如,一個簡單的 sink 實現(xiàn):
public class TestSink implements Sink{private static final String FILENAME = "/tmp/test-out";private BufferedWriter bw = null;private FileWriter fw = null;@Overridepublic void open(Map config) throws Exception { File file = new File(FILENAME);// if file doesnt exists, then create itif (!file.exists()) { file.createNewFile(); } fw = new FileWriter(file.getAbsoluteFile(), true); bw = new BufferedWriter(fw); }@Overridepublic void write(RecordContext inputRecordContext, String value) throws Exception {try { bw.write(value); bw.flush(); } catch (IOException e) {throw new RuntimeException(e); } }@Overridepublic void close() throws Exception {try {if (bw != null) bw.close();if (fw != null) fw.close(); } catch (IOException ex) { ex.printStackTrace(); } }}
以上示例說明 sink 如何從 Pulsar 讀取數(shù)據(jù)并寫入文件。與 source 接口類似,sink 接口中的 “write” 方法有一個 RecordContext 參數(shù)。此參數(shù)為 sink 提供需要寫入外部系統(tǒng)的值的 context。
RecordContext 參數(shù)可用于實現(xiàn)能夠提供不同級別的消息傳遞語義或保證(如:Exactly-once/Effective-once)的 sink。在后續(xù)文章中,我們將對此進行更深入的討論。
用戶可以通過類似于運行內(nèi)置 source 和 sink 的方式來提交自定義 source 和 sink:
$ ./bin/pulsar-admin source create \ --className\ --jar \ --tenant \ --namespace \ --name \ --destinationTopicName
命令示例如下:
$ ./bin/pulsar-admin source create \ --className org.apache.pulsar.io.twitter.TwitterFireHose \ --jar \~/application.jar \ --tenant test \ --namespace ns1 \ --name twitter-source \ --destinationTopicName twitter_data
在現(xiàn)有 Pulsar 集群中提交待運行的自定義 sink 的命令格式如下:
$ ./bin/pulsar-admin sink create \--className\--jar \--tenant test \--namespace \--name \--inputs
命令示例:
$ ./bin/pulsar-admin sink create \--className org.apache.pulsar.io.cassandra \--jar \~/application.jar \--tenant test \--namespace ns1 \--name cassandra-sink \--inputs test_topic```
如上所述,Pulsar IO 框架在現(xiàn)有的 Pulsar Functions 框架上運行。Pulsar IO 充分利用了現(xiàn)有的 Pulsar Functions 框架。作為 Pulsar IO 的組成部分,source 和 sink 擁有 Pulsar Functions 的所有優(yōu)勢:
關于Pulsar IO就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。