我們一起聊聊如何編寫(xiě)異步運(yùn)行時(shí)通用庫(kù)?
如果你正在用Rust編寫(xiě)異步應(yīng)用程序,在某些情況下,你可能希望將代碼分成幾個(gè)子crate。這樣做的好處是:
- 更好的封裝,在子系統(tǒng)之間有一個(gè)crate邊界,可以產(chǎn)生更清晰的代碼和定義更良好的API。不再需要這樣寫(xiě):pub(crate)。
- 更快的編譯,通過(guò)將一個(gè)大crate分解成幾個(gè)獨(dú)立的小crate,它們可以并發(fā)地編譯。
使用一個(gè)異步運(yùn)行時(shí),編寫(xiě)異步運(yùn)行時(shí)通用庫(kù)的好處是什么?
- 可移植性,你可以很容易地切換到不同的異步運(yùn)行時(shí)或wasm。
- 保證正確性,針對(duì)tokio和async-std,測(cè)試一個(gè)庫(kù)就可以發(fā)現(xiàn)更多的bug,包括并發(fā)bug(由于任務(wù)執(zhí)行順序模糊)和“未定義行為”(由于誤解異步運(yùn)行時(shí)實(shí)現(xiàn)細(xì)節(jié))
下面使用三種方法來(lái)實(shí)現(xiàn)異步運(yùn)行時(shí)通用庫(kù)。
方法1,定義自己的異步運(yùn)行時(shí)Trait
使用futures crate,可以編寫(xiě)非常通用的庫(kù)代碼,但是time,sleep或timeout等操作必須依賴于異步運(yùn)行時(shí)。這時(shí),你可以定義自己的AsyncRuntime trait,并要求下游實(shí)現(xiàn)它。
use std::{future::Future, time::Duration};
pub trait AsyncRuntime: Send + Sync + 'static {
type Delay: Future<Output = ()> + Send;
// 返回值必須是一個(gè)Future
fn sleep(duration: Duration) -> Self::Delay;
}
可以像這樣使用上面的庫(kù)代碼:
async fn operation<R: AsyncRuntime>() {
R::sleep(Duration::from_millis(1)).await;
}
下面是它如何實(shí)現(xiàn)的:
pub struct TokioRuntime;
impl AsyncRuntime for TokioRuntime {
type Delay = tokio::time::Sleep;
fn sleep(duration: Duration) -> Self::Delay {
tokio::time::sleep(duration)
}
}
#[tokio::main]
async fn main() {
operation::<TokioRuntime>().await;
println!("Hello, world!");
}
方法2,在內(nèi)部抽象異步運(yùn)行時(shí)并公開(kāi)特性標(biāo)志
為了處理網(wǎng)絡(luò)連接或文件句柄,我們可以使用AsyncRead / AsyncWrite trait:
#[async_trait]
pub(crate) trait AsyncRuntime: Send + Sync + 'static {
type Connection: AsyncRead + AsyncWrite + Send + Sync + 'static;
async fn connect(addr: SocketAddr) -> std::io::Result<Self::Connection>;
}
可以像這樣使用上面的庫(kù)代碼:
async fn operation<R: AsyncRuntime>(conn: &mut R::Connection)
where
R::Connection: Unpin,
{
conn.write(b"some bytes").await;
}
然后為每個(gè)異步運(yùn)行時(shí)定義一個(gè)模塊:
#[cfg(feature = "runtime-async-std")]
mod async_std_impl;
#[cfg(feature = "runtime-async-std")]
use async_std_impl::*;
#[cfg(feature = "runtime-tokio")]
mod tokio_impl;
#[cfg(feature = "runtime-tokio")]
use tokio_impl::*;
tokio_impl模塊:
mod tokio_impl {
use std::net::SocketAddr;
use async_trait::async_trait;
use crate::AsyncRuntime;
pub struct TokioRuntime;
#[async_trait]
impl AsyncRuntime for TokioRuntime {
type Connection = tokio::net::TcpStream;
async fn connect(addr: SocketAddr) -> std::io::Result<Self::Connection> {
tokio::net::TcpStream::connect(addr).await
}
}
}
main函數(shù)代碼:
#[tokio::main]
async fn main() {
let mut conn =
TokioRuntime::connect(SocketAddr::new(IpAddr::from_str("0.0.0.0").unwrap(), 8080))
.await
.unwrap();
operation::<TokioRuntime>(&mut conn).await;
println!("Hello, world!");
}
方法3,維護(hù)一個(gè)異步運(yùn)行時(shí)抽象庫(kù)
基本上,將使用的所有異步運(yùn)行時(shí)api寫(xiě)成一個(gè)包裝器庫(kù)。這樣做可能很繁瑣,但也有一個(gè)好處,即可以在一個(gè)地方為項(xiàng)目指定與異步運(yùn)行時(shí)的所有交互,這對(duì)于調(diào)試或跟蹤非常方便。
例如,我們定義異步運(yùn)行時(shí)抽象庫(kù)的名字為:common-async-runtime,它的異步任務(wù)處理代碼如下:
// common-async-runtime/tokio_task.rs
pub use tokio::task::{JoinHandle as TaskHandle};
pub fn spawn_task<F, T>(future: F) -> TaskHandle<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
tokio::task::spawn(future)
}
async-std的任務(wù)API與Tokio略有不同,這需要一些樣板文件:
// common-async-runtime/async_std_task.rs
pub struct TaskHandle<T>(async_std::task::JoinHandle<T>);
pub fn spawn_task<F, T>(future: F) -> TaskHandle<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
TaskHandle(async_std::task::spawn(future))
}
#[derive(Debug)]
pub struct JoinError;
impl std::error::Error for JoinError {}
impl<T> Future for TaskHandle<T> {
type Output = Result<T, JoinError>;
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
match self.0.poll_unpin(cx) {
std::task::Poll::Ready(res) => std::task::Poll::Ready(Ok(res)),
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}
在Cargo.toml中,你可以簡(jiǎn)單地將common-async-runtime作為依賴項(xiàng)包含進(jìn)來(lái)。這使得你的庫(kù)代碼很“純粹”,因?yàn)楝F(xiàn)在選擇異步運(yùn)行時(shí)是由下游控制的。與方法1類似,這個(gè)crate可以在沒(méi)有任何異步運(yùn)行時(shí)的情況下編譯,這很簡(jiǎn)潔!