真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

簡潔高效的Python流處理庫Faust怎么用

簡潔高效的Python流處理庫Faust怎么用,針對這個問題,這篇文章詳細(xì)介紹了相對應(yīng)的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。

成都創(chuàng)新互聯(lián)從2013年成立,先為興寧等服務(wù)建站,興寧等地企業(yè),進(jìn)行企業(yè)商務(wù)咨詢服務(wù)。為興寧企業(yè)網(wǎng)站制作PC+手機(jī)+微官網(wǎng)三網(wǎng)同步一站式服務(wù)解決您的所有建站問題。

在分布式系統(tǒng)和實(shí)時數(shù)據(jù)處理中,流處理是十分重要的技術(shù)。在數(shù)據(jù)密集型應(yīng)用中,數(shù)據(jù)快速到達(dá),轉(zhuǎn)瞬即逝,需要及時進(jìn)行處理,流式處理強(qiáng)調(diào)數(shù)據(jù)和事件的處理速度,對性能和可靠性有較高的要求。

流處理框架包括:Storm,Spark Streaming 和 Flink 等,而 Kafka 也不甘示弱,推出了分布式流處理平臺 Kafka  Streams。 Faust 把 Kafka Streams 帶到了 Python,并實(shí)現(xiàn)了抽象和優(yōu)化,為數(shù)據(jù)和事件的流處理提供了一個高效便利的框架。

簡介

Faust,是 robinhood 在 Github 上開源的 Python 流處理庫,目前版本為 1.10.4。

Faust 把 Kafka Streams 的概念帶到了 Python,提供了包括流處理和事件處理的模式。Faust 使用純 Python  實(shí)現(xiàn),使得開發(fā)者可以使用包括 NumPy, PyTorch, Pandas 等的庫進(jìn)行數(shù)據(jù)處理。

Faust 實(shí)現(xiàn)簡潔優(yōu)雅,使用簡單,性能優(yōu)秀,且具有高可用、分布式、靈活性高的特點(diǎn)。目前 Faust  已被用于構(gòu)建高性能分布式系統(tǒng)和實(shí)時數(shù)據(jù)管道中。

使用

Faust 需求 Python 3.6 或以上,且需要可用的 Kafka >= 0.10 服務(wù)。使用 pip 安裝:

$ pip install -U faust

此外,一些額外的特性需要額外的依賴,如 rocksdb,可以用來作為 Faust 在生產(chǎn)環(huán)境中的存儲,以及 redis,可以在開啟緩存時使用。

安裝完成以后,就可以在項(xiàng)目中使用了。我們來看一個簡單的例子:

import faust  app = faust.App(     'hello-world',     broker='kafka://localhost:9092',     value_serializer='raw', )  greetings_topic = app.topic('greetings')  @app.agent(greetings_topic) async def greet(greetings):     async for greeting in greetings:         print(greeting)

首先,我們使用 faust.App 創(chuàng)建一個 Faust 應(yīng)用,并配置應(yīng)用的名字、Kafka broker 和序列化方式。

然后,我們創(chuàng)建一個主題,這跟 Kafka 中的主題是對應(yīng)的。

Faust 利用 Python 3.6+ 的異步語法 async,定義異步函數(shù) greet,并注冊為 Faust 應(yīng)用的一個  agent。函數(shù)接收實(shí)時的數(shù)據(jù)集合 greetings,并異步地對每項(xiàng)數(shù)據(jù)進(jìn)行輸出。

把上述代碼保存為 hello_world.py,并在命令行啟動工作者:

$ faust -A hello_world worker -l info

該 Faust 工作者就會從 Kafka 中實(shí)時讀取數(shù)據(jù)并處理。

我們可以發(fā)送一些數(shù)據(jù)來觀察效果:

$ faust -A hello_world send @greet "Hello Faust"

上述命令發(fā)送了一條消息,執(zhí)行后,我們就能在工作者的命令行中看到這條消息。

Faust 還充分利用了 Python 的類型提示,能夠方便地定義數(shù)據(jù)模型:

import faust  class Greeting(faust.Record):     from_name: str     to_name: str  app = faust.App('hello-app', broker='kafka://localhost') topic = app.topic('hello-topic', value_type=Greeting)  @app.agent(topic) async def hello(greetings):     async for greeting in greetings:         print(f'Hello from {greeting.from_name} to {greeting.to_name}')  @app.timer(interval=1.0) async def example_sender(app):     await hello.send(         value=Greeting(from_name='Faust', to_name='you'),     )  if __name__ == '__main__':     app.main()

Faust 把 Kafka Streams 帶到了 Python  中,實(shí)現(xiàn)了簡潔高效的數(shù)據(jù)流處理。其使用簡單的裝飾器和基于類型提示機(jī)的據(jù)模型,就能定義實(shí)現(xiàn)數(shù)據(jù)的處理邏輯;充分利用了 Python 的 async  異步機(jī)制,和其他高性能的異步庫,實(shí)現(xiàn)了高效性能;其使用 Python 實(shí)現(xiàn),使用開發(fā)者可以無縫對接其他數(shù)據(jù)處理和大數(shù)據(jù)相關(guān)功能。

關(guān)于 簡潔高效的Python流處理庫Faust怎么用問題的解答就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道了解更多相關(guān)知識。


文章名稱:簡潔高效的Python流處理庫Faust怎么用
文章轉(zhuǎn)載:http://weahome.cn/article/pjgesg.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部