Rust Tokio取消任務(wù)的幾種模式,你知道嗎?
Rust提供了對(duì)異步編程的支持,它可以生成異步任務(wù),然后通過運(yùn)行時(shí)執(zhí)行器在操作系統(tǒng)線程之間調(diào)度執(zhí)行。
與Rust中的所有東西一樣,異步編程必須是內(nèi)存安全的,因此需要確保借用檢查器可以編譯通過。
這篇文章是關(guān)于任務(wù)取消模式的,下面我們來介紹Tokio任務(wù)的取消模式。
Select 和 Channels
所有這些模式的核心是兩個(gè)tokio特性:
- channel:用于任務(wù)間通信
- select:用于等待多個(gè)異步計(jì)算(不一定是任務(wù)!)
Tokio channel看起來有點(diǎn)復(fù)雜,但同時(shí)就程序的內(nèi)存安全和彈性而言,它很強(qiáng)大。Tokio channel創(chuàng)建了兩個(gè)不同的對(duì)象,用于任務(wù)之間的通信,不能同時(shí)使用一個(gè)通道對(duì)象來接收和發(fā)送。
Tokio提供的頻道實(shí)際上有四種:
- mpsc:多個(gè)生產(chǎn)者,單一消費(fèi)者
- oneshot:用于發(fā)送和接收單個(gè)值,發(fā)送后,通道關(guān)閉。
- broadcast:多個(gè)發(fā)送者,多個(gè)消費(fèi)者
- watch:單一生產(chǎn)者,多個(gè)消費(fèi)者
Drop JoinHandle不會(huì)取消任務(wù)
JoinHandle在刪除關(guān)聯(lián)的任務(wù)時(shí)將其分離,這意味著不再有任何任務(wù)句柄,也沒有辦法對(duì)其進(jìn)行連接。
每次在tokio中生成任務(wù)時(shí),都會(huì)返回JoinHandle??梢允褂胘oin句柄來等待任務(wù)完成,但是認(rèn)為可以使用它來簡單地通過刪除任務(wù)來強(qiáng)制終止任務(wù)是錯(cuò)誤的。這里有一個(gè)愚蠢的例子:
use tokio::time::{self, Duration};
#[tokio::main]
async fn main() {
let handle = tokio::spawn(async {
// do some work
tokio::time::sleep(Duration::from_secs(10)).await;
println!("Task completed");
});
// 100毫秒后取消任務(wù)
time::sleep(Duration::from_millis(100)).await;
drop(handle);
println!("Task was cancelled");
}
丟棄句柄并不會(huì)取消正在運(yùn)行的任務(wù)!
Abort任務(wù)
這是取消任務(wù)的最極端的方式,沒有清理的空間:
use tokio::time::{self, Duration};
#[tokio::main]
async fn main() {
let handle = tokio::spawn(async {
// do some work
tokio::time::sleep(Duration::from_secs(1)).await;
println!("Task completed");
});
// 100毫秒后取消任務(wù)
time::sleep(Duration::from_millis(100)).await;
handle.abort();
time::sleep(Duration::from_secs(2)).await;
println!("Task was cancelled");
}
使用oneshot channel
oneshot channel允許通道上的發(fā)送單個(gè)值,可以由多個(gè)接收器偵聽。與drop模式不同,此模式允許通道執(zhí)行一些清理工作。這里有一個(gè)例子:
use tokio::sync::oneshot;
use tokio::time::Duration;
#[tokio::main]
async fn main() {
let (tx, rx) = oneshot::channel();
let task = tokio::spawn(async move {
tokio::select! {
_ = rx => {
println!("Task is cancelling...");
}
_ = tokio::time::sleep(Duration::from_secs(10)) => {
println!("Task completed normally");
}
}
println!("Task is cleaning up");
});
tokio::time::sleep(Duration::from_millis(100)).await;
// 發(fā)送取消信號(hào)
let _ = tx.send(());
// 等待任務(wù)完成
let _ = task.await;
}
運(yùn)行結(jié)果如下:
Task is cancelling...
Task is cleaning up
oneshot channel的限制是你不能用它來取消多個(gè)任務(wù)。
使用broadcast channel取消多個(gè)任務(wù)
如果要取消多個(gè)任務(wù),可以使用broad channel。可以有多個(gè)生產(chǎn)者向通道發(fā)送信息,也可以有多個(gè)消費(fèi)者從通道接收信息。每個(gè)接收方都可以看到在通道上發(fā)送的每個(gè)值。
這里有一個(gè)簡單的例子,來演示如何使用它來取消多個(gè)任務(wù):
use tokio::sync::broadcast;
use tokio::time::Duration;
#[tokio::main]
async fn main() {
let (tx, mut rx1) = broadcast::channel(1);
let mut rx2 = tx.subscribe();
let task1 = tokio::spawn(async move {
tokio::select! {
_ = rx1.recv() => {
println!("Task 1 is cancelling...");
}
_ = tokio::time::sleep(Duration::from_secs(10)) => {
println!("Task 1 completed normally");
}
}
println!("Task 1 is cleaning up");
});
let task2 = tokio::spawn(async move {
tokio::select! {
_ = rx2.recv() => {
println!("Task 2 is cancelling...");
}
_ = tokio::time::sleep(Duration::from_secs(10)) => {
println!("Task 2 completed normally");
}
}
println!("Task 2 is cleaning up");
});
tokio::time::sleep(Duration::from_millis(100)).await;
// 發(fā)送取消信號(hào)
let _ = tx.send(());
// 等待任務(wù)完成
let _ = tokio::join!(task1, task2);
}
運(yùn)行結(jié)果如下:
Task 2 is cancelling...
Task 2 is cleaning up
Task 1 is cancelling...
Task 1 is cleaning up
取消的順序可能會(huì)有所不同,因?yàn)槿蝿?wù)可能會(huì)以不同的順序取消!
如果只想從單個(gè)任務(wù)向多個(gè)任務(wù)發(fā)送取消信號(hào),那么broad channel可能有點(diǎn)過度,因?yàn)樗峁┝嗽诙鄠€(gè)任務(wù)之間傳遞消息的所有機(jī)制。
如果既需要消息傳遞又需要消息取消,這很方便。但如果只需要消息取消,還有更好的方法,開銷更少:watch channel。
使用watch channel取消多個(gè)任務(wù)
watch channel是多個(gè)消費(fèi)者頻道的單一生產(chǎn)者。watch channel給了任務(wù)清理自己的機(jī)會(huì)。缺點(diǎn)是,消費(fèi)者只能看到通道上發(fā)送的最近的值——這意味著如果任務(wù)在通道上發(fā)送了一個(gè)值之后啟動(dòng),它可能會(huì)錯(cuò)過它,因此不會(huì)被取消,所以要小心這一點(diǎn)。這里有一個(gè)簡單的例子:
use tokio::sync::watch;
use tokio::time::Duration;
#[tokio::main]
async fn main() {
let (tx, mut rx1) = watch::channel(false);
let mut rx2 = tx.subscribe();
let task1 = tokio::spawn(async move {
loop {
tokio::select! {
_ = rx1.changed() => {
if *rx1.borrow() {
println!("Task 1 is cancelling...");
break;
}
}
_ = tokio::time::sleep(Duration::from_secs(10)) => {
println!("Task 1 completed normally");
break;
}
}
}
println!("Task 1 is cleaning up");
});
let task2 = tokio::spawn(async move {
loop {
tokio::select! {
_ = rx2.changed() => {
if *rx2.borrow() {
println!("Task 2 is cancelling...");
break;
}
}
_ = tokio::time::sleep(Duration::from_secs(10)) => {
println!("Task 2 completed normally");
break;
}
}
}
println!("Task 2 is cleaning up");
});
tokio::time::sleep(Duration::from_millis(100)).await;
// 發(fā)送取消信號(hào)
let _ = tx.send(true);
// 等待任務(wù)完成
let _ = tokio::join!(task1, task2);
}
取消令牌
官方的Tokio文檔中列出了一種名為CancellationToken的東西,用于優(yōu)雅關(guān)機(jī)。這在tokio crate本身中不可用,但在相關(guān)的toko_util crate中可用。
use tokio::time::{sleep, Duration};
use tokio_util::sync::CancellationToken;
#[tokio::main]
async fn main() {
// Create a CancellationToken
let token = CancellationToken::new();
let token1 = token.clone();
let token2 = token.clone();
let task1 = tokio::spawn(async move {
loop {
tokio::select! {
_ = token1.cancelled() => {
println!("Task 1 is cancelling...");
break;
}
_ = tokio::time::sleep(Duration::from_secs(10)) => {
println!("Task 1 completed normally");
break;
}
}
}
println!("Task 1 is cleaning up");
});
let task2 = tokio::spawn(async move {
loop {
tokio::select! {
_ = token2.cancelled() => {
println!("Task 2 is cancelling...");
break;
}
_ = tokio::time::sleep(Duration::from_secs(10)) => {
println!("Task 2 completed normally");
break;
}
}
}
println!("Task 2 is cleaning up");
});
sleep(Duration::from_millis(100)).await;
// 發(fā)送取消信號(hào)
token.cancel();
// 等待任務(wù)完成
let _ = tokio::join!(task1, task2);
}
請(qǐng)注意我們是如何克隆令牌的,以便將其移動(dòng)到各個(gè)異步任務(wù)中。