真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

怎么使用ApachePulsarFunctions進(jìn)行簡單事件處理

這篇文章的內(nèi)容主要圍繞怎么使用Apache Pulsar Functions進(jìn)行簡單事件處理進(jìn)行講述,文章內(nèi)容清晰易懂,條理清晰,非常適合新手學(xué)習(xí),值得大家去閱讀。感興趣的朋友可以跟隨小編一起閱讀吧。希望大家通過這篇文章有所收獲!

專注于為中小企業(yè)提供網(wǎng)站建設(shè)、成都網(wǎng)站制作服務(wù),電腦端+手機(jī)端+微信端的三站合一,更高效的管理,為中小企業(yè)啟東免費(fèi)做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動了上千多家企業(yè)的穩(wěn)健成長,幫助中小企業(yè)通過網(wǎng)站建設(shè)實現(xiàn)規(guī)模擴(kuò)充和轉(zhuǎn)變。

 基于事件的編程 

事件驅(qū)動架構(gòu)(Event-driven Architecture,EDA)的關(guān)鍵特征是事件的核心重要性。在 EDA 中,事件 consumer 遵循基于事件編程(EBP)的編程樣式對事件到達(dá)作出反應(yīng)。與面向批處理或過程式編程不同,在 EBP 中,軟件系統(tǒng)響應(yīng)于接收一個或多個事件通知并執(zhí)行處理,且完全通過事件以異步方式與其他軟件組件通信。

盡管所有基于事件的應(yīng)用程序不盡相同,但通常都遵循下圖中的結(jié)構(gòu),事件 producer 將事件引入中間的事件處理框架,該框架負(fù)責(zé)持久化事件并將事件交付給事件 consumer。

怎么使用Apache Pulsar Functions進(jìn)行簡單事件處理

圖 1 基于事件的架構(gòu)

中間的事件處理框架除了負(fù)責(zé)事件路由外,還托管事件處理器組件。事件處理器獲取事件,還可以轉(zhuǎn)發(fā)或發(fā)布新事件,因此在某種意義上它們既是事件 consumer 也是事件 producer。但是我們不會將這些事件處理器稱為事件 producer 或事件 consumer,因為我們希望將它們與事件處理框架之外的實體區(qū)分開來。

 事件處理器類型

在 EDA 中,事件處理器通常可分為以下幾類:

  • 簡單事件處理器:事件到達(dá)立即觸發(fā)事件處理器中的操作。一般來說,如果這些處理器是無狀態(tài)的,則僅根據(jù)當(dāng)前事件的內(nèi)容執(zhí)行所有邏輯;如果是有狀態(tài)的,則可以跨調(diào)用保留消息,以便執(zhí)行稍微復(fù)雜一點(diǎn)的邏輯。

  • 復(fù)雜事件處理器:此類事件處理器處理一系列事件,并執(zhí)行更為復(fù)雜的模式分析,以識別有意義的模式或關(guān)系,如檢測事件相關(guān)性、因果關(guān)系或定時。典型用例一般用于電子商務(wù)、欺詐檢測、網(wǎng)絡(luò)安全、金融交易和其他需要立即響應(yīng)的環(huán)境中。

 事件處理網(wǎng)絡(luò)

基于事件的應(yīng)用程度通常由許多按特定順序或流排列的事件處理器組成。我們將事件 producer、事件處理器、事件 consumer 的集合稱為事件處理網(wǎng)絡(luò)。事件處理網(wǎng)絡(luò)用于解決一個或多個特定的業(yè)務(wù)問題。

怎么使用Apache Pulsar Functions進(jìn)行簡單事件處理

圖 2 事件處理網(wǎng)絡(luò)

如圖 2 所示,外部事件 producer 和事件 consumer 處于邊緣,中間是多個事件處理器。圖 2 展示了事件處理器之間的事件流,這些箭頭也稱為隱式通道,用于將事件直接從一個事件處理器推到另一個事件處理器。當(dāng)使用 Apache Pulsar 實現(xiàn)時,topic 就是這些隱式通道。

圖 2 中還展示了另一種事件處理器之間通信的方式:共享狀態(tài)管理。事件處理器一般需要保留多個事件之間的計算狀態(tài),因此事件處理架構(gòu)需要提供一種機(jī)制以持久化狀態(tài)信息,并允許事件處理器直接訪問。共享狀態(tài)提供了另一種在事件處理器之間共享信息的機(jī)制,并支持有狀態(tài)事件處理,我們將在下一部分詳述相關(guān)內(nèi)容。

多個基于事件的應(yīng)用程序可以與單個事件類型相關(guān)聯(lián)。上圖展示了基于事件的應(yīng)用程序(藍(lán)色)到另一個應(yīng)用程序的過程。在將第一個應(yīng)用程序發(fā)送到事件 consumer 2 前,同時將其輸出到事件 consumer 1 和另一個事件處理器,以進(jìn)行進(jìn)一步處理。

將基于事件的應(yīng)用程序鏈接在一起的原因有很多,如在某一場景中,需要監(jiān)視物聯(lián)網(wǎng)傳感器讀取模式或異常,同時也希望將這些事件長期存儲(存儲平臺如 HDFS 或 Amazon S3),以便用于訓(xùn)練數(shù)據(jù)模型。

一級事件處理器序列首先進(jìn)行事件的 ETL-類型處理,即將事件轉(zhuǎn)換為可消費(fèi)的格式。這些記錄將被發(fā)送到事件 consumer 1,在本例中即 HDFS。同時,我們還希望將清理后的事件轉(zhuǎn)發(fā)到實現(xiàn)異常檢測工作流的二級事件處理器序列。我們將在下一部分討論如何使用 Apache Pulsar Functions 作為框架,此框架采用簡單編程邏輯 functions 來實現(xiàn)基于事件的處理。

 使用 Apache Pulsar Functions 進(jìn)行基于事件的編程

Apache Pulsar Functions 提供了一個易于使用的框架,開發(fā)者可以使用 Functions 創(chuàng)建或部署處理邏輯,這些處理邏輯由 Apache Pulsar 執(zhí)行。你可以用 Java 或 Python 編寫簡單或復(fù)雜的 function,并將這些 function 部署到 Pulsar 集群中,而無需運(yùn)行單獨(dú)的流處理引擎。Pulsar Functions 是輕量級計算框架,具有以下特點(diǎn):

  • 在消息發(fā)送至指定 input topic 時執(zhí)行。

  • 將用戶自定義的處理邏輯應(yīng)用于每條消息。

  • 將計算結(jié)果發(fā)布到一個或多個 topic。

怎么使用Apache Pulsar Functions進(jìn)行簡單事件處理

圖 3 Pulsar Functions 編程模型

可以使用 Java 和 Python 編寫 Pulsar Functions,編寫方式有兩種:

  • 使用原生語言接口 ,不需要 Pulsar 特定的庫或特殊依賴。例如,要在 Java 中實現(xiàn)一個 Pulsar Function,只需要編寫一個實現(xiàn) java.util.Function 接口的類,如下所示:

import java.util.Function;public class EchoFunction implements Function {    public String apply(String input) {        // Logic Here    }}
  • 使用 Pulsar Functions SDK ,利用特定的 Pulsar 庫,這些庫提供原生接口中無法提供的一系列功能,如 org.apache.pulsar.functions.api.Context 對象提供的狀態(tài)管理功能。

import org.apache.pulsar.functions.api.Context;    import org.apache.pulsar.functions.api.Function;
   public interface Function {        O process(I input, Context context) throws Exception;    }

原生語言方法提供了一種清晰的、無 API 的 Pulsar Functions 編寫方法,非常適合無狀態(tài)事件處理器的開發(fā)。但是,這種方法不支持訪問先前的狀態(tài)信息。

 部署 Apache Pulsar Functions



編譯并測試 Pulsar Functions 后,需要將 Pulsar Functions 部署到 Pulsar 集群。Pulsar Functions 旨在支持多種部署場景。目前,運(yùn)行 Pulsar Functions 的方式有兩種:

  • 本地運(yùn)行模式:在此模式下運(yùn)行時,Pulsar Function 將在執(zhí)行命令的機(jī)器上運(yùn)行(如筆記本電腦、邊緣節(jié)點(diǎn)等)。下面是一個本地運(yùn)行命令的例子:

$ bin/pulsar-admin functions localrun \    —py myfunc.py \    —className myfunc.SomeFunction    —inputs input-topic-1    —outputs output-topic-1
  • 集群模式: 在集群模式下運(yùn)行時,Pulsar Function 代碼將被上傳到 Pulsar 集群中的 broker 中,并與 broker 一起運(yùn)行,而不是在本地環(huán)境中運(yùn)行。可以使用如下所示的命令在集群模式中創(chuàng)建 function,該命令在 Pulsar broker 節(jié)點(diǎn)上執(zhí)行。

$ bin/pulsar-admin functions create \    —jar target/my-functions.jar \    —className org.example.functions.MyFunction \    —inputs input-topic-1 \    —outputs output-topic-1 \    —parallelism 4 \    —cpu 2 \    —ram 8589934592 \    —disk 10737418240

上面的命令將啟用 4 個 org.example.functions.MyFunction 實例,每個實例有 2 個 CPU 內(nèi)核、8 GB RAM、10 GB 磁盤空間。(注意,需以字節(jié)為單位設(shè)置 RAM 和磁盤,且在 Docker 環(huán)境中必須設(shè)置 CPU 和磁盤。)

還有一種方法可以在創(chuàng)建 Pulsar Function 時提供用戶配置屬性,此方法在需要復(fù)用 function 時非常有用。我們通過為 userConfig 屬性指定一個 JSON 字符串,在下面的命令中傳入一組鍵-值對。在運(yùn)行時,可以通過使用 Pulsar Functions SDK 的 Pulsar Functions Context 對象訪問傳入的值,我們將在下一部分詳述相關(guān)內(nèi)容。

$ bin/pulsar-admin functions create \    —jar target/my-functions.jar \    —className org.example.functions.MyFunction \    —inputs input-topic-1 \    —outputs output-topic-1 \    —parallelism 4 \    —cpu 2 \    —ram 8589934592 \    —disk 10737418240 \    —userConfig ‘{“key-1”: “value-1”, “key-2”, “value-2”}’

 使用 Apache Pulsar Functions SDK 的最佳實踐


Java 和 Python SDK 中定義的 Context 對象為 function 提供了各種各樣的信息和功能,包括保留可用于提供有狀態(tài)事件處理的中間結(jié)果的能力。以下示例是 Context 對象中所包含的信息:

  • Pulsar Function 的名稱和 ID

  • 每條消息的消息 ID。自動為每條 Pulsar 消息分配一個 ID。

  • 發(fā)送消息的 topic 的名稱

  • 與 function 相關(guān)聯(lián)的所有 input topic、output topic 的名稱

  • SerDe 的類名稱

  • 與 function 相關(guān)聯(lián)的租戶和命名空間

  • 運(yùn)行 function 的 Pulsar Functions 實例的 ID

  • Function 的版本

  • Function 使用的 logger 對象,可用于創(chuàng)建 function 日志消息

  • 訪問通過 CLI 提供的任意用戶配置值

  • 記錄 metric 的接口

接下來,我們將介紹一些利用了 Context 對象特性的使用模式。

最佳實踐 1:動態(tài)配置

運(yùn)行或更新使用 SDK 創(chuàng)建的 Pulsar Functions 時,可以使用 -userConfig flag 通過命令行傳入任意鍵/值。鍵/值必須指定為 JSON。以下示例創(chuàng)建 function 并傳入用戶鍵/值。

$ bin/pulsar-admin functions create \    —name word-filter \    —userConfig ‘{“filter”, “$.Sensors{?(@.Type==‘Temp’)]”}’ \     # Other function configs

這個特性允許我們編寫可以多次使用的通用 function,但是配置略有不同。例如,假設(shè)你想編寫一個基于 JSON 路徑表達(dá)式過濾 JSON 事件的 function。當(dāng)事件到達(dá)時,將其內(nèi)容與配置的表達(dá)式進(jìn)行比較,并過濾掉不匹配的 entry。

顯然該 function 的行為完全依賴于它所過濾的 JSON 路徑表達(dá)式。為了可以多次使用 function,我們使用 Pulsar SDK,直到部署 function 后再指定此路徑表達(dá)式。

如上例所示,要使用的 JSON 路徑過濾器的值在編譯時未知,需使用 getUserConfigValueOrDefault 方法從 Context 中獲取。

import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function;import com.jayway.jsonpath.JsonPath;
public JsonPathFilterFunction implements Function {
   String process(String input, Context context) throws Exception {        // Get the filter from the context        String filter = context.getUserConfigValueOrDefault(“filter”, “$”)                          .toString();        Object filtered = JsonPath.read(input, filter);        Return filtered.toString();    }}

最佳實踐 2:有狀態(tài)事件處理器

有狀態(tài)事件處理器使用先前事件的內(nèi)存生成輸出。存儲狀態(tài)的能力是處理多個事件的關(guān)鍵構(gòu)件。在 Apache Pulsar Function 框架中,狀態(tài)信息存儲在基于 Apache BookKeeper 的專用鍵-值存儲中。Pulsar SDK 通過 Context 對象訪問狀態(tài)信息。

怎么使用Apache Pulsar Functions進(jìn)行簡單事件處理

圖 4 Apache Pulsar 狀態(tài)管理

我們來舉例解釋一下狀態(tài) agent。假設(shè)有一個應(yīng)用程序,用于從物聯(lián)網(wǎng)傳感器獲取溫度讀取事件,我們想知道傳感器的平均溫度,則可以使用事件處理 agent 通過以下 function 持續(xù)更新溫度平均值:

import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function;
public AvgTempFunction implements Function {
   Float process(Float currentTemp, Context context) {        // Increment and get counter        context.incrCounter(“num-measurements”);        Integer n = context.getCounter(“num-measurements”);        // Calculate new average based on old average and count        Float old_average = context.getState(“avg-temp”);        Float new_average = (old_average * (n-1) + currentTemp) / n;        context.putState(“avg-temp”, new_average);        return new_average;    }}

最佳實踐 3:Void Funtions


Pulsar Functions 可以將結(jié)果發(fā)布到一個或多個 output topic,但可以不發(fā)布結(jié)果。也可以使用 function 僅生成日志,并將結(jié)果寫入外部數(shù)據(jù)庫,或僅用于監(jiān)視流中的異常。以下示例中的 function 只會將接收到的事件存入日志:

import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function;import org.slf4j.Logger;
public LogFunction implements Function {
   Void process(String input, Context context) throws Exception {        Logger LOG = context.getLogger();        LOG.info("Received {}”, input);        return null;    }}
     

     
在使用輸出類型為 Void 的 Java function 時,function 必須始終返回 null。在不想生成輸出事件時,輸出類型沒有 Void 的 function 可以返回 null,例如,當(dāng)你在使用過濾器,但不希望某一事件被處理時。      

     
最佳實踐 4:處理來自多個 input topic 的事件      

     
如圖 3 所示,Pulsar Functions 可以消費(fèi)多個 topic 中的事件,下面我們來看一下如何編寫一個這樣的 function:      
import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function;
public MultiTopicFunction implements Function {
   String process(String input, Context context) throws Exception {        String sourceTopic = context.getSourceTopic();        if (sourceTopic.equals(“TopicA”) {           // parse as TopicA Object        } else if (sourceTopic.equals(“TopicB”) {           // parse as Topic B Object        } else if (sourceTopic.equals(“TopicC”) {           // parse as Topic C Object        }        ….    }}
     

從代碼中可以看出,我們首先要從 Context 對象獲取 input topic 的名稱,然后根據(jù) input topic 的名稱相應(yīng)地解析/處理事件。


     

最佳實踐 5:Metric 收集


     

Apache Pulsar SDK 提供了 metric 收集機(jī)制,可用于記錄所選擇的任何用戶定義的 metric。在下面的示例中,我們使用單獨(dú)的 metric 來跟蹤調(diào)用該 function 的總次數(shù),使用另一個 metric 來跟蹤使用無效輸入調(diào)用該 function 的次數(shù)。更多關(guān)于讀取和使用 metric 的說明,請參閱監(jiān)控指南。

import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function;
public MetricFunction implements Function {
   Void process(String input, Context context) throws Exception {        context.recordMetric(“invocation count”, 1);        if (input < 0) {           context.recordMetric(“Invalid data”, 1);        }        return null;    }}
     

感謝你的閱讀,相信你對“怎么使用Apache Pulsar Functions進(jìn)行簡單事件處理”這一問題有一定的了解,快去動手實踐吧,如果想了解更多相關(guān)知識點(diǎn),可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站!小編會繼續(xù)為大家?guī)砀玫奈恼拢?/p>
網(wǎng)頁標(biāo)題:怎么使用ApachePulsarFunctions進(jìn)行簡單事件處理
網(wǎng)站URL:http://weahome.cn/article/pdpggc.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部