99re热视频这里只精品,久久久天堂国产精品女人,国产av一区二区三区,久久久精品成人免费看片,99久久精品免费看国产一区二区三区

Rust 優(yōu)雅停機與清理

2023-03-22 15:16 更新
ch20-03-graceful-shutdown-and-cleanup.md
commit 322899b375d071e4d96aaf29ce25c1a4b4ec65da

示例 20-20 中的代碼如期通過使用線程池異步的響應(yīng)請求。這里有一些警告說 workersid 和 thread 字段沒有直接被使用,這提醒了我們并沒有清理所有的內(nèi)容。當使用不那么優(yōu)雅的 ctrl-c 終止主線程時,所有其他線程也會立刻停止,即便它們正處于處理請求的過程中。

現(xiàn)在我們要為 ThreadPool 實現(xiàn) Drop trait 對線程池中的每一個線程調(diào)用 join,這樣這些線程將會執(zhí)行完他們的請求。接著會為 ThreadPool 實現(xiàn)一個告訴線程他們應(yīng)該停止接收新請求并結(jié)束的方式。為了實踐這些代碼,修改 server 在優(yōu)雅停機(graceful shutdown)之前只接受兩個請求。

為 ThreadPool 實現(xiàn) Drop Trait

現(xiàn)在開始為線程池實現(xiàn) Drop。當線程池被丟棄時,應(yīng)該 join 所有線程以確保他們完成其操作。示例 20-22 展示了 Drop 實現(xiàn)的第一次嘗試;這些代碼還不能夠編譯:

文件名: src/lib.rs

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

示例 20-22: 當線程池離開作用域時 join 每個線程

這里首先遍歷線程池中的每個 workers。這里使用了 &mut 因為 self 本身是一個可變引用而且也需要能夠修改 worker。對于每一個線程,會打印出說明信息表明此特定 worker 正在關(guān)閉,接著在 worker 線程上調(diào)用 join。如果 join 調(diào)用失敗,通過 unwrap 使得 panic 并進行不優(yōu)雅的關(guān)閉。

如下是嘗試編譯代碼時得到的錯誤:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
  --> src/lib.rs:52:13
   |
52 |             worker.thread.join().unwrap();
   |             ^^^^^^^^^^^^^ move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait

For more information about this error, try `rustc --explain E0507`.
error: could not compile `hello` due to previous error

這告訴我們并不能調(diào)用 join,因為只有每一個 worker 的可變借用,而 join 獲取其參數(shù)的所有權(quán)。為了解決這個問題,需要一個方法將 thread 移動出擁有其所有權(quán)的 Worker 實例以便 join 可以消費這個線程。示例 17-15 中我們曾見過這么做的方法:如果 Worker 存放的是 Option<thread::JoinHandle<()>,就可以在 Option 上調(diào)用 take 方法將值從 Some 成員中移動出來而對 None 成員不做處理。換句話說,正在運行的 Worker 的 thread 將是 Some 成員值,而當需要清理 worker 時,將 Some 替換為 None,這樣 worker 就沒有可以運行的線程了。

為此需要更新 Worker 的定義為如下:

文件名: src/lib.rs

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

現(xiàn)在依靠編譯器來找出其他需要修改的地方。check 代碼會得到兩個錯誤:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `join` found for enum `Option` in the current scope
  --> src/lib.rs:52:27
   |
52 |             worker.thread.join().unwrap();
   |                           ^^^^ method not found in `Option<JoinHandle<()>>`

error[E0308]: mismatched types
  --> src/lib.rs:72:22
   |
72 |         Worker { id, thread }
   |                      ^^^^^^ expected enum `Option`, found struct `JoinHandle`
   |
   = note: expected enum `Option<JoinHandle<()>>`
            found struct `JoinHandle<_>`
help: try wrapping the expression in `Some`
   |
72 |         Worker { id, Some(thread) }
   |                      +++++      +

Some errors have detailed explanations: E0308, E0599.
For more information about an error, try `rustc --explain E0308`.
error: could not compile `hello` due to 2 previous errors

讓我們修復(fù)第二個錯誤,它指向 Worker::new 結(jié)尾的代碼;當新建 Worker 時需要將 thread 值封裝進 Some。做出如下改變以修復(fù)問題:

文件名: src/lib.rs

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        // --snip--

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

第一個錯誤位于 Drop 實現(xiàn)中。之前提到過要調(diào)用 Option 上的 take 將 thread 移動出 worker。如下改變會修復(fù)問題:

文件名: src/lib.rs

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

如第十七章我們見過的,Option 上的 take 方法會取出 Some 而留下 None。使用 if let 解構(gòu) Some 并得到線程,接著在線程上調(diào)用 join。如果 worker 的線程已然是 None,就知道此時這個 worker 已經(jīng)清理了其線程所以無需做任何操作。

向線程發(fā)送信號使其停止接收任務(wù)

有了所有這些修改,代碼就能編譯且沒有任何警告。不過也有壞消息,這些代碼還不能以我們期望的方式運行。問題的關(guān)鍵在于 Worker 中分配的線程所運行的閉包中的邏輯:調(diào)用 join 并不會關(guān)閉線程,因為他們一直 loop 來尋找任務(wù)。如果采用這個實現(xiàn)來嘗試丟棄 ThreadPool ,則主線程會永遠阻塞在等待第一個線程結(jié)束上。

為了修復(fù)這個問題,修改線程既監(jiān)聽是否有 Job 運行也要監(jiān)聽一個應(yīng)該停止監(jiān)聽并退出無限循環(huán)的信號。所以信道將發(fā)送這個枚舉的兩個成員之一而不是 Job 實例:

文件名: src/lib.rs

enum Message {
    NewJob(Job),
    Terminate,
}

Message 枚舉要么是存放了線程需要運行的 Job 的 NewJob 成員,要么是會導(dǎo)致線程退出循環(huán)并終止的 Terminate 成員。

同時需要修改信道來使用 Message 類型值而不是 Job,如示例 20-23 所示:

文件名: src/lib.rs

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Message>,
}

// --snip--

impl ThreadPool {
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(Message::NewJob(job)).unwrap();
    }
}

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let message = receiver.lock().unwrap().recv().unwrap();

            match message {
                Message::NewJob(job) => {
                    println!("Worker {} got a job; executing.", id);

                    job();
                }
                Message::Terminate => {
                    println!("Worker {} was told to terminate.", id);

                    break;
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

示例 20-23: 收發(fā) Message 值并在 Worker 收到 Message::Terminate 時退出循環(huán)

為了適用 Message 枚舉需要將兩個地方的 Job 修改為 MessageThreadPool 的定義和 Worker::new 的簽名。ThreadPool 的 execute 方法需要發(fā)送封裝進 Message::NewJob 成員的任務(wù)。然后,在 Worker::new 中當從信道接收 Message 時,當獲取到 NewJob成員會處理任務(wù)而收到 Terminate 成員則會退出循環(huán)。

通過這些修改,代碼再次能夠編譯并繼續(xù)按照示例 20-20 之后相同的行為運行。不過還是會得到一個警告,因為并沒有創(chuàng)建任何 Terminate 成員的消息。如示例 20-24 所示修改 Drop 實現(xiàn)來修復(fù)此問題:

文件名: src/lib.rs

impl Drop for ThreadPool {
    fn drop(&mut self) {
        println!("Sending terminate message to all workers.");

        for _ in &self.workers {
            self.sender.send(Message::Terminate).unwrap();
        }

        println!("Shutting down all workers.");

        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

示例 20-24:在對每個 worker 線程調(diào)用 join 之前向 worker 發(fā)送 Message::Terminate

現(xiàn)在遍歷了 worker 兩次,一次向每個 worker 發(fā)送一個 Terminate 消息,一個調(diào)用每個 worker 線程上的 join。如果嘗試在同一循環(huán)中發(fā)送消息并立即 join 線程,則無法保證當前迭代的 worker 是從信道收到終止消息的 worker。

為了更好的理解為什么需要兩個分開的循環(huán),想象一下只有兩個 worker 的場景。如果在一個單獨的循環(huán)中遍歷每個 worker,在第一次迭代中向信道發(fā)出終止消息并對第一個 worker 線程調(diào)用 join。如果此時第一個 worker 正忙于處理請求,那么第二個 worker 會收到終止消息并停止。我們會一直等待第一個 worker 結(jié)束,不過它永遠也不會結(jié)束因為第二個線程接收了終止消息。死鎖!

為了避免此情況,首先在一個循環(huán)中向信道發(fā)出所有的 Terminate 消息,接著在另一個循環(huán)中 join 所有的線程。每個 worker 一旦收到終止消息即會停止從信道接收消息,意味著可以確保如果發(fā)送同 worker 數(shù)相同的終止消息,在 join 之前每個線程都會收到一個終止消息。

為了實踐這些代碼,如示例 20-25 所示修改 main 在優(yōu)雅停機 server 之前只接受兩個請求:

文件名: src/bin/main.rs

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

示例 20-25: 在處理兩個請求之后通過退出循環(huán)來停止 server

你不會希望真實世界的 web server 只處理兩次請求就停機了,這只是為了展示優(yōu)雅停機和清理處于正常工作狀態(tài)。

take 方法定義于 Iterator trait,這里限制循環(huán)最多頭 2 次。ThreadPool 會在 main 的結(jié)尾離開作用域,而且還會看到 drop 實現(xiàn)的運行。

使用 cargo run 啟動 server,并發(fā)起三個請求。第三個請求應(yīng)該會失敗,而終端的輸出應(yīng)該看起來像這樣:

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
    Finished dev [unoptimized + debuginfo] target(s) in 1.0s
     Running `target/debug/main`
Worker 0 got a job; executing.
Worker 3 got a job; executing.
Shutting down.
Sending terminate message to all workers.
Shutting down all workers.
Shutting down worker 0
Worker 1 was told to terminate.
Worker 2 was told to terminate.
Worker 0 was told to terminate.
Worker 3 was told to terminate.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3

可能會出現(xiàn)不同順序的 worker 和信息輸出??梢詮男畔⒅锌吹椒?wù)是如何運行的: worker 0 和 worker 3 獲取了頭兩個請求,接著在第三個請求時,我們停止接收連接。當 ThreadPool 在 main 的結(jié)尾離開作用域時,其 Drop 實現(xiàn)開始工作,線程池通知所有線程終止。每個 worker 在收到終止消息時會打印出一個信息,接著線程池調(diào)用 join 來終止每一個 worker 線程。

這個特定的運行過程中一個有趣的地方在于:注意我們向信道中發(fā)出終止消息,而在任何線程收到消息之前,就嘗試 join worker 0 了。worker 0 還沒有收到終止消息,所以主線程阻塞直到 worker 0 結(jié)束。與此同時,每一個線程都收到了終止消息。一旦 worker 0 結(jié)束,主線程就等待其他 worker 結(jié)束,此時他們都已經(jīng)收到終止消息并能夠停止了。

恭喜!現(xiàn)在我們完成了這個項目,也有了一個使用線程池異步響應(yīng)請求的基礎(chǔ) web server。我們能對 server 執(zhí)行優(yōu)雅停機,它會清理線程池中的所有線程。

如下是完整的代碼參考:

文件名: src/bin/main.rs

use hello::ThreadPool;
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use std::thread;
use std::time::Duration;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).unwrap();

    let get = b"GET / HTTP/1.1\r\n";
    let sleep = b"GET /sleep HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK", "hello.html")
    } else if buffer.starts_with(sleep) {
        thread::sleep(Duration::from_secs(5));
        ("HTTP/1.1 200 OK", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND", "404.html")
    };

    let contents = fs::read_to_string(filename).unwrap();

    let response = format!(
        "{}\r\nContent-Length: {}\r\n\r\n{}",
        status_line,
        contents.len(),
        contents
    );

    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}

文件名: src/lib.rs

use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Message>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

enum Message {
    NewJob(Job),
    Terminate,
}

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(Message::NewJob(job)).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        println!("Sending terminate message to all workers.");

        for _ in &self.workers {
            self.sender.send(Message::Terminate).unwrap();
        }

        println!("Shutting down all workers.");

        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let message = receiver.lock().unwrap().recv().unwrap();

            match message {
                Message::NewJob(job) => {
                    println!("Worker {} got a job; executing.", id);

                    job();
                }
                Message::Terminate => {
                    println!("Worker {} was told to terminate.", id);

                    break;
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

這里還有很多可以做的事!如果你希望繼續(xù)增強這個項目,如下是一些點子:

  • 為 ?ThreadPool? 和其公有方法增加更多文檔
  • 為庫的功能增加測試
  • 將 ?unwrap? 調(diào)用改為更健壯的錯誤處理
  • 使用 ?ThreadPool? 進行其他不同于處理網(wǎng)絡(luò)請求的任務(wù)
  • 在 crates.io 上尋找一個線程池 crate 并使用它實現(xiàn)一個類似的 web server,將其 API 和魯棒性與我們的實現(xiàn)做對比

總結(jié)

好極了!你結(jié)束了本書的學習!由衷感謝你同我們一道加入這次 Rust 之旅?,F(xiàn)在你已經(jīng)準備好出發(fā)并實現(xiàn)自己的 Rust 項目并幫助他人了。請不要忘記我們的社區(qū),這里有其他 Rustaceans 正樂于幫助你迎接 Rust 之路上的任何挑戰(zhàn)。

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號