pub struct WaitMap<K, V>where
K: PartialEq,{ /* private fields */ }
Expand description
A map of Waker
s associated with keys, allowing tasks to be woken by
their key.
A WaitMap
allows any number of tasks to wait asynchronously and be
woken when a value with a certain key arrives. This can be used to
implement structures like “async mailboxes”, where an async function
requests some data (such as a response) associated with a certain
key (such as a message ID). When the data is received, the key can
be used to provide the task with the desired data, as well as wake
the task for further processing.
§Examples
Waking a single task at a time by calling wake
:
use std::sync::Arc;
use maitake::scheduler;
use maitake_sync::wait_map::{WaitMap, WakeOutcome};
const TASKS: usize = 10;
// In order to spawn tasks, we need a `Scheduler` instance.
let scheduler = Scheduler::new();
// Construct a new `WaitMap`.
let q = Arc::new(WaitMap::new());
// Spawn some tasks that will wait on the queue.
// We'll use the task index (0..10) as the key.
for i in 0..TASKS {
let q = q.clone();
scheduler.spawn(async move {
let val = q.wait(i).await.unwrap();
assert_eq!(val, i + 100);
});
}
// Tick the scheduler once.
let tick = scheduler.tick();
// No tasks should complete on this tick, as they are all waiting
// to be woken by the queue.
assert_eq!(tick.completed, 0, "no tasks have been woken");
// We now wake each of the tasks, using the same key (0..10),
// and provide them with a value that is their `key + 100`,
// e.g. 100..110. Only the task that has been woken will be
// notified.
for i in 0..TASKS {
let result = q.wake(&i, i + 100);
assert!(matches!(result, WakeOutcome::Woke));
// Tick the scheduler.
let tick = scheduler.tick();
// Exactly one task should have completed
assert_eq!(tick.completed, 1);
}
// Tick the scheduler.
let tick = scheduler.tick();
// No additional tasks should be completed
assert_eq!(tick.completed, 0);
assert!(!tick.has_remaining);
§Implementation Notes
This type is currently implemented using intrusive doubly-linked list.
The intrusive aspect of this map is important, as it means that it does not allocate memory. Instead, nodes in the linked list are stored in the futures of tasks trying to wait for capacity. This means that it is not necessary to allocate any heap memory for each task waiting to be woken.
However, the intrusive linked list introduces one new danger: because futures can be cancelled, and the linked list nodes live within the futures trying to wait on the queue, we must ensure that the node is unlinked from the list before dropping a cancelled future. Failure to do so would result in the list containing dangling pointers. Therefore, we must use a doubly-linked list, so that nodes can edit both the previous and next node when they have to remove themselves. This is kind of a bummer, as it means we can’t use something nice like this intrusive queue by Dmitry Vyukov, and there are not really practical designs for lock-free doubly-linked lists that don’t rely on some kind of deferred reclamation scheme such as hazard pointers or QSBR.
Instead, we just stick a Mutex
around the linked list, which must be
acquired to pop nodes from it, or for nodes to remove themselves when
futures are cancelled. This is a bit sad, but the critical sections for this
mutex are short enough that we still get pretty good performance despite it.
Implementations§
Source§impl<K, V> WaitMap<K, V>where
K: PartialEq,
impl<K, V> WaitMap<K, V>where
K: PartialEq,
Sourcepub fn wake(&self, key: &K, val: V) -> WakeOutcome<V>
pub fn wake(&self, key: &K, val: V) -> WakeOutcome<V>
Wake a certain task in the queue.
If the queue is empty, a wakeup is stored in the WaitMap
, and the
next call to wait
will complete immediately.
Sourcepub fn close(&self)
pub fn close(&self)
Close the queue, indicating that it may no longer be used.
Once a queue is closed, all wait
calls (current or future) will
return an error.
This method is generally used when implementing higher-level synchronization primitives or resources: when an event makes a resource permanently unavailable, the queue can be closed.
Sourcepub fn wait(&self, key: K) -> Wait<'_, K, V> ⓘ
pub fn wait(&self, key: K) -> Wait<'_, K, V> ⓘ
Wait to be woken up by this queue.
This returns a Wait
future that will complete when the task is
woken by a call to wake
with a matching key
, or when the WaitMap
is dropped.
Note: key
s must be unique. If the given key already exists in the
WaitMap
, the future will resolve to an Error the first time it is polled
Source§impl<K, V> WaitMap<K, V>where
K: PartialEq,
impl<K, V> WaitMap<K, V>where
K: PartialEq,
Sourcepub fn wait_owned(self: &Arc<WaitMap<K, V>>, key: K) -> WaitOwned<K, V> ⓘ
pub fn wait_owned(self: &Arc<WaitMap<K, V>>, key: K) -> WaitOwned<K, V> ⓘ
Wait to be woken up by this queue, returning a future that’s valid
for the 'static
lifetime.
This is identical to the wait
method, except that it takes a
Arc
reference to the WaitMap
, allowing the returned future to
live for the 'static
lifetime.
This returns a WaitOwned
future that will complete when the task is
woken by a call to wake
with a matching key
, or when the WaitMap
is dropped.
Note: key
s must be unique. If the given key already exists in the
WaitMap
, the future will resolve to an Error the first time it is polled