Chuyển Đổi Máy Chủ Đơn Luồng Thành Máy Chủ Đa Luồng

Hiện tại, máy chủ sẽ xử lý từng yêu cầu lần lượt, nghĩa là nó sẽ không xử lý kết nối thứ hai cho đến khi kết nối đầu tiên hoàn tất. Nếu máy chủ nhận được ngày càng nhiều yêu cầu, việc thực thi tuần tự này sẽ ngày càng kém hiệu quả. Nếu máy chủ nhận được một yêu cầu mất nhiều thời gian để xử lý, các yêu cầu tiếp theo sẽ phải đợi cho đến khi yêu cầu dài hoàn thành, ngay cả khi các yêu cầu mới có thể được xử lý nhanh chóng. Chúng ta cần khắc phục điều này, nhưng trước tiên chúng ta sẽ xem xét vấn đề trong thực tế.

Mô Phỏng Yêu Cầu Chậm Trong Cài Đặt Máy Chủ Hiện Tại

Chúng ta sẽ xem xét cách một yêu cầu xử lý chậm có thể ảnh hưởng đến các yêu cầu khác được gửi đến cài đặt máy chủ hiện tại của chúng ta. Listing 21-10 triển khai việc xử lý yêu cầu đến /sleep với một phản hồi chậm mô phỏng sẽ khiến máy chủ ngủ trong năm giây trước khi phản hồi.

use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};
// --snip--

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    // --snip--

    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    // --snip--

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

Chúng ta đã chuyển từ if sang match khi đã có ba trường hợp. Chúng ta cần phải so khớp rõ ràng trên một lát cắt của request_line để khớp mẫu với các giá trị chuỗi; match không tự động tham chiếu và hủy tham chiếu, như phương thức so sánh bằng.

Cánh tay đầu tiên giống như khối if từ Listing 21-9. Cánh tay thứ hai khớp với một yêu cầu đến /sleep. Khi yêu cầu đó được nhận, máy chủ sẽ ngủ trong năm giây trước khi hiển thị trang HTML thành công. Cánh tay thứ ba giống như khối else từ Listing 21-9.

Bạn có thể thấy máy chủ của chúng ta nguyên thủy như thế nào: các thư viện thực tế sẽ xử lý việc nhận dạng nhiều yêu cầu theo cách ít dài dòng hơn nhiều!

Khởi động máy chủ bằng cách sử dụng cargo run. Sau đó mở hai cửa sổ trình duyệt: một cho http://127.0.0.1:7878/ và cái kia cho http://127.0.0.1:7878/sleep. Nếu bạn nhập URI / vài lần, như trước đây, bạn sẽ thấy nó phản hồi nhanh chóng. Nhưng nếu bạn nhập /sleep và sau đó tải /, bạn sẽ thấy / đợi cho đến khi sleep đã ngủ đủ năm giây trước khi tải.

Có nhiều kỹ thuật chúng ta có thể sử dụng để tránh các yêu cầu ùn tắc phía sau một yêu cầu chậm, bao gồm việc sử dụng async như chúng ta đã làm trong Chương 17; kỹ thuật chúng ta sẽ triển khai là một thread pool.

Cải Thiện Thông Lượng với Thread Pool

Một thread pool là một nhóm các luồng được tạo ra đang chờ đợi và sẵn sàng xử lý một tác vụ. Khi chương trình nhận được một tác vụ mới, nó gán một trong các luồng trong pool cho tác vụ đó, và luồng đó sẽ xử lý tác vụ. Các luồng còn lại trong pool sẵn sàng xử lý bất kỳ tác vụ nào khác đến trong khi luồng đầu tiên đang xử lý. Khi luồng đầu tiên hoàn thành việc xử lý tác vụ của nó, nó được trả lại pool của các luồng rảnh, sẵn sàng xử lý một tác vụ mới. Thread pool cho phép bạn xử lý các kết nối đồng thời, tăng thông lượng của máy chủ của bạn.

Chúng ta sẽ giới hạn số lượng luồng trong pool ở một số nhỏ để bảo vệ chúng ta khỏi các cuộc tấn công DoS; nếu chúng ta để chương trình của mình tạo một luồng mới cho mỗi yêu cầu khi nó đến, ai đó gửi 10 triệu yêu cầu đến máy chủ của chúng ta có thể gây rối bằng cách sử dụng hết tài nguyên của máy chủ và làm cho việc xử lý yêu cầu dừng lại.

Thay vì tạo ra không giới hạn luồng, chúng ta sẽ có một số lượng cố định luồng đang chờ trong pool. Các yêu cầu đến được gửi đến pool để xử lý. Pool sẽ duy trì một hàng đợi các yêu cầu đến. Mỗi luồng trong pool sẽ lấy một yêu cầu từ hàng đợi này, xử lý yêu cầu, và sau đó hỏi hàng đợi về yêu cầu khác. Với thiết kế này, chúng ta có thể xử lý đồng thời tối đa N yêu cầu, trong đó N là số lượng luồng. Nếu mỗi luồng đang đáp ứng một yêu cầu chạy lâu, các yêu cầu tiếp theo vẫn có thể ùn tắc trong hàng đợi, nhưng chúng ta đã tăng số lượng yêu cầu chạy lâu mà chúng ta có thể xử lý trước khi đạt đến điểm đó.

Kỹ thuật này chỉ là một trong nhiều cách để cải thiện thông lượng của máy chủ web. Các tùy chọn khác mà bạn có thể khám phá là mô hình fork/join, mô hình I/O không đồng bộ đơn luồng, và mô hình I/O không đồng bộ đa luồng. Nếu bạn quan tâm đến chủ đề này, bạn có thể đọc thêm về các giải pháp khác và thử triển khai chúng; với một ngôn ngữ cấp thấp như Rust, tất cả các tùy chọn này đều có thể.

Trước khi bắt đầu triển khai thread pool, hãy nói về việc sử dụng pool sẽ trông như thế nào. Khi bạn đang cố gắng thiết kế mã, việc viết giao diện khách hàng trước có thể giúp hướng dẫn thiết kế của bạn. Viết API của mã sao cho nó được cấu trúc theo cách bạn muốn gọi nó; sau đó triển khai chức năng trong cấu trúc đó thay vì triển khai chức năng và sau đó thiết kế API công khai.

Tương tự như cách chúng ta đã sử dụng phát triển dựa trên kiểm thử trong dự án trong Chương 12, chúng ta sẽ sử dụng phát triển dựa trên trình biên dịch ở đây. Chúng ta sẽ viết mã gọi các hàm mà chúng ta muốn, và sau đó chúng ta sẽ xem xét các lỗi từ trình biên dịch để xác định những gì chúng ta nên thay đổi tiếp theo để làm cho mã hoạt động. Tuy nhiên, trước khi làm điều đó, chúng ta sẽ khám phá kỹ thuật mà chúng ta sẽ không sử dụng làm điểm khởi đầu.

Tạo một Luồng cho Mỗi Yêu Cầu

Đầu tiên, hãy khám phá cách mã của chúng ta có thể trông như thế nào nếu nó thực sự tạo ra một luồng mới cho mỗi kết nối. Như đã đề cập trước đó, đây không phải là kế hoạch cuối cùng của chúng ta do các vấn đề với việc có thể tạo ra một số lượng không giới hạn các luồng, nhưng nó là một điểm khởi đầu để có được một máy chủ đa luồng hoạt động trước. Sau đó, chúng ta sẽ thêm thread pool như một cải tiến, và việc so sánh hai giải pháp sẽ dễ dàng hơn. Listing 21-11 hiển thị các thay đổi cần thực hiện đối với main để tạo một luồng mới để xử lý mỗi luồng trong vòng lặp for.

use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        thread::spawn(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

Như bạn đã học trong Chương 16, thread::spawn sẽ tạo một luồng mới và sau đó chạy mã trong closure trong luồng mới. Nếu bạn chạy mã này và tải /sleep trong trình duyệt của bạn, sau đó / trong hai tab trình duyệt khác, bạn sẽ thấy rằng các yêu cầu đến / không phải đợi /sleep hoàn thành. Tuy nhiên, như chúng ta đã đề cập, điều này cuối cùng sẽ làm quá tải hệ thống vì bạn sẽ tạo ra các luồng mới mà không có bất kỳ giới hạn nào.

Bạn cũng có thể nhớ lại từ Chương 17 rằng đây chính xác là loại tình huống mà async và await thực sự tỏa sáng! Hãy nhớ điều đó khi chúng ta xây dựng thread pool và nghĩ về việc mọi thứ sẽ trông khác hoặc giống nhau như thế nào với async.

Tạo một Số Lượng Giới Hạn Các Luồng

Chúng ta muốn thread pool của mình hoạt động theo cách tương tự, quen thuộc để việc chuyển đổi từ luồng sang thread pool không yêu cầu thay đổi lớn đối với mã sử dụng API của chúng ta. Listing 21-12 hiển thị giao diện giả thuyết cho một cấu trúc ThreadPool mà chúng ta muốn sử dụng thay vì thread::spawn.

use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

Chúng ta sử dụng ThreadPool::new để tạo một thread pool mới với một số lượng luồng có thể cấu hình, trong trường hợp này là bốn. Sau đó, trong vòng lặp for, pool.execute có một giao diện tương tự như thread::spawn ở chỗ nó nhận một closure mà pool nên chạy cho mỗi luồng. Chúng ta cần triển khai pool.execute để nó lấy closure và đưa nó cho một luồng trong pool để chạy. Mã này chưa biên dịch, nhưng chúng ta sẽ thử để trình biên dịch có thể hướng dẫn chúng ta cách khắc phục nó.

Xây Dựng ThreadPool Sử Dụng Phát Triển Dựa Trên Trình Biên Dịch

Thực hiện các thay đổi trong Listing 21-12 cho src/main.rs, và sau đó hãy sử dụng các lỗi biên dịch từ cargo check để hướng dẫn phát triển của chúng ta. Đây là lỗi đầu tiên chúng ta nhận được:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
  --> src/main.rs:11:16
   |
11 |     let pool = ThreadPool::new(4);
   |                ^^^^^^^^^^ use of undeclared type `ThreadPool`

For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` (bin "hello") due to 1 previous error

Tuyệt! Lỗi này cho chúng ta biết rằng chúng ta cần một loại hoặc module ThreadPool, vì vậy chúng ta sẽ xây dựng nó ngay bây giờ. Việc triển khai ThreadPool của chúng ta sẽ độc lập với loại công việc mà máy chủ web của chúng ta đang làm. Vì vậy, hãy chuyển crate hello từ một crate nhị phân thành một crate thư viện để giữ việc triển khai ThreadPool của chúng ta. Sau khi chúng ta chuyển sang crate thư viện, chúng ta cũng có thể sử dụng thư viện thread pool riêng biệt cho bất kỳ công việc nào chúng ta muốn làm bằng cách sử dụng thread pool, không chỉ để phục vụ các yêu cầu web.

Tạo một file src/lib.rs chứa những điều sau, đây là định nghĩa đơn giản nhất của một cấu trúc ThreadPool mà chúng ta có thể có bây giờ:

pub struct ThreadPool;

Sau đó chỉnh sửa file main.rs để đưa ThreadPool vào phạm vi từ crate thư viện bằng cách thêm mã sau vào đầu của src/main.rs:

use hello::ThreadPool;
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

Mã này vẫn chưa hoạt động, nhưng hãy kiểm tra lại để có được lỗi tiếp theo mà chúng ta cần giải quyết:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
  --> src/main.rs:12:28
   |
12 |     let pool = ThreadPool::new(4);
   |                            ^^^ function or associated item not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error

Lỗi này chỉ ra rằng tiếp theo chúng ta cần tạo một hàm liên kết có tên new cho ThreadPool. Chúng ta cũng biết rằng new cần có một tham số có thể chấp nhận 4 làm đối số và nên trả về một thể hiện ThreadPool. Hãy triển khai hàm new đơn giản nhất có những đặc điểm đó:

pub struct ThreadPool;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }
}

Chúng ta đã chọn usize làm loại của tham số size vì chúng ta biết rằng một số âm của luồng không có ý nghĩa. Chúng ta cũng biết rằng chúng ta sẽ sử dụng 4 này làm số lượng phần tử trong một bộ sưu tập các luồng, đó là mục đích của loại usize, như đã thảo luận trong "Các Loại Số Nguyên" trong Chương 3.

Hãy kiểm tra mã một lần nữa:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
  --> src/main.rs:17:14
   |
17 |         pool.execute(|| {
   |         -----^^^^^^^ method not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error

Bây giờ lỗi xảy ra vì chúng ta không có phương thức execute trên ThreadPool. Nhớ lại từ "Tạo một Số Lượng Giới Hạn Các Luồng" rằng chúng ta đã quyết định thread pool của mình nên có một giao diện tương tự như thread::spawn. Ngoài ra, chúng ta sẽ triển khai hàm execute để nó lấy closure mà nó được cung cấp và giao nó cho một luồng rảnh trong pool để chạy.

Chúng ta sẽ định nghĩa phương thức execute trên ThreadPool để nhận một closure làm tham số. Nhớ lại từ "Di Chuyển Các Giá Trị Được Bắt Giữ Ra Khỏi Closure và Các Đặc Điểm Fn" trong Chương 13 rằng chúng ta có thể lấy closures làm tham số với ba đặc điểm khác nhau: Fn, FnMut, và FnOnce. Chúng ta cần quyết định loại closure nào để sử dụng ở đây. Chúng ta biết rằng cuối cùng chúng ta sẽ làm điều gì đó tương tự như việc triển khai thread::spawn của thư viện chuẩn, vì vậy chúng ta có thể xem xét những ràng buộc nào mà chữ ký của thread::spawn có trên tham số của nó. Tài liệu hiển thị cho chúng ta những điều sau:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

Tham số loại F là loại chúng ta quan tâm ở đây; tham số loại T liên quan đến giá trị trả về, và chúng ta không quan tâm đến điều đó. Chúng ta có thể thấy rằng spawn sử dụng FnOnce làm ràng buộc đặc điểm trên F. Đây có lẽ là điều chúng ta cũng muốn, vì cuối cùng chúng ta sẽ truyền đối số mà chúng ta nhận được trong execute cho spawn. Chúng ta có thể tin tưởng hơn rằng FnOnce là đặc điểm mà chúng ta muốn sử dụng vì luồng để chạy một yêu cầu sẽ chỉ thực thi closure của yêu cầu đó một lần, điều này phù hợp với Once trong FnOnce.

Tham số loại F cũng có ràng buộc đặc điểm Send và ràng buộc vòng đời 'static, điều này hữu ích trong tình huống của chúng ta: chúng ta cần Send để chuyển closure từ một luồng sang luồng khác và 'static vì chúng ta không biết luồng sẽ mất bao lâu để thực thi. Hãy tạo một phương thức execute trên ThreadPool sẽ lấy một tham số chung kiểu F với những ràng buộc này:

pub struct ThreadPool;

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

Chúng ta vẫn sử dụng () sau FnOnceFnOnce này đại diện cho một closure không nhận tham số nào và trả về kiểu đơn vị (). Giống như các định nghĩa hàm, kiểu trả về có thể được bỏ qua khỏi chữ ký, nhưng ngay cả khi chúng ta không có tham số nào, chúng ta vẫn cần dấu ngoặc đơn.

Một lần nữa, đây là cách triển khai đơn giản nhất của phương thức execute: nó không làm gì cả, nhưng chúng ta chỉ đang cố gắng làm cho mã của mình biên dịch. Hãy kiểm tra lại:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.24s

Nó biên dịch! Nhưng lưu ý rằng nếu bạn thử cargo run và thực hiện một yêu cầu trong trình duyệt, bạn sẽ thấy các lỗi trong trình duyệt mà chúng ta đã thấy ở đầu chương. Thư viện của chúng ta không thực sự gọi closure được truyền vào execute nữa!

Lưu ý: Một câu nói mà bạn có thể nghe về các ngôn ngữ với trình biên dịch nghiêm ngặt, như Haskell và Rust, là "nếu mã biên dịch, nó hoạt động." Nhưng câu nói này không phải là sự thật phổ quát. Dự án của chúng ta biên dịch, nhưng nó hoàn toàn không làm gì cả! Nếu chúng ta đang xây dựng một dự án thực tế, hoàn chỉnh, đây sẽ là thời điểm tốt để bắt đầu viết các bài kiểm tra đơn vị để kiểm tra rằng mã biên dịch có hành vi mà chúng ta muốn.

Hãy xem xét: điều gì sẽ khác ở đây nếu chúng ta sẽ thực thi một future thay vì một closure?

Xác Thực Số Lượng Luồng trong new

Chúng ta không làm gì với các tham số cho newexecute. Hãy triển khai các thân của các hàm này với hành vi mà chúng ta muốn. Để bắt đầu, hãy nghĩ về new. Trước đó, chúng ta đã chọn một loại không dấu cho tham số size vì một pool với số lượng luồng âm không có ý nghĩa. Tuy nhiên, một pool với số luồng là không cũng không có ý nghĩa, nhưng không là một usize hoàn toàn hợp lệ. Chúng ta sẽ thêm mã để kiểm tra rằng size lớn hơn không trước khi chúng ta trả về một thể hiện ThreadPool và có chương trình hoảng sợ nếu nó nhận được một số không bằng cách sử dụng macro assert!, như được hiển thị trong Listing 21-13.

pub struct ThreadPool;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        ThreadPool
    }

    // --snip--
    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

Chúng ta cũng đã thêm một số tài liệu cho ThreadPool của chúng ta với các chú thích tài liệu. Lưu ý rằng chúng ta đã tuân theo các thực hành tài liệu tốt bằng cách thêm một phần nêu ra các tình huống mà hàm của chúng ta có thể hoảng sợ, như đã thảo luận trong Chương 14. Hãy thử chạy cargo doc --open và nhấp vào cấu trúc ThreadPool để xem tài liệu được tạo ra cho new trông như thế nào!

Thay vì thêm macro assert! như chúng ta đã làm ở đây, chúng ta có thể thay đổi new thành build và trả về một Result như chúng ta đã làm với Config::build trong dự án I/O trong Listing 12-9. Nhưng chúng ta đã quyết định trong trường hợp này rằng việc cố gắng tạo một thread pool mà không có bất kỳ luồng nào nên là một lỗi không thể khôi phục. Nếu bạn cảm thấy tham vọng, hãy thử viết một hàm có tên build với chữ ký sau để so sánh với hàm new:

pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {

Tạo Không Gian Để Lưu Trữ Các Luồng

Bây giờ chúng ta có một cách để biết chúng ta có một số lượng luồng hợp lệ để lưu trữ trong pool, chúng ta có thể tạo các luồng đó và lưu trữ chúng trong cấu trúc ThreadPool trước khi trả về cấu trúc. Nhưng làm thế nào để chúng ta "lưu trữ" một luồng? Hãy xem lại chữ ký của thread::spawn:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

Hàm spawn trả về một JoinHandle<T>, trong đó T là loại mà closure trả về. Hãy thử sử dụng JoinHandle và xem điều gì xảy ra. Trong trường hợp của chúng ta, các closure mà chúng ta đang truyền cho thread pool sẽ xử lý kết nối và không trả về bất cứ điều gì, vì vậy T sẽ là kiểu đơn vị ().

Mã trong Listing 21-14 sẽ biên dịch nhưng chưa tạo bất kỳ luồng nào. Chúng ta đã thay đổi định nghĩa của ThreadPool để chứa một vector của các thể hiện thread::JoinHandle<()>, khởi tạo vector với dung lượng là size, thiết lập một vòng lặp for sẽ chạy một số mã để tạo các luồng, và trả về một thể hiện ThreadPool chứa chúng.

use std::thread;

pub struct ThreadPool {
    threads: Vec<thread::JoinHandle<()>>,
}

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut threads = Vec::with_capacity(size);

        for _ in 0..size {
            // create some threads and store them in the vector
        }

        ThreadPool { threads }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

Chúng ta đã đưa std::thread vào phạm vi trong crate thư viện vì chúng ta đang sử dụng thread::JoinHandle làm kiểu của các mục trong vector trong ThreadPool.

Khi một kích thước hợp lệ được nhận, ThreadPool của chúng ta tạo một vector mới có thể chứa size mục. Hàm with_capacity thực hiện cùng một nhiệm vụ như Vec::new nhưng với một sự khác biệt quan trọng: nó cấp phát trước không gian trong vector. Vì chúng ta biết chúng ta cần lưu trữ size phần tử trong vector, việc thực hiện cấp phát này trước là hiệu quả hơn một chút so với việc sử dụng Vec::new, việc này tự điều chỉnh kích thước khi các phần tử được chèn vào.

Khi bạn chạy cargo check một lần nữa, nó sẽ thành công.

Một Cấu Trúc Worker Chịu Trách Nhiệm Gửi Mã từ ThreadPool đến một Luồng

Chúng ta đã để lại một chú thích trong vòng lặp for trong Listing 21-14 liên quan đến việc tạo các luồng. Ở đây, chúng ta sẽ xem xét cách chúng ta thực sự tạo ra các luồng. Thư viện chuẩn cung cấp thread::spawn như một cách để tạo các luồng, và thread::spawn mong đợi nhận được một số mã mà luồng nên chạy ngay khi luồng được tạo. Tuy nhiên, trong trường hợp của chúng ta, chúng ta muốn tạo các luồng và có chúng đợi mã mà chúng ta sẽ gửi sau này. Việc triển khai các luồng của thư viện chuẩn không bao gồm bất kỳ cách nào để làm điều đó; chúng ta phải triển khai nó theo cách thủ công.

Chúng ta sẽ triển khai hành vi này bằng cách giới thiệu một cấu trúc dữ liệu mới giữa ThreadPool và các luồng sẽ quản lý hành vi mới này. Chúng ta sẽ gọi cấu trúc dữ liệu này là Worker, đây là một thuật ngữ phổ biến trong các triển khai pooling. Worker nhận mã cần được chạy và chạy mã đó trong luồng của Worker.

Hãy nghĩ về những người làm việc trong nhà bếp của một nhà hàng: các công nhân đợi cho đến khi đơn đặt hàng đến từ khách hàng, và sau đó họ chịu trách nhiệm lấy các đơn đặt hàng đó và thực hiện chúng.

Thay vì lưu trữ một vector các thể hiện JoinHandle<()> trong thread pool, chúng ta sẽ lưu trữ các thể hiện của cấu trúc Worker. Mỗi Worker sẽ lưu trữ một thể hiện JoinHandle<()> duy nhất. Sau đó, chúng ta sẽ triển khai một phương thức trên Worker sẽ lấy một closure của mã để chạy và gửi nó đến luồng đã chạy để thực thi. Chúng ta cũng sẽ cung cấp cho mỗi Worker một id để chúng ta có thể phân biệt giữa các thể hiện khác nhau của Worker trong pool khi ghi nhật ký hoặc gỡ lỗi.

Đây là quy trình mới sẽ xảy ra khi chúng ta tạo một ThreadPool. Chúng ta sẽ triển khai mã gửi closure đến luồng sau khi chúng ta đã thiết lập Worker theo cách này:

  1. Định nghĩa một cấu trúc Worker chứa một id và một JoinHandle<()>.
  2. Thay đổi ThreadPool để chứa một vector các thể hiện Worker.
  3. Định nghĩa một hàm Worker::new nhận một số id và trả về một thể hiện Worker chứa id và một luồng được tạo ra với một closure rỗng.
  4. Trong ThreadPool::new, sử dụng bộ đếm vòng lặp for để tạo ra một id, tạo một Worker mới với id đó, và lưu trữ worker trong vector.

Nếu bạn sẵn sàng cho một thử thách, hãy thử triển khai các thay đổi này một mình trước khi xem mã trong Listing 21-15.

Sẵn sàng chưa? Đây là Listing 21-15 với một cách để thực hiện các sửa đổi trước đó.

use std::thread;

pub struct ThreadPool {
    workers: Vec<Worker>,
}

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}

Chúng ta đã thay đổi tên của trường trên ThreadPool từ threads thành workers vì nó hiện đang chứa các thể hiện Worker thay vì các thể hiện JoinHandle<()>. Chúng ta sử dụng bộ đếm trong vòng lặp for làm đối số cho Worker::new, và chúng ta lưu trữ mỗi Worker mới trong vector có tên workers.

Mã bên ngoài (như máy chủ của chúng ta trong src/main.rs) không cần biết các chi tiết triển khai liên quan đến việc sử dụng cấu trúc Worker trong ThreadPool, vì vậy chúng ta làm cho cấu trúc Worker và hàm new của nó là riêng tư. Hàm Worker::new sử dụng id mà chúng ta cung cấp và lưu trữ một thể hiện JoinHandle<()> được tạo ra bằng cách tạo một luồng mới bằng một closure rỗng.

Lưu ý: Nếu hệ điều hành không thể tạo một luồng vì không có đủ tài nguyên hệ thống, thread::spawn sẽ hoảng sợ. Điều đó sẽ khiến toàn bộ máy chủ của chúng ta hoảng sợ, ngay cả khi việc tạo một số luồng có thể thành công. Để đơn giản, hành vi này là tốt, nhưng trong một triển khai thread pool sản xuất, bạn có thể muốn sử dụng std::thread::Builder và phương thức spawn của nó trả về Result thay thế.

Mã này sẽ biên dịch và sẽ lưu trữ số lượng thể hiện Worker mà chúng ta đã chỉ định làm đối số cho ThreadPool::new. Nhưng chúng ta vẫn không xử lý closure mà chúng ta nhận được trong execute. Hãy xem cách làm điều đó tiếp theo.

Gửi Yêu Cầu đến Các Luồng thông qua Các Kênh

Vấn đề tiếp theo mà chúng ta sẽ giải quyết là các closure được đưa cho thread::spawn hoàn toàn không làm gì cả. Hiện tại, chúng ta nhận được closure mà chúng ta muốn thực thi trong phương thức execute. Nhưng chúng ta cần cung cấp cho thread::spawn một closure để chạy khi chúng ta tạo mỗi Worker trong quá trình tạo ThreadPool.

Chúng ta muốn các cấu trúc Worker mà chúng ta vừa tạo lấy mã để chạy từ một hàng đợi được giữ trong ThreadPool và gửi mã đó đến luồng của nó để chạy.

Các kênh mà chúng ta đã học trong Chương 16—một cách đơn giản để giao tiếp giữa hai luồng—sẽ hoàn hảo cho trường hợp sử dụng này. Chúng ta sẽ sử dụng một kênh để hoạt động như hàng đợi các công việc, và execute sẽ gửi một công việc từ ThreadPool đến các thể hiện Worker, sẽ gửi công việc đến luồng của nó để chạy. Đây là kế hoạch:

  1. ThreadPool sẽ tạo một kênh và giữ người gửi.
  2. Mỗi Worker sẽ giữ người nhận.
  3. Chúng ta sẽ tạo một cấu trúc Job mới sẽ chứa các closure mà chúng ta muốn gửi qua kênh.
  4. Phương thức execute sẽ gửi công việc mà nó muốn thực thi thông qua người gửi.
  5. Trong luồng của nó, Worker sẽ lặp qua người nhận của nó và thực thi các closure của bất kỳ công việc nào nó nhận được.

Hãy bắt đầu bằng cách tạo một kênh trong ThreadPool::new và giữ người gửi trong thể hiện ThreadPool, như được hiển thị trong Listing 21-16. Cấu trúc Job không chứa gì bây giờ nhưng sẽ là loại mục mà chúng ta đang gửi qua kênh.

use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers, sender }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}

Trong ThreadPool::new, chúng ta tạo kênh mới của mình và có pool giữ người gửi. Điều này sẽ biên dịch thành công.

Hãy thử truyền người nhận của kênh vào mỗi Worker khi thread pool tạo kênh. Chúng ta biết rằng chúng ta muốn sử dụng người nhận trong luồng mà các thể hiện Worker tạo ra, vì vậy chúng ta sẽ tham chiếu đến tham số receiver trong closure. Mã trong Listing 21-17 chưa biên dịch hoàn toàn.

use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, receiver));
        }

        ThreadPool { workers, sender }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

// --snip--


struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}

Chúng ta đã thực hiện một số thay đổi nhỏ và đơn giản: chúng ta truyền người nhận vào Worker::new, và sau đó chúng ta sử dụng nó bên trong closure.

Khi chúng ta cố gắng kiểm tra mã này, chúng ta nhận được lỗi này:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
  --> src/lib.rs:26:42
   |
21 |         let (sender, receiver) = mpsc::channel();
   |                      -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
25 |         for id in 0..size {
   |         ----------------- inside of this loop
26 |             workers.push(Worker::new(id, receiver));
   |                                          ^^^^^^^^ value moved here, in previous iteration of loop
   |
note: consider changing this parameter type in method `new` to borrow instead if owning the value isn't necessary
  --> src/lib.rs:47:33
   |
47 |     fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
   |        --- in this method       ^^^^^^^^^^^^^^^^^^^ this parameter takes ownership of the value
help: consider moving the expression out of the loop so it is only moved once
   |
25 ~         let mut value = Worker::new(id, receiver);
26 ~         for id in 0..size {
27 ~             workers.push(value);
   |

For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` (lib) due to 1 previous error

Mã đang cố gắng truyền receiver cho nhiều thể hiện Worker. Điều này sẽ không hoạt động, như bạn sẽ nhớ lại từ Chương 16: việc triển khai kênh mà Rust cung cấp là nhiều producer, một consumer. Điều này có nghĩa là chúng ta không thể chỉ sao chép đầu tiêu thụ của kênh để sửa mã này. Chúng ta cũng không muốn gửi một thông điệp nhiều lần đến nhiều người tiêu dùng; chúng ta muốn một danh sách các thông điệp với nhiều thể hiện Worker sao cho mỗi thông điệp được xử lý một lần.

Ngoài ra, việc lấy một công việc khỏi hàng đợi kênh liên quan đến việc thay đổi receiver, vì vậy các luồng cần một cách an toàn để chia sẻ và sửa đổi receiver; nếu không, chúng ta có thể gặp phải các điều kiện đua (như đã đề cập trong Chương 16).

Nhớ lại các con trỏ thông minh an toàn cho luồng được thảo luận trong Chương 16: để chia sẻ quyền sở hữu trên nhiều luồng và cho phép các luồng thay đổi giá trị, chúng ta cần sử dụng Arc<Mutex<T>>. Loại Arc sẽ cho phép nhiều thể hiện Worker sở hữu người nhận, và Mutex sẽ đảm bảo rằng chỉ một Worker nhận được một công việc từ người nhận tại một thời điểm. Listing 21-18 hiển thị các thay đổi chúng ta cần thực hiện.

use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};
// --snip--

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

// --snip--

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        // --snip--
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}

Trong ThreadPool::new, chúng ta đặt người nhận trong một Arc và một Mutex. Đối với mỗi Worker mới, chúng ta sao chép Arc để tăng số đếm tham chiếu để các thể hiện Worker có thể chia sẻ quyền sở hữu của người nhận.

Với những thay đổi này, mã biên dịch! Chúng ta đang tiến gần đến đích!

Triển Khai Phương Thức execute

Cuối cùng, hãy triển khai phương thức execute trên ThreadPool. Chúng ta cũng sẽ thay đổi Job từ một cấu trúc thành một bí danh kiểu cho một đối tượng đặc điểm chứa loại closure mà execute nhận được. Như đã thảo luận trong "Tạo Đồng Nghĩa Kiểu với Bí Danh Kiểu" trong Chương 20, bí danh kiểu cho phép chúng ta làm cho các kiểu dài ngắn hơn để dễ sử dụng. Hãy xem Listing 21-19.

use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

// --snip--

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

// --snip--

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}

Sau khi tạo một thể hiện Job mới bằng cách sử dụng closure mà chúng ta nhận được trong execute, chúng ta gửi công việc đó qua đầu gửi của kênh. Chúng ta đang gọi unwrap trên send cho trường hợp gửi thất bại. Điều này có thể xảy ra nếu, ví dụ, chúng ta dừng tất cả các luồng của mình từ việc thực thi, nghĩa là đầu nhận đã dừng nhận các thông điệp mới. Tại thời điểm này, chúng ta không thể dừng các luồng của mình từ việc thực thi: các luồng của chúng ta tiếp tục thực thi miễn là pool tồn tại. Lý do chúng ta sử dụng unwrap là vì chúng ta biết trường hợp thất bại sẽ không xảy ra, nhưng trình biên dịch không biết điều đó.

Nhưng chúng ta chưa hoàn thành! Trong Worker, closure của chúng ta đang được truyền cho thread::spawn vẫn chỉ tham chiếu đến đầu nhận của kênh. Thay vào đó, chúng ta cần closure để lặp mãi mãi, yêu cầu đầu nhận của kênh một công việc và chạy công việc khi nó nhận được một. Hãy thực hiện thay đổi được hiển thị trong Listing 21-20 cho Worker::new.

use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}

Ở đây, trước tiên chúng ta gọi lock trên receiver để có được mutex, và sau đó chúng ta gọi unwrap để hoảng sợ khi có bất kỳ lỗi nào. Việc có được một khóa có thể không thành công nếu mutex đang ở trạng thái poisoned, điều này có thể xảy ra nếu một số luồng khác hoảng sợ trong khi giữ khóa thay vì giải phóng khóa. Trong tình huống này, việc gọi unwrap để có luồng này hoảng sợ là hành động đúng đắn để thực hiện. Đừng ngại thay đổi unwrap này thành expect với một thông báo lỗi có ý nghĩa đối với bạn.

Nếu chúng ta có được khóa trên mutex, chúng ta gọi recv để nhận một Job từ kênh. Một unwrap cuối cùng vượt qua bất kỳ lỗi nào ở đây cũng vậy, có thể xảy ra nếu luồng giữ người gửi đã tắt, tương tự như cách phương thức send trả về Err nếu người nhận tắt.

Cuộc gọi đến recv chặn, vì vậy nếu chưa có công việc, luồng hiện tại sẽ đợi cho đến khi một công việc có sẵn. Mutex<T> đảm bảo rằng chỉ một luồng Worker tại một thời điểm đang cố gắng yêu cầu một công việc.

Thread pool của chúng ta bây giờ đang ở trạng thái hoạt động! Hãy thử cargo run và thực hiện một số yêu cầu:

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
warning: field `workers` is never read
 --> src/lib.rs:7:5
  |
6 | pub struct ThreadPool {
  |            ---------- field in this struct
7 |     workers: Vec<Worker>,
  |     ^^^^^^^
  |
  = note: `#[warn(dead_code)]` on by default

warning: fields `id` and `thread` are never read
  --> src/lib.rs:48:5
   |
47 | struct Worker {
   |        ------ fields in this struct
48 |     id: usize,
   |     ^^
49 |     thread: thread::JoinHandle<()>,
   |     ^^^^^^

warning: `hello` (lib) generated 2 warnings
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 4.91s
     Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.

Thành công! Bây giờ chúng ta có một thread pool thực thi các kết nối không đồng bộ. Không bao giờ có nhiều hơn bốn luồng được tạo ra, vì vậy hệ thống của chúng ta sẽ không bị quá tải nếu máy chủ nhận được rất nhiều yêu cầu. Nếu chúng ta thực hiện một yêu cầu đến /sleep, máy chủ sẽ có thể phục vụ các yêu cầu khác bằng cách có một luồng khác chạy chúng.

Lưu ý: Nếu bạn mở /sleep trong nhiều cửa sổ trình duyệt cùng một lúc, chúng có thể tải một cái một lúc trong khoảng thời gian năm giây. Một số trình duyệt web thực thi nhiều phiên bản của cùng một yêu cầu tuần tự vì lý do cache. Hạn chế này không phải do máy chủ web của chúng ta gây ra.

Đây là một thời điểm tốt để tạm dừng và xem xét cách mã trong Listings 21-18, 21-19, và 21-20 sẽ khác nhau nếu chúng ta đang sử dụng futures thay vì closure cho công việc cần thực hiện. Những kiểu nào sẽ thay đổi? Chữ ký phương thức sẽ khác nhau như thế nào, nếu có? Những phần nào của mã sẽ giữ nguyên?

Sau khi học về vòng lặp while let trong Chương 17 và 18, bạn có thể đang tự hỏi tại sao chúng ta không viết mã luồng công nhân như được hiển thị trong Listing 21-21.

use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}
// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            while let Ok(job) = receiver.lock().unwrap().recv() {
                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}

Mã này biên dịch và chạy nhưng không dẫn đến hành vi luồng mong muốn: một yêu cầu chậm vẫn sẽ khiến các yêu cầu khác phải đợi để được xử lý. Lý do hơi tinh tế: cấu trúc Mutex không có phương thức unlock công khai vì quyền sở hữu của khóa dựa trên vòng đời của MutexGuard<T> trong LockResult<MutexGuard<T>> mà phương thức lock trả về. Tại thời điểm biên dịch, trình kiểm tra mượn sau đó có thể thực thi quy tắc rằng một tài nguyên được bảo vệ bởi Mutex không thể được truy cập trừ khi chúng ta giữ khóa. Tuy nhiên, việc triển khai này cũng có thể dẫn đến khóa được giữ lâu hơn dự định nếu chúng ta không chú ý đến vòng đời của MutexGuard<T>.

Mã trong Listing 21-20 sử dụng let job = receiver.lock().unwrap().recv().unwrap(); hoạt động vì với let, bất kỳ giá trị tạm thời nào được sử dụng trong biểu thức ở phía bên phải của dấu bằng sẽ bị bỏ ngay lập tức khi câu lệnh let kết thúc. Tuy nhiên, while let (và if letmatch) không bỏ các giá trị tạm thời cho đến khi kết thúc khối liên kết. Trong Listing 21-21, khóa vẫn được giữ trong suốt thời gian gọi job(), có nghĩa là các thể hiện Worker khác không thể nhận công việc.