Accessing Shared State
-
Rust’s ownership model prevents accesses to shared state that otherwise would cause data races.
-
Several techniques exist for threads to exchange data
-
traditional shared state (
std::sync::Mutex
) - including data types supporting atomic operations
- message passing via channel
-
traditional shared state (
Mutexes std::sync::Mutex
-
Mutex<T>
here does not refer to an instance of synchronization device; rather, it describes the combination of some instance of typeT
and a lock (=mutex) -
Locking is mandatory - shared state can only be accessed via
.lock()
which provides (temporary mutually exclusive) access to the protected object within the scope of aMutexGuard
returned by.lock()
-
lock is released when guard is dropped
-
avoids common mistakes in traditional use of mutexes/locks, such access
- failing to lock
- ambiguity about which locks protects data
- or failing to unlock
-
other sources of bugs inherent in this model remain: potential for deadlock, atomicity violations - also same performance impact caused by serialization
std::sync::Arc
-
provides an atomically referenced counted, thread-safe smart pointer that allows multiple threads to safely share an (immutable) objects even when their lifetimes differ
-
multithreaded equivalent to
std::rc::Rc
-
does not allow mutable access to what’s being counted; this requires a
Mutex
(or one of its alternatives) -
an
Arc
is necessary for threads whose lifetimes may differ to even safely access aMutex
Shared Counter Example
use std::thread;
use std::sync::{Arc, Mutex};
fn main() {
let protected_shared_counter = Arc::new(Mutex::new(0));
let threads = (0..4).map(|_| {
let counter = protected_shared_counter.clone();
thread::spawn(move || {
for _i in 0..1000000 {
if let Ok(mut v) = counter.lock() {
*v += 1; // lock is held here until `v` is dropped
}
}
})
});
threads.into_iter().for_each(|handle| { let _ = handle.join(); });
let final_value = protected_shared_counter.lock().unwrap();
println!("final_value {}", final_value);
}
Scoped Threads
-
To reduce the need for
Arc
, and to directly allow the sharing of immutably borrowed objects, it is possible to synchronized the lifetimes of threads by spawning them via a sharedscope
Scoped Threads Example
use std::sync::Mutex;
fn main() {
let protected_shared_counter = Mutex::new(0);
std::thread::scope(|s| {
for _i in 0..4 {
let counter = &protected_shared_counter; // ref is moved into thread
s.spawn(move || {
for _i in 0..1000000 {
if let Ok(mut v) = counter.lock() {
*v += 1; // critical section
}
}
});
}
}); // automatic join at end of scope
let final_value = protected_shared_counter.lock().unwrap();
println!("final_value {}", final_value);
}
Scoped Threads Example (Take 2)
use std::sync::Mutex;
fn main() {
let protected_shared_counter = Mutex::new(0);
// note: this closure doesn't move, so `protected_shared_counter`
// is immutably borrowed by all threads.
std::thread::scope(|s| {
for _i in 0..4 {
s.spawn(|| {
for _i in 0..1000000 {
if let Ok(mut v) = protected_shared_counter.lock() {
*v += 1;
}
}
});
}
}); // automatic join at end of scope
let final_value = protected_shared_counter.lock().unwrap();
println!("final_value {}", final_value);
}
Side note: Beware of Arc::get_mut
-
Arc
provide shared ownership, but not mutability. -
A
get_mut
method is provided, but it will fail withNone
if theArc
is in fact shared (has a refcount > 1). -
So
Arc::new(0)
does not provide an atomic integer
Arc::get_mut
Non-working Example
use std::thread;
use std::sync::Arc;
fn main() {
let shared_counter = Arc::new(0);
(0..4).map(|_| {
let mut counter = shared_counter.clone();
thread::spawn(move || {
for _i in 0..1000000 {
// will fail at runtime since Arc has refcount > 1
*Arc::get_mut(&mut counter).unwrap() += 1;
}
})
}).into_iter().for_each(|handle| {
let _rc = handle.join();
});
}
Atomic Variables
-
An
Arc
may also wrap types that provide direct atomic operations (AtomicI32
, etc.) -
These types provide methods for atomic read-update-write operations, e.g.
fetch_add
performs these three operations atomically- fetch current value
- add some amount
- write new value back
-
If multiple threads perform this operation, their effects will be serialized and remain consistent
-
Support for weaker memory models provided; in fact, must specify the required consistency for each operation
-
Ordering::SeqCst
provides sequential consistency of this atomic operation with respect to others. (Common case.)
Atomic Variables Example
use std::thread;
use std::sync::Arc;
use std::sync::atomic::{AtomicI32, Ordering};
fn main() {
let shared_counter = Arc::new(AtomicI32::new(0));
(0..4).map(|_| {
let counter = shared_counter.clone();
thread::spawn(move || {
for _i in 0..1000000 {
counter.fetch_add(1, Ordering::SeqCst); // atomic op
}
})
}).into_iter().for_each(|handle| {
let _ = handle.join();
});
let final_value = shared_counter.load(Ordering::SeqCst);
println!("final_value {}", final_value);
}