CS3984 Computer Systems in Rust



Accessing Shared State

  • Rust’s ownership model prevents accesses to shared state that otherwise would cause data races.

  • Several techniques exist for threads to exchange data

    • traditional shared state (std::sync::Mutex)
    • including data types supporting atomic operations
    • message passing via channel

Mutexes std::sync::Mutex

  • Mutex<T> here does not refer to an instance of synchronization device; rather, it describes the combination of some instance of type T and a lock (=mutex)

  • Locking is mandatory - shared state can only be accessed via .lock() which provides (temporary mutually exclusive) access to the protected object within the scope of a MutexGuard returned by .lock()

  • lock is released when guard is dropped

  • avoids common mistakes in traditional use of mutexes/locks, such access

    • failing to lock
    • ambiguity about which locks protects data
    • or failing to unlock
  • other sources of bugs inherent in this model remain: potential for deadlock, atomicity violations - also same performance impact caused by serialization

std::sync::Arc

  • provides an atomically referenced counted, thread-safe smart pointer that allows multiple threads to safely share an (immutable) objects even when their lifetimes differ

  • multithreaded equivalent to std::rc::Rc

  • does not allow mutable access to what’s being counted; this requires a Mutex (or one of its alternatives)

  • an Arc is necessary for threads whose lifetimes may differ to even safely access a Mutex

Shared Counter Example

use std::thread;
use std::sync::{Arc, Mutex};

fn main() {
    let protected_shared_counter = Arc::new(Mutex::new(0));
    let threads = (0..4).map(|_| {
        let counter = protected_shared_counter.clone();
        thread::spawn(move || {
            for _i in 0..1000000 {
                if let Ok(mut v) = counter.lock() {
                    *v += 1;    // lock is held here until `v` is dropped
                }
            }
        })
    });
    threads.into_iter().for_each(|handle| { let _ = handle.join(); });
    let final_value = protected_shared_counter.lock().unwrap();
    println!("final_value {}", final_value);
}


Scoped Threads

  • To reduce the need for Arc, and to directly allow the sharing of immutably borrowed objects, it is possible to synchronized the lifetimes of threads by spawning them via a shared scope

Scoped Threads Example

use std::sync::Mutex;
fn main() {
    let protected_shared_counter = Mutex::new(0);
    std::thread::scope(|s| {
        for _i in 0..4 {
            let counter = &protected_shared_counter; // ref is moved into thread
            s.spawn(move || {
                for _i in 0..1000000 {
                    if let Ok(mut v) = counter.lock() {
                        *v += 1;    // critical section
                    }
                }
            });
        }
    }); // automatic join at end of scope

    let final_value = protected_shared_counter.lock().unwrap();
    println!("final_value {}", final_value);
}


Scoped Threads Example (Take 2)

use std::sync::Mutex;
fn main() {
    let protected_shared_counter = Mutex::new(0);
    // note: this closure doesn't move, so `protected_shared_counter`
    // is immutably borrowed by all threads.
    std::thread::scope(|s| {
        for _i in 0..4 {
            s.spawn(|| {
                for _i in 0..1000000 {
                    if let Ok(mut v) = protected_shared_counter.lock() {
                        *v += 1;
                    }
                }
            });
        }
    }); // automatic join at end of scope
    let final_value = protected_shared_counter.lock().unwrap();
    println!("final_value {}", final_value);
}


Side note: Beware of Arc::get_mut

  • Arc provide shared ownership, but not mutability.

  • A get_mut method is provided, but it will fail with None if the Arc is in fact shared (has a refcount > 1).

  • So Arc::new(0) does not provide an atomic integer

Arc::get_mut Non-working Example

use std::thread;
use std::sync::Arc;

fn main() {
    let shared_counter = Arc::new(0);
    (0..4).map(|_| {
        let mut counter = shared_counter.clone();
        thread::spawn(move || {
            for _i in 0..1000000 {
                // will fail at runtime since Arc has refcount > 1
                *Arc::get_mut(&mut counter).unwrap() += 1;
            }
        })
    }).into_iter().for_each(|handle| {
        let _rc = handle.join();
    });
}


Atomic Variables

  • An Arc may also wrap types that provide direct atomic operations (AtomicI32, etc.)

  • These types provide methods for atomic read-update-write operations, e.g. fetch_add performs these three operations atomically

    • fetch current value
    • add some amount
    • write new value back
  • If multiple threads perform this operation, their effects will be serialized and remain consistent

  • Support for weaker memory models provided; in fact, must specify the required consistency for each operation

  • Ordering::SeqCst provides sequential consistency of this atomic operation with respect to others. (Common case.)

Atomic Variables Example

use std::thread;
use std::sync::Arc;
use std::sync::atomic::{AtomicI32, Ordering};

fn main() {
    let shared_counter = Arc::new(AtomicI32::new(0));
    (0..4).map(|_| {
        let counter = shared_counter.clone();
        thread::spawn(move || {
            for _i in 0..1000000 {
                counter.fetch_add(1, Ordering::SeqCst); // atomic op
            }
        })
    }).into_iter().for_each(|handle| { 
        let _ = handle.join(); 
    });
    let final_value = shared_counter.load(Ordering::SeqCst);
    println!("final_value {}", final_value);
}