Concurrency Models
-
Shared-State
- Multiple threads share access to the same piece of data.
- Access protected, usually by using mutexes and atomics.
-
Message-passing
- Data is single-owner, and multiple threads access the data by passing it between themselves.
- The passing is done through unidirectional channels.
Rust provides a channel data structure implementation through the std::sync::mpsc
module.
Channels
A channel can be created with the std::sync::mpsc
function:
fn main() {
let (tx, rx) = std::sync::mpsc::channel::<i32>(); // 1
tx.send(5).unwrap(); // 2
let mut value = rx.recv().unwrap(); // 3
assert_eq!(value, 5);
}
-
tx
is astd::sync::mpsc::Sender<i32>
, andrx
is astd::sync::mpsc::Receiver<i32>
. -
Data (in the example, values of type
i32
) can be sent through the sendertx
, -
…and received through the receiver
rx
.
Channels with Multiple Threads
Channels can be send across threads.
fn main() {
let (tx, rx) = std::sync::mpsc::channel::<i32>();
std::thread::spawn(move || {
tx.send(5).unwrap();
});
std::thread::spawn(move || {
let value = rx.recv().unwrap();
assert_eq!(value, 5);
});
}
Sender
pub fn send(&self, t: T) -> Result<(), SendError<T>>;
-
Ok(())
is returned if the data was successfully placed on the channel.-
This does not mean a thread has gotten the data through
recv()
.
-
This does not mean a thread has gotten the data through
-
SendError(T)
is returned if the receiving end of the channel no longer exists. -
The method never blocks the current thread.
fn main() {
let (tx, rx) = std::sync::mpsc::channel::<i32>();
drop(rx); // Close the receiving end
assert_eq!(tx.send(5), Err(std::sync::mpsc::SendError(5)));
}
Receiver
pub fn recv(&self) -> Result<T, RecvError>;
-
Ok(T)
is returned if data was successfully received from the channel. -
Err(RecvError)
is returned if there is no more data and no more senders exist. -
The method blocks the current thread until data is available or no more senders exist.
fn main() {
let (tx, rx) = std::sync::mpsc::channel::<i32>();
tx.send(5).unwrap();
drop(tx); // Close the sending end
assert_eq!(rx.recv(), Ok(5));
assert_eq!(rx.recv(), Err(std::sync::mpsc::RecvError));
}
Receiver (2)
pub fn try_recv(&self) -> Result<T, TryRecvError>;
-
Err(TryRecvError::Disconnected)
is returned if no more senders exist. -
Err(TryRecvError::Empty)
is returned if there is no data in the channel.
fn main() {
let (tx, rx) = std::sync::mpsc::channel::<i32>();
assert_eq!(rx.try_recv(), Err(std::sync::mpsc::TryRecvError::Empty));
tx.send(5).unwrap();
drop(tx); // Close the sending end
assert_eq!(rx.try_recv(), Ok(5));
assert_eq!(rx.try_recv(), Err(std::sync::mpsc::TryRecvError::Disconnected));
}
MPSC: Multiple Producer, Single Consumer
fn main() {
let (tx_1, rx) = std::sync::mpsc::channel::<String>();
let tx_2 = tx_1.clone();
std::thread::spawn(move || {
for i in 0..5 {
tx_1.send(format!("Thread 1: {i}")).unwrap();
std::thread::sleep(std::time::Duration::from_millis(500));
}
});
std::thread::spawn(move || {
for i in 0..5 {
tx_2.send(format!("Thread 2: {i}")).unwrap();
std::thread::sleep(std::time::Duration::from_millis(300));
}
});
for value in rx {
println!("Received: {value}");
}
}
impl<T: Send> Send for Sender<T>
What’s wrong with this?
use std::rc::Rc;
fn main() {
let (tx, rx) = std::sync::mpsc::channel::<Rc<i32>>();
std::thread::spawn(move || {
let _ = rx.recv().unwrap();
});
tx.send(Rc::new(5)).unwrap();
}
impl<T: Send> Send for Sender<T>
What’s wrong with this?
use std::rc::Rc;
fn main() {
let (tx, rx) = std::sync::mpsc::channel::<Rc<i32>>();
std::thread::spawn(move || {
let _ = rx.recv().unwrap();
});
let rc = Rc::new(5);
let rc_2 = Rc::clone(rc); // !! Same underlying reference counter
tx.send(rc_2).unwrap();
drop(rc); // Data race
}
Use std::sync::Arc
; reference counts are updated atomically.