Channel允许在Rust中创建一个消息传递渠道,它返回一个元组结构体,其中包含发送和接收端。发送端用于向通道发送数据,而接收端则用于从通道接收数据。不能使用可变变量的方式,线程外面修改了可变变量的值,线程里面是拿不到最新的值的。
不可用方式
is_loading在主线程里面,通过参数传递给子线程,等待5秒之后,主线程里面的is_loading修改为false,但是子线程里面获取不到最新的值,是由于所有权和线程安全性的限制所导致的。所以会导致子线程一直运行:
use std::io::{self, Write};
use std::thread;
use std::time::Duration;
fn main() {
let mut is_loading = true;
// creat close package must use move to get is_loading
thread::spawn(move || { sub_loading(&is_loading) });
// sleep 3 second then stop loading
thread::sleep(Duration::from_secs(5));
// set loading stop
is_loading = false;
loop {
thread::sleep(Duration::from_millis(250));
print!("build done");
}
}
fn sub_loading(flag: &bool) {
let loading_chars = vec!['-', '\\', '|', '/'];
let mut index = 0;
// cant get newe value
while *flag {
print!("\rLoading {} {} ", flag, loading_chars[index]);
io::stdout().flush().unwrap();
index = (index + 1) % loading_chars.len();
thread::sleep(Duration::from_millis(250));
}
}
Channel通道说明
每个channel由两部分组成:发送端(Sender)和接收端(Receiver)。 发送端用于向channel发送消息,而接收端则用于接收这些消息。这种机制允许线程之间的安全通信,避免了共享内存的复杂性和潜在的数据竞争问题。 (通过通信来共享内存,而非通过共享内存来通信) Rust的channel为线程间通信提供了一种安全、简单的方式,是构建并发应用的基础工具之一。channel是Rust标准库的一部分,自Rust 1.0版本以来就包含了这个功能。随着Rust语言和标准库的发展,channel的实现和API可能会有所改进,但其基本概念和用法保持一致。
基本步骤如下:
创建: 使用std::sync::mpsc::channel()函数创建一个新的channel,这个函数返回一个包含发送端(Sender)和接收端(Receiver)的元组。
发送: 使用发送端的send方法发送消息。send方法接受一个消息值,如果接收端已经被丢弃,会返回一个错误。
接收: 使用接收端的recv方法接收消息。recv会阻塞当前线程直到一个消息可用,或者channel被关闭。
最简单的一个使用demo:
use std::sync::mpsc;
use std::thread;
fn main() {
// 创建一个通道
let (sender, receiver) = mpsc::channel();
// 在主线程中发送数据
let data = 42;
sender.send(data).unwrap();
// 创建子线程来接收数据
let child_thread = thread::spawn(move || {
let received_data = receiver.recv().unwrap();
println!("Received data in child thread: {}", received_data);
});
child_thread.join().unwrap();
}
在Rust中,如果你想停止一个由thread::spawn
创建的线程,你不能直接停止它,因为Rust没有提供直接的方式来停止线程。但是,你可以通过通信来请求线程优雅地停止运行。
如果想使用一种方式,让主线程控制子线程停止,就可以使用channel通道:
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
let child_thread = thread::spawn(move || {
loop {
// 检查是否收到停止信号
if let Ok(_) = rx.try_recv() {
println!("线程收到停止信号,正在退出...");
break; // 退出循环,线程结束
}
// 执行线程的工作
println!("sub child thread is ruing");
// 为了演示,让线程休眠一会儿
thread::sleep(Duration::from_secs(1));
}
println!("线程已退出。");
});
// 主线程等6秒,模拟耗时操作
thread::sleep(Duration::from_secs(6));
// 假设6秒后需要停止线程
tx.send(()).unwrap(); // 发送停止信号
// 等待线程退出
child_thread.join().unwrap();
}
运行结果:
注意事项
recv:
方法是阻塞的,即 它会阻塞当前线程, 直到从通道中接收到消息。 线程在调用rx.recv().unwrap()时会阻塞 等待消息的到来。一旦主线程通过tx.send(msg).unwrap();发送了消息,主线程会接收到这个消息并继续执行,之后程序才会正常退出。
try_recv:尝试在不阻塞的情况下返回此接收器上的挂起值。此方法永远不会为了等待数据可用而阻止调用方。相反,它将始终立即返回,并可能选择通道上的挂起数据。这对于在决定阻塞接收器之前进行“乐观检查”非常有用。
recv_timeout:尝试在此接收器上等待一个值,如果相应的通道已挂断或达到最后期限,则返回错误。如果没有可用的数据,并且可能发送更多的数据,此函数将始终阻止当前线程。将消息发送到相应的发件人(或SyncSender)后,此收件人将唤醒并返回该消息。如果相应的Sender已断开连接,或者在该呼叫被阻止时断开了连接,则该呼叫将唤醒并返回Err,以指示在此信道上无法再接收到消息。但是,由于通道是缓冲的,因此在断开连接之前发送的消息仍将被正确接收。
recv_deadline:返回一个迭代器,该迭代器将阻止等待消息,但永远不要惊慌!当频道挂断时,它将返回None。
iter:返回一个迭代器,该迭代器将尝试生成所有挂起的值。如果没有更多挂起的值或通道已挂起,它将返回None。迭代器永远不会死机!或者通过等待值来阻止用户。
关于MPSC讲解
其中mpsc是Multi producer, Single consumer FIFO queue的缩写,即多生产者单消费者先入先出队列。Rust标准库提供的channel是MPSC(多生产者,单消费者)模型,这意味着可以有多个发送端(Sender)向同一个接收端(Receiver)发送消息。这种模式非常适用于工作队列模型,其中多个生产者线程生成任务,而单个消费者线程处理这些任务。
除了MPSC之外,还有如下几种模型:
SPSC(Single Producer Single Consumer): 单生产者单消费者。
SPMC(Single Producer Multiple Consumer): 单生产者多消费者。
MPSC(Multi Producer Single Consumer): 多生产者单消费者, Rust中标准的mpsc模型。
MPMC(Multi Producer Multi Consumer)*: 多生产者多消费者。