CS3984 Computer Systems in Rust



Concurrency Models

  • Shared-State
    • Multiple threads share access to the same piece of data.
    • Access protected, usually by using mutexes and atomics.
  • Message-passing
    • Data is single-owner, and multiple threads access the data by passing it between themselves.
    • The passing is done through unidirectional channels.

Rust provides a channel data structure implementation through the std::sync::mpsc module.

Channels

A channel can be created with the std::sync::mpsc function:

fn main() {
    let (tx, rx) = std::sync::mpsc::channel::<i32>();  // 1
    tx.send(5).unwrap();  // 2

    let mut value = rx.recv().unwrap();  // 3
    assert_eq!(value, 5);
}


  1. tx is a std::sync::mpsc::Sender<i32>, and rx is a std::sync::mpsc::Receiver<i32>.

  2. Data (in the example, values of type i32) can be sent through the sender tx,

  3. …and received through the receiver rx.

Channels with Multiple Threads

Channels can be send across threads.

fn main() {
    let (tx, rx) = std::sync::mpsc::channel::<i32>();

    std::thread::spawn(move || {
        tx.send(5).unwrap();
    });

    std::thread::spawn(move || {
        let value = rx.recv().unwrap();
        assert_eq!(value, 5);
    });
}


Sender

pub fn send(&self, t: T) -> Result<(), SendError<T>>;


  1. Ok(()) is returned if the data was successfully placed on the channel.

    • This does not mean a thread has gotten the data through recv().
  2. SendError(T) is returned if the receiving end of the channel no longer exists.

  3. The method never blocks the current thread.

fn main() {
    let (tx, rx) = std::sync::mpsc::channel::<i32>();

    drop(rx);  // Close the receiving end

    assert_eq!(tx.send(5), Err(std::sync::mpsc::SendError(5)));
}


Receiver

pub fn recv(&self) -> Result<T, RecvError>;


  1. Ok(T) is returned if data was successfully received from the channel.

  2. Err(RecvError) is returned if there is no more data and no more senders exist.

  3. The method blocks the current thread until data is available or no more senders exist.

fn main() {
    let (tx, rx) = std::sync::mpsc::channel::<i32>();

    tx.send(5).unwrap();
    drop(tx);  // Close the sending end

    assert_eq!(rx.recv(), Ok(5));
    assert_eq!(rx.recv(), Err(std::sync::mpsc::RecvError));
}


Receiver (2)

pub fn try_recv(&self) -> Result<T, TryRecvError>;


  1. Err(TryRecvError::Disconnected) is returned if no more senders exist.
  2. Err(TryRecvError::Empty) is returned if there is no data in the channel.
fn main() {
    let (tx, rx) = std::sync::mpsc::channel::<i32>();

    assert_eq!(rx.try_recv(), Err(std::sync::mpsc::TryRecvError::Empty));

    tx.send(5).unwrap();
    drop(tx);  // Close the sending end

    assert_eq!(rx.try_recv(), Ok(5));
    assert_eq!(rx.try_recv(), Err(std::sync::mpsc::TryRecvError::Disconnected));
}


MPSC: Multiple Producer, Single Consumer

fn main() {
    let (tx_1, rx) = std::sync::mpsc::channel::<String>();
    let tx_2 = tx_1.clone();
    std::thread::spawn(move || {
        for i in 0..5 {
            tx_1.send(format!("Thread 1: {i}")).unwrap();
            std::thread::sleep(std::time::Duration::from_millis(500));
        }
    });
    std::thread::spawn(move || {
        for i in 0..5 {
            tx_2.send(format!("Thread 2: {i}")).unwrap();
            std::thread::sleep(std::time::Duration::from_millis(300));
        }
    });
    for value in rx {
        println!("Received: {value}");
    }
}


impl<T: Send> Send for Sender<T>

What’s wrong with this?

use std::rc::Rc;

fn main() {
    let (tx, rx) = std::sync::mpsc::channel::<Rc<i32>>();

    std::thread::spawn(move || {
        let _ = rx.recv().unwrap();
    });

    tx.send(Rc::new(5)).unwrap();
}


impl<T: Send> Send for Sender<T>

What’s wrong with this?

use std::rc::Rc;

fn main() {
    let (tx, rx) = std::sync::mpsc::channel::<Rc<i32>>();

    std::thread::spawn(move || {
        let _ = rx.recv().unwrap();
    });

    let rc = Rc::new(5);

    let rc_2 = Rc::clone(rc);  // !! Same underlying reference counter
    tx.send(rc_2).unwrap();

    drop(rc);  // Data race
}


Use std::sync::Arc; reference counts are updated atomically.