自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Faust - 簡潔高效的 Python 流處理庫

開發(fā) 后端
在分布式系統(tǒng)和實時數(shù)據(jù)處理中,流處理是十分重要的技術(shù)。在數(shù)據(jù)密集型應(yīng)用中,數(shù)據(jù)快速到達(dá),轉(zhuǎn)瞬即逝,需要及時進(jìn)行處理,流式處理強(qiáng)調(diào)數(shù)據(jù)和事件的處理速度,對性能和可靠性有較高的要求。

在分布式系統(tǒng)和實時數(shù)據(jù)處理中,流處理是十分重要的技術(shù)。在數(shù)據(jù)密集型應(yīng)用中,數(shù)據(jù)快速到達(dá),轉(zhuǎn)瞬即逝,需要及時進(jìn)行處理,流式處理強(qiáng)調(diào)數(shù)據(jù)和事件的處理速度,對性能和可靠性有較高的要求。

流處理框架包括:Storm,Spark Streaming 和 Flink 等,而 Kafka 也不甘示弱,推出了分布式流處理平臺 Kafka Streams。 Faust 把 Kafka Streams 帶到了 Python,并實現(xiàn)了抽象和優(yōu)化,為數(shù)據(jù)和事件的流處理提供了一個高效便利的框架。

簡介

Faust,是 robinhood 在 Github 上開源的 Python 流處理庫,目前版本為 1.10.4。

Faust 把 Kafka Streams 的概念帶到了 Python,提供了包括流處理和事件處理的模式。Faust 使用純 Python 實現(xiàn),使得開發(fā)者可以使用包括 NumPy, PyTorch, Pandas 等的庫進(jìn)行數(shù)據(jù)處理。

Faust 實現(xiàn)簡潔優(yōu)雅,使用簡單,性能優(yōu)秀,且具有高可用、分布式、靈活性高的特點。目前 Faust 已被用于構(gòu)建高性能分布式系統(tǒng)和實時數(shù)據(jù)管道中。

 

[[387836]]

Faust

 

使用

Faust 需求 Python 3.6 或以上,且需要可用的 Kafka >= 0.10 服務(wù)。使用 pip 安裝:

  1. $ pip install -U faust 

此外,一些額外的特性需要額外的依賴,如 rocksdb,可以用來作為 Faust 在生產(chǎn)環(huán)境中的存儲,以及 Redis,可以在開啟緩存時使用。

 

[[387837]]

Faust

 

安裝完成以后,就可以在項目中使用了。我們來看一個簡單的例子:

 

  1. import faust 
  2.  
  3. app = faust.App( 
  4.     'hello-world'
  5.     broker='kafka://localhost:9092'
  6.     value_serializer='raw'
  7.  
  8. greetings_topic = app.topic('greetings'
  9.  
  10. @app.agent(greetings_topic) 
  11. async def greet(greetings): 
  12.     async for greeting in greetings: 
  13.         print(greeting) 

首先,我們使用 faust.App 創(chuàng)建一個 Faust 應(yīng)用,并配置應(yīng)用的名字、Kafka broker 和序列化方式。

然后,我們創(chuàng)建一個主題,這跟 Kafka 中的主題是對應(yīng)的。

Faust 利用 Python 3.6+ 的異步語法 async,定義異步函數(shù) greet,并注冊為 Faust 應(yīng)用的一個 agent。函數(shù)接收實時的數(shù)據(jù)集合 greetings,并異步地對每項數(shù)據(jù)進(jìn)行輸出。

把上述代碼保存為 hello_world.py,并在命令行啟動工作者:

  1. $ faust -A hello_world worker -l info 

該 Faust 工作者就會從 Kafka 中實時讀取數(shù)據(jù)并處理。

我們可以發(fā)送一些數(shù)據(jù)來觀察效果:

  1. $ faust -A hello_world send @greet "Hello Faust" 

上述命令發(fā)送了一條消息,執(zhí)行后,我們就能在工作者的命令行中看到這條消息。

Faust 還充分利用了 Python 的類型提示,能夠方便地定義數(shù)據(jù)模型:

 

  1. import faust 
  2.  
  3. class Greeting(faust.Record): 
  4.     from_name: str 
  5.     to_name: str 
  6.  
  7. app = faust.App('hello-app', broker='kafka://localhost'
  8. topic = app.topic('hello-topic', value_type=Greeting) 
  9.  
  10. @app.agent(topic) 
  11. async def hello(greetings): 
  12.     async for greeting in greetings: 
  13.         print(f'Hello from {greeting.from_name} to {greeting.to_name}'
  14.  
  15. @app.timer(interval=1.0) 
  16. async def example_sender(app): 
  17.     await hello.send( 
  18.         value=Greeting(from_name='Faust', to_name='you'), 
  19.     ) 
  20.  
  21. if __name__ == '__main__'
  22.     app.main() 

 

 

Faust

 

總結(jié)

Faust 把 Kafka Streams 帶到了 Python 中,實現(xiàn)了簡潔高效的數(shù)據(jù)流處理。其使用簡單的裝飾器和基于類型提示機(jī)的據(jù)模型,就能定義實現(xiàn)數(shù)據(jù)的處理邏輯;充分利用了 Python 的 async 異步機(jī)制,和其他高性能的異步庫,實現(xiàn)了高效性能;其使用 Python 實現(xiàn),使用開發(fā)者可以無縫對接其他數(shù)據(jù)處理和大數(shù)據(jù)相關(guān)功能。

責(zé)任編輯:未麗燕 來源: 今日頭條
相關(guān)推薦

2019-06-27 10:32:57

Java開發(fā)代碼

2024-12-19 09:05:13

Python鏈?zhǔn)秸{(diào)用

2021-01-19 13:10:29

ZshLinuxUbuntu

2018-06-06 09:10:34

編程語言Python庫

2024-10-15 10:51:47

2024-01-30 08:54:05

JavaScript技巧代碼

2024-06-03 11:36:06

Pythonf-string

2023-11-30 16:05:17

2022-07-25 11:33:48

Python大文件

2023-09-26 11:34:56

Python

2024-04-23 08:26:56

C++折疊表達(dá)式編程

2011-04-29 10:22:49

CSS高性能Web開發(fā)

2012-01-09 17:03:39

臺式機(jī)評測

2024-06-24 13:35:48

2024-04-28 18:24:05

2023-10-29 12:54:16

Doris數(shù)據(jù)倉庫

2019-07-31 10:24:16

JavaScript瀏覽器口袋妖怪

2022-09-17 18:23:46

Lodash模塊化JavaScrip

2010-03-24 15:19:35

Python庫

2023-09-06 09:40:29

點贊
收藏

51CTO技術(shù)棧公眾號