這篇文章的內(nèi)容主要圍繞怎么使用Apache Pulsar Functions進(jìn)行簡單事件處理進(jìn)行講述,文章內(nèi)容清晰易懂,條理清晰,非常適合新手學(xué)習(xí),值得大家去閱讀。感興趣的朋友可以跟隨小編一起閱讀吧。希望大家通過這篇文章有所收獲!
專注于為中小企業(yè)提供網(wǎng)站建設(shè)、成都網(wǎng)站制作服務(wù),電腦端+手機(jī)端+微信端的三站合一,更高效的管理,為中小企業(yè)啟東免費(fèi)做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動了上千多家企業(yè)的穩(wěn)健成長,幫助中小企業(yè)通過網(wǎng)站建設(shè)實現(xiàn)規(guī)模擴(kuò)充和轉(zhuǎn)變。
事件驅(qū)動架構(gòu)(Event-driven Architecture,EDA)的關(guān)鍵特征是事件的核心重要性。在 EDA 中,事件 consumer 遵循基于事件編程(EBP)的編程樣式對事件到達(dá)作出反應(yīng)。與面向批處理或過程式編程不同,在 EBP 中,軟件系統(tǒng)響應(yīng)于接收一個或多個事件通知并執(zhí)行處理,且完全通過事件以異步方式與其他軟件組件通信。
盡管所有基于事件的應(yīng)用程序不盡相同,但通常都遵循下圖中的結(jié)構(gòu),事件 producer 將事件引入中間的事件處理框架,該框架負(fù)責(zé)持久化事件并將事件交付給事件 consumer。
圖 1 基于事件的架構(gòu)
中間的事件處理框架除了負(fù)責(zé)事件路由外,還托管事件處理器組件。事件處理器獲取事件,還可以轉(zhuǎn)發(fā)或發(fā)布新事件,因此在某種意義上它們既是事件 consumer 也是事件 producer。但是我們不會將這些事件處理器稱為事件 producer 或事件 consumer,因為我們希望將它們與事件處理框架之外的實體區(qū)分開來。
在 EDA 中,事件處理器通常可分為以下幾類:
簡單事件處理器:事件到達(dá)立即觸發(fā)事件處理器中的操作。一般來說,如果這些處理器是無狀態(tài)的,則僅根據(jù)當(dāng)前事件的內(nèi)容執(zhí)行所有邏輯;如果是有狀態(tài)的,則可以跨調(diào)用保留消息,以便執(zhí)行稍微復(fù)雜一點(diǎn)的邏輯。
復(fù)雜事件處理器:此類事件處理器處理一系列事件,并執(zhí)行更為復(fù)雜的模式分析,以識別有意義的模式或關(guān)系,如檢測事件相關(guān)性、因果關(guān)系或定時。典型用例一般用于電子商務(wù)、欺詐檢測、網(wǎng)絡(luò)安全、金融交易和其他需要立即響應(yīng)的環(huán)境中。
基于事件的應(yīng)用程度通常由許多按特定順序或流排列的事件處理器組成。我們將事件 producer、事件處理器、事件 consumer 的集合稱為事件處理網(wǎng)絡(luò)。事件處理網(wǎng)絡(luò)用于解決一個或多個特定的業(yè)務(wù)問題。
圖 2 事件處理網(wǎng)絡(luò)
如圖 2 所示,外部事件 producer 和事件 consumer 處于邊緣,中間是多個事件處理器。圖 2 展示了事件處理器之間的事件流,這些箭頭也稱為隱式通道,用于將事件直接從一個事件處理器推到另一個事件處理器。當(dāng)使用 Apache Pulsar 實現(xiàn)時,topic 就是這些隱式通道。
圖 2 中還展示了另一種事件處理器之間通信的方式:共享狀態(tài)管理。事件處理器一般需要保留多個事件之間的計算狀態(tài),因此事件處理架構(gòu)需要提供一種機(jī)制以持久化狀態(tài)信息,并允許事件處理器直接訪問。共享狀態(tài)提供了另一種在事件處理器之間共享信息的機(jī)制,并支持有狀態(tài)事件處理,我們將在下一部分詳述相關(guān)內(nèi)容。
多個基于事件的應(yīng)用程序可以與單個事件類型相關(guān)聯(lián)。上圖展示了基于事件的應(yīng)用程序(藍(lán)色)到另一個應(yīng)用程序的過程。在將第一個應(yīng)用程序發(fā)送到事件 consumer 2 前,同時將其輸出到事件 consumer 1 和另一個事件處理器,以進(jìn)行進(jìn)一步處理。
將基于事件的應(yīng)用程序鏈接在一起的原因有很多,如在某一場景中,需要監(jiān)視物聯(lián)網(wǎng)傳感器讀取模式或異常,同時也希望將這些事件長期存儲(存儲平臺如 HDFS 或 Amazon S3),以便用于訓(xùn)練數(shù)據(jù)模型。
一級事件處理器序列首先進(jìn)行事件的 ETL-類型處理,即將事件轉(zhuǎn)換為可消費(fèi)的格式。這些記錄將被發(fā)送到事件 consumer 1,在本例中即 HDFS。同時,我們還希望將清理后的事件轉(zhuǎn)發(fā)到實現(xiàn)異常檢測工作流的二級事件處理器序列。我們將在下一部分討論如何使用 Apache Pulsar Functions 作為框架,此框架采用簡單編程邏輯 functions 來實現(xiàn)基于事件的處理。
Apache Pulsar Functions 提供了一個易于使用的框架,開發(fā)者可以使用 Functions 創(chuàng)建或部署處理邏輯,這些處理邏輯由 Apache Pulsar 執(zhí)行。你可以用 Java 或 Python 編寫簡單或復(fù)雜的 function,并將這些 function 部署到 Pulsar 集群中,而無需運(yùn)行單獨(dú)的流處理引擎。Pulsar Functions 是輕量級計算框架,具有以下特點(diǎn):
在消息發(fā)送至指定 input topic 時執(zhí)行。
將用戶自定義的處理邏輯應(yīng)用于每條消息。
將計算結(jié)果發(fā)布到一個或多個 topic。
圖 3 Pulsar Functions 編程模型
可以使用 Java 和 Python 編寫 Pulsar Functions,編寫方式有兩種:
使用原生語言接口 ,不需要 Pulsar 特定的庫或特殊依賴。例如,要在 Java 中實現(xiàn)一個 Pulsar Function,只需要編寫一個實現(xiàn) java.util.Function 接口的類,如下所示:
import java.util.Function;public class EchoFunction implements Function{ public String apply(String input) { // Logic Here }}
使用 Pulsar Functions SDK ,利用特定的 Pulsar 庫,這些庫提供原生接口中無法提供的一系列功能,如 org.apache.pulsar.functions.api.Context 對象提供的狀態(tài)管理功能。
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
public interface Function {
O process(I input, Context context) throws Exception;
}
原生語言方法提供了一種清晰的、無 API 的 Pulsar Functions 編寫方法,非常適合無狀態(tài)事件處理器的開發(fā)。但是,這種方法不支持訪問先前的狀態(tài)信息。
編譯并測試 Pulsar Functions 后,需要將 Pulsar Functions 部署到 Pulsar 集群。Pulsar Functions 旨在支持多種部署場景。目前,運(yùn)行 Pulsar Functions 的方式有兩種:
本地運(yùn)行模式:在此模式下運(yùn)行時,Pulsar Function 將在執(zhí)行命令的機(jī)器上運(yùn)行(如筆記本電腦、邊緣節(jié)點(diǎn)等)。下面是一個本地運(yùn)行命令的例子:
$ bin/pulsar-admin functions localrun \ —py myfunc.py \ —className myfunc.SomeFunction —inputs input-topic-1 —outputs output-topic-1
集群模式: 在集群模式下運(yùn)行時,Pulsar Function 代碼將被上傳到 Pulsar 集群中的 broker 中,并與 broker 一起運(yùn)行,而不是在本地環(huán)境中運(yùn)行。可以使用如下所示的命令在集群模式中創(chuàng)建 function,該命令在 Pulsar broker 節(jié)點(diǎn)上執(zhí)行。
$ bin/pulsar-admin functions create \ —jar target/my-functions.jar \ —className org.example.functions.MyFunction \ —inputs input-topic-1 \ —outputs output-topic-1 \ —parallelism 4 \ —cpu 2 \ —ram 8589934592 \ —disk 10737418240
上面的命令將啟用 4 個 org.example.functions.MyFunction 實例,每個實例有 2 個 CPU 內(nèi)核、8 GB RAM、10 GB 磁盤空間。(注意,需以字節(jié)為單位設(shè)置 RAM 和磁盤,且在 Docker 環(huán)境中必須設(shè)置 CPU 和磁盤。)
還有一種方法可以在創(chuàng)建 Pulsar Function 時提供用戶配置屬性,此方法在需要復(fù)用 function 時非常有用。我們通過為 userConfig 屬性指定一個 JSON 字符串,在下面的命令中傳入一組鍵-值對。在運(yùn)行時,可以通過使用 Pulsar Functions SDK 的 Pulsar Functions Context 對象訪問傳入的值,我們將在下一部分詳述相關(guān)內(nèi)容。
$ bin/pulsar-admin functions create \ —jar target/my-functions.jar \ —className org.example.functions.MyFunction \ —inputs input-topic-1 \ —outputs output-topic-1 \ —parallelism 4 \ —cpu 2 \ —ram 8589934592 \ —disk 10737418240 \ —userConfig ‘{“key-1”: “value-1”, “key-2”, “value-2”}’
Java 和 Python SDK 中定義的 Context 對象為 function 提供了各種各樣的信息和功能,包括保留可用于提供有狀態(tài)事件處理的中間結(jié)果的能力。以下示例是 Context 對象中所包含的信息:
Pulsar Function 的名稱和 ID
每條消息的消息 ID。自動為每條 Pulsar 消息分配一個 ID。
發(fā)送消息的 topic 的名稱
與 function 相關(guān)聯(lián)的所有 input topic、output topic 的名稱
SerDe 的類名稱
與 function 相關(guān)聯(lián)的租戶和命名空間
運(yùn)行 function 的 Pulsar Functions 實例的 ID
Function 的版本
Function 使用的 logger 對象,可用于創(chuàng)建 function 日志消息
訪問通過 CLI 提供的任意用戶配置值
記錄 metric 的接口
接下來,我們將介紹一些利用了 Context 對象特性的使用模式。
運(yùn)行或更新使用 SDK 創(chuàng)建的 Pulsar Functions 時,可以使用 -userConfig flag 通過命令行傳入任意鍵/值。鍵/值必須指定為 JSON。以下示例創(chuàng)建 function 并傳入用戶鍵/值。
$ bin/pulsar-admin functions create \ —name word-filter \ —userConfig ‘{“filter”, “$.Sensors{?(@.Type==‘Temp’)]”}’ \ # Other function configs
這個特性允許我們編寫可以多次使用的通用 function,但是配置略有不同。例如,假設(shè)你想編寫一個基于 JSON 路徑表達(dá)式過濾 JSON 事件的 function。當(dāng)事件到達(dá)時,將其內(nèi)容與配置的表達(dá)式進(jìn)行比較,并過濾掉不匹配的 entry。
顯然該 function 的行為完全依賴于它所過濾的 JSON 路徑表達(dá)式。為了可以多次使用 function,我們使用 Pulsar SDK,直到部署 function 后再指定此路徑表達(dá)式。
如上例所示,要使用的 JSON 路徑過濾器的值在編譯時未知,需使用 getUserConfigValueOrDefault 方法從 Context 中獲取。
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import com.jayway.jsonpath.JsonPath;
public JsonPathFilterFunction implements Function
{
String process(String input, Context context) throws Exception {
// Get the filter from the context
String filter = context.getUserConfigValueOrDefault(“filter”, “$”)
.toString();
Object filtered = JsonPath.read(input, filter);
Return filtered.toString();
}
}
有狀態(tài)事件處理器使用先前事件的內(nèi)存生成輸出。存儲狀態(tài)的能力是處理多個事件的關(guān)鍵構(gòu)件。在 Apache Pulsar Function 框架中,狀態(tài)信息存儲在基于 Apache BookKeeper 的專用鍵-值存儲中。Pulsar SDK 通過 Context 對象訪問狀態(tài)信息。
圖 4 Apache Pulsar 狀態(tài)管理
我們來舉例解釋一下狀態(tài) agent。假設(shè)有一個應(yīng)用程序,用于從物聯(lián)網(wǎng)傳感器獲取溫度讀取事件,我們想知道傳感器的平均溫度,則可以使用事件處理 agent 通過以下 function 持續(xù)更新溫度平均值:
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
public AvgTempFunction implements Function
{
Float process(Float currentTemp, Context context) {
// Increment and get counter
context.incrCounter(“num-measurements”);
Integer n = context.getCounter(“num-measurements”);
// Calculate new average based on old average and count
Float old_average = context.getState(“avg-temp”);
Float new_average = (old_average * (n-1) + currentTemp) / n;
context.putState(“avg-temp”, new_average);
return new_average;
}
}
最佳實踐 3:Void Funtions
Pulsar Functions 可以將結(jié)果發(fā)布到一個或多個 output topic,但可以不發(fā)布結(jié)果。也可以使用 function 僅生成日志,并將結(jié)果寫入外部數(shù)據(jù)庫,或僅用于監(jiān)視流中的異常。以下示例中的 function 只會將接收到的事件存入日志:
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.slf4j.Logger;
public LogFunction implements Function
{
Void process(String input, Context context) throws Exception {
Logger LOG = context.getLogger();
LOG.info("Received {}”, input);
return null;
}
}
在使用輸出類型為 Void 的 Java function 時,function 必須始終返回 null。在不想生成輸出事件時,輸出類型沒有 Void 的 function 可以返回 null,例如,當(dāng)你在使用過濾器,但不希望某一事件被處理時。
最佳實踐 4:處理來自多個 input topic 的事件
如圖 3 所示,Pulsar Functions 可以消費(fèi)多個 topic 中的事件,下面我們來看一下如何編寫一個這樣的 function: import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
public MultiTopicFunction implements Function
{
String process(String input, Context context) throws Exception {
String sourceTopic = context.getSourceTopic();
if (sourceTopic.equals(“TopicA”) {
// parse as TopicA Object
} else if (sourceTopic.equals(“TopicB”) {
// parse as Topic B Object
} else if (sourceTopic.equals(“TopicC”) {
// parse as Topic C Object
}
….
}
}
從代碼中可以看出,我們首先要從 Context 對象獲取 input topic 的名稱,然后根據(jù) input topic 的名稱相應(yīng)地解析/處理事件。
最佳實踐 5:Metric 收集
Apache Pulsar SDK 提供了 metric 收集機(jī)制,可用于記錄所選擇的任何用戶定義的 metric。在下面的示例中,我們使用單獨(dú)的 metric 來跟蹤調(diào)用該 function 的總次數(shù),使用另一個 metric 來跟蹤使用無效輸入調(diào)用該 function 的次數(shù)。更多關(guān)于讀取和使用 metric 的說明,請參閱監(jiān)控指南。
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
public MetricFunction implements Function
{
Void process(String input, Context context) throws Exception {
context.recordMetric(“invocation count”, 1);
if (input < 0) {
context.recordMetric(“Invalid data”, 1);
}
return null;
}
}
感謝你的閱讀,相信你對“怎么使用Apache Pulsar Functions進(jìn)行簡單事件處理”這一問題有一定的了解,快去動手實踐吧,如果想了解更多相關(guān)知識點(diǎn),可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站!小編會繼續(xù)為大家?guī)砀玫奈恼拢?/p>
網(wǎng)頁標(biāo)題:怎么使用ApachePulsarFunctions進(jìn)行簡單事件處理
網(wǎng)站URL:http://weahome.cn/article/pdpggc.html