Concurrency and Parallelism
Concurrency: Two or more tasks executing in overlapping time periods.
Parallelism: Two or more tasks executing at the same time.
Operating systems: Threads…
Programming languages: Functions, chunks of code…
Concurrency and Parallelism (2)
Concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things at once.
Operating systems: Context switching between multiple threads on a single CPU core
Programming languages: Executing multiple tasks on a single thread
Asynchronous Rust
The async
/await
keywords provides support for concurrent programming.
fn main() {
// let f: fn sync_func() -> i32
let f = sync_func;
}
fn sync_func() -> i32 {
println!("Function called!");
42
}
Asynchronous Rust
The async
/await
keywords provides support for concurrent programming.
fn main() {
// let f: fn sync_func() -> i32
let f = sync_func;
// let f: fn async_func() -> impl Future<Output = i32>
let f = async_func;
}
fn sync_func() -> i32 {
println!("Function called!");
42
}
async fn async_func() -> i32 {
println!("Function called!");
42
}
Asynchronous Rust (2)
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
pub enum Poll<T> {
Ready(T),
Pending,
}
Asynchronous Rust (2)
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
pub enum Poll<T> {
Ready(T),
Pending,
}
-
Futures have to be
poll
by an executor to check for completion. -
Simple executor:
-
Poll a future. If it is
Ready
, it is done! -
If it is
Pending
, poll another future. -
Repeat until all futures are
Ready
.
-
Poll a future. If it is
Asynchronous Rust (3)
Polling in a loop is a bad idea.
Idea: Allow futures to signal to the executor when the future should be polled again.
/// The context of an asynchronous task.
/// Currently, Context only serves to provide access to a &Waker which can be used
/// to wake the current task.
pub struct Context<'a> { /* private fields */ }
impl<'a> Context<'a> {
pub const fn waker(&self) -> &'a Waker;
}
/// A Waker is a handle for waking up a task by notifying its executor
/// that it is ready to be run.
pub struct Waker { /* private fields */ }
impl Waker {
pub fn wake(self);
pub fn wake_by_ref(&self);
}
Asynchronous Rust (4)
- Get a future off a pending queue.
-
Poll the future. If it is
Ready
, it is done! -
If it is
Pending
, get another future off the pending queue. - Block while there are no futures on the pending queue.
-
If work is available, do work and return
Ready
. -
If work is not available, schedule for
wake
to be called when it is, returnPending
.
Simple way to create a waker: Submit the future to the queue (eg. Arc<Sender<Future>>
).
Simple way to schedule a wake when work is available: Set a timer, use OS notification mechanisms like epoll
, kqueue
.
Pin
What is Pin<&mut Self>
and why is it required?
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
Pin
What is Pin<&mut Self>
and why is it required?
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
fn func(&self) {}
// is actually
fn func(self: &Self) {}
fn func_mut(&mut self) {}
// is actually
fn func_mut(self: &mut Self) {}
Pin
What is Pin<&mut Self>
and why is it required?
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
fn func(&self) {}
// is actually
fn func(self: &Self) {}
fn func_mut(&mut self) {}
// is actually
fn func_mut(self: &mut Self) {}
fn func_box(self: Box<Self>) {}
fn func_box(self: Rc<Self>) {}
fn func_box(self: Arc<Self>) {}
fn func_pin(self: Pin<&Self>) {}
Async Desugar: Two Futures
async fn func() {
sleep(Duration::from_secs(1)).await;
sleep(Duration::from_secs(2)).await;
}
conceptually desugars to:
struct Generated {
fut_a: Timer,
fut_b: Timer,
state: State,
}
enum State {
FutOne,
FutTwo,
}
impl Future for Generated {
...
Async Desugar: Two Futures
impl Future for Generated {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
loop {
match self.state {
State::FutOne => match self.fut_a.poll(cx) {
Poll::Ready(()) => self.state = State::FutTwo,
Poll::Pending => return Poll::Pending,
},
State::FutTwo => match self.fut_b.poll(cx) {
Poll::Ready(()) => return Poll::Ready(()),
Poll::Pending => return Poll::Pending,
},
}
}
}
}
Async Desugar: Function Arguments
What about function arguments?
async fn get_string() -> String {
String::from("hello!")
}
async fn print(s: String) -> String {
println!("{}", s);
s
}
async fn func() {
let s = get_string().await;
let s = print(s).await;
println!("{}", s);
}
Async Desugar: Function Arguments
struct GetString;
struct Print {
s: String,
}
struct Generated {
state: State,
}
enum State {
FutOne(GetString),
FutTwo(Print),
}
Async Desugar: Function Arguments
impl Future for Generated {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
loop {
match self.state {
State::FutOne(fut) => match fut.poll(cx) {
Poll::Ready(s) => self.state = State::FutTwo(Print { s }),
Poll::Pending => return Poll::Pending,
},
State::FutTwo(fut) => match self.fut.poll(cx) {
Poll::Ready(s) => {
println!("{s}");
return Poll::Ready(());
}
Poll::Pending => return Poll::Pending,
},
}
}
}
}
Async Desugar: Local Variables
Modification 1:
async fn get_string() -> String {
String::from("hello!")
}
async fn print(s: String) -> String {
println!("{}", s);
s
}
async fn func() {
let s = get_string().await;
print(String::from("crab")).await;
println!("{}", s);
}
Async Desugar: Local Variables
struct GetString;
struct Print {
s: String,
}
struct Generated {
s: Option<String>,
state: State,
}
enum State {
FutOne(GetString),
FutTwo(Print),
}
Async Desugar: Local Variables
impl Future for Generated {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
loop {
match self.state {
State::FutOne(fut) => match fut.poll(cx) {
Poll::Ready(s) => {
self.s = Some(s);
self.state = State::FutTwo(Print { s: String::from("crab") }),
}
Poll::Pending => return Poll::Pending,
},
State::FutTwo(fut) => match self.fut.poll(cx) {
Poll::Ready(_) => {
println!("{}", self.s.take().unwrap());
return Poll::Ready(());
}
Poll::Pending => return Poll::Pending,
},
}
}
}
}
Async Desugar: References
Modification 2:
async fn get_string() -> String {
String::from("hello!")
}
async fn print(s: &str) {
println!("{}", s);
}
async fn func() {
let s = get_string().await;
print(&s).await;
println!("{}", s);
}
Async Desugar: References
struct GetString;
struct Print<'a> {
s: &'a str,
}
struct Generated {
s: Option<String>,
state: State<'??????>,
}
enum State<'a> {
FutOne(GetString),
FutTwo(Print<'a>),
}
The Print
future stored in self.state
needs a reference to the String
in Generated
.
Solution: Unsafe code and self-referential structs.
Self-Referential Structs
#[derive(Debug)]
struct Test {
s: String,
p: *const String,
}
impl Test {
fn new(s: String) -> Self {
Test { s, p: std::ptr::null() }
}
fn init(&mut self) {
let ptr: *const String = &self.s;
self.p = ptr;
}
fn s(&self) -> &str {
&self.s
}
fn p(&self) -> &str {
assert!(!self.p.is_null());
unsafe { &*(self.p) }
}
}
Self-Referential Structs
fn main() {
let mut test_a = Test::new(String::from("test_a"));
test_a.init();
let mut test_b = Test::new(String::from("test_b"));
test_b.init();
println!("test_a: s: {}, p: {}", test_a.s(), test_a.p());
println!("test_b: s: {}, p: {}", test_b.s(), test_b.p());
}
outputs
test_a: s: test_a, p: test_atest_b: s: test_b, p: test_b
It works!
Self-Referential Structs
fn main() {
let mut test_a = Test::new(String::from("test_a"));
test_a.init();
let mut test_b = Test::new(String::from("test_b"));
test_b.init();
std::mem::swap(&mut test_a, &mut test_b);
println!("test_a: s: {}, p: {}", test_a.s(), test_a.p());
println!("test_b: s: {}, p: {}", test_b.s(), test_b.p());
}
outputs
test_a: s: test_b, p: test_atest_b: s: test_a, p: test_b
Uh oh…
Self-Referential Structs
fn main() {
let mut test_a = Test::new(String::from("test_a"));
test_a.init();
let mut test_b = Test::new(String::from("test_b"));
test_b.init();
std::mem::swap(&mut test_a, &mut test_b);
drop(test_a);
println!("test_b: s: {}, p: {}", test_b.s(), test_b.p());
}
outputs
test_b: s: test_a, p: R�ԣ
Oh no.
Pin (2)
use std::pin::Pin;
use std::marker::PhantomPinned;
#[derive(Debug)]
struct Test {
s: String,
p: *const String,
_marker: PhantomPinned,
}
impl Test {
fn new(s: String) -> Self {
Test {
s, p: std::ptr::null(),
_marker: PhantomPinned
}
}
}
impl Test {
fn init(self: Pin<&mut Self>) {
let ptr: *const String = &self.s;
let this = unsafe {
self.get_unchecked_mut()
};
this.p = ptr;
}
fn s(self: Pin<&Self>) -> &str {
&self.get_ref().s
}
fn p(self: Pin<&Self>) -> &str {
assert!(!self.p.is_null());
unsafe { &*(self.p) }
}
}
Pin (2)
fn main() {
let test_a = Test::new(String::from("test_a"));
let mut test_a = std::pin::pin!(test_a);
test_a.as_mut().init();
let test_b = Test::new(String::from("test_b"));
let mut test_b = std::pin::pin!(test_b);
test_b.as_mut().init();
println!("test_a: s: {}, p: {}", test_a.as_ref().s(), test_a.as_ref().p());
println!("test_b: s: {}, p: {}", test_b.as_ref().s(), test_b.as_ref().p());
}
outputs
test_a: s: test_a, p: test_atest_b: s: test_b, p: test_b
Pin (2)
fn main() {
let test_a = Test::new(String::from("test_a"));
let mut test_a = std::pin::pin!(test_a);
test_a.as_mut().init();
let test_b = Test::new(String::from("test_b"));
let mut test_b = std::pin::pin!(test_b);
test_b.as_mut().init();
std::mem::swap(test_a.get_mut(), test_b.get_mut());
println!("test_a: s: {}, p: {}", test_a.as_ref().s(), test_a.as_ref().p());
println!("test_b: s: {}, p: {}", test_b.as_ref().s(), test_b.as_ref().p());
}
Pin (2)
error[E0277]: `PhantomPinned` cannot be unpinned --> src/main.rs:45:27 |45 | std::mem::swap(test_a.get_mut(), test_b.get_mut()); | ^^^^^^^ within `Test`, the trait `Unpin` is not implemented for `PhantomPinned`, which is required by `Test: Unpin` |error[E0277]: `PhantomPinned` cannot be unpinned --> src/main.rs:45:45 |45 | std::mem::swap(test_a.get_mut(), test_b.get_mut()); | ^^^^^^^ within `Test`, the trait `Unpin` is not implemented for `PhantomPinned`, which is required by `Test: Unpin`
Pin (2)
impl Test {
fn new(s: String) -> Pin<Box<Self>> {
let t = Test {
s,
p: std::ptr::null(),
_marker: PhantomPinned,
};
let mut boxed = Box::new(t);
let ptr: *const String = &boxed.s;
boxed.p = ptr;
Box::into_pin(boxed)
}
}
Pinning Test
to the heap ensures the data will be pinned for the lifetime of the containing Box
.
Pin (2)
fn main() {
let mut test_a = Test::new(String::from("test_a"));
let mut test_b = Test::new(String::from("test_b"));
println!("test_a: s: {}, p: {}", test_a.as_ref().s(), test_a.as_ref().p());
println!("test_b: s: {}, p: {}", test_b.as_ref().s(), test_b.as_ref().p());
}
test_a: s: test_a, p: test_atest_b: s: test_b, p: test_b
What is Pin?
-
The
Pin
type allows you to ensure a given valueT
will not be moved or invalidated until it is dropped. -
What does moving mean?
- Semantically: transfer of ownership (eg. through assignment or argument passing)
-
Mechanically: bitwise copy of bytes from one memory location to another (basically
Copy
)
-
What does invalidating mean?
-
The memory location of a currently used value of type
T
is repurposed
-
The memory location of a currently used value of type
-
What does dropped mean?
-
The value
T
’s implementation ofDrop::drop
is called
-
The value
What does Pin do?
#[repr(transparent)]
pub struct Pin<Ptr> { /* private fields */ }
What does Pin do?
#[repr(transparent)]
pub struct Pin<Ptr> { /* private fields */ }
-
Ptr
is a pointer to a typeT
, for example&T
,&mut T
, orBox<T>
-
The guarantee of
Pin
is as long as you have eg. aPin<&T>
,T
will not be moved or invalidated.
What does Pin do?
#[repr(transparent)]
pub struct Pin<Ptr> { /* private fields */ }
-
Ptr
is a pointer to a typeT
, for example&T
,&mut T
, orBox<T>
-
The guarantee of
Pin
is as long as you have aPin<&T>
,T
will not be moved or invalidated.
We can move a Pin
, so if we could do Pin<T>
then moving the pin also moves T
, which defeats the whole purpose of pinning!
How can Pin be used?
struct Test {
x: i32,
}
impl Test {
fn method(self: Pin<&Self>) {}
}
fn method(x: Pin<&mut i32>) {}
-
Calling
Test::method
ormethod
requires pinningTest
andx
respectively first. -
This is a restriction to the caller to ensure the callee has the guarantees of
Pin
.
How can Pin be used? (2)
fn method(x: Pin<&mut i32>) {
let mut y = 5;
let rx: &mut i32 = x.get_mut();
std::mem::replace(rx, &mut y);
}
Why does this work? Isn’t getting the reference rx
defeating the Pin
guarantees?
We can destruct, invalidate, or move the i32
value that is pinned without unsafe code :0
How can Pin be used? (2)
fn method(x: Pin<&mut i32>) {
let mut y = 5;
let rx: &mut i32 = x.get_mut();
std::mem::swap(rx, &mut y);
}
Why does this work? Isn’t getting the reference rx
defeating the Pin
guarantees?
We can destruct, invalidate, or move the i32
value that is pinned without unsafe code :0
If we check the documentation for i32
, we see:
Unpin
pub auto trait Unpin { }
-
Unpin
is an auto trait. -
Unpin
says thatT
does not need the pinning guarantees ofPin
, so we can get a mutable reference, mutate, destroy theT
behind aPin
as we see fit.
Unpin
pub auto trait Unpin { }
-
Unpin
is an auto trait. -
Unpin
says thatT
does not need the pinning guarantees ofPin
, so we can get a mutable reference, mutate, destroy theT
behind aPin
as we see fit.
How do I opt-out of Unpin
?
If you have types that rely on the fact that the type must be pinned, you need to opt-out with the PhantomPinned
marker type:
use std::{marker::PhantomPinned, pin::Pin};
#[derive(Default)]
struct Test {
_pin: PhantomPinned,
}
Unpin (2)
use std::{marker::PhantomPinned, pin::Pin};
#[derive(Default)]
struct Test {
x: i32,
_pin: PhantomPinned,
}
fn method(x: Pin<&mut Test>) {
let mut y = Test::default();
let rx: &mut Test = x.get_mut();
std::mem::replace(rx, &mut y);
}
error[E0277]: `PhantomPinned` cannot be unpinned --> src/lib.rs:12:27