自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

使用Python和Asyncio編寫在線多人游戲(二)

開發(fā) 前端
你在 Python 中用過異步編程嗎?本文中我會(huì)告訴你怎樣做,而且用一個(gè)能工作的例子來展示它:這是一個(gè)流行的貪吃蛇游戲,而且是為多人游戲而設(shè)計(jì)的。

[[171738]]

在 Python 中用過異步編程嗎?本文中我會(huì)告訴你怎樣做,而且用一個(gè)能工作的例子來展示它:這是一個(gè)流行的貪吃蛇游戲,而且是為多人游戲而設(shè)計(jì)的。

介紹和理論部分參見“第一部分 異步化”。

 

 

3、編寫游戲循環(huán)主體

游戲循環(huán)是每一個(gè)游戲的核心。它持續(xù)地運(yùn)行以讀取玩家的輸入、更新游戲的狀態(tài),并且在屏幕上渲染游戲結(jié)果。在在線游戲中,游戲循環(huán)分為客戶端和服務(wù)端兩部分,所以一般有兩個(gè)循環(huán)通過網(wǎng)絡(luò)通信。通??蛻舳说慕巧谦@取玩家輸入,比如按鍵或者鼠標(biāo)移動(dòng),將數(shù)據(jù)傳輸給服務(wù)端,然后接收需要渲染的數(shù)據(jù)。服務(wù)端處理來自玩家的所有數(shù)據(jù),更新游戲的狀態(tài),執(zhí)行渲染下一幀的必要計(jì)算,然后將結(jié)果傳回客戶端,例如游戲中對(duì)象的新位置。如果沒有可靠的理由,不混淆客戶端和服務(wù)端的角色是一件很重要的事。如果你在客戶端執(zhí)行游戲邏輯的計(jì)算,很容易就會(huì)和其它客戶端失去同步,其實(shí)你的游戲也可以通過簡(jiǎn)單地傳遞客戶端的數(shù)據(jù)來創(chuàng)建。

游戲循環(huán)的一次迭代稱為一個(gè)嘀嗒(tick)。嘀嗒是一個(gè)事件,表示當(dāng)前游戲循環(huán)的迭代已經(jīng)結(jié)束,下一幀(或者多幀)的數(shù)據(jù)已經(jīng)就緒。

在后面的例子中,我們使用相同的客戶端,它使用 WebSocket 從一個(gè)網(wǎng)頁(yè)上連接到服務(wù)端。它執(zhí)行一個(gè)簡(jiǎn)單的循環(huán),將按鍵碼發(fā)送給服務(wù)端,并顯示來自服務(wù)端的所有信息。客戶端代碼戳這里。

例子 3.1:基本游戲循環(huán)

我們使用 aiohttp 庫(kù)來創(chuàng)建游戲服務(wù)器。它可以通過 asyncio 創(chuàng)建網(wǎng)頁(yè)服務(wù)器和客戶端。這個(gè)庫(kù)的一個(gè)優(yōu)勢(shì)是它同時(shí)支持普通 http 請(qǐng)求和 websocket。所以我們不用其他網(wǎng)頁(yè)服務(wù)器來渲染游戲的 html 頁(yè)面。

下面是啟動(dòng)服務(wù)器的方法:

  1. app = web.Application() 
  2. app["sockets"] = [] 
  3. asyncio.ensure_future(game_loop(app)) 
  4. app.router.add_route('GET''/connect', wshandler) 
  5. app.router.add_route('GET''/', handle) 
  6. web.run_app(app) 

web.run_app 是創(chuàng)建服務(wù)主任務(wù)的快捷方法,通過它的 run_forever() 方法來執(zhí)行 asyncio 事件循環(huán)。建議你查看這個(gè)方法的源碼,弄清楚服務(wù)器到底是如何創(chuàng)建和結(jié)束的。

app 變量就是一個(gè)類似于字典的對(duì)象,它用于在所連接的客戶端之間共享數(shù)據(jù)。我們使用它來存儲(chǔ)連接的套接字的列表。隨后會(huì)用這個(gè)列表來給所有連接的客戶端發(fā)送消息。asyncio.ensure_future() 調(diào)用會(huì)啟動(dòng)主游戲循環(huán)的任務(wù),每隔2 秒向客戶端發(fā)送嘀嗒消息。這個(gè)任務(wù)會(huì)在同樣的 asyncio 事件循環(huán)中和網(wǎng)頁(yè)服務(wù)器并行執(zhí)行。

有兩個(gè)網(wǎng)頁(yè)請(qǐng)求處理器:handle 是提供 html 頁(yè)面的處理器;wshandler 是主要的 websocket 服務(wù)器任務(wù),處理和客戶端之間的交互。在事件循環(huán)中,每一個(gè)連接的客戶端都會(huì)創(chuàng)建一個(gè)新的 wshandler 任務(wù)。這個(gè)任務(wù)會(huì)添加客戶端的套接字到列表中,以便 game_loop 任務(wù)可以給所有的客戶端發(fā)送消息。然后它將隨同消息回顯客戶端的每個(gè)擊鍵。

在啟動(dòng)的任務(wù)中,我們?cè)?asyncio 的主事件循環(huán)中啟動(dòng) worker 循環(huán)。任務(wù)之間的切換發(fā)生在它們之間任何一個(gè)使用 await語(yǔ)句來等待某個(gè)協(xié)程結(jié)束時(shí)。例如 asyncio.sleep 僅僅是將程序執(zhí)行權(quán)交給調(diào)度器一段指定的時(shí)間;ws.receive 等待 websocket 的消息,此時(shí)調(diào)度器可能切換到其它任務(wù)。

在瀏覽器中打開主頁(yè),連接上服務(wù)器后,試試隨便按下鍵。它們的鍵值會(huì)從服務(wù)端返回,每隔 2 秒這個(gè)數(shù)字會(huì)被游戲循環(huán)中發(fā)給所有客戶端的嘀嗒消息所覆蓋。

我們剛剛創(chuàng)建了一個(gè)處理客戶端按鍵的服務(wù)器,主游戲循環(huán)在后臺(tái)做一些處理,周期性地同時(shí)更新所有的客戶端。

例子 3.2: 根據(jù)請(qǐng)求啟動(dòng)游戲

在前一個(gè)例子中,在服務(wù)器的生命周期內(nèi),游戲循環(huán)一直運(yùn)行著。但是現(xiàn)實(shí)中,如果沒有一個(gè)人連接服務(wù)器,空運(yùn)行游戲循環(huán)通常是不合理的。而且,同一個(gè)服務(wù)器上可能有不同的“游戲房間”。在這種假設(shè)下,每一個(gè)玩家“創(chuàng)建”一個(gè)游戲會(huì)話(比如說,多人游戲中的一個(gè)比賽或者大型多人游戲中的副本),這樣其他用戶可以加入其中。當(dāng)游戲會(huì)話開始時(shí),游戲循環(huán)才開始執(zhí)行。

在這個(gè)例子中,我們使用一個(gè)全局標(biāo)記來檢測(cè)游戲循環(huán)是否在執(zhí)行。當(dāng)?shù)谝粋€(gè)用戶發(fā)起連接時(shí),啟動(dòng)它。最開始,游戲循環(huán)沒有執(zhí)行,標(biāo)記設(shè)置為 False。游戲循環(huán)是通過客戶端的處理方法啟動(dòng)的。

  1. if app["game_is_running"] == False
  2.       asyncio.ensure_future(game_loop(app)) 

當(dāng) game_loop() 運(yùn)行時(shí),這個(gè)標(biāo)記設(shè)置為 True;當(dāng)所有客戶端都斷開連接時(shí),其又被設(shè)置為 False。

例子 3.3:管理任務(wù)

這個(gè)例子用來解釋如何和任務(wù)對(duì)象協(xié)同工作。我們把游戲循環(huán)的任務(wù)直接存儲(chǔ)在游戲循環(huán)的全局字典中,代替標(biāo)記的使用。在像這樣的一個(gè)簡(jiǎn)單例子中并不一定是最優(yōu)的,但是有時(shí)候你可能需要控制所有已經(jīng)啟動(dòng)的任務(wù)。

  1. if app["game_loop"is None or \ 
  2.    app["game_loop"].cancelled(): 
  3.     app["game_loop"] = asyncio.ensure_future(game_loop(app)) 

這里 ensure_future() 返回我們存放在全局字典中的任務(wù)對(duì)象,當(dāng)所有用戶都斷開連接時(shí),我們使用下面方式取消任務(wù):

  1. app["game_loop"].cancel() 

這個(gè) cancel() 調(diào)用將通知調(diào)度器不要向這個(gè)協(xié)程傳遞執(zhí)行權(quán),而且將它的狀態(tài)設(shè)置為已取消:cancelled,之后可以通過 cancelled() 方法來檢查是否已取消。這里有一個(gè)值得一提的小注意點(diǎn):當(dāng)你持有一個(gè)任務(wù)對(duì)象的外部引用時(shí),而這個(gè)任務(wù)執(zhí)行中發(fā)生了異常,這個(gè)異常不會(huì)拋出。取而代之的是為這個(gè)任務(wù)設(shè)置一個(gè)異常狀態(tài),可以通過 exception() 方法來檢查是否出現(xiàn)了異常。這種悄無聲息地失敗在調(diào)試時(shí)不是很有用。所以,你可能想用拋出所有異常來取代這種做法。你可以對(duì)所有未完成的任務(wù)顯式地調(diào)用 result() 來實(shí)現(xiàn)??梢酝ㄟ^如下的回調(diào)來實(shí)現(xiàn):

  1. app["game_loop"].add_done_callback(lambda t: t.result()) 

如果我們打算在我們代碼中取消這個(gè)任務(wù),但是又不想產(chǎn)生 CancelError 異常,有一個(gè)檢查 cancelled 狀態(tài)的點(diǎn):

  1. app["game_loop"].add_done_callback(lambda t: t.result() if not t.cancelled() else None) 

注意僅當(dāng)你持有任務(wù)對(duì)象的引用時(shí)才需要這么做。在前一個(gè)例子,所有的異常都是沒有額外的回調(diào),直接拋出所有異常。

例子 3.4:等待多個(gè)事件

在許多場(chǎng)景下,在客戶端的處理方法中你需要等待多個(gè)事件的發(fā)生。除了來自客戶端的消息,你可能需要等待不同類型事件的發(fā)生。比如,如果你的游戲時(shí)間有限制,那么你可能需要等一個(gè)來自定時(shí)器的信號(hào)?;蛘吣阈枰褂霉艿纴淼却齺碜云渌M(jìn)程的消息。亦或者是使用分布式消息系統(tǒng)的網(wǎng)絡(luò)中其它服務(wù)器的信息。

為了簡(jiǎn)單起見,這個(gè)例子是基于例子 3.1。但是這個(gè)例子中我們使用 Condition 對(duì)象來與已連接的客戶端保持游戲循環(huán)的同步。我們不保存套接字的全局列表,因?yàn)橹辉谠撎幚矸椒ㄖ惺褂锰捉幼?。?dāng)游戲循環(huán)停止迭代時(shí),我們使用 Condition.notify_all() 方法來通知所有的客戶端。這個(gè)方法允許在 asyncio 的事件循環(huán)中使用發(fā)布/訂閱的模式。

為了等待這兩個(gè)事件,首先我們使用 ensure_future() 來封裝任務(wù)中這個(gè)可等待對(duì)象。

  1. if not recv_task: 
  2.     recv_task = asyncio.ensure_future(ws.receive()) 
  3. if not tick_task: 
  4.     await tick.acquire() 
  5.     tick_task = asyncio.ensure_future(tick.wait()) 

在我們調(diào)用 Condition.wait() 之前,我們需要在它后面獲取一把鎖。這就是我們?yōu)槭裁聪日{(diào)用 tick.acquire() 的原因。在調(diào)用 tick.wait() 之后,鎖會(huì)被釋放,這樣其他的協(xié)程也可以使用它。但是當(dāng)我們收到通知時(shí),會(huì)重新獲取鎖,所以在收到通知后需要調(diào)用 tick.release() 來釋放它。

我們使用 asyncio.wait() 協(xié)程來等待兩個(gè)任務(wù)。

  1. done, pending = await asyncio.wait( 
  2.         [recv_task, 
  3.          tick_task], 
  4.         return_when=asyncio.FIRST_COMPLETED) 

程序會(huì)阻塞,直到列表中的任意一個(gè)任務(wù)完成。然后它返回兩個(gè)列表:執(zhí)行完成的任務(wù)列表和仍然在執(zhí)行的任務(wù)列表。如果任務(wù)執(zhí)行完成了,其對(duì)應(yīng)變量賦值為 None,所以在下一個(gè)迭代時(shí),它可能會(huì)被再次創(chuàng)建。

例子 3.5: 結(jié)合多個(gè)線程

在這個(gè)例子中,我們結(jié)合 asyncio 循環(huán)和線程,在一個(gè)單獨(dú)的線程中執(zhí)行主游戲循環(huán)。我之前提到過,由于 GIL 的存在,Python 代碼的真正并行執(zhí)行是不可能的。所以使用其它線程來執(zhí)行復(fù)雜計(jì)算并不是一個(gè)好主意。然而,在使用 asyncio 時(shí)結(jié)合線程有原因的:當(dāng)我們使用的其它庫(kù)不支持 asyncio 時(shí)就需要。在主線程中調(diào)用這些庫(kù)會(huì)阻塞循環(huán)的執(zhí)行,所以異步使用他們的唯一方法是在不同的線程中使用他們。

我們使用 asyncio 循環(huán)的run_in_executor() 方法和 ThreadPoolExecutor 來執(zhí)行游戲循環(huán)。注意 game_loop() 已經(jīng)不再是一個(gè)協(xié)程了。它是一個(gè)由其它線程執(zhí)行的函數(shù)。然而我們需要和主線程交互,在游戲事件到來時(shí)通知客戶端。asyncio 本身不是線程安全的,它提供了可以在其它線程中執(zhí)行你的代碼的方法。普通函數(shù)有 call_soon_threadsafe(),協(xié)程有 run_coroutine_threadsafe()。我們?cè)?notify() 協(xié)程中增加了通知客戶端游戲的嘀嗒的代碼,然后通過另外一個(gè)線程執(zhí)行主事件循環(huán)。

  1. def game_loop(asyncio_loop): 
  2.     print("Game loop thread id {}".format(threading.get_ident())) 
  3.     async def notify(): 
  4.         print("Notify thread id {}".format(threading.get_ident())) 
  5.         await tick.acquire() 
  6.         tick.notify_all() 
  7.         tick.release() 
  8.     while 1: 
  9.         task = asyncio.run_coroutine_threadsafe(notify(), asyncio_loop) 
  10.         # blocking the thread 
  11.         sleep(1) 
  12.         # make sure the task has finished 
  13.         task.result() 

當(dāng)你執(zhí)行這個(gè)例子時(shí),你會(huì)看到 “Notify thread id” 和 “Main thread id” 相等,因?yàn)?notify() 協(xié)程在主線程中執(zhí)行。與此同時(shí) sleep(1) 在另外一個(gè)線程中執(zhí)行,因此它不會(huì)阻塞主事件循環(huán)。

例子 3.6:多進(jìn)程和擴(kuò)展

單線程的服務(wù)器可能運(yùn)行得很好,但是它只能使用一個(gè) CPU 核。為了將服務(wù)擴(kuò)展到多核,我們需要執(zhí)行多個(gè)進(jìn)程,每個(gè)進(jìn)程執(zhí)行各自的事件循環(huán)。這樣我們需要在進(jìn)程間交互信息或者共享游戲的數(shù)據(jù)。而且在一個(gè)游戲中經(jīng)常需要進(jìn)行復(fù)雜的計(jì)算,例如路徑查找之類。這些任務(wù)有時(shí)候在一個(gè)游戲嘀嗒中沒法快速完成。在協(xié)程中不推薦進(jìn)行費(fèi)時(shí)的計(jì)算,因?yàn)樗鼤?huì)阻塞事件的處理。在這種情況下,將這個(gè)復(fù)雜任務(wù)交給其它并行執(zhí)行的進(jìn)程可能更合理。

最簡(jiǎn)單的使用多個(gè)核的方法是啟動(dòng)多個(gè)使用單核的服務(wù)器,就像之前的例子中一樣,每個(gè)服務(wù)器占用不同的端口。你可以使用 supervisord 或者其它進(jìn)程控制的系統(tǒng)。這個(gè)時(shí)候你需要一個(gè)像 HAProxy 這樣的負(fù)載均衡器,使得連接的客戶端分布在多個(gè)進(jìn)程間。已經(jīng)有一些可以連接 asyncio 和一些流行的消息及存儲(chǔ)系統(tǒng)的適配系統(tǒng)。例如:

  • aiomcache 用于 memcached 客戶端
  • aiozmq 用于 zeroMQ
  • aioredis 用于 Redis 存儲(chǔ),支持發(fā)布/訂閱

你可以在 github 或者 pypi 上找到其它的軟件包,大部分以 aio 開頭。

使用網(wǎng)絡(luò)服務(wù)在存儲(chǔ)持久狀態(tài)和交換某些信息時(shí)可能比較有效。但是如果你需要進(jìn)行進(jìn)程間通信的實(shí)時(shí)處理,它的性能可能不足。此時(shí),使用標(biāo)準(zhǔn)的 unix 管道可能更合適。asyncio 支持管道,在aiohttp倉(cāng)庫(kù)有個(gè) 使用管道的服務(wù)器的非常底層的例子。

在當(dāng)前的例子中,我們使用 Python 的高層類庫(kù) multiprocessing 來在不同的核上啟動(dòng)復(fù)雜的計(jì)算,使用 multiprocessing.Queue 來進(jìn)行進(jìn)程間的消息交互。不幸的是,當(dāng)前的 multiprocessing 實(shí)現(xiàn)與 asyncio 不兼容。所以每一個(gè)阻塞方法的調(diào)用都會(huì)阻塞事件循環(huán)。但是此時(shí)線程正好可以起到幫助作用,因?yàn)槿绻诓煌€程里面執(zhí)行 multiprocessing 的代碼,它就不會(huì)阻塞主線程。所有我們需要做的就是把所有進(jìn)程間的通信放到另外一個(gè)線程中去。這個(gè)例子會(huì)解釋如何使用這個(gè)方法。和上面的多線程例子非常類似,但是我們從線程中創(chuàng)建的是一個(gè)新的進(jìn)程。

  1. def game_loop(asyncio_loop): 
  2.     # coroutine to run in main thread 
  3.     async def notify(): 
  4.         await tick.acquire() 
  5.         tick.notify_all() 
  6.         tick.release() 
  7.     queue = Queue() 
  8.     # function to run in a different process 
  9.     def worker(): 
  10.         while 1: 
  11.             print("doing heavy calculation in process {}".format(os.getpid())) 
  12.             sleep(1) 
  13.             queue.put("calculation result"
  14.     Process(target=worker).start() 
  15.     while 1: 
  16.         # blocks this thread but not main thread with event loop 
  17.         result = queue.get() 
  18.         print("getting {} in process {}".format(result, os.getpid())) 
  19.         task = asyncio.run_coroutine_threadsafe(notify(), asyncio_loop) 
  20.         task.result() 

這里我們?cè)诹硗庖粋€(gè)進(jìn)程中運(yùn)行 worker() 函數(shù)。它包括一個(gè)執(zhí)行復(fù)雜計(jì)算并把計(jì)算結(jié)果放到 queue 中的循環(huán),這個(gè) queue 是 multiprocessing.Queue 的實(shí)例。然后我們就可以在另外一個(gè)線程的主事件循環(huán)中獲取結(jié)果并通知客戶端,就和例子 3.5 一樣。這個(gè)例子已經(jīng)非常簡(jiǎn)化了,它沒有合理的結(jié)束進(jìn)程。而且在真實(shí)的游戲中,我們可能需要另外一個(gè)隊(duì)列來將數(shù)據(jù)傳遞給 worker。

有一個(gè)項(xiàng)目叫 aioprocessing,它封裝了 multiprocessing,使得它可以和 asyncio 兼容。但是實(shí)際上它只是和上面例子使用了完全一樣的方法:從線程中創(chuàng)建進(jìn)程。它并沒有給你帶來任何方便,除了它使用了簡(jiǎn)單的接口隱藏了后面的這些技巧。希望在 Python 的下一個(gè)版本中,我們能有一個(gè)基于協(xié)程且支持 asyncio 的 multiprocessing 庫(kù)。

注意!如果你從主線程或者主進(jìn)程中創(chuàng)建了一個(gè)不同的線程或者子進(jìn)程來運(yùn)行另外一個(gè) asyncio 事件循環(huán),你需要顯式地使用 asyncio.new_event_loop() 來創(chuàng)建循環(huán),不然的話可能程序不會(huì)正常工作。

責(zé)任編輯:龐桂玉 來源: Linux中國(guó)
相關(guān)推薦

2016-09-14 21:17:47

PythonAsyncio游戲

2016-09-22 21:12:14

2010-03-05 18:42:31

杜比語(yǔ)音聊天

2020-02-21 08:00:00

Pythonasyncio編程語(yǔ)言

2018-06-27 14:50:06

Cloud StudiSpring Boot應(yīng)用

2021-09-15 14:53:35

在線文檔多人協(xié)作

2011-12-16 10:08:36

Node.js

2015-07-31 10:10:12

javaweb在線聊天

2014-11-20 13:56:08

2024-01-18 08:37:33

socketasyncio線程

2021-04-13 06:35:13

Elixir語(yǔ)言編程語(yǔ)言軟件開發(fā)

2018-10-08 15:35:56

Python異步IO

2020-01-16 11:42:45

PyramidCornicePython Web

2014-03-31 10:51:40

pythonasyncio

2017-08-02 15:00:12

PythonAsyncio異步編程

2020-09-21 08:58:57

PythonOpenCV乒乓球

2014-10-30 10:28:55

Node.js

2013-04-10 10:58:19

LambdaC#

2017-09-05 08:08:37

asyncio程序多線程

2017-05-05 08:44:24

PythonAsyncio異步編程
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)