自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

關(guān)于FastAPI中在新線程里調(diào)用協(xié)成函數(shù)問題

開發(fā)
在異步接口里面接收批量上傳的文件夾后通過webdav3進行批量進行文件處理。其實涉及的問題就是:相對于在異步之中進行線程化的異步處理。

先前有公眾號朋友問起一個問題,大概的問題是這樣:

在異步接口里面接收批量上傳的文件夾后通過webdav3進行批量進行文件處理。其實涉及的問題就是:相對于在異步之中進行線程化的異步處理。

大致的代碼如下所示:


@uploadrp.post('/upload/rp')
async def upload_rp_file(file: List[UploadFile] = File(...)):
   ........
    upload_result = await new_upload_file(file_path=file_path, file_name=file_name, old_name=rp_dir_name)
   
    .........

其中對應(yīng)的new_upload_file也是異步函數(shù),但是里面涉及到webdav3庫的異步的調(diào)用,如下代碼所示:

async def new_upload_file(file_path, file_name, old_name):
    """ 上傳文件夾 """
.........
  upload_rp_file_result = client.upload_async('' + file_name, file_path,
                                                callback=partial(completion_callback, old_file_dir=old_name, file_name=file_name))

  .........

容納后在調(diào)用upload_async的使用,需要進行任務(wù)處理完成的回調(diào),也就是completion_callback函數(shù),它的代碼如下:

def completion_callback(old_file_dir, file_name):
    pss

它的問題就是在completion_callback還有一個異步的函數(shù)需要調(diào)用的時候就遇到問題了:

(1) 我無法在upload_async上傳文件完成后的回調(diào)函數(shù)使用async,即 async def completion_callback

(2) 我在completion_callback中獲取當前事件循環(huán),因為執(zhí)行這個回調(diào)函數(shù)的時候,控制臺打印當前沒有事件循環(huán)

3) 我在completion_callback中new一個事件循環(huán)(new_event_loop)然后用create_task或run_until_complete執(zhí)行update_rp_file_status函數(shù)的時候,都不成功:

  • 要不然就報update_rp_file_status函數(shù)跟new的事件循環(huán)不是同一個...
  • 要不就是這個update_rp_file_status壓根不執(zhí)行...
  • 要不就是還沒執(zhí)行,new出來的事件循環(huán),在update_rp_file_status還沒執(zhí)行的時候就被銷毀了,很多問題...

問題探究

其實這個問題本質(zhì)其實就是關(guān)于 client.upload_async中啟用了新的線程去異步處理了,導(dǎo)致了開啟了新的線程的問題,如下 client.upload_async的代碼:

由此可見,問題肯本原因在于是因為我們是在新線程中調(diào)用了異步函數(shù),而新線程沒有自己的事件循環(huán)導(dǎo)致的。

回到事件循環(huán)的本身上,我們知道首先需要知道一點:

異步函數(shù)需要在事件循環(huán)中運行。但是由于我們啟動服務(wù)之后,在主線程中,F(xiàn)astAPI應(yīng)用程序已經(jīng)創(chuàng)建了一個事件循環(huán),并且通過uvicorn.run()方法來運行該應(yīng)用程序,而這個的事件循環(huán)對象是當前主線程創(chuàng)建出來的, 而當我們在新線程中,也就是client.upload_async調(diào)用之后,開啟的線程中并沒有默認的事件循環(huán)可用。因此,當你嘗試在新線程中調(diào)用異步函數(shù)時,就會拋出RuntimeError: There is no current event loop in thread異常,甚至你嘗試打印獲取當前運行的事件循環(huán)都無法獲取到。如下代碼所示:

from fastapi import FastAPI
import asyncio
app = FastAPI()


from threading import Thread  # 創(chuàng)建線程的模塊

def task(name):
    print(asyncio.get_event_loop())


async def do_task():
    # 異步任務(wù)的邏輯
    await asyncio.sleep(1)
    print("異步任務(wù)完成")


@app.get("/items")
async def read_item():
    p = Thread(target=task, args=('線程1',))
    p.start()  
    return {"data": "ok"}

在上面的等待嗎中我們直接輸出當前 print(asyncio.get_event_loop())也會遇到如下類似的錯誤提示:

問題解決

解決此類的問題,開始的時候,因為具體可能沒了解帶業(yè)務(wù)細節(jié)和邏輯,所以給的思路也有點偏差,后來自己嘗試后,其實才發(fā)現(xiàn)原來只是一個比較簡單的問題,此類的問題,我們可以在新線程中創(chuàng)建一個新的事件循環(huán),并將任務(wù)添加到該事件循環(huán)中,以確保異步函數(shù)能夠正確運行。這樣,每個線程都有自己的事件循環(huán),可以獨立地運行異步任務(wù)。也就是如下的代碼:

from fastapi import FastAPI
import asyncio
app = FastAPI()


from threading import Thread  # 創(chuàng)建線程的模塊

def task(name):
    print(asyncio.get_event_loop())
    # 嘗試去執(zhí)行異步任務(wù)
    asyncio.run(do_task())


async def do_task():
    # 異步任務(wù)的邏輯
    await asyncio.sleep(1)
    print("異步任務(wù)完成")


@app.get("/items")
async def read_item():
    p = Thread(target=task, args=('線程1',))
    p.start()  
    return {"data": "ok"}

在上面的代碼中使用  asyncio.run開啟新的事件循環(huán)去處理異步函數(shù)即可。但是之前那個朋友也說它嘗試夠使用在completion_callback中new一個事件循環(huán)(new_event_loop)然后用create_task或run_until_complete執(zhí)行update_rp_file_status函數(shù)的時候,都不成功,或許可能得原因是它創(chuàng)建新的事件循環(huán)之后,沒有進行重新設(shè)置,所以會導(dǎo)致無法設(shè)置成功,也就是我們也可以改為如下代碼進行運行:

def task(name):
    loop = asyncio.new_event_loop()  # 創(chuàng)建新的事件循環(huán)
    asyncio.set_event_loop(loop)  # 設(shè)置新的事件循環(huán)為當前線程的事件循環(huán)
    loop.run_until_complete(do_task())  # 在新的事件循環(huán)中運行異步任務(wù)

關(guān)于asyncio.run 和手動的設(shè)置創(chuàng)建事件循環(huán)

asyncio.run()是從Python 3.7版本引入的一個方便的函數(shù),它用于運行異步函數(shù)。它自身會自動創(chuàng)建一個新的事件循環(huán),并將異步函數(shù)添加到該事件循環(huán)中運行。在異步函數(shù)完成后,它會關(guān)閉事件循環(huán)并返回結(jié)果。而我們的

 loop = asyncio.new_event_loop()  # 創(chuàng)建新的事件循環(huán)
    asyncio.set_event_loop(loop)  # 設(shè)置新的事件循環(huán)為當前線程的事件循環(huán)
    loop.run_until_complete(do_task())  # 在新的事件循環(huán)中運行異步

是一種手動創(chuàng)建、設(shè)置和運行事件循環(huán)的方法包括:

使用asyncio.new_event_loop()創(chuàng)建一個新的事件循環(huán)。使用asyncio.set_event_loop(loop)將新創(chuàng)建的事件循環(huán)設(shè)置為當前線程的事件循環(huán)。使用loop.run_until_complete(do_task())在新的事件循環(huán)中運行異步任務(wù),直到任務(wù)完成。

他們之間的區(qū)別在于使用asyncio.run()時,它會自動處理事件循環(huán)的創(chuàng)建、設(shè)置和關(guān)閉,非常方便。而手動創(chuàng)建、設(shè)置和運行事件循環(huán)需要更多的代碼來處理這些步驟,但也提供了更多的靈活性和控制權(quán)。

責任編輯:趙寧寧 來源: 程序員小鐘同學(xué)
相關(guān)推薦

2011-08-22 17:13:00

LuaC++函數(shù)

2012-04-16 13:47:37

JavaMatlab

2020-05-06 16:47:08

線程安全Python數(shù)據(jù)安全

2020-05-07 10:05:52

Python數(shù)據(jù)安全

2020-02-24 10:39:55

Python函數(shù)線程池

2024-12-13 09:26:35

2021-06-03 14:08:03

開發(fā)技能代碼

2021-06-04 14:28:07

協(xié)程線程Android開發(fā)

2010-03-15 17:17:29

Java線程池

2022-09-21 11:45:22

Vue組件插槽函數(shù)

2022-09-22 08:45:10

Vue組件函數(shù)

2023-12-24 12:56:36

協(xié)程

2020-11-29 17:03:08

進程線程協(xié)程

2011-03-29 10:41:51

Java線程安全

2009-08-11 14:16:00

Winform調(diào)用WEC#

2024-06-12 12:50:06

2009-07-31 14:47:22

JavaScript函C#

2021-05-21 07:59:40

應(yīng)用程序設(shè)計動態(tài)庫函數(shù)

2022-09-06 08:25:13

線程異步任務(wù)

2012-05-14 17:09:05

iOS
點贊
收藏

51CTO技術(shù)棧公眾號