summaryrefslogtreecommitdiff
path: root/src/header.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/header.rs')
-rw-r--r--src/header.rs162
1 files changed, 162 insertions, 0 deletions
diff --git a/src/header.rs b/src/header.rs
new file mode 100644
index 0000000..8a3a0b9
--- /dev/null
+++ b/src/header.rs
@@ -0,0 +1,162 @@
+use core::cell::UnsafeCell;
+use core::fmt;
+use core::sync::atomic::{AtomicUsize, Ordering};
+use core::task::Waker;
+
+use crate::raw::TaskVTable;
+use crate::state::*;
+use crate::utils::abort_on_panic;
+
+/// The header of a task.
+///
+/// This header is stored in memory at the beginning of the heap-allocated task.
+pub(crate) struct Header {
+ /// Current state of the task.
+ ///
+ /// Contains flags representing the current state and the reference count.
+ pub(crate) state: AtomicUsize,
+
+ /// The task that is blocked on the `Task` handle.
+ ///
+ /// This waker needs to be woken up once the task completes or is closed.
+ pub(crate) awaiter: UnsafeCell<Option<Waker>>,
+
+ /// The virtual table.
+ ///
+ /// In addition to the actual waker virtual table, it also contains pointers to several other
+ /// methods necessary for bookkeeping the heap-allocated task.
+ pub(crate) vtable: &'static TaskVTable,
+}
+
+impl Header {
+ /// Notifies the awaiter blocked on this task.
+ ///
+ /// If the awaiter is the same as the current waker, it will not be notified.
+ #[inline]
+ pub(crate) fn notify(&self, current: Option<&Waker>) {
+ if let Some(w) = self.take(current) {
+ abort_on_panic(|| w.wake());
+ }
+ }
+
+ /// Takes the awaiter blocked on this task.
+ ///
+ /// If there is no awaiter or if it is the same as the current waker, returns `None`.
+ #[inline]
+ pub(crate) fn take(&self, current: Option<&Waker>) -> Option<Waker> {
+ // Set the bit indicating that the task is notifying its awaiter.
+ let state = self.state.fetch_or(NOTIFYING, Ordering::AcqRel);
+
+ // If the task was not notifying or registering an awaiter...
+ if state & (NOTIFYING | REGISTERING) == 0 {
+ // Take the waker out.
+ let waker = unsafe { (*self.awaiter.get()).take() };
+
+ // Unset the bit indicating that the task is notifying its awaiter.
+ self.state
+ .fetch_and(!NOTIFYING & !AWAITER, Ordering::Release);
+
+ // Finally, notify the waker if it's different from the current waker.
+ if let Some(w) = waker {
+ match current {
+ None => return Some(w),
+ Some(c) if !w.will_wake(c) => return Some(w),
+ Some(_) => abort_on_panic(|| drop(w)),
+ }
+ }
+ }
+
+ None
+ }
+
+ /// Registers a new awaiter blocked on this task.
+ ///
+ /// This method is called when `Task` is polled and it has not yet completed.
+ #[inline]
+ pub(crate) fn register(&self, waker: &Waker) {
+ // Load the state and synchronize with it.
+ let mut state = self.state.fetch_or(0, Ordering::Acquire);
+
+ loop {
+ // There can't be two concurrent registrations because `Task` can only be polled
+ // by a unique pinned reference.
+ debug_assert!(state & REGISTERING == 0);
+
+ // If we're in the notifying state at this moment, just wake and return without
+ // registering.
+ if state & NOTIFYING != 0 {
+ abort_on_panic(|| waker.wake_by_ref());
+ return;
+ }
+
+ // Mark the state to let other threads know we're registering a new awaiter.
+ match self.state.compare_exchange_weak(
+ state,
+ state | REGISTERING,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => {
+ state |= REGISTERING;
+ break;
+ }
+ Err(s) => state = s,
+ }
+ }
+
+ // Put the waker into the awaiter field.
+ unsafe {
+ abort_on_panic(|| (*self.awaiter.get()) = Some(waker.clone()));
+ }
+
+ // This variable will contain the newly registered waker if a notification comes in before
+ // we complete registration.
+ let mut waker = None;
+
+ loop {
+ // If there was a notification, take the waker out of the awaiter field.
+ if state & NOTIFYING != 0 {
+ if let Some(w) = unsafe { (*self.awaiter.get()).take() } {
+ abort_on_panic(|| waker = Some(w));
+ }
+ }
+
+ // The new state is not being notified nor registered, but there might or might not be
+ // an awaiter depending on whether there was a concurrent notification.
+ let new = if waker.is_none() {
+ (state & !NOTIFYING & !REGISTERING) | AWAITER
+ } else {
+ state & !NOTIFYING & !REGISTERING & !AWAITER
+ };
+
+ match self
+ .state
+ .compare_exchange_weak(state, new, Ordering::AcqRel, Ordering::Acquire)
+ {
+ Ok(_) => break,
+ Err(s) => state = s,
+ }
+ }
+
+ // If there was a notification during registration, wake the awaiter now.
+ if let Some(w) = waker {
+ abort_on_panic(|| w.wake());
+ }
+ }
+}
+
+impl fmt::Debug for Header {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ let state = self.state.load(Ordering::SeqCst);
+
+ f.debug_struct("Header")
+ .field("scheduled", &(state & SCHEDULED != 0))
+ .field("running", &(state & RUNNING != 0))
+ .field("completed", &(state & COMPLETED != 0))
+ .field("closed", &(state & CLOSED != 0))
+ .field("awaiter", &(state & AWAITER != 0))
+ .field("task", &(state & TASK != 0))
+ .field("ref_count", &(state / REFERENCE))
+ .finish()
+ }
+}