Learning Rust: Polling

Adrian Macal
Level Up Coding
Published in
9 min readMar 28, 2024

--

Are Rust’s futures a black box for you? Let’s open this box and see what’s inside!

The async/await paradigm is everywhere. You are likely accustomed to writing a lot of it without considering what is behind it. In this story, full of examples, we are going to explore how async/await works. It will be a progressive journey from very basic snippets to more advanced ones.

Let’s start from a baseline. Have you ever wondered how to call your async block without using Tokio or any other crate that makes your main method also async?

use std::future::Future;
use std::ptr::null;
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};

fn wake(_data: *const ()) {}
fn noop(_data: *const ()) {}

static VTABLE: RawWakerVTable =
RawWakerVTable::new(|data| RawWaker::new(data, &VTABLE), wake, wake, noop);

pub fn main() {
let task = async { 13 };

let waker = RawWaker::new(null(), &VTABLE);
let waker = unsafe { Waker::from_raw(waker) };

let mut cx = Context::from_waker(&waker);
let mut task = Box::pin(task);

match task.as_mut().poll(&mut cx) {
Poll::Ready(value) => println!("ready {value:?}"),
Poll::Pending => println!("pending"),
}
}

The snippet does it. It aims to call a very naive async block that returns the number 13. However, the call isn’t so straightforward. Each async block returns a future, and each future has exactly one function to poll it.

To poll a future, we need to pass a context, which contains a waker — something that will be woken when the code execution is unblocked. We constructed a very naive waker that does nothing. It simply exists.

Having a context, we can pin the future and call its poll function. The function returns an enum with two variants:

  • Pending, indicating that we are not ready to obtain the final value.
  • Ready, holding the final future value.

Can you guess what the code will print out?

ready 13

What if we introduced a delay in our async function? Perhaps a timer waiting for one second. It is expected that an immediate call to the poll function won’t return a value. The value would be returned not earlier than one second later.

use std::future::Future;
use std::ptr::null;
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use std::time::Duration;
use async_std::task;

fn wake(_data: *const ()) {}
fn noop(_data: *const ()) {}

static VTABLE: RawWakerVTable =
RawWakerVTable::new(|data| RawWaker::new(data, &VTABLE), wake, wake, noop);

pub fn main() {
let task = async {
task::sleep(Duration::from_secs(1)).await;
13
};

let waker = RawWaker::new(null(), &VTABLE);
let waker = unsafe { Waker::from_raw(waker) };

let mut elapsed = 0;
let mut cx = Context::from_waker(&waker);
let mut task = Box::pin(task);

loop {
match task.as_mut().poll(&mut cx) {
Poll::Ready(value) => break println!("{elapsed:>4} ready {value:?}"),
Poll::Pending => println!("{elapsed:>4} pending"),
}

std::thread::sleep(Duration::from_millis(300));
elapsed += 300;
}
}

We introduced a loop to check the polling result every 300ms. How many polls are needed to obtain a ready variant with a number?

   0 pending
300 pending
600 pending
900 pending
1200 ready 13

Each async block may create new objects on the stack or the heap, which may live between await calls. Do we know how long they live?

use async_std::task;
use std::cell::Cell;
use std::future::Future;
use std::ptr::null;
use std::rc::Rc;
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use std::time::Duration;

fn wake(_data: *const ()) {}
fn noop(_data: *const ()) {}

static VTABLE: RawWakerVTable =
RawWakerVTable::new(|data| RawWaker::new(data, &VTABLE), wake, wake, noop);

pub struct Resource {
value: i32,
elapsed: Rc<Cell<i32>>,
}

impl Resource {
pub fn result(&self) -> i32 {
self.value
}
}

impl Drop for Resource {
fn drop(&mut self) {
println!("{:>4} dropped", self.elapsed.get());
}
}

pub fn main() {
let elapsed = Rc::new(Cell::new(0));
let copied = elapsed.clone();

let task = async {
let resource = Resource { value: 13, elapsed: copied };
task::sleep(Duration::from_secs(1)).await;
resource.result()
};

let waker = RawWaker::new(null(), &VTABLE);
let waker = unsafe { Waker::from_raw(waker) };

let mut cx = Context::from_waker(&waker);
let mut task = Box::pin(task);

loop {
match task.as_mut().poll(&mut cx) {
Poll::Ready(value) => break println!("{:>4} ready {value:?}", elapsed.get()),
Poll::Pending => println!("{:>4} pending", elapsed.get()),
}

std::thread::sleep(Duration::from_millis(300));
elapsed.set(elapsed.get() + 300);
}
}

In this example, we introduced a resource producing the final value to be returned. For this resource, we implemented a Drop trait to track its lifetime. Can you guess when it will be eliminated?

   0 pending
300 pending
600 pending
900 pending
1200 dropped
1200 ready 13

Yes, within an async block, but before pending returns a value.

What if the future never has a chance to report its readiness? Let’s simulate this by pending only once to check if a Drop trait will be called.

use async_std::task;
use std::cell::Cell;
use std::future::Future;
use std::ptr::null;
use std::rc::Rc;
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use std::time::Duration;

fn wake(_data: *const ()) {}
fn noop(_data: *const ()) {}

static VTABLE: RawWakerVTable =
RawWakerVTable::new(|data| RawWaker::new(data, &VTABLE), wake, wake, noop);

pub struct Resource {
value: i32,
elapsed: Rc<Cell<i32>>,
}

impl Resource {
pub fn result(&self) -> i32 {
self.value
}
}

impl Drop for Resource {
fn drop(&mut self) {
println!("{:>4} dropped", self.elapsed.get());
}
}

pub fn main() {
let elapsed = Rc::new(Cell::new(0));
let copied = elapsed.clone();

let task = async {
let resource = Resource { value: 13, elapsed: copied };
task::sleep(Duration::from_secs(1)).await;
resource.result()
};

let waker = RawWaker::new(null(), &VTABLE);
let waker = unsafe { Waker::from_raw(waker) };

let mut cx = Context::from_waker(&waker);
let mut task = Box::pin(task);

match task.as_mut().poll(&mut cx) {
Poll::Ready(value) => println!("{:>4} ready {value:?}", elapsed.get()),
Poll::Pending => println!("{:>4} pending", elapsed.get()),
}

std::thread::sleep(Duration::from_millis(300));
elapsed.set(elapsed.get() + 300);

println!("{:>4} completed", elapsed.get())
}

The code does the same thing that the previous snippet did. Can you guess when the Drop trait will be called?

   0 pending
300 completed
300 dropped

You are right! It was called at the end of the main function scope, exactly when the task variable was dropped, triggering the dropping of the abandoned resource. This indicates that the future holds some variables present in the async block.

Is abandoning a started task a real-world scenario at all? It is exactly what will happen when you use the select! macro. Let’s analyze the following code:

use async_std::task;
use std::cell::Cell;
use std::future::Future;
use std::ptr::null;
use std::rc::Rc;
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use std::time::Duration;

fn wake(_data: *const ()) {}
fn noop(_data: *const ()) {}

static VTABLE: RawWakerVTable =
RawWakerVTable::new(|data| RawWaker::new(data, &VTABLE), wake, wake, noop);

pub struct Resource {
value: i32,
elapsed: Rc<Cell<i32>>,
}

impl Resource {
pub fn result(&self) -> i32 {
self.value
}
}

impl Drop for Resource {
fn drop(&mut self) {
println!("{:>4} dropped {}", self.elapsed.get(), self.value);
}
}

pub fn main() {
let elapsed = Rc::new(Cell::new(0));

let copied13 = elapsed.clone();
let task13 = async {
let resource = Resource { value: 13, elapsed: copied13 };
task::sleep(Duration::from_secs(1)).await;
resource.result()
};

let copied19 = elapsed.clone();
let task19 = async {
let resource = Resource { value: 19, elapsed: copied19 };
task::sleep(Duration::from_secs(2)).await;
resource.result()
};

let waker = RawWaker::new(null(), &VTABLE);
let waker = unsafe { Waker::from_raw(waker) };

let mut cx = Context::from_waker(&waker);
let mut task13 = Box::pin(task13);
let mut task19 = Box::pin(task19);

loop {
match task13.as_mut().poll(&mut cx) {
Poll::Ready(value) => break println!("{:>4} ready {value:?}", elapsed.get()),
Poll::Pending => println!("{:>4} pending 13", elapsed.get()),
}

match task19.as_mut().poll(&mut cx) {
Poll::Ready(value) => break println!("{:>4} ready {value:?}", elapsed.get()),
Poll::Pending => println!("{:>4} pending 19", elapsed.get()),
}

std::thread::sleep(Duration::from_millis(300));
elapsed.set(elapsed.get() + 300);
}

println!("{:>4} completed", elapsed.get())
}

This time we have two async blocks, each of which may return either 13 or 19. We are polling both of them. When any of them returns readiness, we will stop looping, and the other one will be abandoned. Can you guess which one will be abandoned?

   0 pending 13
0 pending 19
300 pending 13
300 pending 19
600 pending 13
600 pending 19
900 pending 13
900 pending 19
1200 dropped 13
1200 ready 13
1200 completed
1200 dropped 19

The future will be finally dropped. Is your async code ready to be interrupted at an await line?

We used an async block to generate a future for us. Did you know you could write a future on your own? It’s just a trait to implement. Let’s try it out.

use async_std::task;
use std::cell::Cell;
use std::future::Future;
use std::pin::Pin;
use std::ptr::null;
use std::rc::Rc;
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use std::time::Duration;

fn wake(_data: *const ()) {}
fn noop(_data: *const ()) {}

static VTABLE: RawWakerVTable =
RawWakerVTable::new(|data| RawWaker::new(data, &VTABLE), wake, wake, noop);

pub struct Resource {
value: i32,
elapsed: Rc<Cell<i32>>,
}

impl Resource {
pub fn result(&self) -> i32 {
self.value
}
}

impl Drop for Resource {
fn drop(&mut self) {
println!("{:>4} dropped {}", self.elapsed.get(), self.value);
}
}

pub struct Ticker {
timer: Pin<Box<dyn Future<Output = ()>>>,
resource: Resource,
}

impl Ticker {
pub fn new(value: i32, elapsed: Rc<Cell<i32>>) -> Self {
Self {
timer: Box::pin(task::sleep(Duration::from_secs(1))),
resource: Resource {
value: value,
elapsed: elapsed,
},
}
}
}

impl Future for Ticker {
type Output = i32;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.timer.as_mut().poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(()) => Poll::Ready(self.resource.value),
}
}
}

pub fn main() {
let elapsed = Rc::new(Cell::new(0));
let task13 = Ticker::new(13, elapsed.clone());
let task19 = Ticker::new(19, elapsed.clone());

let waker = RawWaker::new(null(), &VTABLE);
let waker = unsafe { Waker::from_raw(waker) };

let mut cx = Context::from_waker(&waker);
let mut task13 = Box::pin(task13);
let mut task19 = Box::pin(task19);

loop {
match task13.as_mut().poll(&mut cx) {
Poll::Ready(value) => break println!("{:>4} ready {value:?}", elapsed.get()),
Poll::Pending => println!("{:>4} pending 13", elapsed.get()),
}

match task19.as_mut().poll(&mut cx) {
Poll::Ready(value) => break println!("{:>4} ready {value:?}", elapsed.get()),
Poll::Pending => println!("{:>4} pending 19", elapsed.get()),
}

std::thread::sleep(Duration::from_millis(300));
elapsed.set(elapsed.get() + 300);
}

println!("{:>4} completed", elapsed.get())
}

The code introduced a struct called Ticker, which also implements Future. We brought a pinned returned future of the sleep call. Do you think the code works as the previous snippet did?

   0 pending 13
0 pending 19
300 pending 13
300 pending 19
600 pending 13
600 pending 19
900 pending 13
900 pending 19
1200 ready 13
1200 completed
1200 dropped 19
1200 dropped 13

Very similar, but NO. It didn’t work as the previous snippet did. You can notice that both resources were dropped together with both futures, whereas we expected that the one returning readiness would be dropped before returning its value.

What can we do to fix this behavior? And what actually happened? The problem is that we created a new instance of the Resource too early. This instance should be created just when the first poll on our future is called.

use async_std::task;
use std::cell::Cell;
use std::future::Future;
use std::mem;
use std::pin::Pin;
use std::ptr::null;
use std::rc::Rc;
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use std::time::Duration;

fn wake(_data: *const ()) {}
fn noop(_data: *const ()) {}

static VTABLE: RawWakerVTable =
RawWakerVTable::new(|data| RawWaker::new(data, &VTABLE), wake, wake, noop);

pub struct Resource {
value: i32,
elapsed: Rc<Cell<i32>>,
}

impl Resource {
pub fn result(&self) -> i32 {
self.value
}
}

impl Drop for Resource {
fn drop(&mut self) {
println!("{:>4} dropped {}", self.elapsed.get(), self.value);
}
}

pub enum State {
Init {
value: i32,
elapsed: Rc<Cell<i32>>,
},
Waiting {
resource: Resource,
timer: Pin<Box<dyn Future<Output = ()>>>,
},
Completed {},
}

pub struct Ticker {
state: State,
}

impl Ticker {
pub fn new(value: i32, elapsed: Rc<Cell<i32>>) -> Self {
Self {
state: State::Init {
value: value,
elapsed: elapsed,
},
}
}
}

impl Future for Ticker {
type Output = i32;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let self_mut = self.get_mut();

loop {
let mut state = State::Completed {};
mem::swap(&mut self_mut.state, &mut state);

let (poll, state) = match state {
State::Init { value, elapsed } => {
let state = State::Waiting {
timer: Box::pin(task::sleep(Duration::from_secs(1))),
resource: Resource {
value: value,
elapsed: elapsed,
},
};

(None, state)
}
State::Waiting {
resource,
mut timer,
} => match timer.as_mut().poll(cx) {
Poll::Pending => (Some(Poll::Pending), State::Waiting { resource, timer }),
Poll::Ready(()) => (Some(Poll::Ready(resource.value)), State::Completed {}),
},
State::Completed {} => panic!("Future cannot be polled again."),
};

self_mut.state = state;

if let Some(poll) = poll {
return poll;
}
}
}
}

pub fn main() {
let elapsed = Rc::new(Cell::new(0));
let task13 = Ticker::new(13, elapsed.clone());
let task19 = Ticker::new(19, elapsed.clone());

let waker = RawWaker::new(null(), &VTABLE);
let waker = unsafe { Waker::from_raw(waker) };

let mut cx = Context::from_waker(&waker);
let mut task13 = Box::pin(task13);
let mut task19 = Box::pin(task19);

loop {
match task13.as_mut().poll(&mut cx) {
Poll::Ready(value) => break println!("{:>4} ready {value:?}", elapsed.get()),
Poll::Pending => println!("{:>4} pending 13", elapsed.get()),
}

match task19.as_mut().poll(&mut cx) {
Poll::Ready(value) => break println!("{:>4} ready {value:?}", elapsed.get()),
Poll::Pending => println!("{:>4} pending 19", elapsed.get()),
}

std::thread::sleep(Duration::from_millis(300));
elapsed.set(elapsed.get() + 300);
}

println!("{:>4} completed", elapsed.get())
}

The code defines a state machine to keep the initially received variables, wait for a timer, and indicate the completeness of the future. We start from the initial state assigned in the constructor. In each poll iteration, we check it and decide on the potentially returned result and the next state. With these extra few lines, we managed to reproduce the expected behavior.

   0 pending 13
0 pending 19
300 pending 13
300 pending 19
600 pending 13
600 pending 19
900 pending 13
900 pending 19
1200 dropped 13
1200 ready 13
1200 completed
1200 dropped 19

You might wonder why we panic. There’s a rule for futures that should not be broken: Once a future is completed, it cannot be called a second time.

I hope that in this short story, I could help you demystify how futures work a bit. Perhaps next time, before you type your next async/await code, you will consider that what you’ve written won’t be executed linearly and that the code may not manage progress until the end.

--

--

Software Developer, Data Engineer with solid knowledge of Business Intelligence. Passionate about programming.