Tắt Máy Nhẹ Nhàng và Dọn Dẹp

Đoạn mã trong Listing 21-20 đang phản hồi các yêu cầu bất đồng bộ thông qua việc sử dụng thread pool, như chúng ta dự định. Chúng ta nhận được một số cảnh báo về các trường workers, id, và thread mà chúng ta không sử dụng trực tiếp, điều này nhắc nhở chúng ta rằng chúng ta không dọn dẹp bất cứ thứ gì. Khi chúng ta sử dụng phương pháp kém thanh lịch ctrl-c để dừng luồng chính, tất cả các luồng khác cũng bị dừng ngay lập tức, ngay cả khi chúng đang trong quá trình phục vụ một yêu cầu.

Tiếp theo, chúng ta sẽ thực hiện đặc tính Drop để gọi join trên mỗi luồng trong pool để chúng có thể hoàn thành các yêu cầu mà chúng đang xử lý trước khi đóng. Sau đó, chúng ta sẽ thực hiện một cách để cho các luồng biết rằng chúng nên dừng chấp nhận các yêu cầu mới và tắt. Để xem mã này hoạt động, chúng ta sẽ sửa đổi máy chủ của mình để chỉ chấp nhận hai yêu cầu trước khi tắt thread pool một cách nhẹ nhàng.

Một điều cần lưu ý khi chúng ta tiến hành: không có gì trong phần này ảnh hưởng đến các phần của mã xử lý việc thực thi các closure, vì vậy mọi thứ ở đây sẽ giống hệt nhau nếu chúng ta đang sử dụng thread pool cho một môi trường thực thi bất đồng bộ.

Thực Hiện Đặc Tính Drop trên ThreadPool

Hãy bắt đầu với việc thực hiện Drop trên thread pool của chúng ta. Khi pool bị huỷ, các luồng của chúng ta đều nên tham gia (join) để đảm bảo chúng hoàn thành công việc của mình. Listing 21-22 hiển thị nỗ lực đầu tiên để thực hiện Drop; mã này chưa hoạt động hoàn toàn.

use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}

Đầu tiên, chúng ta lặp qua từng worker trong thread pool. Chúng ta sử dụng &mut cho việc này vì self là một tham chiếu có thể thay đổi, và chúng ta cũng cần có khả năng thay đổi worker. Đối với mỗi worker, chúng ta in một thông báo cho biết thể hiện Worker cụ thể này đang tắt, sau đó chúng ta gọi join trên luồng của thể hiện Worker đó. Nếu lệnh gọi join thất bại, chúng ta sử dụng unwrap để làm cho Rust hoảng loạn (panic) và chuyển sang tắt không nhẹ nhàng.

Đây là lỗi chúng ta nhận được khi biên dịch mã này:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
  --> src/lib.rs:52:13
   |
52 |             worker.thread.join().unwrap();
   |             ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this method call
   |             |
   |             move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait
   |
note: `JoinHandle::<T>::join` takes ownership of the receiver `self`, which moves `worker.thread`
  --> /rustc/4eb161250e340c8f48f66e2b929ef4a5bed7c181/library/std/src/thread/mod.rs:1876:17

For more information about this error, try `rustc --explain E0507`.
error: could not compile `hello` (lib) due to 1 previous error

Lỗi cho chúng ta biết rằng chúng ta không thể gọi join vì chúng ta chỉ có một tham chiếu có thể thay đổi của mỗi workerjoin lấy quyền sở hữu của đối số của nó. Để giải quyết vấn đề này, chúng ta cần di chuyển luồng ra khỏi thể hiện Worker sở hữu thread để join có thể tiêu thụ luồng. Một cách để làm điều này là bằng cách sử dụng cùng một phương pháp mà chúng ta đã làm trong Listing 18-15. Nếu Worker giữ một Option<thread::JoinHandle<()>>, chúng ta có thể gọi phương thức take trên Option để di chuyển giá trị ra khỏi biến thể Some và để lại một biến thể None ở vị trí của nó. Nói cách khác, một Worker đang chạy sẽ có một biến thể Some trong thread, và khi chúng ta muốn dọn dẹp một Worker, chúng ta sẽ thay thế Some bằng None để Worker không có luồng để chạy.

Tuy nhiên, chỉ thời điểm này sẽ xuất hiện là khi huỷ Worker. Để đổi lại, chúng ta sẽ phải xử lý một Option<thread::JoinHandle<()>> bất cứ nơi nào chúng ta truy cập worker.thread. Rust thành ngữ sử dụng Option khá nhiều, nhưng khi bạn thấy mình bọc thứ gì đó mà bạn biết sẽ luôn có mặt trong Option như một giải pháp tạm thời như thế này, đó là một ý tưởng tốt để tìm kiếm các phương pháp thay thế. Chúng có thể làm cho mã của bạn sạch hơn và ít dễ xảy ra lỗi hơn.

Trong trường hợp này, một giải pháp thay thế tốt hơn tồn tại: phương thức Vec::drain. Nó chấp nhận một tham số phạm vi để chỉ định các mục cần xóa khỏi Vec, và trả về một iterator của các mục đó. Truyền cú pháp phạm vi .. sẽ xóa mọi giá trị khỏi Vec.

Vì vậy, chúng ta cần cập nhật việc thực hiện drop của ThreadPool như thế này:

#![allow(unused)]
fn main() {
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in self.workers.drain(..) {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}
}

Điều này giải quyết lỗi biên dịch và không yêu cầu bất kỳ thay đổi nào khác cho mã của chúng ta.

Báo Hiệu cho Các Luồng Dừng Lắng Nghe Công Việc

Với tất cả các thay đổi chúng ta đã thực hiện, mã của chúng ta biên dịch mà không có bất kỳ cảnh báo nào. Tuy nhiên, tin xấu là mã này chưa hoạt động theo cách chúng ta muốn. Chìa khóa là logic trong các closure được chạy bởi các luồng của các thể hiện Worker: tại thời điểm này, chúng ta gọi join, nhưng điều đó sẽ không tắt các luồng vì chúng loop mãi mãi tìm kiếm công việc. Nếu chúng ta thử huỷ ThreadPool của mình với cách thực hiện drop hiện tại, luồng chính sẽ bị chặn mãi mãi, chờ đợi luồng đầu tiên hoàn thành.

Để khắc phục vấn đề này, chúng ta sẽ cần thay đổi trong cách thực hiện drop của ThreadPool và sau đó thay đổi trong vòng lặp Worker.

Đầu tiên, chúng ta sẽ thay đổi cách thực hiện drop của ThreadPool để rõ ràng huỷ sender trước khi đợi các luồng hoàn thành. Listing 21-23 hiển thị các thay đổi đối với ThreadPool để rõ ràng huỷ sender. Không giống như với luồng, ở đây chúng ta thực sự cần sử dụng Option để có thể di chuyển sender ra khỏi ThreadPool với Option::take.

use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}
// --snip--

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        // --snip--

        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in self.workers.drain(..) {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}

Việc huỷ sender đóng kênh, điều này chỉ ra rằng không có thêm thông báo nào sẽ được gửi. Khi điều đó xảy ra, tất cả các lệnh gọi recv mà các thể hiện Worker thực hiện trong vòng lặp vô hạn sẽ trả về một lỗi. Trong Listing 21-24, chúng ta thay đổi vòng lặp Worker để nhẹ nhàng thoát khỏi vòng lặp trong trường hợp đó, điều đó có nghĩa là các luồng sẽ kết thúc khi thực hiện drop của ThreadPool gọi join trên chúng.

use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in self.workers.drain(..) {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let message = receiver.lock().unwrap().recv();

                match message {
                    Ok(job) => {
                        println!("Worker {id} got a job; executing.");

                        job();
                    }
                    Err(_) => {
                        println!("Worker {id} disconnected; shutting down.");
                        break;
                    }
                }
            }
        });

        Worker { id, thread }
    }
}

Để xem mã này hoạt động, hãy sửa đổi main để chỉ chấp nhận hai yêu cầu trước khi tắt máy chủ một cách nhẹ nhàng, như được hiển thị trong Listing 21-25.

use hello::ThreadPool;
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

Bạn sẽ không muốn một máy chủ web trong thế giới thực tắt sau khi phục vụ chỉ hai yêu cầu. Mã này chỉ chứng minh rằng việc tắt nhẹ nhàng và dọn dẹp đang hoạt động tốt.

Phương thức take được định nghĩa trong đặc tính Iterator và giới hạn vòng lặp tối đa là hai mục đầu tiên. ThreadPool sẽ ra khỏi phạm vi khi kết thúc main, và việc thực hiện drop sẽ chạy.

Khởi động máy chủ với cargo run, và thực hiện ba yêu cầu. Yêu cầu thứ ba sẽ bị lỗi, và trong terminal của bạn, bạn sẽ thấy đầu ra tương tự như thế này:

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.41s
     Running `target/debug/hello`
Worker 0 got a job; executing.
Shutting down.
Shutting down worker 0
Worker 3 got a job; executing.
Worker 1 disconnected; shutting down.
Worker 2 disconnected; shutting down.
Worker 3 disconnected; shutting down.
Worker 0 disconnected; shutting down.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3

Bạn có thể thấy thứ tự ID Worker và thông báo được in khác nhau. Chúng ta có thể thấy cách mã này hoạt động từ các thông báo: các thể hiện Worker 0 và 3 nhận hai yêu cầu đầu tiên. Máy chủ dừng chấp nhận kết nối sau kết nối thứ hai, và việc thực hiện Drop trên ThreadPool bắt đầu thực thi trước khi Worker 3 thậm chí bắt đầu công việc của nó. Việc huỷ sender ngắt kết nối tất cả các thể hiện Worker và cho chúng biết phải tắt. Các thể hiện Worker mỗi thể hiện in một thông báo khi chúng ngắt kết nối, và sau đó thread pool gọi join để đợi mỗi luồng Worker hoàn thành.

Hãy chú ý một khía cạnh thú vị của việc thực thi cụ thể này: ThreadPool đã huỷ sender, và trước khi bất kỳ Worker nào nhận được lỗi, chúng ta đã cố gắng tham gia Worker 0. Worker 0 chưa nhận được lỗi từ recv, vì vậy luồng chính bị chặn chờ đợi Worker 0 hoàn thành. Trong khi đó, Worker 3 nhận được một công việc và sau đó tất cả các luồng nhận được lỗi. Khi Worker 0 hoàn thành, luồng chính đợi phần còn lại của các thể hiện Worker hoàn thành. Tại thời điểm đó, tất cả chúng đều đã thoát khỏi vòng lặp của chúng và dừng lại.

Xin chúc mừng! Bây giờ chúng ta đã hoàn thành dự án của mình; chúng ta có một máy chủ web cơ bản sử dụng thread pool để phản hồi bất đồng bộ. Chúng ta có thể thực hiện việc tắt máy chủ một cách nhẹ nhàng, điều này dọn dẹp tất cả các luồng trong pool.

Đây là toàn bộ mã để tham khảo:

use hello::ThreadPool;
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let message = receiver.lock().unwrap().recv();

                match message {
                    Ok(job) => {
                        println!("Worker {id} got a job; executing.");

                        job();
                    }
                    Err(_) => {
                        println!("Worker {id} disconnected; shutting down.");
                        break;
                    }
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

Chúng ta có thể làm nhiều hơn nữa ở đây! Nếu bạn muốn tiếp tục nâng cao dự án này, đây là một số ý tưởng:

  • Thêm nhiều tài liệu vào ThreadPool và các phương thức công khai của nó.
  • Thêm các bài kiểm tra chức năng của thư viện.
  • Thay đổi các lệnh gọi unwrap thành xử lý lỗi mạnh mẽ hơn.
  • Sử dụng ThreadPool để thực hiện một số tác vụ khác ngoài việc phục vụ các yêu cầu web.
  • Tìm một crate thread pool trên crates.io và thực hiện một máy chủ web tương tự bằng cách sử dụng crate thay thế. Sau đó so sánh API và độ mạnh mẽ của nó với thread pool mà chúng ta đã thực hiện.

Tổng Kết

Làm tốt lắm! Bạn đã đến được cuối sách! Chúng tôi muốn cảm ơn bạn đã tham gia cùng chúng tôi trong chuyến tham quan Rust này. Bây giờ bạn đã sẵn sàng để thực hiện các dự án Rust của riêng mình và giúp đỡ với các dự án của người khác. Hãy nhớ rằng có một cộng đồng chào đón của các Rustacean khác sẵn sàng giúp đỡ bạn với bất kỳ thách thức nào bạn gặp phải trên hành trình Rust của mình.