使用Rust構(gòu)建可以并發(fā)執(zhí)行多個任務(wù)的線程池
在這篇文章中讓我們探討一下如何使用Rust構(gòu)建線程池來并發(fā)地管理多個任務(wù)。
在開始實際的編碼之前,讓我們首先了解線程池是什么以及它是如何工作的。
線程池
線程池是工作線程的集合,創(chuàng)建這些線程是為了同時執(zhí)行多個任務(wù)并等待新任務(wù)的到來。這意味著一開始創(chuàng)建了多個線程,并且所有線程都處于空閑狀態(tài)。
每當你的系統(tǒng)獲得任務(wù)時,它可以快速地將任務(wù)分配給這些線程,從而節(jié)省大量時間,而無需多次創(chuàng)建和刪除線程。
圖片
正如圖所看到的,線程池是等待從主線程接收任務(wù)以執(zhí)行的多個線程的集合。
在該圖中,主線程中總共有15個任務(wù),所有這些任務(wù)都被轉(zhuǎn)發(fā)給不同的工作線程并發(fā)執(zhí)行。了解了線程池的概念后,讓我們來理解線程池的內(nèi)部工作原理。
線程池是如何工作的?
在線程池體系結(jié)構(gòu)中,主線程只有兩個任務(wù):
1,接收所有的任務(wù)并將它們存儲在一個地方。
2,創(chuàng)建多個線程,并定期為它們分配不同的任務(wù)。
在接收任務(wù)之前創(chuàng)建線程集,并使用ID存儲在某個地方,以便我們可以通過ID識別它們。
然后每個線程都在等待接收任務(wù),如果它們得到任務(wù),就開始處理任務(wù)。完成任務(wù)后,他們再次等待下一個任務(wù)。
當該線程忙于執(zhí)行任務(wù)時,主線程將更多的任務(wù)分配給其他線程,這樣在主線程結(jié)束任務(wù)之前沒有線程空閑。在完成所有任務(wù)后,主線程終止所有線程并關(guān)閉線程池。
現(xiàn)在我們了解了線程池是如何工作的。接下來,讓我們使用Rust實現(xiàn)一個線程池。
使用Rust實現(xiàn)線程池
1. 創(chuàng)建線程
我們需要一個函數(shù)來生成一個線程并返回它的JoinHandle。
此外,我們需要知道線程的ID,如果我們搞砸了,就可以用線程ID記錄錯誤,這樣我們就可以知道哪個線程出錯了。
可以看出,如果兩個相互關(guān)聯(lián)的數(shù)據(jù)需要組合,需要一個結(jié)構(gòu)體。我們來創(chuàng)建一個:
struct Worker {
id: usize,
thread: JoinHandle<()>
}
現(xiàn)在我們實現(xiàn)一個可以返回新Worker的構(gòu)造函數(shù):
impl Worker {
fn new(id: usize) -> Self {
let thread = thread::spawn(|| {});
Self {id, thread}
}
}
現(xiàn)在,我們的函數(shù)已經(jīng)準備好創(chuàng)建線程并將它們返回給調(diào)用者。
2. 存放線程
我們需要一個結(jié)構(gòu)來保存所有線程的所有JoinHandles,我們還想控制線程池可以擁有多少線程。
這意味著,我們需要一個帶有構(gòu)造函數(shù)的結(jié)構(gòu)體,該函數(shù)指定一個數(shù)字來指示線程的數(shù)量,并且必須調(diào)用Worker來創(chuàng)建線程。
struct ThreadPool {
workers: Vec<Worker>,
}
impl ThreadPool {
fn new(size: usize) -> Self {
assert!(size > 0, "Need at least 1 worker!");
let mut workers = Vec::with_capacity(size);
for i in 0..size {
workers.push(Worker::new(i));
}
Self { workers }
}
}
我們有了創(chuàng)建線程和管理線程的函數(shù),現(xiàn)在是時候創(chuàng)建一個可以將任務(wù)分配給不同線程的函數(shù)了。
3. 給線程分配任務(wù)
我們的線程池結(jié)構(gòu)體必須有一個函數(shù),該函數(shù)可以在線程內(nèi)部分配和執(zhí)行任務(wù)。但是有一個問題,我們?nèi)绾螌⑷蝿?wù)發(fā)送給線程,以便線程能夠執(zhí)行任務(wù)?
為此,我們需要一個task類型來表示我們需要完成的任務(wù):
type task = Box<dyn FnOnce() + Send + 'static>;
在這里,意味著我們的任務(wù)必須實現(xiàn)Box<dyn>里的這些Trait:
1,實現(xiàn)FnOnce()意味著我們的任務(wù)是一個只能運行一次的函數(shù)。
2,實現(xiàn)Send,因為我們將任務(wù)從主線程發(fā)送到工作線程,所以將任務(wù)設(shè)置為Send類型,以便它可以在線程之間安全地傳輸。
3,實現(xiàn)'static,意味著我們的任務(wù)必須和程序運行的時間一樣長。
現(xiàn)在是時候?qū)⑷蝿?wù)發(fā)送給工作線程了,但要做到這一點,我們必須在主線程和所有工作線程之間建立一個通道,因此我們需要使用Arc<Mutex<()>>。
讓我們來更新這兩個構(gòu)造函數(shù):
struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>
}
impl ThreadPool {
fn new(size: usize) -> Self {
assert!(size > 0, "Need at least 1 worker!");
let (sender, reciever) = mpsc::channel();
let reciever = Arc::new(Mutex::new(reciever));
let mut workers = Vec::with_capacity(size);
for i in 0..size {
workers.push(Worker::new(i, Arc::clone(&reciever)));
}
Self {
workers,
sender: Some(sender)
}
}
}
impl Worker {
fn new(id: usize, reciever: Arc<Mutex<Receiver<Task>>>) -> Self {
let thread = thread::spawn(move || {});
Self {
id,
thread
}
}
}
在ThreadPool構(gòu)造函數(shù)中,我們創(chuàng)建了一個新的通道,并在Arc<Mutex<()>>中封裝了接收器,我們把接收器發(fā)送給工作線程,以便主線程可以發(fā)送任務(wù),工作線程可以接收任務(wù)。
此外,我們必須更新ThreadPool結(jié)構(gòu)體,以包含一個發(fā)送者,它將被主線程用來向不同的線程發(fā)送任務(wù)。
現(xiàn)在,讓我們實現(xiàn)在工作線程中執(zhí)行任務(wù)的邏輯:
fn new(id: usize, reciever: Arc<Mutex<Receiver<task>>>) -> Self {
let thread = thread::spawn(move || {
loop {
let receiver = reciever.lock()
.expect("Failed to grab the lock!")
.recv();
match receiver {
Ok(task) => {
println!("Thread {} got the task& executing.", id);
task();
thread::sleep(Duration::from_millis(10));
},
Err(_) => {
println!("No got the task");
break;
}
}
}
});
Self {
id,
thread
}
}
這里,在每個循環(huán)中,我們都試圖獲得鎖并調(diào)用鎖上的recv(),以便我們可以獲得主線程發(fā)送的任務(wù)。
接下來,我們在ThreadPool中實現(xiàn)一個函數(shù),將任務(wù)發(fā)送到不同的線程。
impl ThreadPool {
fn new(size: usize) -> Self {
// snip
}
fn execute<F>(&self, job: F)
where
F: FnOnce() + Send + 'static
{
let job = Box::new(job);
self.sender.send(job)
.expect("Failed to send the job to workers!");
}
}
我們還需要創(chuàng)建一個函數(shù),在ThreadPool結(jié)束時動態(tài)終止所有線程。簡單地說,我們必須手動實現(xiàn)ThreadPool的Drop特性,在那里我們將終止所有線程。
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in &mut self.workers {
println!("Thread {} is shutting down.", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join()..unwrap_or_else(|_| panic!("Failed to join the thread {}", worker.id));}
}
}
}
這里我們還必須刪除發(fā)送方,因為如果我們不這樣做,那么接收方將永遠循環(huán)。如果刪除發(fā)送者,那么接收者也會自動刪除,我們就可以成功地退出這個程序。
測試
main函數(shù)代碼如下:
fn main() {
let pool = ThreadPool::new(5);
for _ in 0..10 {
pool.execute(|| println!("Doing something"));
}
}
運行結(jié)果:
Thread 0 is shutting down.
Thread 0 got the job & executing.
Doing something
Thread 3 got the job & executing.
Doing something
Thread 1 got the job & executing.
Thread 2 got the job & executing.
Doing something
Thread 4 got the job & executing.
Doing something
Doing something
Thread 0 got the job & executing.
Doing something
Thread 4 got the job & executing.
Doing something
Thread 3 got the job & executing.
Doing something
Thread 2 got the job & executing.
Doing something
Thread 1 got the job & executing.
Doing something
No got the job
Thread 1 is shutting down.
No got the job
No got the job
No got the job
No got the job
Thread 2 is shutting down.
Thread 3 is shutting down.
Thread 4 is shutting down.