Accessing Shared State
-
Rust’s ownership model prevents accesses to shared state that otherwise would cause data races.
-
Several techniques exist for threads to exchange data
-
traditional shared state (
std::sync::Mutex) - including data types supporting atomic operations
- message passing via channel
-
traditional shared state (
Mutexes std::sync::Mutex
-
Mutex<T>here does not refer to an instance of synchronization device; rather, it describes the combination of some instance of typeTand a lock (=mutex) -
Locking is mandatory - shared state can only be accessed via
.lock()which provides (temporary mutually exclusive) access to the protected object within the scope of aMutexGuardreturned by.lock() -
lock is released when guard is dropped
-
avoids common mistakes in traditional use of mutexes/locks, such access
- failing to lock
- ambiguity about which locks protects data
- or failing to unlock
-
other sources of bugs inherent in this model remain: potential for deadlock, atomicity violations - also same performance impact caused by serialization
std::sync::Arc
-
provides an atomically referenced counted, thread-safe smart pointer that allows multiple threads to safely share an (immutable) objects even when their lifetimes differ
-
multithreaded equivalent to
std::rc::Rc -
does not allow mutable access to what’s being counted; this requires a
Mutex(or one of its alternatives) -
an
Arcis necessary for threads whose lifetimes may differ to even safely access aMutex
Shared Counter Example
use std::thread;
use std::sync::{Arc, Mutex};
fn main() {
let protected_shared_counter = Arc::new(Mutex::new(0));
let threads = (0..4).map(|_| {
let counter = protected_shared_counter.clone();
thread::spawn(move || {
for _i in 0..1000000 {
if let Ok(mut v) = counter.lock() {
*v += 1; // lock is held here until `v` is dropped
}
}
})
});
threads.into_iter().for_each(|handle| { let _ = handle.join(); });
let final_value = protected_shared_counter.lock().unwrap();
println!("final_value {}", final_value);
}
Scoped Threads
-
To reduce the need for
Arc, and to directly allow the sharing of immutably borrowed objects, it is possible to synchronized the lifetimes of threads by spawning them via a sharedscope
Scoped Threads Example
use std::sync::Mutex;
fn main() {
let protected_shared_counter = Mutex::new(0);
std::thread::scope(|s| {
for _i in 0..4 {
let counter = &protected_shared_counter; // ref is moved into thread
s.spawn(move || {
for _i in 0..1000000 {
if let Ok(mut v) = counter.lock() {
*v += 1; // critical section
}
}
});
}
}); // automatic join at end of scope
let final_value = protected_shared_counter.lock().unwrap();
println!("final_value {}", final_value);
}
Scoped Threads Example (Take 2)
use std::sync::Mutex;
fn main() {
let protected_shared_counter = Mutex::new(0);
// note: this closure doesn't move, so `protected_shared_counter`
// is immutably borrowed by all threads.
std::thread::scope(|s| {
for _i in 0..4 {
s.spawn(|| {
for _i in 0..1000000 {
if let Ok(mut v) = protected_shared_counter.lock() {
*v += 1;
}
}
});
}
}); // automatic join at end of scope
let final_value = protected_shared_counter.lock().unwrap();
println!("final_value {}", final_value);
}
Side note: Beware of Arc::get_mut
-
Arcprovide shared ownership, but not mutability. -
A
get_mutmethod is provided, but it will fail withNoneif theArcis in fact shared (has a refcount > 1). -
So
Arc::new(0)does not provide an atomic integer
Arc::get_mut Non-working Example
use std::thread;
use std::sync::Arc;
fn main() {
let shared_counter = Arc::new(0);
(0..4).map(|_| {
let mut counter = shared_counter.clone();
thread::spawn(move || {
for _i in 0..1000000 {
// will fail at runtime since Arc has refcount > 1
*Arc::get_mut(&mut counter).unwrap() += 1;
}
})
}).into_iter().for_each(|handle| {
let _rc = handle.join();
});
}
Atomic Variables
-
An
Arcmay also wrap types that provide direct atomic operations (AtomicI32, etc.) -
These types provide methods for atomic read-update-write operations, e.g.
fetch_addperforms these three operations atomically- fetch current value
- add some amount
- write new value back
-
If multiple threads perform this operation, their effects will be serialized and remain consistent
-
Support for weaker memory models provided; in fact, must specify the required consistency for each operation
-
Ordering::SeqCstprovides sequential consistency of this atomic operation with respect to others. (Common case.)
Atomic Variables Example
use std::thread;
use std::sync::Arc;
use std::sync::atomic::{AtomicI32, Ordering};
fn main() {
let shared_counter = Arc::new(AtomicI32::new(0));
(0..4).map(|_| {
let counter = shared_counter.clone();
thread::spawn(move || {
for _i in 0..1000000 {
counter.fetch_add(1, Ordering::SeqCst); // atomic op
}
})
}).into_iter().for_each(|handle| {
let _ = handle.join();
});
let final_value = shared_counter.load(Ordering::SeqCst);
println!("final_value {}", final_value);
}