Rust学习笔记

Rust编程语言入门教程课程笔记

参考教材: The Rust Programming Language (by Steve Klabnik and Carol Nichols, with contributions from the Rust Community)

Lecture 20: Final Project: Building a Multithreaded Web Server

src/main.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
use std::fs;
use std::{
io::{prelude::*, BufReader},
net::{TcpListener, TcpStream},
};
use std::thread;
use std::time::Duration;
use hello::ThreadPool;

fn main() {
// bind to a port
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);// set a limit on the number of threads in the pool

// listen for incoming connections
for stream in listener.incoming().take(2) {// only accept two connections
let stream = stream.unwrap();

// println!("Connection established!");

// handle each connection
// handle_connection(stream);

// spawn a new thread for each connection
// thread::spawn(|| {
// handle_connection(stream);//suffer from ddos attack
// });

pool.execute(|| {
handle_connection(stream);
});
}

println!("Shutting down.");
}

fn handle_connection(mut stream: TcpStream) {
// let buf_reader = BufReader::new(&mut stream);
// let http_request: Vec<_> = buf_reader
// .lines()
// .map(|result| result.unwrap())
// .take_while(|line| !line.is_empty())
// .collect();

// println!("Request: {:#?}", http_request);
let mut buffer = [0; 1024];
stream.read(&mut buffer).unwrap();
let get = b"GET / HTTP/1.1\r\n";
let sleep = b"GET /sleep HTTP/1.1\r\n";

//Reading a Request

// Request Format
// Method Request-URI HTTP-Version CRLF
// headers CRLF
// message-body

// Example

// Request: [
// "GET / HTTP/1.1",
// "Host: 127.0.0.1:7878",
// "Connection: keep-alive",
// "sec-ch-ua: \"Google Chrome\";v=\"119\", \"Chromium\";v=\"119\", \"Not?A_Brand\";v=\"24\"",
// "sec-ch-ua-mobile: ?0",
// "sec-ch-ua-platform: \"macOS\"",
// "Upgrade-Insecure-Requests: 1",
// "User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
// "Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
// "Sec-Fetch-Site: none",
// "Sec-Fetch-Mode: navigate",
// "Sec-Fetch-User: ?1",
// "Sec-Fetch-Dest: document",
// "Accept-Encoding: gzip, deflate, br",
// "Accept-Language: zh-CN,zh;q=0.9,en;q=0.8",
// ]

//Writing a Response

// Response Format
// HTTP-Version Status-Code Reason-Phrase CRLF
// headers CRLF
// message-body

//let response = "HTTP/1.1 200 OK\r\n\r\n";
// let contents = fs::read_to_string("hello.html").unwrap();
// let response = format!("HTTP/1.1 200 OK\r\n\r\n{}", contents);

// write the response to the stream
// stream.write(response.as_bytes()).unwrap();
// stream.flush().unwrap();

// if buffer.starts_with(get) {
// let contents = fs::read_to_string("hello.html").unwrap();
// let response = format!("HTTP/1.1 200 OK\r\n\r\n{}", contents);
// stream.write(response.as_bytes()).unwrap();
// stream.flush().unwrap();
// } else {
// // other request
// let status_line = "HTTP/1.1 404 NOT FOUND\r\n\r\n";
// let contents = fs::read_to_string("404.html").unwrap();
// let response = format!("{}{}", status_line, contents);
// stream.write(response.as_bytes()).unwrap();
// stream.flush().unwrap();
// }

// Refactoring
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else if buffer.starts_with(sleep) {// simulate a slow request
// simulate a slow request
std::thread::sleep(std::time::Duration::from_secs(5));
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
}
else {
("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
};

let contents = fs::read_to_string(filename).unwrap();
let response = format!("{}{}", status_line, contents);
stream.write(response.as_bytes()).unwrap();
stream.flush().unwrap();
}

src/lib.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
use std::thread;
use std::sync::{mpsc, Arc, Mutex};

enum Message {
NewJob(Job),
Terminate,
}

struct Worker {
id: usize,
// thread: thread::JoinHandle<()>,
thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
// let thread = thread::spawn(|| {});
// let thread = thread::spawn(|| {
// receiver;
// });
// let thread = thread::spawn(move || loop {
// while let Ok(_) = receiver.lock().unwrap().recv().unwrap(){
// println!("Worker {} got a job; executing.", id);
// job.call_box();
// }
// });


let thread = thread::spawn(move || loop {
let message = receiver.lock().unwrap().recv().unwrap();
match message {
Message::NewJob(job) => {
println!("Worker {} got a job; executing.", id);
job.call_box();
},
Message::Terminate => {
println!("Worker {} was told to terminate.", id);
break;
},
}
});
Worker { id, thread: Some(thread) }
}

// fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
// let thread = thread::spawn(|| {
// // loop {
// // receiver.lock().unwrap().recv().unwrap();
// // }
// loop {
// let job = receiver.lock().unwrap().recv().unwrap();
// println!("Worker {} got a job; executing.", id);
// job.call_box();
// }
// });
// Worker { id, thread }
// }
}

pub struct ThreadPool {
// threads: Vec<thread::JoinHandle<()>>,
workers: Vec<Worker>,
// sender: mpsc::Sender<Job>,
sender: mpsc::Sender<Message>,
}

impl ThreadPool {
/// Create a new ThreadPool
///
/// The size is the number of threads in the pool
///
/// # Panics
///
/// The `new` function will panic if the size is zero
//

// pub fn new(size: usize) -> ThreadPool {
// assert!(size > 0);
// let (sender, receiver) = mpsc::channel();
// let mut workers = Vec::with_capacity(size);
// for id in 0..size {
// workers.push(Worker::new(id, receiver));
// }

// ThreadPool { workers, sender }
// }

pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}

ThreadPool { workers, sender }
}

pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);//Box<dyn FnOnce() + Send + 'static>
// self.sender.send(job).unwrap();
self.sender.send(Message::NewJob(job)).unwrap();
}

// the execute method should be similar with thread::spawn

// pub fn spawn<F, T>(f: F) -> JoinHandle<T>
// where
// F: FnOnce() -> T + Send + 'static,
// T: Send + 'static,
// {
// Builder::new().spawn(f).expect("failed to spawn thread")
// }

}

impl Drop for ThreadPool {
fn drop(&mut self) {
println!("Sending terminate message to all workers.");
// for worker in &mut self.workers {
// println!("Shutting down worker {}", worker.id);
// worker.thread.join().unwrap();
// }
// for worker in &mut self.workers {
// println!("Shutting down worker {}", worker.id);
// // worker.thread.join().unwrap();
// if let Ok(_) = worker.thread.join() {
// println!("Worker {} shut down successfully.", worker.id);
// } else {
// println!("Worker {} failed to shut down.", worker.id);
// }
// }

for _ in &mut self.workers {
self.sender.send(Message::Terminate).unwrap();
}

println!("Shutting down all workers.");

for worker in &mut self.workers {
// println!("Shutting down worker {}", worker.id);
// worker.thread.join().unwrap();
if let Some(thread) = worker.thread.take() {
if let Ok(_) = thread.join() {
println!("Worker {} shut down successfully.", worker.id);
} else {
println!("Worker {} failed to shut down.", worker.id);
}
}
}
}
}

// struct Job;
type Job = Box<dyn FnBox + Send + 'static>;

trait FnBox {
fn call_box(self: Box<Self>);
}

impl <F: FnOnce()> FnBox for F {
fn call_box(self: Box<F>) {
(*self)()
}
}

hello.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<!DOCTYPE html>
<html lang="en">

<head>
<meta charset="UTF-8">
<title>Hello World</title>
</head>

<body>
<h1>Hello World</h1>
<p>
<?php
echo "Hello World";
?>
</p>
</body>

</html>

404.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<!DOCTYPE html>
<html lang="en">

<head>
<meta charset="UTF-8">
<title>404</title>
</head>

<body>
<h1>Oops!</h1>
<p>Page not found</p>
</body>

</html>